001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017 package org.apache.logging.log4j.flume.appender; 018 019 import java.io.ByteArrayInputStream; 020 import java.io.ByteArrayOutputStream; 021 import java.io.DataInputStream; 022 import java.io.DataOutputStream; 023 import java.io.File; 024 import java.nio.charset.Charset; 025 import java.util.HashMap; 026 import java.util.Map; 027 import java.util.concurrent.Callable; 028 import java.util.concurrent.ExecutorService; 029 import java.util.concurrent.Executors; 030 import java.util.concurrent.Future; 031 import java.util.concurrent.ThreadFactory; 032 import java.util.concurrent.TimeUnit; 033 import java.util.concurrent.atomic.AtomicInteger; 034 import java.util.concurrent.atomic.AtomicLong; 035 036 import javax.crypto.Cipher; 037 import javax.crypto.SecretKey; 038 039 import org.apache.flume.Event; 040 import org.apache.flume.event.SimpleEvent; 041 import org.apache.logging.log4j.LoggingException; 042 import org.apache.logging.log4j.core.appender.ManagerFactory; 043 import org.apache.logging.log4j.core.config.Property; 044 import org.apache.logging.log4j.core.config.plugins.util.PluginManager; 045 import org.apache.logging.log4j.core.config.plugins.util.PluginType; 046 import org.apache.logging.log4j.core.util.FileUtils; 047 import org.apache.logging.log4j.core.util.SecretKeyProvider; 048 import org.apache.logging.log4j.util.Strings; 049 050 import com.sleepycat.je.Cursor; 051 import com.sleepycat.je.CursorConfig; 052 import com.sleepycat.je.Database; 053 import com.sleepycat.je.DatabaseConfig; 054 import com.sleepycat.je.DatabaseEntry; 055 import com.sleepycat.je.Environment; 056 import com.sleepycat.je.EnvironmentConfig; 057 import com.sleepycat.je.LockConflictException; 058 import com.sleepycat.je.LockMode; 059 import com.sleepycat.je.OperationStatus; 060 import com.sleepycat.je.StatsConfig; 061 import com.sleepycat.je.Transaction; 062 063 /** 064 * Manager that persists data to Berkeley DB before passing it on to Flume. 065 */ 066 public class FlumePersistentManager extends FlumeAvroManager { 067 068 /** Attribute name for the key provider. */ 069 public static final String KEY_PROVIDER = "keyProvider"; 070 071 private static final Charset UTF8 = Charset.forName("UTF-8"); 072 073 private static final String DEFAULT_DATA_DIR = ".log4j/flumeData"; 074 075 private static final int SHUTDOWN_WAIT = 60; 076 077 private static final int MILLIS_PER_SECOND = 1000; 078 079 private static final int LOCK_TIMEOUT_SLEEP_MILLIS = 500; 080 081 private static BDBManagerFactory factory = new BDBManagerFactory(); 082 083 private final Database database; 084 085 private final Environment environment; 086 087 private final WriterThread worker; 088 089 private final Gate gate = new Gate(); 090 091 private final SecretKey secretKey; 092 093 private final int delay; 094 095 private final int lockTimeoutRetryCount; 096 097 private final ExecutorService threadPool; 098 099 private final AtomicLong dbCount = new AtomicLong(); 100 101 /** 102 * Constructor 103 * @param name The unique name of this manager. 104 * @param shortName Original name for the Manager. 105 * @param agents An array of Agents. 106 * @param batchSize The number of events to include in a batch. 107 * @param retries The number of times to retry connecting before giving up. 108 * @param connectionTimeout The amount of time to wait for a connection to be established. 109 * @param requestTimeout The amount of time to wair for a response to a request. 110 * @param delay The amount of time to wait between retries. 111 * @param database The database to write to. 112 * @param environment The database environment. 113 * @param secretKey The SecretKey to use for encryption. 114 * @param lockTimeoutRetryCount The number of times to retry a lock timeout. 115 */ 116 protected FlumePersistentManager(final String name, final String shortName, final Agent[] agents, 117 final int batchSize, final int retries, final int connectionTimeout, 118 final int requestTimeout, final int delay, final Database database, 119 final Environment environment, final SecretKey secretKey, 120 final int lockTimeoutRetryCount) { 121 super(name, shortName, agents, batchSize, retries, connectionTimeout, requestTimeout); 122 this.delay = delay; 123 this.database = database; 124 this.environment = environment; 125 dbCount.set(database.count()); 126 this.worker = new WriterThread(database, environment, this, gate, batchSize, secretKey, dbCount, 127 lockTimeoutRetryCount); 128 this.worker.start(); 129 this.secretKey = secretKey; 130 this.threadPool = Executors.newCachedThreadPool(new DaemonThreadFactory()); 131 this.lockTimeoutRetryCount = lockTimeoutRetryCount; 132 } 133 134 135 /** 136 * Returns a FlumeAvroManager. 137 * @param name The name of the manager. 138 * @param agents The agents to use. 139 * @param properties Properties to pass to the Manager. 140 * @param batchSize The number of events to include in a batch. 141 * @param retries The number of times to retry connecting before giving up. 142 * @param connectionTimeout The amount of time to wait to establish a connection. 143 * @param requestTimeout The amount of time to wait for a response to a request. 144 * @param delay Amount of time to delay before delivering a batch. 145 * @param lockTimeoutRetryCount The number of times to retry after a lock timeout. 146 * @param dataDir The location of the Berkeley database. 147 * @return A FlumeAvroManager. 148 */ 149 public static FlumePersistentManager getManager(final String name, final Agent[] agents, 150 final Property[] properties, int batchSize, final int retries, 151 final int connectionTimeout, final int requestTimeout, 152 final int delay, final int lockTimeoutRetryCount, 153 final String dataDir) { 154 if (agents == null || agents.length == 0) { 155 throw new IllegalArgumentException("At least one agent is required"); 156 } 157 158 if (batchSize <= 0) { 159 batchSize = 1; 160 } 161 final String dataDirectory = Strings.isEmpty(dataDir) ? DEFAULT_DATA_DIR : dataDir; 162 163 final StringBuilder sb = new StringBuilder("FlumePersistent["); 164 boolean first = true; 165 for (final Agent agent : agents) { 166 if (!first) { 167 sb.append(','); 168 } 169 sb.append(agent.getHost()).append(':').append(agent.getPort()); 170 first = false; 171 } 172 sb.append(']'); 173 sb.append(' ').append(dataDirectory); 174 return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries, 175 connectionTimeout, requestTimeout, delay, lockTimeoutRetryCount, dataDir, properties)); 176 } 177 178 @Override 179 public void send(final Event event) { 180 if (worker.isShutdown()) { 181 throw new LoggingException("Unable to record event"); 182 } 183 184 final Map<String, String> headers = event.getHeaders(); 185 final byte[] keyData = headers.get(FlumeEvent.GUID).getBytes(UTF8); 186 try { 187 final ByteArrayOutputStream baos = new ByteArrayOutputStream(); 188 final DataOutputStream daos = new DataOutputStream(baos); 189 daos.writeInt(event.getBody().length); 190 daos.write(event.getBody(), 0, event.getBody().length); 191 daos.writeInt(event.getHeaders().size()); 192 for (final Map.Entry<String, String> entry : headers.entrySet()) { 193 daos.writeUTF(entry.getKey()); 194 daos.writeUTF(entry.getValue()); 195 } 196 byte[] eventData = baos.toByteArray(); 197 if (secretKey != null) { 198 final Cipher cipher = Cipher.getInstance("AES"); 199 cipher.init(Cipher.ENCRYPT_MODE, secretKey); 200 eventData = cipher.doFinal(eventData); 201 } 202 final Future<Integer> future = threadPool.submit(new BDBWriter(keyData, eventData, environment, database, 203 gate, dbCount, getBatchSize(), lockTimeoutRetryCount)); 204 boolean interrupted = false; 205 int count = 0; 206 do { 207 try { 208 future.get(); 209 } catch (final InterruptedException ie) { 210 interrupted = true; 211 ++count; 212 } 213 } while (interrupted && count <= 1); 214 215 } catch (final Exception ex) { 216 throw new LoggingException("Exception occurred writing log event", ex); 217 } 218 } 219 220 @Override 221 protected void releaseSub() { 222 LOGGER.debug("Shutting down FlumePersistentManager"); 223 worker.shutdown(); 224 try { 225 worker.join(SHUTDOWN_WAIT * MILLIS_PER_SECOND); 226 } catch (InterruptedException ie) { 227 // Ignore the exception and shutdown. 228 } 229 threadPool.shutdown(); 230 try { 231 threadPool.awaitTermination(SHUTDOWN_WAIT, TimeUnit.SECONDS); 232 } catch (final InterruptedException ie) { 233 LOGGER.warn("PersistentManager Thread pool failed to shut down"); 234 } 235 try { 236 worker.join(); 237 } catch (final InterruptedException ex) { 238 LOGGER.debug("Interrupted while waiting for worker to complete"); 239 } 240 try { 241 LOGGER.debug("FlumePersistenceManager dataset status: {}", database.getStats(new StatsConfig())); 242 database.close(); 243 } catch (final Exception ex) { 244 LOGGER.warn("Failed to close database", ex); 245 } 246 try { 247 environment.cleanLog(); 248 environment.close(); 249 } catch (final Exception ex) { 250 LOGGER.warn("Failed to close environment", ex); 251 } 252 super.releaseSub(); 253 } 254 255 private void doSend(final SimpleEvent event) { 256 LOGGER.debug("Sending event to Flume"); 257 super.send(event); 258 } 259 260 /** 261 * Thread for writing to Berkeley DB to avoid having interrupts close the database. 262 */ 263 private static class BDBWriter implements Callable<Integer> { 264 private final byte[] eventData; 265 private final byte[] keyData; 266 private final Environment environment; 267 private final Database database; 268 private final Gate gate; 269 private final AtomicLong dbCount; 270 private final long batchSize; 271 private final int lockTimeoutRetryCount; 272 273 public BDBWriter(final byte[] keyData, final byte[] eventData, final Environment environment, 274 final Database database, final Gate gate, final AtomicLong dbCount, final long batchSize, 275 final int lockTimeoutRetryCount) { 276 this.keyData = keyData; 277 this.eventData = eventData; 278 this.environment = environment; 279 this.database = database; 280 this.gate = gate; 281 this.dbCount = dbCount; 282 this.batchSize = batchSize; 283 this.lockTimeoutRetryCount = lockTimeoutRetryCount; 284 } 285 286 @Override 287 public Integer call() throws Exception { 288 final DatabaseEntry key = new DatabaseEntry(keyData); 289 final DatabaseEntry data = new DatabaseEntry(eventData); 290 Exception exception = null; 291 for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) { 292 Transaction txn = null; 293 try { 294 txn = environment.beginTransaction(null, null); 295 try { 296 database.put(txn, key, data); 297 txn.commit(); 298 txn = null; 299 if (dbCount.incrementAndGet() >= batchSize) { 300 gate.open(); 301 } 302 exception = null; 303 break; 304 } catch (final LockConflictException lce) { 305 exception = lce; 306 // Fall through and retry. 307 } catch (final Exception ex) { 308 if (txn != null) { 309 txn.abort(); 310 } 311 throw ex; 312 } finally { 313 if (txn != null) { 314 txn.abort(); 315 txn = null; 316 } 317 } 318 } catch (LockConflictException lce) { 319 exception = lce; 320 if (txn != null) { 321 try { 322 txn.abort(); 323 txn = null; 324 } catch (Exception ex) { 325 LOGGER.trace("Ignoring exception while aborting transaction during lock conflict."); 326 } 327 } 328 329 } 330 try { 331 Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS); 332 } catch (InterruptedException ie) { 333 // Ignore the error 334 } 335 } 336 if (exception != null) { 337 throw exception; 338 } 339 return eventData.length; 340 } 341 } 342 343 /** 344 * Factory data. 345 */ 346 private static class FactoryData { 347 private final String name; 348 private final Agent[] agents; 349 private final int batchSize; 350 private final String dataDir; 351 private final int retries; 352 private final int connectionTimeout; 353 private final int requestTimeout; 354 private final int delay; 355 private final int lockTimeoutRetryCount; 356 private final Property[] properties; 357 358 /** 359 * Constructor. 360 * @param name The name of the Appender. 361 * @param agents The agents. 362 * @param batchSize The number of events to include in a batch. 363 * @param dataDir The directory for data. 364 */ 365 public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries, 366 final int connectionTimeout, final int requestTimeout, final int delay, 367 final int lockTimeoutRetryCount, final String dataDir, final Property[] properties) { 368 this.name = name; 369 this.agents = agents; 370 this.batchSize = batchSize; 371 this.dataDir = dataDir; 372 this.retries = retries; 373 this.connectionTimeout = connectionTimeout; 374 this.requestTimeout = requestTimeout; 375 this.delay = delay; 376 this.lockTimeoutRetryCount = lockTimeoutRetryCount; 377 this.properties = properties; 378 } 379 } 380 381 /** 382 * Avro Manager Factory. 383 */ 384 private static class BDBManagerFactory implements ManagerFactory<FlumePersistentManager, FactoryData> { 385 386 /** 387 * Create the FlumeKratiManager. 388 * @param name The name of the entity to manage. 389 * @param data The data required to create the entity. 390 * @return The FlumeKratiManager. 391 */ 392 @Override 393 public FlumePersistentManager createManager(final String name, final FactoryData data) { 394 SecretKey secretKey = null; 395 Database database = null; 396 Environment environment = null; 397 398 final Map<String, String> properties = new HashMap<String, String>(); 399 if (data.properties != null) { 400 for (final Property property : data.properties) { 401 properties.put(property.getName(), property.getValue()); 402 } 403 } 404 405 try { 406 final File dir = new File(data.dataDir); 407 FileUtils.mkdir(dir, true); 408 final EnvironmentConfig dbEnvConfig = new EnvironmentConfig(); 409 dbEnvConfig.setTransactional(true); 410 dbEnvConfig.setAllowCreate(true); 411 dbEnvConfig.setLockTimeout(5, TimeUnit.SECONDS); 412 environment = new Environment(dir, dbEnvConfig); 413 final DatabaseConfig dbConfig = new DatabaseConfig(); 414 dbConfig.setTransactional(true); 415 dbConfig.setAllowCreate(true); 416 database = environment.openDatabase(null, name, dbConfig); 417 } catch (final Exception ex) { 418 LOGGER.error("Could not create FlumePersistentManager", ex); 419 // For consistency, close database as well as environment even though it should never happen since the 420 // database is that last thing in the block above, but this does guard against a future line being 421 // inserted at the end that would bomb (like some debug logging). 422 if (database != null) { 423 database.close(); 424 database = null; 425 } 426 if (environment != null) { 427 environment.close(); 428 environment = null; 429 } 430 return null; 431 } 432 433 try { 434 String key = null; 435 for (final Map.Entry<String, String> entry : properties.entrySet()) { 436 if (entry.getKey().equalsIgnoreCase(KEY_PROVIDER)) { 437 key = entry.getValue(); 438 break; 439 } 440 } 441 if (key != null) { 442 final PluginManager manager = new PluginManager("KeyProvider"); 443 manager.collectPlugins(); 444 final Map<String, PluginType<?>> plugins = manager.getPlugins(); 445 if (plugins != null) { 446 boolean found = false; 447 for (final Map.Entry<String, PluginType<?>> entry : plugins.entrySet()) { 448 if (entry.getKey().equalsIgnoreCase(key)) { 449 found = true; 450 final Class<?> cl = entry.getValue().getPluginClass(); 451 try { 452 final SecretKeyProvider provider = (SecretKeyProvider) cl.newInstance(); 453 secretKey = provider.getSecretKey(); 454 LOGGER.debug("Persisting events using SecretKeyProvider {}", cl.getName()); 455 } catch (final Exception ex) { 456 LOGGER.error("Unable to create SecretKeyProvider {}, encryption will be disabled", 457 cl.getName()); 458 } 459 break; 460 } 461 } 462 if (!found) { 463 LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key); 464 } 465 } else { 466 LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key); 467 } 468 } 469 } catch (final Exception ex) { 470 LOGGER.warn("Error setting up encryption - encryption will be disabled", ex); 471 } 472 return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries, 473 data.connectionTimeout, data.requestTimeout, data.delay, database, environment, secretKey, 474 data.lockTimeoutRetryCount); 475 } 476 } 477 478 /** 479 * Thread that sends data to Flume and pulls it from Berkeley DB. 480 */ 481 private static class WriterThread extends Thread { 482 private volatile boolean shutdown = false; 483 private final Database database; 484 private final Environment environment; 485 private final FlumePersistentManager manager; 486 private final Gate gate; 487 private final SecretKey secretKey; 488 private final int batchSize; 489 private final AtomicLong dbCounter; 490 private final int lockTimeoutRetryCount; 491 492 public WriterThread(final Database database, final Environment environment, 493 final FlumePersistentManager manager, final Gate gate, final int batchsize, 494 final SecretKey secretKey, final AtomicLong dbCount, final int lockTimeoutRetryCount) { 495 this.database = database; 496 this.environment = environment; 497 this.manager = manager; 498 this.gate = gate; 499 this.batchSize = batchsize; 500 this.secretKey = secretKey; 501 this.setDaemon(true); 502 this.dbCounter = dbCount; 503 this.lockTimeoutRetryCount = lockTimeoutRetryCount; 504 } 505 506 public void shutdown() { 507 LOGGER.debug("Writer thread shutting down"); 508 this.shutdown = true; 509 gate.open(); 510 } 511 512 public boolean isShutdown() { 513 return shutdown; 514 } 515 516 @Override 517 public void run() { 518 LOGGER.trace("WriterThread started - batch size = " + batchSize + ", delay = " + manager.delay); 519 long nextBatch = System.currentTimeMillis() + manager.delay; 520 while (!shutdown) { 521 long now = System.currentTimeMillis(); 522 long dbCount = database.count(); 523 dbCounter.set(dbCount); 524 if (dbCount >= batchSize || dbCount > 0 && nextBatch <= now) { 525 nextBatch = now + manager.delay; 526 try { 527 boolean errors = false; 528 DatabaseEntry key = new DatabaseEntry(); 529 final DatabaseEntry data = new DatabaseEntry(); 530 531 gate.close(); 532 OperationStatus status; 533 if (batchSize > 1) { 534 try { 535 errors = sendBatch(key, data); 536 } catch (final Exception ex) { 537 break; 538 } 539 } else { 540 Exception exception = null; 541 for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) { 542 exception = null; 543 Transaction txn = null; 544 Cursor cursor = null; 545 try { 546 txn = environment.beginTransaction(null, null); 547 cursor = database.openCursor(txn, null); 548 try { 549 status = cursor.getFirst(key, data, LockMode.RMW); 550 while (status == OperationStatus.SUCCESS) { 551 final SimpleEvent event = createEvent(data); 552 if (event != null) { 553 try { 554 manager.doSend(event); 555 } catch (final Exception ioe) { 556 errors = true; 557 LOGGER.error("Error sending event", ioe); 558 break; 559 } 560 try { 561 cursor.delete(); 562 } catch (final Exception ex) { 563 LOGGER.error("Unable to delete event", ex); 564 } 565 } 566 status = cursor.getNext(key, data, LockMode.RMW); 567 } 568 if (cursor != null) { 569 cursor.close(); 570 cursor = null; 571 } 572 txn.commit(); 573 txn = null; 574 dbCounter.decrementAndGet(); 575 exception = null; 576 break; 577 } catch (final LockConflictException lce) { 578 exception = lce; 579 // Fall through and retry. 580 } catch (final Exception ex) { 581 LOGGER.error("Error reading or writing to database", ex); 582 shutdown = true; 583 break; 584 } finally { 585 if (cursor != null) { 586 cursor.close(); 587 cursor = null; 588 } 589 if (txn != null) { 590 txn.abort(); 591 txn = null; 592 } 593 } 594 } catch (LockConflictException lce) { 595 exception = lce; 596 if (cursor != null) { 597 try { 598 cursor.close(); 599 cursor = null; 600 } catch (Exception ex) { 601 LOGGER.trace("Ignored exception closing cursor during lock conflict."); 602 } 603 } 604 if (txn != null) { 605 try { 606 txn.abort(); 607 txn = null; 608 } catch (Exception ex) { 609 LOGGER.trace("Ignored exception aborting tx during lock conflict."); 610 } 611 } 612 } 613 try { 614 Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS); 615 } catch (InterruptedException ie) { 616 // Ignore the error 617 } 618 } 619 if (exception != null) { 620 LOGGER.error("Unable to read or update data base", exception); 621 } 622 } 623 if (errors) { 624 Thread.sleep(manager.delay); 625 continue; 626 } 627 } catch (final Exception ex) { 628 LOGGER.warn("WriterThread encountered an exception. Continuing.", ex); 629 } 630 } else { 631 if (nextBatch <= now) { 632 nextBatch = now + manager.delay; 633 } 634 try { 635 final long interval = nextBatch - now; 636 gate.waitForOpen(interval); 637 } catch (final InterruptedException ie) { 638 LOGGER.warn("WriterThread interrupted, continuing"); 639 } catch (final Exception ex) { 640 LOGGER.error("WriterThread encountered an exception waiting for work", ex); 641 break; 642 } 643 } 644 } 645 646 if (batchSize > 1 && database.count() > 0) { 647 DatabaseEntry key = new DatabaseEntry(); 648 final DatabaseEntry data = new DatabaseEntry(); 649 try { 650 sendBatch(key, data); 651 } catch (final Exception ex) { 652 LOGGER.warn("Unable to write final batch"); 653 } 654 } 655 LOGGER.trace("WriterThread exiting"); 656 } 657 658 private boolean sendBatch(DatabaseEntry key, DatabaseEntry data) throws Exception { 659 boolean errors = false; 660 OperationStatus status; 661 Cursor cursor = database.openCursor(null, CursorConfig.DEFAULT); 662 try { 663 status = cursor.getFirst(key, data, null); 664 665 final BatchEvent batch = new BatchEvent(); 666 for (int i = 0; status == OperationStatus.SUCCESS && i < batchSize; ++i) { 667 final SimpleEvent event = createEvent(data); 668 if (event != null) { 669 batch.addEvent(event); 670 } 671 status = cursor.getNext(key, data, null); 672 } 673 try { 674 manager.send(batch); 675 } catch (final Exception ioe) { 676 LOGGER.error("Error sending events", ioe); 677 errors = true; 678 } 679 if (!errors) { 680 cursor.close(); 681 cursor = null; 682 Transaction txn = null; 683 Exception exception = null; 684 for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) { 685 try { 686 txn = environment.beginTransaction(null, null); 687 try { 688 for (final Event event : batch.getEvents()) { 689 try { 690 final Map<String, String> headers = event.getHeaders(); 691 key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8)); 692 database.delete(txn, key); 693 } catch (final Exception ex) { 694 LOGGER.error("Error deleting key from database", ex); 695 } 696 } 697 txn.commit(); 698 long count = dbCounter.get(); 699 while (!dbCounter.compareAndSet(count, count - batch.getEvents().size())) { 700 count = dbCounter.get(); 701 } 702 exception = null; 703 break; 704 } catch (final LockConflictException lce) { 705 exception = lce; 706 if (cursor != null) { 707 try { 708 cursor.close(); 709 cursor = null; 710 } catch (Exception ex) { 711 LOGGER.trace("Ignored exception closing cursor during lock conflict."); 712 } 713 } 714 if (txn != null) { 715 try { 716 txn.abort(); 717 txn = null; 718 } catch (Exception ex) { 719 LOGGER.trace("Ignored exception aborting transaction during lock conflict."); 720 } 721 } 722 } catch (final Exception ex) { 723 LOGGER.error("Unable to commit transaction", ex); 724 if (txn != null) { 725 txn.abort(); 726 } 727 } 728 } catch (LockConflictException lce) { 729 exception = lce; 730 if (cursor != null) { 731 try { 732 cursor.close(); 733 cursor = null; 734 } catch (Exception ex) { 735 LOGGER.trace("Ignored exception closing cursor during lock conflict."); 736 } 737 } 738 if (txn != null) { 739 try { 740 txn.abort(); 741 txn = null; 742 } catch (Exception ex) { 743 LOGGER.trace("Ignored exception aborting transaction during lock conflict."); 744 } 745 } 746 } finally { 747 if (cursor != null) { 748 cursor.close(); 749 cursor = null; 750 } 751 if (txn != null) { 752 txn.abort(); 753 txn = null; 754 } 755 } 756 try { 757 Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS); 758 } catch (InterruptedException ie) { 759 // Ignore the error 760 } 761 } 762 if (exception != null) { 763 LOGGER.error("Unable to delete events from data base", exception); 764 } 765 } 766 } catch (final Exception ex) { 767 LOGGER.error("Error reading database", ex); 768 shutdown = true; 769 throw ex; 770 } finally { 771 if (cursor != null) { 772 cursor.close(); 773 } 774 } 775 776 return errors; 777 } 778 779 private SimpleEvent createEvent(final DatabaseEntry data) { 780 final SimpleEvent event = new SimpleEvent(); 781 try { 782 byte[] eventData = data.getData(); 783 if (secretKey != null) { 784 final Cipher cipher = Cipher.getInstance("AES"); 785 cipher.init(Cipher.DECRYPT_MODE, secretKey); 786 eventData = cipher.doFinal(eventData); 787 } 788 final ByteArrayInputStream bais = new ByteArrayInputStream(eventData); 789 final DataInputStream dais = new DataInputStream(bais); 790 int length = dais.readInt(); 791 final byte[] bytes = new byte[length]; 792 dais.read(bytes, 0, length); 793 event.setBody(bytes); 794 length = dais.readInt(); 795 final Map<String, String> map = new HashMap<String, String>(length); 796 for (int i = 0; i < length; ++i) { 797 final String headerKey = dais.readUTF(); 798 final String value = dais.readUTF(); 799 map.put(headerKey, value); 800 } 801 event.setHeaders(map); 802 return event; 803 } catch (final Exception ex) { 804 LOGGER.error("Error retrieving event", ex); 805 return null; 806 } 807 } 808 809 } 810 811 /** 812 * Factory that creates Daemon threads that can be properly shut down. 813 */ 814 private static class DaemonThreadFactory implements ThreadFactory { 815 private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1); 816 private final ThreadGroup group; 817 private final AtomicInteger threadNumber = new AtomicInteger(1); 818 private final String namePrefix; 819 820 public DaemonThreadFactory() { 821 final SecurityManager securityManager = System.getSecurityManager(); 822 group = securityManager != null ? securityManager.getThreadGroup() : 823 Thread.currentThread().getThreadGroup(); 824 namePrefix = "DaemonPool-" + POOL_NUMBER.getAndIncrement() + "-thread-"; 825 } 826 827 @Override 828 public Thread newThread(final Runnable r) { 829 final Thread thread = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); 830 thread.setDaemon(true); 831 if (thread.getPriority() != Thread.NORM_PRIORITY) { 832 thread.setPriority(Thread.NORM_PRIORITY); 833 } 834 return thread; 835 } 836 } 837 838 /** 839 * An internal class. 840 */ 841 private static class Gate { 842 843 private boolean isOpen = false; 844 845 public boolean isOpen() { 846 return isOpen; 847 } 848 849 public synchronized void open() { 850 isOpen = true; 851 notifyAll(); 852 } 853 854 public synchronized void close() { 855 isOpen = false; 856 } 857 858 public synchronized void waitForOpen(long timeout) throws InterruptedException { 859 wait(timeout); 860 } 861 } 862 }