001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017package org.apache.logging.log4j.flume.appender; 018 019import java.io.ByteArrayInputStream; 020import java.io.ByteArrayOutputStream; 021import java.io.DataInputStream; 022import java.io.DataOutputStream; 023import java.io.File; 024import java.nio.charset.Charset; 025import java.util.HashMap; 026import java.util.Map; 027import java.util.concurrent.Callable; 028import java.util.concurrent.ExecutorService; 029import java.util.concurrent.Executors; 030import java.util.concurrent.Future; 031import java.util.concurrent.ThreadFactory; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.atomic.AtomicInteger; 034import java.util.concurrent.atomic.AtomicLong; 035import javax.crypto.Cipher; 036import javax.crypto.SecretKey; 037 038import org.apache.flume.Event; 039import org.apache.flume.event.SimpleEvent; 040import org.apache.logging.log4j.LoggingException; 041import org.apache.logging.log4j.core.appender.ManagerFactory; 042import org.apache.logging.log4j.core.config.Property; 043import org.apache.logging.log4j.core.config.plugins.PluginManager; 044import org.apache.logging.log4j.core.config.plugins.PluginType; 045import org.apache.logging.log4j.core.helpers.FileUtils; 046import org.apache.logging.log4j.core.helpers.SecretKeyProvider; 047import org.apache.logging.log4j.core.helpers.Strings; 048 049import com.sleepycat.je.Cursor; 050import com.sleepycat.je.CursorConfig; 051import com.sleepycat.je.Database; 052import com.sleepycat.je.DatabaseConfig; 053import com.sleepycat.je.DatabaseEntry; 054import com.sleepycat.je.Environment; 055import com.sleepycat.je.EnvironmentConfig; 056import com.sleepycat.je.LockConflictException; 057import com.sleepycat.je.LockMode; 058import com.sleepycat.je.OperationStatus; 059import com.sleepycat.je.StatsConfig; 060import com.sleepycat.je.Transaction; 061 062/** 063 * Manager that persists data to Berkeley DB before passing it on to Flume. 064 */ 065public class FlumePersistentManager extends FlumeAvroManager { 066 067 /** Attribute name for the key provider. */ 068 public static final String KEY_PROVIDER = "keyProvider"; 069 070 private static final Charset UTF8 = Charset.forName("UTF-8"); 071 072 private static final String DEFAULT_DATA_DIR = ".log4j/flumeData"; 073 074 private static final int SHUTDOWN_WAIT = 60; 075 076 private static final int MILLIS_PER_SECOND = 1000; 077 078 private static final int LOCK_TIMEOUT_SLEEP_MILLIS = 500; 079 080 private static BDBManagerFactory factory = new BDBManagerFactory(); 081 082 private final Database database; 083 084 private final Environment environment; 085 086 private final WriterThread worker; 087 088 private final Gate gate = new Gate(); 089 090 private final SecretKey secretKey; 091 092 private final int delay; 093 094 private final int lockTimeoutRetryCount; 095 096 private final ExecutorService threadPool; 097 098 private AtomicLong dbCount = new AtomicLong(); 099 100 /** 101 * Constructor 102 * @param name The unique name of this manager. 103 * @param shortName Original name for the Manager. 104 * @param agents An array of Agents. 105 * @param batchSize The number of events to include in a batch. 106 * @param retries The number of times to retry connecting before giving up. 107 * @param connectionTimeout The amount of time to wait for a connection to be established. 108 * @param requestTimeout The amount of time to wair for a response to a request. 109 * @param delay The amount of time to wait between retries. 110 * @param database The database to write to. 111 * @param environment The database environment. 112 * @param secretKey The SecretKey to use for encryption. 113 * @param lockTimeoutRetryCount The number of times to retry a lock timeout. 114 */ 115 protected FlumePersistentManager(final String name, final String shortName, final Agent[] agents, 116 final int batchSize, final int retries, final int connectionTimeout, 117 final int requestTimeout, final int delay, final Database database, 118 final Environment environment, final SecretKey secretKey, 119 final int lockTimeoutRetryCount) { 120 super(name, shortName, agents, batchSize, retries, connectionTimeout, requestTimeout); 121 this.delay = delay; 122 this.database = database; 123 this.environment = environment; 124 dbCount.set(database.count()); 125 this.worker = new WriterThread(database, environment, this, gate, batchSize, secretKey, dbCount, 126 lockTimeoutRetryCount); 127 this.worker.start(); 128 this.secretKey = secretKey; 129 this.threadPool = Executors.newCachedThreadPool(new DaemonThreadFactory()); 130 this.lockTimeoutRetryCount = lockTimeoutRetryCount; 131 } 132 133 134 /** 135 * Returns a FlumeAvroManager. 136 * @param name The name of the manager. 137 * @param agents The agents to use. 138 * @param properties Properties to pass to the Manager. 139 * @param batchSize The number of events to include in a batch. 140 * @param retries The number of times to retry connecting before giving up. 141 * @param connectionTimeout The amount of time to wait to establish a connection. 142 * @param requestTimeout The amount of time to wait for a response to a request. 143 * @param delay Amount of time to delay before delivering a batch. 144 * @param lockTimeoutRetryCount The number of times to retry after a lock timeout. 145 * @param dataDir The location of the Berkeley database. 146 * @return A FlumeAvroManager. 147 */ 148 public static FlumePersistentManager getManager(final String name, final Agent[] agents, 149 final Property[] properties, int batchSize, final int retries, 150 final int connectionTimeout, final int requestTimeout, 151 final int delay, final int lockTimeoutRetryCount, 152 final String dataDir) { 153 if (agents == null || agents.length == 0) { 154 throw new IllegalArgumentException("At least one agent is required"); 155 } 156 157 if (batchSize <= 0) { 158 batchSize = 1; 159 } 160 final String dataDirectory = Strings.isEmpty(dataDir) ? DEFAULT_DATA_DIR : dataDir; 161 162 final StringBuilder sb = new StringBuilder("FlumePersistent["); 163 boolean first = true; 164 for (final Agent agent : agents) { 165 if (!first) { 166 sb.append(","); 167 } 168 sb.append(agent.getHost()).append(":").append(agent.getPort()); 169 first = false; 170 } 171 sb.append("]"); 172 sb.append(" ").append(dataDirectory); 173 return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries, 174 connectionTimeout, requestTimeout, delay, lockTimeoutRetryCount, dataDir, properties)); 175 } 176 177 @Override 178 public void send(final Event event) { 179 if (worker.isShutdown()) { 180 throw new LoggingException("Unable to record event"); 181 } 182 183 final Map<String, String> headers = event.getHeaders(); 184 final byte[] keyData = headers.get(FlumeEvent.GUID).getBytes(UTF8); 185 try { 186 final ByteArrayOutputStream baos = new ByteArrayOutputStream(); 187 final DataOutputStream daos = new DataOutputStream(baos); 188 daos.writeInt(event.getBody().length); 189 daos.write(event.getBody(), 0, event.getBody().length); 190 daos.writeInt(event.getHeaders().size()); 191 for (final Map.Entry<String, String> entry : headers.entrySet()) { 192 daos.writeUTF(entry.getKey()); 193 daos.writeUTF(entry.getValue()); 194 } 195 byte[] eventData = baos.toByteArray(); 196 if (secretKey != null) { 197 final Cipher cipher = Cipher.getInstance("AES"); 198 cipher.init(Cipher.ENCRYPT_MODE, secretKey); 199 eventData = cipher.doFinal(eventData); 200 } 201 final Future<Integer> future = threadPool.submit(new BDBWriter(keyData, eventData, environment, database, 202 gate, dbCount, getBatchSize(), lockTimeoutRetryCount)); 203 boolean interrupted = false; 204 int count = 0; 205 do { 206 try { 207 future.get(); 208 } catch (final InterruptedException ie) { 209 interrupted = true; 210 ++count; 211 } 212 } while (interrupted && count <= 1); 213 214 } catch (final Exception ex) { 215 throw new LoggingException("Exception occurred writing log event", ex); 216 } 217 } 218 219 @Override 220 protected void releaseSub() { 221 LOGGER.debug("Shutting down FlumePersistentManager"); 222 worker.shutdown(); 223 try { 224 worker.join(SHUTDOWN_WAIT * MILLIS_PER_SECOND); 225 } catch (InterruptedException ie) { 226 // Ignore the exception and shutdown. 227 } 228 threadPool.shutdown(); 229 try { 230 threadPool.awaitTermination(SHUTDOWN_WAIT, TimeUnit.SECONDS); 231 } catch (final InterruptedException ie) { 232 LOGGER.warn("PersistentManager Thread pool failed to shut down"); 233 } 234 try { 235 worker.join(); 236 } catch (final InterruptedException ex) { 237 LOGGER.debug("Interrupted while waiting for worker to complete"); 238 } 239 try { 240 LOGGER.debug("FlumePersistenceManager dataset status: {}", database.getStats(new StatsConfig())); 241 database.close(); 242 } catch (final Exception ex) { 243 LOGGER.warn("Failed to close database", ex); 244 } 245 try { 246 environment.cleanLog(); 247 environment.close(); 248 } catch (final Exception ex) { 249 LOGGER.warn("Failed to close environment", ex); 250 } 251 super.releaseSub(); 252 } 253 254 private void doSend(final SimpleEvent event) { 255 LOGGER.debug("Sending event to Flume"); 256 super.send(event); 257 } 258 259 /** 260 * Thread for writing to Berkeley DB to avoid having interrupts close the database. 261 */ 262 private static class BDBWriter implements Callable<Integer> { 263 private final byte[] eventData; 264 private final byte[] keyData; 265 private final Environment environment; 266 private final Database database; 267 private final Gate gate; 268 private final AtomicLong dbCount; 269 private final long batchSize; 270 private final int lockTimeoutRetryCount; 271 272 public BDBWriter(final byte[] keyData, final byte[] eventData, final Environment environment, 273 final Database database, final Gate gate, final AtomicLong dbCount, final long batchSize, 274 final int lockTimeoutRetryCount) { 275 this.keyData = keyData; 276 this.eventData = eventData; 277 this.environment = environment; 278 this.database = database; 279 this.gate = gate; 280 this.dbCount = dbCount; 281 this.batchSize = batchSize; 282 this.lockTimeoutRetryCount = lockTimeoutRetryCount; 283 } 284 285 @Override 286 public Integer call() throws Exception { 287 final DatabaseEntry key = new DatabaseEntry(keyData); 288 final DatabaseEntry data = new DatabaseEntry(eventData); 289 Exception exception = null; 290 for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) { 291 Transaction txn = null; 292 try { 293 txn = environment.beginTransaction(null, null); 294 try { 295 database.put(txn, key, data); 296 txn.commit(); 297 txn = null; 298 if (dbCount.incrementAndGet() >= batchSize) { 299 gate.open(); 300 } 301 exception = null; 302 break; 303 } catch (final LockConflictException lce) { 304 exception = lce; 305 // Fall through and retry. 306 } catch (final Exception ex) { 307 if (txn != null) { 308 txn.abort(); 309 } 310 throw ex; 311 } finally { 312 if (txn != null) { 313 txn.abort(); 314 txn = null; 315 } 316 } 317 } catch (LockConflictException lce) { 318 exception = lce; 319 if (txn != null) { 320 try { 321 txn.abort(); 322 txn = null; 323 } catch (Exception ex) { 324 LOGGER.trace("Ignoring exception while aborting transaction during lock conflict."); 325 } 326 } 327 328 } 329 try { 330 Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS); 331 } catch (InterruptedException ie) { 332 // Ignore the error 333 } 334 } 335 if (exception != null) { 336 throw exception; 337 } 338 return eventData.length; 339 } 340 } 341 342 /** 343 * Factory data. 344 */ 345 private static class FactoryData { 346 private final String name; 347 private final Agent[] agents; 348 private final int batchSize; 349 private final String dataDir; 350 private final int retries; 351 private final int connectionTimeout; 352 private final int requestTimeout; 353 private final int delay; 354 private final int lockTimeoutRetryCount; 355 private final Property[] properties; 356 357 /** 358 * Constructor. 359 * @param name The name of the Appender. 360 * @param agents The agents. 361 * @param batchSize The number of events to include in a batch. 362 * @param dataDir The directory for data. 363 */ 364 public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries, 365 final int connectionTimeout, final int requestTimeout, final int delay, 366 final int lockTimeoutRetryCount, final String dataDir, final Property[] properties) { 367 this.name = name; 368 this.agents = agents; 369 this.batchSize = batchSize; 370 this.dataDir = dataDir; 371 this.retries = retries; 372 this.connectionTimeout = connectionTimeout; 373 this.requestTimeout = requestTimeout; 374 this.delay = delay; 375 this.lockTimeoutRetryCount = lockTimeoutRetryCount; 376 this.properties = properties; 377 } 378 } 379 380 /** 381 * Avro Manager Factory. 382 */ 383 private static class BDBManagerFactory implements ManagerFactory<FlumePersistentManager, FactoryData> { 384 385 /** 386 * Create the FlumeKratiManager. 387 * @param name The name of the entity to manage. 388 * @param data The data required to create the entity. 389 * @return The FlumeKratiManager. 390 */ 391 @Override 392 public FlumePersistentManager createManager(final String name, final FactoryData data) { 393 SecretKey secretKey = null; 394 Database database = null; 395 Environment environment = null; 396 397 final Map<String, String> properties = new HashMap<String, String>(); 398 if (data.properties != null) { 399 for (final Property property : data.properties) { 400 properties.put(property.getName(), property.getValue()); 401 } 402 } 403 404 try { 405 final File dir = new File(data.dataDir); 406 FileUtils.mkdir(dir, true); 407 final EnvironmentConfig dbEnvConfig = new EnvironmentConfig(); 408 dbEnvConfig.setTransactional(true); 409 dbEnvConfig.setAllowCreate(true); 410 dbEnvConfig.setLockTimeout(5, TimeUnit.SECONDS); 411 environment = new Environment(dir, dbEnvConfig); 412 final DatabaseConfig dbConfig = new DatabaseConfig(); 413 dbConfig.setTransactional(true); 414 dbConfig.setAllowCreate(true); 415 database = environment.openDatabase(null, name, dbConfig); 416 } catch (final Exception ex) { 417 LOGGER.error("Could not create FlumePersistentManager", ex); 418 // For consistency, close database as well as environment even though it should never happen since the 419 // database is that last thing in the block above, but this does guard against a future line being 420 // inserted at the end that would bomb (like some debug logging). 421 if (database != null) { 422 database.close(); 423 database = null; 424 } 425 if (environment != null) { 426 environment.close(); 427 environment = null; 428 } 429 return null; 430 } 431 432 try { 433 String key = null; 434 for (final Map.Entry<String, String> entry : properties.entrySet()) { 435 if (entry.getKey().equalsIgnoreCase(KEY_PROVIDER)) { 436 key = entry.getValue(); 437 break; 438 } 439 } 440 if (key != null) { 441 final PluginManager manager = new PluginManager("KeyProvider", SecretKeyProvider.class); 442 manager.collectPlugins(); 443 final Map<String, PluginType<?>> plugins = manager.getPlugins(); 444 if (plugins != null) { 445 boolean found = false; 446 for (final Map.Entry<String, PluginType<?>> entry : plugins.entrySet()) { 447 if (entry.getKey().equalsIgnoreCase(key)) { 448 found = true; 449 final Class<?> cl = entry.getValue().getPluginClass(); 450 try { 451 final SecretKeyProvider provider = (SecretKeyProvider) cl.newInstance(); 452 secretKey = provider.getSecretKey(); 453 LOGGER.debug("Persisting events using SecretKeyProvider {}", cl.getName()); 454 } catch (final Exception ex) { 455 LOGGER.error("Unable to create SecretKeyProvider {}, encryption will be disabled", 456 cl.getName()); 457 } 458 break; 459 } 460 } 461 if (!found) { 462 LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key); 463 } 464 } else { 465 LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key); 466 } 467 } 468 } catch (final Exception ex) { 469 LOGGER.warn("Error setting up encryption - encryption will be disabled", ex); 470 } 471 return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries, 472 data.connectionTimeout, data.requestTimeout, data.delay, database, environment, secretKey, 473 data.lockTimeoutRetryCount); 474 } 475 } 476 477 /** 478 * Thread that sends data to Flume and pulls it from Berkeley DB. 479 */ 480 private static class WriterThread extends Thread { 481 private volatile boolean shutdown = false; 482 private final Database database; 483 private final Environment environment; 484 private final FlumePersistentManager manager; 485 private final Gate gate; 486 private final SecretKey secretKey; 487 private final int batchSize; 488 private final AtomicLong dbCounter; 489 private final int lockTimeoutRetryCount; 490 491 public WriterThread(final Database database, final Environment environment, 492 final FlumePersistentManager manager, final Gate gate, final int batchsize, 493 final SecretKey secretKey, final AtomicLong dbCount, final int lockTimeoutRetryCount) { 494 this.database = database; 495 this.environment = environment; 496 this.manager = manager; 497 this.gate = gate; 498 this.batchSize = batchsize; 499 this.secretKey = secretKey; 500 this.setDaemon(true); 501 this.dbCounter = dbCount; 502 this.lockTimeoutRetryCount = lockTimeoutRetryCount; 503 } 504 505 public void shutdown() { 506 LOGGER.debug("Writer thread shutting down"); 507 this.shutdown = true; 508 gate.open(); 509 } 510 511 public boolean isShutdown() { 512 return shutdown; 513 } 514 515 @Override 516 public void run() { 517 LOGGER.trace("WriterThread started - batch size = " + batchSize + ", delay = " + manager.delay); 518 long nextBatch = System.currentTimeMillis() + manager.delay; 519 while (!shutdown) { 520 long now = System.currentTimeMillis(); 521 long dbCount = database.count(); 522 dbCounter.set(dbCount); 523 if (dbCount >= batchSize || dbCount > 0 && nextBatch <= now) { 524 nextBatch = now + manager.delay; 525 try { 526 boolean errors = false; 527 DatabaseEntry key = new DatabaseEntry(); 528 final DatabaseEntry data = new DatabaseEntry(); 529 530 gate.close(); 531 OperationStatus status; 532 if (batchSize > 1) { 533 try { 534 errors = sendBatch(key, data); 535 } catch (final Exception ex) { 536 break; 537 } 538 } else { 539 Exception exception = null; 540 for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) { 541 exception = null; 542 Transaction txn = null; 543 Cursor cursor = null; 544 try { 545 txn = environment.beginTransaction(null, null); 546 cursor = database.openCursor(txn, null); 547 try { 548 status = cursor.getFirst(key, data, LockMode.RMW); 549 while (status == OperationStatus.SUCCESS) { 550 final SimpleEvent event = createEvent(data); 551 if (event != null) { 552 try { 553 manager.doSend(event); 554 } catch (final Exception ioe) { 555 errors = true; 556 LOGGER.error("Error sending event", ioe); 557 break; 558 } 559 try { 560 cursor.delete(); 561 } catch (final Exception ex) { 562 LOGGER.error("Unable to delete event", ex); 563 } 564 } 565 status = cursor.getNext(key, data, LockMode.RMW); 566 } 567 if (cursor != null) { 568 cursor.close(); 569 cursor = null; 570 } 571 txn.commit(); 572 txn = null; 573 dbCounter.decrementAndGet(); 574 exception = null; 575 break; 576 } catch (final LockConflictException lce) { 577 exception = lce; 578 // Fall through and retry. 579 } catch (final Exception ex) { 580 LOGGER.error("Error reading or writing to database", ex); 581 shutdown = true; 582 break; 583 } finally { 584 if (cursor != null) { 585 cursor.close(); 586 cursor = null; 587 } 588 if (txn != null) { 589 txn.abort(); 590 txn = null; 591 } 592 } 593 } catch (LockConflictException lce) { 594 exception = lce; 595 if (cursor != null) { 596 try { 597 cursor.close(); 598 cursor = null; 599 } catch (Exception ex) { 600 LOGGER.trace("Ignored exception closing cursor during lock conflict."); 601 } 602 } 603 if (txn != null) { 604 try { 605 txn.abort(); 606 txn = null; 607 } catch (Exception ex) { 608 LOGGER.trace("Ignored exception aborting tx during lock conflict."); 609 } 610 } 611 } 612 try { 613 Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS); 614 } catch (InterruptedException ie) { 615 // Ignore the error 616 } 617 } 618 if (exception != null) { 619 LOGGER.error("Unable to read or update data base", exception); 620 } 621 } 622 if (errors) { 623 Thread.sleep(manager.delay); 624 continue; 625 } 626 } catch (final Exception ex) { 627 LOGGER.warn("WriterThread encountered an exception. Continuing.", ex); 628 } 629 } else { 630 if (nextBatch <= now) { 631 nextBatch = now + manager.delay; 632 } 633 try { 634 final long interval = nextBatch - now; 635 gate.waitForOpen(interval); 636 } catch (final InterruptedException ie) { 637 LOGGER.warn("WriterThread interrupted, continuing"); 638 } catch (final Exception ex) { 639 LOGGER.error("WriterThread encountered an exception waiting for work", ex); 640 break; 641 } 642 } 643 } 644 645 if (batchSize > 1 && database.count() > 0) { 646 DatabaseEntry key = new DatabaseEntry(); 647 final DatabaseEntry data = new DatabaseEntry(); 648 try { 649 sendBatch(key, data); 650 } catch (final Exception ex) { 651 LOGGER.warn("Unable to write final batch"); 652 } 653 } 654 LOGGER.trace("WriterThread exiting"); 655 } 656 657 private boolean sendBatch(DatabaseEntry key, DatabaseEntry data) throws Exception { 658 boolean errors = false; 659 OperationStatus status; 660 Cursor cursor = database.openCursor(null, CursorConfig.DEFAULT); 661 try { 662 status = cursor.getFirst(key, data, null); 663 664 final BatchEvent batch = new BatchEvent(); 665 for (int i = 0; status == OperationStatus.SUCCESS && i < batchSize; ++i) { 666 final SimpleEvent event = createEvent(data); 667 if (event != null) { 668 batch.addEvent(event); 669 } 670 status = cursor.getNext(key, data, null); 671 } 672 try { 673 manager.send(batch); 674 } catch (final Exception ioe) { 675 LOGGER.error("Error sending events", ioe); 676 errors = true; 677 } 678 if (!errors) { 679 cursor.close(); 680 cursor = null; 681 Transaction txn = null; 682 Exception exception = null; 683 for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) { 684 try { 685 txn = environment.beginTransaction(null, null); 686 try { 687 for (final Event event : batch.getEvents()) { 688 try { 689 final Map<String, String> headers = event.getHeaders(); 690 key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8)); 691 database.delete(txn, key); 692 } catch (final Exception ex) { 693 LOGGER.error("Error deleting key from database", ex); 694 } 695 } 696 txn.commit(); 697 long count = dbCounter.get(); 698 while (!dbCounter.compareAndSet(count, count - batch.getEvents().size())) { 699 count = dbCounter.get(); 700 } 701 exception = null; 702 break; 703 } catch (final LockConflictException lce) { 704 exception = lce; 705 if (cursor != null) { 706 try { 707 cursor.close(); 708 cursor = null; 709 } catch (Exception ex) { 710 LOGGER.trace("Ignored exception closing cursor during lock conflict."); 711 } 712 } 713 if (txn != null) { 714 try { 715 txn.abort(); 716 txn = null; 717 } catch (Exception ex) { 718 LOGGER.trace("Ignored exception aborting transaction during lock conflict."); 719 } 720 } 721 } catch (final Exception ex) { 722 LOGGER.error("Unable to commit transaction", ex); 723 if (txn != null) { 724 txn.abort(); 725 } 726 } 727 } catch (LockConflictException lce) { 728 exception = lce; 729 if (cursor != null) { 730 try { 731 cursor.close(); 732 cursor = null; 733 } catch (Exception ex) { 734 LOGGER.trace("Ignored exception closing cursor during lock conflict."); 735 } 736 } 737 if (txn != null) { 738 try { 739 txn.abort(); 740 txn = null; 741 } catch (Exception ex) { 742 LOGGER.trace("Ignored exception aborting transaction during lock conflict."); 743 } 744 } 745 } finally { 746 if (cursor != null) { 747 cursor.close(); 748 cursor = null; 749 } 750 if (txn != null) { 751 txn.abort(); 752 txn = null; 753 } 754 } 755 try { 756 Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS); 757 } catch (InterruptedException ie) { 758 // Ignore the error 759 } 760 } 761 if (exception != null) { 762 LOGGER.error("Unable to delete events from data base", exception); 763 } 764 } 765 } catch (final Exception ex) { 766 LOGGER.error("Error reading database", ex); 767 shutdown = true; 768 throw ex; 769 } finally { 770 if (cursor != null) { 771 cursor.close(); 772 } 773 } 774 775 return errors; 776 } 777 778 private SimpleEvent createEvent(final DatabaseEntry data) { 779 final SimpleEvent event = new SimpleEvent(); 780 try { 781 byte[] eventData = data.getData(); 782 if (secretKey != null) { 783 final Cipher cipher = Cipher.getInstance("AES"); 784 cipher.init(Cipher.DECRYPT_MODE, secretKey); 785 eventData = cipher.doFinal(eventData); 786 } 787 final ByteArrayInputStream bais = new ByteArrayInputStream(eventData); 788 final DataInputStream dais = new DataInputStream(bais); 789 int length = dais.readInt(); 790 final byte[] bytes = new byte[length]; 791 dais.read(bytes, 0, length); 792 event.setBody(bytes); 793 length = dais.readInt(); 794 final Map<String, String> map = new HashMap<String, String>(length); 795 for (int i = 0; i < length; ++i) { 796 final String headerKey = dais.readUTF(); 797 final String value = dais.readUTF(); 798 map.put(headerKey, value); 799 } 800 event.setHeaders(map); 801 return event; 802 } catch (final Exception ex) { 803 LOGGER.error("Error retrieving event", ex); 804 return null; 805 } 806 } 807 808 } 809 810 /** 811 * Factory that creates Daemon threads that can be properly shut down. 812 */ 813 private static class DaemonThreadFactory implements ThreadFactory { 814 private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1); 815 private final ThreadGroup group; 816 private final AtomicInteger threadNumber = new AtomicInteger(1); 817 private final String namePrefix; 818 819 public DaemonThreadFactory() { 820 final SecurityManager securityManager = System.getSecurityManager(); 821 group = securityManager != null ? securityManager.getThreadGroup() : 822 Thread.currentThread().getThreadGroup(); 823 namePrefix = "DaemonPool-" + POOL_NUMBER.getAndIncrement() + "-thread-"; 824 } 825 826 @Override 827 public Thread newThread(final Runnable r) { 828 final Thread thread = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); 829 thread.setDaemon(true); 830 if (thread.getPriority() != Thread.NORM_PRIORITY) { 831 thread.setPriority(Thread.NORM_PRIORITY); 832 } 833 return thread; 834 } 835 } 836 837 /** 838 * An internal class. 839 */ 840 private static class Gate { 841 842 private boolean isOpen = false; 843 844 public boolean isOpen() { 845 return isOpen; 846 } 847 848 public synchronized void open() { 849 isOpen = true; 850 notifyAll(); 851 } 852 853 public synchronized void close() { 854 isOpen = false; 855 } 856 857 public synchronized void waitForOpen(long timeout) throws InterruptedException { 858 wait(timeout); 859 } 860 } 861}