001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017package org.apache.logging.log4j.flume.appender; 018 019import java.io.ByteArrayInputStream; 020import java.io.ByteArrayOutputStream; 021import java.io.DataInputStream; 022import java.io.DataOutputStream; 023import java.io.File; 024import java.nio.charset.Charset; 025import java.util.HashMap; 026import java.util.Map; 027import java.util.concurrent.Callable; 028import java.util.concurrent.ExecutorService; 029import java.util.concurrent.Executors; 030import java.util.concurrent.Future; 031import java.util.concurrent.ThreadFactory; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.atomic.AtomicInteger; 034import java.util.concurrent.atomic.AtomicLong; 035 036import javax.crypto.Cipher; 037import javax.crypto.SecretKey; 038 039import org.apache.flume.Event; 040import org.apache.flume.event.SimpleEvent; 041import org.apache.logging.log4j.LoggingException; 042import org.apache.logging.log4j.core.appender.ManagerFactory; 043import org.apache.logging.log4j.core.config.Property; 044import org.apache.logging.log4j.core.config.plugins.util.PluginManager; 045import org.apache.logging.log4j.core.config.plugins.util.PluginType; 046import org.apache.logging.log4j.core.util.FileUtils; 047import org.apache.logging.log4j.core.util.SecretKeyProvider; 048import org.apache.logging.log4j.util.Strings; 049 050import com.sleepycat.je.Cursor; 051import com.sleepycat.je.CursorConfig; 052import com.sleepycat.je.Database; 053import com.sleepycat.je.DatabaseConfig; 054import com.sleepycat.je.DatabaseEntry; 055import com.sleepycat.je.Environment; 056import com.sleepycat.je.EnvironmentConfig; 057import com.sleepycat.je.LockConflictException; 058import com.sleepycat.je.LockMode; 059import com.sleepycat.je.OperationStatus; 060import com.sleepycat.je.StatsConfig; 061import com.sleepycat.je.Transaction; 062 063/** 064 * Manager that persists data to Berkeley DB before passing it on to Flume. 065 */ 066public class FlumePersistentManager extends FlumeAvroManager { 067 068 /** Attribute name for the key provider. */ 069 public static final String KEY_PROVIDER = "keyProvider"; 070 071 private static final Charset UTF8 = Charset.forName("UTF-8"); 072 073 private static final String DEFAULT_DATA_DIR = ".log4j/flumeData"; 074 075 private static final int SHUTDOWN_WAIT = 60; 076 077 private static final int MILLIS_PER_SECOND = 1000; 078 079 private static final int LOCK_TIMEOUT_SLEEP_MILLIS = 500; 080 081 private static BDBManagerFactory factory = new BDBManagerFactory(); 082 083 private final Database database; 084 085 private final Environment environment; 086 087 private final WriterThread worker; 088 089 private final Gate gate = new Gate(); 090 091 private final SecretKey secretKey; 092 093 private final int lockTimeoutRetryCount; 094 095 private final ExecutorService threadPool; 096 097 private final AtomicLong dbCount = new AtomicLong(); 098 099 /** 100 * Constructor 101 * @param name The unique name of this manager. 102 * @param shortName Original name for the Manager. 103 * @param agents An array of Agents. 104 * @param batchSize The number of events to include in a batch. 105 * @param retries The number of times to retry connecting before giving up. 106 * @param connectionTimeout The amount of time to wait for a connection to be established. 107 * @param requestTimeout The amount of time to wair for a response to a request. 108 * @param delay The amount of time to wait between retries. 109 * @param database The database to write to. 110 * @param environment The database environment. 111 * @param secretKey The SecretKey to use for encryption. 112 * @param lockTimeoutRetryCount The number of times to retry a lock timeout. 113 */ 114 protected FlumePersistentManager(final String name, final String shortName, final Agent[] agents, 115 final int batchSize, final int retries, final int connectionTimeout, 116 final int requestTimeout, final int delay, final Database database, 117 final Environment environment, final SecretKey secretKey, 118 final int lockTimeoutRetryCount) { 119 super(name, shortName, agents, batchSize, delay, retries, connectionTimeout, requestTimeout); 120 this.database = database; 121 this.environment = environment; 122 dbCount.set(database.count()); 123 this.worker = new WriterThread(database, environment, this, gate, batchSize, secretKey, dbCount, 124 lockTimeoutRetryCount); 125 this.worker.start(); 126 this.secretKey = secretKey; 127 this.threadPool = Executors.newCachedThreadPool(new DaemonThreadFactory()); 128 this.lockTimeoutRetryCount = lockTimeoutRetryCount; 129 } 130 131 132 /** 133 * Returns a FlumeAvroManager. 134 * @param name The name of the manager. 135 * @param agents The agents to use. 136 * @param properties Properties to pass to the Manager. 137 * @param batchSize The number of events to include in a batch. 138 * @param retries The number of times to retry connecting before giving up. 139 * @param connectionTimeout The amount of time to wait to establish a connection. 140 * @param requestTimeout The amount of time to wait for a response to a request. 141 * @param delayMillis Amount of time to delay before delivering a batch. 142 * @param lockTimeoutRetryCount The number of times to retry after a lock timeout. 143 * @param dataDir The location of the Berkeley database. 144 * @return A FlumeAvroManager. 145 */ 146 public static FlumePersistentManager getManager(final String name, final Agent[] agents, 147 final Property[] properties, int batchSize, final int retries, 148 final int connectionTimeout, final int requestTimeout, 149 final int delayMillis, final int lockTimeoutRetryCount, 150 final String dataDir) { 151 if (agents == null || agents.length == 0) { 152 throw new IllegalArgumentException("At least one agent is required"); 153 } 154 155 if (batchSize <= 0) { 156 batchSize = 1; 157 } 158 final String dataDirectory = Strings.isEmpty(dataDir) ? DEFAULT_DATA_DIR : dataDir; 159 160 final StringBuilder sb = new StringBuilder("FlumePersistent["); 161 boolean first = true; 162 for (final Agent agent : agents) { 163 if (!first) { 164 sb.append(','); 165 } 166 sb.append(agent.getHost()).append(':').append(agent.getPort()); 167 first = false; 168 } 169 sb.append(']'); 170 sb.append(' ').append(dataDirectory); 171 return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries, 172 connectionTimeout, requestTimeout, delayMillis, lockTimeoutRetryCount, dataDir, properties)); 173 } 174 175 @Override 176 public void send(final Event event) { 177 if (worker.isShutdown()) { 178 throw new LoggingException("Unable to record event"); 179 } 180 181 final Map<String, String> headers = event.getHeaders(); 182 final byte[] keyData = headers.get(FlumeEvent.GUID).getBytes(UTF8); 183 try { 184 final ByteArrayOutputStream baos = new ByteArrayOutputStream(); 185 final DataOutputStream daos = new DataOutputStream(baos); 186 daos.writeInt(event.getBody().length); 187 daos.write(event.getBody(), 0, event.getBody().length); 188 daos.writeInt(event.getHeaders().size()); 189 for (final Map.Entry<String, String> entry : headers.entrySet()) { 190 daos.writeUTF(entry.getKey()); 191 daos.writeUTF(entry.getValue()); 192 } 193 byte[] eventData = baos.toByteArray(); 194 if (secretKey != null) { 195 final Cipher cipher = Cipher.getInstance("AES"); 196 cipher.init(Cipher.ENCRYPT_MODE, secretKey); 197 eventData = cipher.doFinal(eventData); 198 } 199 final Future<Integer> future = threadPool.submit(new BDBWriter(keyData, eventData, environment, database, 200 gate, dbCount, getBatchSize(), lockTimeoutRetryCount)); 201 boolean interrupted = false; 202 int ieCount = 0; 203 do { 204 try { 205 future.get(); 206 } catch (final InterruptedException ie) { 207 interrupted = true; 208 ++ieCount; 209 } 210 } while (interrupted && ieCount <= 1); 211 212 } catch (final Exception ex) { 213 throw new LoggingException("Exception occurred writing log event", ex); 214 } 215 } 216 217 @Override 218 protected void releaseSub() { 219 LOGGER.debug("Shutting down FlumePersistentManager"); 220 worker.shutdown(); 221 try { 222 worker.join(SHUTDOWN_WAIT * MILLIS_PER_SECOND); 223 } catch (final InterruptedException ie) { 224 // Ignore the exception and shutdown. 225 } 226 threadPool.shutdown(); 227 try { 228 threadPool.awaitTermination(SHUTDOWN_WAIT, TimeUnit.SECONDS); 229 } catch (final InterruptedException ie) { 230 LOGGER.warn("PersistentManager Thread pool failed to shut down"); 231 } 232 try { 233 worker.join(); 234 } catch (final InterruptedException ex) { 235 LOGGER.debug("Interrupted while waiting for worker to complete"); 236 } 237 try { 238 LOGGER.debug("FlumePersistenceManager dataset status: {}", database.getStats(new StatsConfig())); 239 database.close(); 240 } catch (final Exception ex) { 241 LOGGER.warn("Failed to close database", ex); 242 } 243 try { 244 environment.cleanLog(); 245 environment.close(); 246 } catch (final Exception ex) { 247 LOGGER.warn("Failed to close environment", ex); 248 } 249 super.releaseSub(); 250 } 251 252 private void doSend(final SimpleEvent event) { 253 LOGGER.debug("Sending event to Flume"); 254 super.send(event); 255 } 256 257 /** 258 * Thread for writing to Berkeley DB to avoid having interrupts close the database. 259 */ 260 private static class BDBWriter implements Callable<Integer> { 261 private final byte[] eventData; 262 private final byte[] keyData; 263 private final Environment environment; 264 private final Database database; 265 private final Gate gate; 266 private final AtomicLong dbCount; 267 private final long batchSize; 268 private final int lockTimeoutRetryCount; 269 270 public BDBWriter(final byte[] keyData, final byte[] eventData, final Environment environment, 271 final Database database, final Gate gate, final AtomicLong dbCount, final long batchSize, 272 final int lockTimeoutRetryCount) { 273 this.keyData = keyData; 274 this.eventData = eventData; 275 this.environment = environment; 276 this.database = database; 277 this.gate = gate; 278 this.dbCount = dbCount; 279 this.batchSize = batchSize; 280 this.lockTimeoutRetryCount = lockTimeoutRetryCount; 281 } 282 283 @Override 284 public Integer call() throws Exception { 285 final DatabaseEntry key = new DatabaseEntry(keyData); 286 final DatabaseEntry data = new DatabaseEntry(eventData); 287 Exception exception = null; 288 for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) { 289 Transaction txn = null; 290 try { 291 txn = environment.beginTransaction(null, null); 292 try { 293 database.put(txn, key, data); 294 txn.commit(); 295 txn = null; 296 if (dbCount.incrementAndGet() >= batchSize) { 297 gate.open(); 298 } 299 exception = null; 300 break; 301 } catch (final LockConflictException lce) { 302 exception = lce; 303 // Fall through and retry. 304 } catch (final Exception ex) { 305 if (txn != null) { 306 txn.abort(); 307 } 308 throw ex; 309 } finally { 310 if (txn != null) { 311 txn.abort(); 312 txn = null; 313 } 314 } 315 } catch (final LockConflictException lce) { 316 exception = lce; 317 if (txn != null) { 318 try { 319 txn.abort(); 320 txn = null; 321 } catch (final Exception ex) { 322 LOGGER.trace("Ignoring exception while aborting transaction during lock conflict."); 323 } 324 } 325 326 } 327 try { 328 Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS); 329 } catch (final InterruptedException ie) { 330 // Ignore the error 331 } 332 } 333 if (exception != null) { 334 throw exception; 335 } 336 return eventData.length; 337 } 338 } 339 340 /** 341 * Factory data. 342 */ 343 private static class FactoryData { 344 private final String name; 345 private final Agent[] agents; 346 private final int batchSize; 347 private final String dataDir; 348 private final int retries; 349 private final int connectionTimeout; 350 private final int requestTimeout; 351 private final int delayMillis; 352 private final int lockTimeoutRetryCount; 353 private final Property[] properties; 354 355 /** 356 * Constructor. 357 * @param name The name of the Appender. 358 * @param agents The agents. 359 * @param batchSize The number of events to include in a batch. 360 * @param dataDir The directory for data. 361 */ 362 public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries, 363 final int connectionTimeout, final int requestTimeout, final int delayMillis, 364 final int lockTimeoutRetryCount, final String dataDir, final Property[] properties) { 365 this.name = name; 366 this.agents = agents; 367 this.batchSize = batchSize; 368 this.dataDir = dataDir; 369 this.retries = retries; 370 this.connectionTimeout = connectionTimeout; 371 this.requestTimeout = requestTimeout; 372 this.delayMillis = delayMillis; 373 this.lockTimeoutRetryCount = lockTimeoutRetryCount; 374 this.properties = properties; 375 } 376 } 377 378 /** 379 * Avro Manager Factory. 380 */ 381 private static class BDBManagerFactory implements ManagerFactory<FlumePersistentManager, FactoryData> { 382 383 /** 384 * Create the FlumeKratiManager. 385 * @param name The name of the entity to manage. 386 * @param data The data required to create the entity. 387 * @return The FlumeKratiManager. 388 */ 389 @Override 390 public FlumePersistentManager createManager(final String name, final FactoryData data) { 391 SecretKey secretKey = null; 392 Database database = null; 393 Environment environment = null; 394 395 final Map<String, String> properties = new HashMap<>(); 396 if (data.properties != null) { 397 for (final Property property : data.properties) { 398 properties.put(property.getName(), property.getValue()); 399 } 400 } 401 402 try { 403 final File dir = new File(data.dataDir); 404 FileUtils.mkdir(dir, true); 405 final EnvironmentConfig dbEnvConfig = new EnvironmentConfig(); 406 dbEnvConfig.setTransactional(true); 407 dbEnvConfig.setAllowCreate(true); 408 dbEnvConfig.setLockTimeout(5, TimeUnit.SECONDS); 409 environment = new Environment(dir, dbEnvConfig); 410 final DatabaseConfig dbConfig = new DatabaseConfig(); 411 dbConfig.setTransactional(true); 412 dbConfig.setAllowCreate(true); 413 database = environment.openDatabase(null, name, dbConfig); 414 } catch (final Exception ex) { 415 LOGGER.error("Could not create FlumePersistentManager", ex); 416 // For consistency, close database as well as environment even though it should never happen since the 417 // database is that last thing in the block above, but this does guard against a future line being 418 // inserted at the end that would bomb (like some debug logging). 419 if (database != null) { 420 database.close(); 421 database = null; 422 } 423 if (environment != null) { 424 environment.close(); 425 environment = null; 426 } 427 return null; 428 } 429 430 try { 431 String key = null; 432 for (final Map.Entry<String, String> entry : properties.entrySet()) { 433 if (entry.getKey().equalsIgnoreCase(KEY_PROVIDER)) { 434 key = entry.getValue(); 435 break; 436 } 437 } 438 if (key != null) { 439 final PluginManager manager = new PluginManager("KeyProvider"); 440 manager.collectPlugins(); 441 final Map<String, PluginType<?>> plugins = manager.getPlugins(); 442 if (plugins != null) { 443 boolean found = false; 444 for (final Map.Entry<String, PluginType<?>> entry : plugins.entrySet()) { 445 if (entry.getKey().equalsIgnoreCase(key)) { 446 found = true; 447 final Class<?> cl = entry.getValue().getPluginClass(); 448 try { 449 final SecretKeyProvider provider = (SecretKeyProvider) cl.newInstance(); 450 secretKey = provider.getSecretKey(); 451 LOGGER.debug("Persisting events using SecretKeyProvider {}", cl.getName()); 452 } catch (final Exception ex) { 453 LOGGER.error("Unable to create SecretKeyProvider {}, encryption will be disabled", 454 cl.getName()); 455 } 456 break; 457 } 458 } 459 if (!found) { 460 LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key); 461 } 462 } else { 463 LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key); 464 } 465 } 466 } catch (final Exception ex) { 467 LOGGER.warn("Error setting up encryption - encryption will be disabled", ex); 468 } 469 return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries, 470 data.connectionTimeout, data.requestTimeout, data.delayMillis, database, environment, secretKey, 471 data.lockTimeoutRetryCount); 472 } 473 } 474 475 /** 476 * Thread that sends data to Flume and pulls it from Berkeley DB. 477 */ 478 private static class WriterThread extends Thread { 479 private volatile boolean shutdown = false; 480 private final Database database; 481 private final Environment environment; 482 private final FlumePersistentManager manager; 483 private final Gate gate; 484 private final SecretKey secretKey; 485 private final int batchSize; 486 private final AtomicLong dbCounter; 487 private final int lockTimeoutRetryCount; 488 489 public WriterThread(final Database database, final Environment environment, 490 final FlumePersistentManager manager, final Gate gate, final int batchsize, 491 final SecretKey secretKey, final AtomicLong dbCount, final int lockTimeoutRetryCount) { 492 this.database = database; 493 this.environment = environment; 494 this.manager = manager; 495 this.gate = gate; 496 this.batchSize = batchsize; 497 this.secretKey = secretKey; 498 this.setDaemon(true); 499 this.dbCounter = dbCount; 500 this.lockTimeoutRetryCount = lockTimeoutRetryCount; 501 } 502 503 public void shutdown() { 504 LOGGER.debug("Writer thread shutting down"); 505 this.shutdown = true; 506 gate.open(); 507 } 508 509 public boolean isShutdown() { 510 return shutdown; 511 } 512 513 @Override 514 public void run() { 515 LOGGER.trace("WriterThread started - batch size = " + batchSize + ", delayMillis = " + manager.getDelayMillis()); 516 long nextBatchMillis = System.currentTimeMillis() + manager.getDelayMillis(); 517 while (!shutdown) { 518 final long nowMillis = System.currentTimeMillis(); 519 final long dbCount = database.count(); 520 dbCounter.set(dbCount); 521 if (dbCount >= batchSize || dbCount > 0 && nextBatchMillis <= nowMillis) { 522 nextBatchMillis = nowMillis + manager.getDelayMillis(); 523 try { 524 boolean errors = false; 525 final DatabaseEntry key = new DatabaseEntry(); 526 final DatabaseEntry data = new DatabaseEntry(); 527 528 gate.close(); 529 OperationStatus status; 530 if (batchSize > 1) { 531 try { 532 errors = sendBatch(key, data); 533 } catch (final Exception ex) { 534 break; 535 } 536 } else { 537 Exception exception = null; 538 for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) { 539 exception = null; 540 Transaction txn = null; 541 Cursor cursor = null; 542 try { 543 txn = environment.beginTransaction(null, null); 544 cursor = database.openCursor(txn, null); 545 try { 546 status = cursor.getFirst(key, data, LockMode.RMW); 547 while (status == OperationStatus.SUCCESS) { 548 final SimpleEvent event = createEvent(data); 549 if (event != null) { 550 try { 551 manager.doSend(event); 552 } catch (final Exception ioe) { 553 errors = true; 554 LOGGER.error("Error sending event", ioe); 555 break; 556 } 557 try { 558 cursor.delete(); 559 } catch (final Exception ex) { 560 LOGGER.error("Unable to delete event", ex); 561 } 562 } 563 status = cursor.getNext(key, data, LockMode.RMW); 564 } 565 if (cursor != null) { 566 cursor.close(); 567 cursor = null; 568 } 569 txn.commit(); 570 txn = null; 571 dbCounter.decrementAndGet(); 572 exception = null; 573 break; 574 } catch (final LockConflictException lce) { 575 exception = lce; 576 // Fall through and retry. 577 } catch (final Exception ex) { 578 LOGGER.error("Error reading or writing to database", ex); 579 shutdown = true; 580 break; 581 } finally { 582 if (cursor != null) { 583 cursor.close(); 584 cursor = null; 585 } 586 if (txn != null) { 587 txn.abort(); 588 txn = null; 589 } 590 } 591 } catch (final LockConflictException lce) { 592 exception = lce; 593 if (cursor != null) { 594 try { 595 cursor.close(); 596 cursor = null; 597 } catch (final Exception ex) { 598 LOGGER.trace("Ignored exception closing cursor during lock conflict."); 599 } 600 } 601 if (txn != null) { 602 try { 603 txn.abort(); 604 txn = null; 605 } catch (final Exception ex) { 606 LOGGER.trace("Ignored exception aborting tx during lock conflict."); 607 } 608 } 609 } 610 try { 611 Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS); 612 } catch (final InterruptedException ie) { 613 // Ignore the error 614 } 615 } 616 if (exception != null) { 617 LOGGER.error("Unable to read or update data base", exception); 618 } 619 } 620 if (errors) { 621 Thread.sleep(manager.getDelayMillis()); 622 continue; 623 } 624 } catch (final Exception ex) { 625 LOGGER.warn("WriterThread encountered an exception. Continuing.", ex); 626 } 627 } else { 628 if (nextBatchMillis <= nowMillis) { 629 nextBatchMillis = nowMillis + manager.getDelayMillis(); 630 } 631 try { 632 final long interval = nextBatchMillis - nowMillis; 633 gate.waitForOpen(interval); 634 } catch (final InterruptedException ie) { 635 LOGGER.warn("WriterThread interrupted, continuing"); 636 } catch (final Exception ex) { 637 LOGGER.error("WriterThread encountered an exception waiting for work", ex); 638 break; 639 } 640 } 641 } 642 643 if (batchSize > 1 && database.count() > 0) { 644 final DatabaseEntry key = new DatabaseEntry(); 645 final DatabaseEntry data = new DatabaseEntry(); 646 try { 647 sendBatch(key, data); 648 } catch (final Exception ex) { 649 LOGGER.warn("Unable to write final batch"); 650 } 651 } 652 LOGGER.trace("WriterThread exiting"); 653 } 654 655 private boolean sendBatch(DatabaseEntry key, final DatabaseEntry data) throws Exception { 656 boolean errors = false; 657 OperationStatus status; 658 Cursor cursor = null; 659 try { 660 final BatchEvent batch = new BatchEvent(); 661 for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) { 662 try { 663 cursor = database.openCursor(null, CursorConfig.DEFAULT); 664 status = cursor.getFirst(key, data, null); 665 666 for (int i = 0; status == OperationStatus.SUCCESS && i < batchSize; ++i) { 667 final SimpleEvent event = createEvent(data); 668 if (event != null) { 669 batch.addEvent(event); 670 } 671 status = cursor.getNext(key, data, null); 672 } 673 break; 674 } catch (final LockConflictException lce) { 675 if (cursor != null) { 676 try { 677 cursor.close(); 678 cursor = null; 679 } catch (final Exception ex) { 680 LOGGER.trace("Ignored exception closing cursor during lock conflict."); 681 } 682 } 683 } 684 } 685 686 try { 687 manager.send(batch); 688 } catch (final Exception ioe) { 689 LOGGER.error("Error sending events", ioe); 690 errors = true; 691 } 692 if (!errors) { 693 if (cursor != null) { 694 cursor.close(); 695 cursor = null; 696 } 697 Transaction txn = null; 698 Exception exception = null; 699 for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) { 700 try { 701 txn = environment.beginTransaction(null, null); 702 try { 703 for (final Event event : batch.getEvents()) { 704 try { 705 final Map<String, String> headers = event.getHeaders(); 706 key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8)); 707 database.delete(txn, key); 708 } catch (final Exception ex) { 709 LOGGER.error("Error deleting key from database", ex); 710 } 711 } 712 txn.commit(); 713 long count = dbCounter.get(); 714 while (!dbCounter.compareAndSet(count, count - batch.getEvents().size())) { 715 count = dbCounter.get(); 716 } 717 exception = null; 718 break; 719 } catch (final LockConflictException lce) { 720 exception = lce; 721 if (cursor != null) { 722 try { 723 cursor.close(); 724 cursor = null; 725 } catch (final Exception ex) { 726 LOGGER.trace("Ignored exception closing cursor during lock conflict."); 727 } 728 } 729 if (txn != null) { 730 try { 731 txn.abort(); 732 txn = null; 733 } catch (final Exception ex) { 734 LOGGER.trace("Ignored exception aborting transaction during lock conflict."); 735 } 736 } 737 } catch (final Exception ex) { 738 LOGGER.error("Unable to commit transaction", ex); 739 if (txn != null) { 740 txn.abort(); 741 } 742 } 743 } catch (final LockConflictException lce) { 744 exception = lce; 745 if (cursor != null) { 746 try { 747 cursor.close(); 748 cursor = null; 749 } catch (final Exception ex) { 750 LOGGER.trace("Ignored exception closing cursor during lock conflict."); 751 } 752 } 753 if (txn != null) { 754 try { 755 txn.abort(); 756 txn = null; 757 } catch (final Exception ex) { 758 LOGGER.trace("Ignored exception aborting transaction during lock conflict."); 759 } 760 } 761 } finally { 762 if (cursor != null) { 763 cursor.close(); 764 cursor = null; 765 } 766 if (txn != null) { 767 txn.abort(); 768 txn = null; 769 } 770 } 771 try { 772 Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS); 773 } catch (final InterruptedException ie) { 774 // Ignore the error 775 } 776 } 777 if (exception != null) { 778 LOGGER.error("Unable to delete events from data base", exception); 779 } 780 } 781 } catch (final Exception ex) { 782 LOGGER.error("Error reading database", ex); 783 shutdown = true; 784 throw ex; 785 } finally { 786 if (cursor != null) { 787 cursor.close(); 788 } 789 } 790 791 return errors; 792 } 793 794 private SimpleEvent createEvent(final DatabaseEntry data) { 795 final SimpleEvent event = new SimpleEvent(); 796 try { 797 byte[] eventData = data.getData(); 798 if (secretKey != null) { 799 final Cipher cipher = Cipher.getInstance("AES"); 800 cipher.init(Cipher.DECRYPT_MODE, secretKey); 801 eventData = cipher.doFinal(eventData); 802 } 803 final ByteArrayInputStream bais = new ByteArrayInputStream(eventData); 804 final DataInputStream dais = new DataInputStream(bais); 805 int length = dais.readInt(); 806 final byte[] bytes = new byte[length]; 807 dais.read(bytes, 0, length); 808 event.setBody(bytes); 809 length = dais.readInt(); 810 final Map<String, String> map = new HashMap<>(length); 811 for (int i = 0; i < length; ++i) { 812 final String headerKey = dais.readUTF(); 813 final String value = dais.readUTF(); 814 map.put(headerKey, value); 815 } 816 event.setHeaders(map); 817 return event; 818 } catch (final Exception ex) { 819 LOGGER.error("Error retrieving event", ex); 820 return null; 821 } 822 } 823 824 } 825 826 /** 827 * Factory that creates Daemon threads that can be properly shut down. 828 */ 829 private static class DaemonThreadFactory implements ThreadFactory { 830 private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1); 831 private final ThreadGroup group; 832 private final AtomicInteger threadNumber = new AtomicInteger(1); 833 private final String namePrefix; 834 835 public DaemonThreadFactory() { 836 final SecurityManager securityManager = System.getSecurityManager(); 837 group = securityManager != null ? securityManager.getThreadGroup() : 838 Thread.currentThread().getThreadGroup(); 839 namePrefix = "DaemonPool-" + POOL_NUMBER.getAndIncrement() + "-thread-"; 840 } 841 842 @Override 843 public Thread newThread(final Runnable r) { 844 final Thread thread = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); 845 thread.setDaemon(true); 846 if (thread.getPriority() != Thread.NORM_PRIORITY) { 847 thread.setPriority(Thread.NORM_PRIORITY); 848 } 849 return thread; 850 } 851 } 852 853 /** 854 * An internal class. 855 */ 856 private static class Gate { 857 858 private boolean isOpen = false; 859 860 public boolean isOpen() { 861 return isOpen; 862 } 863 864 public synchronized void open() { 865 isOpen = true; 866 notifyAll(); 867 } 868 869 public synchronized void close() { 870 isOpen = false; 871 } 872 873 public synchronized void waitForOpen(final long timeout) throws InterruptedException { 874 wait(timeout); 875 } 876 } 877}