001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017package org.apache.logging.log4j.flume.appender; 018 019import java.io.ByteArrayInputStream; 020import java.io.ByteArrayOutputStream; 021import java.io.DataInputStream; 022import java.io.DataOutputStream; 023import java.io.File; 024import java.nio.charset.Charset; 025import java.nio.charset.StandardCharsets; 026import java.util.HashMap; 027import java.util.Map; 028import java.util.concurrent.Callable; 029import java.util.concurrent.ExecutorService; 030import java.util.concurrent.Executors; 031import java.util.concurrent.Future; 032import java.util.concurrent.ThreadFactory; 033import java.util.concurrent.TimeUnit; 034import java.util.concurrent.atomic.AtomicInteger; 035import java.util.concurrent.atomic.AtomicLong; 036 037import javax.crypto.Cipher; 038import javax.crypto.SecretKey; 039 040import org.apache.flume.Event; 041import org.apache.flume.event.SimpleEvent; 042import org.apache.logging.log4j.LoggingException; 043import org.apache.logging.log4j.core.appender.ManagerFactory; 044import org.apache.logging.log4j.core.config.Property; 045import org.apache.logging.log4j.core.config.plugins.util.PluginManager; 046import org.apache.logging.log4j.core.config.plugins.util.PluginType; 047import org.apache.logging.log4j.core.util.FileUtils; 048import org.apache.logging.log4j.core.util.SecretKeyProvider; 049import org.apache.logging.log4j.util.Strings; 050 051import com.sleepycat.je.Cursor; 052import com.sleepycat.je.CursorConfig; 053import com.sleepycat.je.Database; 054import com.sleepycat.je.DatabaseConfig; 055import com.sleepycat.je.DatabaseEntry; 056import com.sleepycat.je.Environment; 057import com.sleepycat.je.EnvironmentConfig; 058import com.sleepycat.je.LockConflictException; 059import com.sleepycat.je.LockMode; 060import com.sleepycat.je.OperationStatus; 061import com.sleepycat.je.StatsConfig; 062import com.sleepycat.je.Transaction; 063 064/** 065 * Manager that persists data to Berkeley DB before passing it on to Flume. 066 */ 067public class FlumePersistentManager extends FlumeAvroManager { 068 069 /** Attribute name for the key provider. */ 070 public static final String KEY_PROVIDER = "keyProvider"; 071 072 private static final Charset UTF8 = StandardCharsets.UTF_8; 073 074 private static final String DEFAULT_DATA_DIR = ".log4j/flumeData"; 075 076 private static final int SHUTDOWN_WAIT = 60; 077 078 private static final int MILLIS_PER_SECOND = 1000; 079 080 private static final int LOCK_TIMEOUT_SLEEP_MILLIS = 500; 081 082 private static BDBManagerFactory factory = new BDBManagerFactory(); 083 084 private final Database database; 085 086 private final Environment environment; 087 088 private final WriterThread worker; 089 090 private final Gate gate = new Gate(); 091 092 private final SecretKey secretKey; 093 094 private final int lockTimeoutRetryCount; 095 096 private final ExecutorService threadPool; 097 098 private final AtomicLong dbCount = new AtomicLong(); 099 100 /** 101 * Constructor 102 * @param name The unique name of this manager. 103 * @param shortName Original name for the Manager. 104 * @param agents An array of Agents. 105 * @param batchSize The number of events to include in a batch. 106 * @param retries The number of times to retry connecting before giving up. 107 * @param connectionTimeout The amount of time to wait for a connection to be established. 108 * @param requestTimeout The amount of time to wair for a response to a request. 109 * @param delay The amount of time to wait between retries. 110 * @param database The database to write to. 111 * @param environment The database environment. 112 * @param secretKey The SecretKey to use for encryption. 113 * @param lockTimeoutRetryCount The number of times to retry a lock timeout. 114 */ 115 protected FlumePersistentManager(final String name, final String shortName, final Agent[] agents, 116 final int batchSize, final int retries, final int connectionTimeout, 117 final int requestTimeout, final int delay, final Database database, 118 final Environment environment, final SecretKey secretKey, 119 final int lockTimeoutRetryCount) { 120 super(name, shortName, agents, batchSize, delay, retries, connectionTimeout, requestTimeout); 121 this.database = database; 122 this.environment = environment; 123 dbCount.set(database.count()); 124 this.worker = new WriterThread(database, environment, this, gate, batchSize, secretKey, dbCount, 125 lockTimeoutRetryCount); 126 this.worker.start(); 127 this.secretKey = secretKey; 128 this.threadPool = Executors.newCachedThreadPool(new DaemonThreadFactory()); 129 this.lockTimeoutRetryCount = lockTimeoutRetryCount; 130 } 131 132 133 /** 134 * Returns a FlumeAvroManager. 135 * @param name The name of the manager. 136 * @param agents The agents to use. 137 * @param properties Properties to pass to the Manager. 138 * @param batchSize The number of events to include in a batch. 139 * @param retries The number of times to retry connecting before giving up. 140 * @param connectionTimeout The amount of time to wait to establish a connection. 141 * @param requestTimeout The amount of time to wait for a response to a request. 142 * @param delayMillis Amount of time to delay before delivering a batch. 143 * @param lockTimeoutRetryCount The number of times to retry after a lock timeout. 144 * @param dataDir The location of the Berkeley database. 145 * @return A FlumeAvroManager. 146 */ 147 public static FlumePersistentManager getManager(final String name, final Agent[] agents, 148 final Property[] properties, int batchSize, final int retries, 149 final int connectionTimeout, final int requestTimeout, 150 final int delayMillis, final int lockTimeoutRetryCount, 151 final String dataDir) { 152 if (agents == null || agents.length == 0) { 153 throw new IllegalArgumentException("At least one agent is required"); 154 } 155 156 if (batchSize <= 0) { 157 batchSize = 1; 158 } 159 final String dataDirectory = Strings.isEmpty(dataDir) ? DEFAULT_DATA_DIR : dataDir; 160 161 final StringBuilder sb = new StringBuilder("FlumePersistent["); 162 boolean first = true; 163 for (final Agent agent : agents) { 164 if (!first) { 165 sb.append(','); 166 } 167 sb.append(agent.getHost()).append(':').append(agent.getPort()); 168 first = false; 169 } 170 sb.append(']'); 171 sb.append(' ').append(dataDirectory); 172 return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries, 173 connectionTimeout, requestTimeout, delayMillis, lockTimeoutRetryCount, dataDir, properties)); 174 } 175 176 @Override 177 public void send(final Event event) { 178 if (worker.isShutdown()) { 179 throw new LoggingException("Unable to record event"); 180 } 181 182 final Map<String, String> headers = event.getHeaders(); 183 final byte[] keyData = headers.get(FlumeEvent.GUID).getBytes(UTF8); 184 try { 185 final ByteArrayOutputStream baos = new ByteArrayOutputStream(); 186 final DataOutputStream daos = new DataOutputStream(baos); 187 daos.writeInt(event.getBody().length); 188 daos.write(event.getBody(), 0, event.getBody().length); 189 daos.writeInt(event.getHeaders().size()); 190 for (final Map.Entry<String, String> entry : headers.entrySet()) { 191 daos.writeUTF(entry.getKey()); 192 daos.writeUTF(entry.getValue()); 193 } 194 byte[] eventData = baos.toByteArray(); 195 if (secretKey != null) { 196 final Cipher cipher = Cipher.getInstance("AES"); 197 cipher.init(Cipher.ENCRYPT_MODE, secretKey); 198 eventData = cipher.doFinal(eventData); 199 } 200 final Future<Integer> future = threadPool.submit(new BDBWriter(keyData, eventData, environment, database, 201 gate, dbCount, getBatchSize(), lockTimeoutRetryCount)); 202 boolean interrupted = false; 203 int ieCount = 0; 204 do { 205 try { 206 future.get(); 207 } catch (final InterruptedException ie) { 208 interrupted = true; 209 ++ieCount; 210 } 211 } while (interrupted && ieCount <= 1); 212 213 } catch (final Exception ex) { 214 throw new LoggingException("Exception occurred writing log event", ex); 215 } 216 } 217 218 @Override 219 protected void releaseSub() { 220 LOGGER.debug("Shutting down FlumePersistentManager"); 221 worker.shutdown(); 222 try { 223 worker.join(SHUTDOWN_WAIT * MILLIS_PER_SECOND); 224 } catch (final InterruptedException ie) { 225 // Ignore the exception and shutdown. 226 } 227 threadPool.shutdown(); 228 try { 229 threadPool.awaitTermination(SHUTDOWN_WAIT, TimeUnit.SECONDS); 230 } catch (final InterruptedException ie) { 231 LOGGER.warn("PersistentManager Thread pool failed to shut down"); 232 } 233 try { 234 worker.join(); 235 } catch (final InterruptedException ex) { 236 LOGGER.debug("Interrupted while waiting for worker to complete"); 237 } 238 try { 239 LOGGER.debug("FlumePersistenceManager dataset status: {}", database.getStats(new StatsConfig())); 240 database.close(); 241 } catch (final Exception ex) { 242 LOGGER.warn("Failed to close database", ex); 243 } 244 try { 245 environment.cleanLog(); 246 environment.close(); 247 } catch (final Exception ex) { 248 LOGGER.warn("Failed to close environment", ex); 249 } 250 super.releaseSub(); 251 } 252 253 private void doSend(final SimpleEvent event) { 254 LOGGER.debug("Sending event to Flume"); 255 super.send(event); 256 } 257 258 /** 259 * Thread for writing to Berkeley DB to avoid having interrupts close the database. 260 */ 261 private static class BDBWriter implements Callable<Integer> { 262 private final byte[] eventData; 263 private final byte[] keyData; 264 private final Environment environment; 265 private final Database database; 266 private final Gate gate; 267 private final AtomicLong dbCount; 268 private final long batchSize; 269 private final int lockTimeoutRetryCount; 270 271 public BDBWriter(final byte[] keyData, final byte[] eventData, final Environment environment, 272 final Database database, final Gate gate, final AtomicLong dbCount, final long batchSize, 273 final int lockTimeoutRetryCount) { 274 this.keyData = keyData; 275 this.eventData = eventData; 276 this.environment = environment; 277 this.database = database; 278 this.gate = gate; 279 this.dbCount = dbCount; 280 this.batchSize = batchSize; 281 this.lockTimeoutRetryCount = lockTimeoutRetryCount; 282 } 283 284 @Override 285 public Integer call() throws Exception { 286 final DatabaseEntry key = new DatabaseEntry(keyData); 287 final DatabaseEntry data = new DatabaseEntry(eventData); 288 Exception exception = null; 289 for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) { 290 Transaction txn = null; 291 try { 292 txn = environment.beginTransaction(null, null); 293 try { 294 database.put(txn, key, data); 295 txn.commit(); 296 txn = null; 297 if (dbCount.incrementAndGet() >= batchSize) { 298 gate.open(); 299 } 300 exception = null; 301 break; 302 } catch (final LockConflictException lce) { 303 exception = lce; 304 // Fall through and retry. 305 } catch (final Exception ex) { 306 if (txn != null) { 307 txn.abort(); 308 } 309 throw ex; 310 } finally { 311 if (txn != null) { 312 txn.abort(); 313 txn = null; 314 } 315 } 316 } catch (final LockConflictException lce) { 317 exception = lce; 318 if (txn != null) { 319 try { 320 txn.abort(); 321 txn = null; 322 } catch (final Exception ex) { 323 LOGGER.trace("Ignoring exception while aborting transaction during lock conflict."); 324 } 325 } 326 327 } 328 try { 329 Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS); 330 } catch (final InterruptedException ie) { 331 // Ignore the error 332 } 333 } 334 if (exception != null) { 335 throw exception; 336 } 337 return eventData.length; 338 } 339 } 340 341 /** 342 * Factory data. 343 */ 344 private static class FactoryData { 345 private final String name; 346 private final Agent[] agents; 347 private final int batchSize; 348 private final String dataDir; 349 private final int retries; 350 private final int connectionTimeout; 351 private final int requestTimeout; 352 private final int delayMillis; 353 private final int lockTimeoutRetryCount; 354 private final Property[] properties; 355 356 /** 357 * Constructor. 358 * @param name The name of the Appender. 359 * @param agents The agents. 360 * @param batchSize The number of events to include in a batch. 361 * @param dataDir The directory for data. 362 */ 363 public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries, 364 final int connectionTimeout, final int requestTimeout, final int delayMillis, 365 final int lockTimeoutRetryCount, final String dataDir, final Property[] properties) { 366 this.name = name; 367 this.agents = agents; 368 this.batchSize = batchSize; 369 this.dataDir = dataDir; 370 this.retries = retries; 371 this.connectionTimeout = connectionTimeout; 372 this.requestTimeout = requestTimeout; 373 this.delayMillis = delayMillis; 374 this.lockTimeoutRetryCount = lockTimeoutRetryCount; 375 this.properties = properties; 376 } 377 } 378 379 /** 380 * Avro Manager Factory. 381 */ 382 private static class BDBManagerFactory implements ManagerFactory<FlumePersistentManager, FactoryData> { 383 384 /** 385 * Create the FlumeKratiManager. 386 * @param name The name of the entity to manage. 387 * @param data The data required to create the entity. 388 * @return The FlumeKratiManager. 389 */ 390 @Override 391 public FlumePersistentManager createManager(final String name, final FactoryData data) { 392 SecretKey secretKey = null; 393 Database database = null; 394 Environment environment = null; 395 396 final Map<String, String> properties = new HashMap<>(); 397 if (data.properties != null) { 398 for (final Property property : data.properties) { 399 properties.put(property.getName(), property.getValue()); 400 } 401 } 402 403 try { 404 final File dir = new File(data.dataDir); 405 FileUtils.mkdir(dir, true); 406 final EnvironmentConfig dbEnvConfig = new EnvironmentConfig(); 407 dbEnvConfig.setTransactional(true); 408 dbEnvConfig.setAllowCreate(true); 409 dbEnvConfig.setLockTimeout(5, TimeUnit.SECONDS); 410 environment = new Environment(dir, dbEnvConfig); 411 final DatabaseConfig dbConfig = new DatabaseConfig(); 412 dbConfig.setTransactional(true); 413 dbConfig.setAllowCreate(true); 414 database = environment.openDatabase(null, name, dbConfig); 415 } catch (final Exception ex) { 416 LOGGER.error("Could not create FlumePersistentManager", ex); 417 // For consistency, close database as well as environment even though it should never happen since the 418 // database is that last thing in the block above, but this does guard against a future line being 419 // inserted at the end that would bomb (like some debug logging). 420 if (database != null) { 421 database.close(); 422 database = null; 423 } 424 if (environment != null) { 425 environment.close(); 426 environment = null; 427 } 428 return null; 429 } 430 431 try { 432 String key = null; 433 for (final Map.Entry<String, String> entry : properties.entrySet()) { 434 if (entry.getKey().equalsIgnoreCase(KEY_PROVIDER)) { 435 key = entry.getValue(); 436 break; 437 } 438 } 439 if (key != null) { 440 final PluginManager manager = new PluginManager("KeyProvider"); 441 manager.collectPlugins(); 442 final Map<String, PluginType<?>> plugins = manager.getPlugins(); 443 if (plugins != null) { 444 boolean found = false; 445 for (final Map.Entry<String, PluginType<?>> entry : plugins.entrySet()) { 446 if (entry.getKey().equalsIgnoreCase(key)) { 447 found = true; 448 final Class<?> cl = entry.getValue().getPluginClass(); 449 try { 450 final SecretKeyProvider provider = (SecretKeyProvider) cl.newInstance(); 451 secretKey = provider.getSecretKey(); 452 LOGGER.debug("Persisting events using SecretKeyProvider {}", cl.getName()); 453 } catch (final Exception ex) { 454 LOGGER.error("Unable to create SecretKeyProvider {}, encryption will be disabled", 455 cl.getName()); 456 } 457 break; 458 } 459 } 460 if (!found) { 461 LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key); 462 } 463 } else { 464 LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key); 465 } 466 } 467 } catch (final Exception ex) { 468 LOGGER.warn("Error setting up encryption - encryption will be disabled", ex); 469 } 470 return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries, 471 data.connectionTimeout, data.requestTimeout, data.delayMillis, database, environment, secretKey, 472 data.lockTimeoutRetryCount); 473 } 474 } 475 476 /** 477 * Thread that sends data to Flume and pulls it from Berkeley DB. 478 */ 479 private static class WriterThread extends Thread { 480 private volatile boolean shutdown = false; 481 private final Database database; 482 private final Environment environment; 483 private final FlumePersistentManager manager; 484 private final Gate gate; 485 private final SecretKey secretKey; 486 private final int batchSize; 487 private final AtomicLong dbCounter; 488 private final int lockTimeoutRetryCount; 489 490 public WriterThread(final Database database, final Environment environment, 491 final FlumePersistentManager manager, final Gate gate, final int batchsize, 492 final SecretKey secretKey, final AtomicLong dbCount, final int lockTimeoutRetryCount) { 493 this.database = database; 494 this.environment = environment; 495 this.manager = manager; 496 this.gate = gate; 497 this.batchSize = batchsize; 498 this.secretKey = secretKey; 499 this.setDaemon(true); 500 this.dbCounter = dbCount; 501 this.lockTimeoutRetryCount = lockTimeoutRetryCount; 502 } 503 504 public void shutdown() { 505 LOGGER.debug("Writer thread shutting down"); 506 this.shutdown = true; 507 gate.open(); 508 } 509 510 public boolean isShutdown() { 511 return shutdown; 512 } 513 514 @Override 515 public void run() { 516 LOGGER.trace("WriterThread started - batch size = " + batchSize + ", delayMillis = " + manager.getDelayMillis()); 517 long nextBatchMillis = System.currentTimeMillis() + manager.getDelayMillis(); 518 while (!shutdown) { 519 final long nowMillis = System.currentTimeMillis(); 520 final long dbCount = database.count(); 521 dbCounter.set(dbCount); 522 if (dbCount >= batchSize || dbCount > 0 && nextBatchMillis <= nowMillis) { 523 nextBatchMillis = nowMillis + manager.getDelayMillis(); 524 try { 525 boolean errors = false; 526 final DatabaseEntry key = new DatabaseEntry(); 527 final DatabaseEntry data = new DatabaseEntry(); 528 529 gate.close(); 530 OperationStatus status; 531 if (batchSize > 1) { 532 try { 533 errors = sendBatch(key, data); 534 } catch (final Exception ex) { 535 break; 536 } 537 } else { 538 Exception exception = null; 539 for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) { 540 exception = null; 541 Transaction txn = null; 542 Cursor cursor = null; 543 try { 544 txn = environment.beginTransaction(null, null); 545 cursor = database.openCursor(txn, null); 546 try { 547 status = cursor.getFirst(key, data, LockMode.RMW); 548 while (status == OperationStatus.SUCCESS) { 549 final SimpleEvent event = createEvent(data); 550 if (event != null) { 551 try { 552 manager.doSend(event); 553 } catch (final Exception ioe) { 554 errors = true; 555 LOGGER.error("Error sending event", ioe); 556 break; 557 } 558 try { 559 cursor.delete(); 560 } catch (final Exception ex) { 561 LOGGER.error("Unable to delete event", ex); 562 } 563 } 564 status = cursor.getNext(key, data, LockMode.RMW); 565 } 566 if (cursor != null) { 567 cursor.close(); 568 cursor = null; 569 } 570 txn.commit(); 571 txn = null; 572 dbCounter.decrementAndGet(); 573 exception = null; 574 break; 575 } catch (final LockConflictException lce) { 576 exception = lce; 577 // Fall through and retry. 578 } catch (final Exception ex) { 579 LOGGER.error("Error reading or writing to database", ex); 580 shutdown = true; 581 break; 582 } finally { 583 if (cursor != null) { 584 cursor.close(); 585 cursor = null; 586 } 587 if (txn != null) { 588 txn.abort(); 589 txn = null; 590 } 591 } 592 } catch (final LockConflictException lce) { 593 exception = lce; 594 if (cursor != null) { 595 try { 596 cursor.close(); 597 cursor = null; 598 } catch (final Exception ex) { 599 LOGGER.trace("Ignored exception closing cursor during lock conflict."); 600 } 601 } 602 if (txn != null) { 603 try { 604 txn.abort(); 605 txn = null; 606 } catch (final Exception ex) { 607 LOGGER.trace("Ignored exception aborting tx during lock conflict."); 608 } 609 } 610 } 611 try { 612 Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS); 613 } catch (final InterruptedException ie) { 614 // Ignore the error 615 } 616 } 617 if (exception != null) { 618 LOGGER.error("Unable to read or update data base", exception); 619 } 620 } 621 if (errors) { 622 Thread.sleep(manager.getDelayMillis()); 623 continue; 624 } 625 } catch (final Exception ex) { 626 LOGGER.warn("WriterThread encountered an exception. Continuing.", ex); 627 } 628 } else { 629 if (nextBatchMillis <= nowMillis) { 630 nextBatchMillis = nowMillis + manager.getDelayMillis(); 631 } 632 try { 633 final long interval = nextBatchMillis - nowMillis; 634 gate.waitForOpen(interval); 635 } catch (final InterruptedException ie) { 636 LOGGER.warn("WriterThread interrupted, continuing"); 637 } catch (final Exception ex) { 638 LOGGER.error("WriterThread encountered an exception waiting for work", ex); 639 break; 640 } 641 } 642 } 643 644 if (batchSize > 1 && database.count() > 0) { 645 final DatabaseEntry key = new DatabaseEntry(); 646 final DatabaseEntry data = new DatabaseEntry(); 647 try { 648 sendBatch(key, data); 649 } catch (final Exception ex) { 650 LOGGER.warn("Unable to write final batch"); 651 } 652 } 653 LOGGER.trace("WriterThread exiting"); 654 } 655 656 private boolean sendBatch(DatabaseEntry key, final DatabaseEntry data) throws Exception { 657 boolean errors = false; 658 OperationStatus status; 659 Cursor cursor = null; 660 try { 661 final BatchEvent batch = new BatchEvent(); 662 for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) { 663 try { 664 cursor = database.openCursor(null, CursorConfig.DEFAULT); 665 status = cursor.getFirst(key, data, null); 666 667 for (int i = 0; status == OperationStatus.SUCCESS && i < batchSize; ++i) { 668 final SimpleEvent event = createEvent(data); 669 if (event != null) { 670 batch.addEvent(event); 671 } 672 status = cursor.getNext(key, data, null); 673 } 674 break; 675 } catch (final LockConflictException lce) { 676 if (cursor != null) { 677 try { 678 cursor.close(); 679 cursor = null; 680 } catch (final Exception ex) { 681 LOGGER.trace("Ignored exception closing cursor during lock conflict."); 682 } 683 } 684 } 685 } 686 687 try { 688 manager.send(batch); 689 } catch (final Exception ioe) { 690 LOGGER.error("Error sending events", ioe); 691 errors = true; 692 } 693 if (!errors) { 694 if (cursor != null) { 695 cursor.close(); 696 cursor = null; 697 } 698 Transaction txn = null; 699 Exception exception = null; 700 for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) { 701 try { 702 txn = environment.beginTransaction(null, null); 703 try { 704 for (final Event event : batch.getEvents()) { 705 try { 706 final Map<String, String> headers = event.getHeaders(); 707 key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8)); 708 database.delete(txn, key); 709 } catch (final Exception ex) { 710 LOGGER.error("Error deleting key from database", ex); 711 } 712 } 713 txn.commit(); 714 long count = dbCounter.get(); 715 while (!dbCounter.compareAndSet(count, count - batch.getEvents().size())) { 716 count = dbCounter.get(); 717 } 718 exception = null; 719 break; 720 } catch (final LockConflictException lce) { 721 exception = lce; 722 if (cursor != null) { 723 try { 724 cursor.close(); 725 cursor = null; 726 } catch (final Exception ex) { 727 LOGGER.trace("Ignored exception closing cursor during lock conflict."); 728 } 729 } 730 if (txn != null) { 731 try { 732 txn.abort(); 733 txn = null; 734 } catch (final Exception ex) { 735 LOGGER.trace("Ignored exception aborting transaction during lock conflict."); 736 } 737 } 738 } catch (final Exception ex) { 739 LOGGER.error("Unable to commit transaction", ex); 740 if (txn != null) { 741 txn.abort(); 742 } 743 } 744 } catch (final LockConflictException lce) { 745 exception = lce; 746 if (cursor != null) { 747 try { 748 cursor.close(); 749 cursor = null; 750 } catch (final Exception ex) { 751 LOGGER.trace("Ignored exception closing cursor during lock conflict."); 752 } 753 } 754 if (txn != null) { 755 try { 756 txn.abort(); 757 txn = null; 758 } catch (final Exception ex) { 759 LOGGER.trace("Ignored exception aborting transaction during lock conflict."); 760 } 761 } 762 } finally { 763 if (cursor != null) { 764 cursor.close(); 765 cursor = null; 766 } 767 if (txn != null) { 768 txn.abort(); 769 txn = null; 770 } 771 } 772 try { 773 Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS); 774 } catch (final InterruptedException ie) { 775 // Ignore the error 776 } 777 } 778 if (exception != null) { 779 LOGGER.error("Unable to delete events from data base", exception); 780 } 781 } 782 } catch (final Exception ex) { 783 LOGGER.error("Error reading database", ex); 784 shutdown = true; 785 throw ex; 786 } finally { 787 if (cursor != null) { 788 cursor.close(); 789 } 790 } 791 792 return errors; 793 } 794 795 private SimpleEvent createEvent(final DatabaseEntry data) { 796 final SimpleEvent event = new SimpleEvent(); 797 try { 798 byte[] eventData = data.getData(); 799 if (secretKey != null) { 800 final Cipher cipher = Cipher.getInstance("AES"); 801 cipher.init(Cipher.DECRYPT_MODE, secretKey); 802 eventData = cipher.doFinal(eventData); 803 } 804 final ByteArrayInputStream bais = new ByteArrayInputStream(eventData); 805 final DataInputStream dais = new DataInputStream(bais); 806 int length = dais.readInt(); 807 final byte[] bytes = new byte[length]; 808 dais.read(bytes, 0, length); 809 event.setBody(bytes); 810 length = dais.readInt(); 811 final Map<String, String> map = new HashMap<>(length); 812 for (int i = 0; i < length; ++i) { 813 final String headerKey = dais.readUTF(); 814 final String value = dais.readUTF(); 815 map.put(headerKey, value); 816 } 817 event.setHeaders(map); 818 return event; 819 } catch (final Exception ex) { 820 LOGGER.error("Error retrieving event", ex); 821 return null; 822 } 823 } 824 825 } 826 827 /** 828 * Factory that creates Daemon threads that can be properly shut down. 829 */ 830 private static class DaemonThreadFactory implements ThreadFactory { 831 private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1); 832 private final ThreadGroup group; 833 private final AtomicInteger threadNumber = new AtomicInteger(1); 834 private final String namePrefix; 835 836 public DaemonThreadFactory() { 837 final SecurityManager securityManager = System.getSecurityManager(); 838 group = securityManager != null ? securityManager.getThreadGroup() : 839 Thread.currentThread().getThreadGroup(); 840 namePrefix = "DaemonPool-" + POOL_NUMBER.getAndIncrement() + "-thread-"; 841 } 842 843 @Override 844 public Thread newThread(final Runnable r) { 845 final Thread thread = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); 846 thread.setDaemon(true); 847 if (thread.getPriority() != Thread.NORM_PRIORITY) { 848 thread.setPriority(Thread.NORM_PRIORITY); 849 } 850 return thread; 851 } 852 } 853 854 /** 855 * An internal class. 856 */ 857 private static class Gate { 858 859 private boolean isOpen = false; 860 861 public boolean isOpen() { 862 return isOpen; 863 } 864 865 public synchronized void open() { 866 isOpen = true; 867 notifyAll(); 868 } 869 870 public synchronized void close() { 871 isOpen = false; 872 } 873 874 public synchronized void waitForOpen(final long timeout) throws InterruptedException { 875 wait(timeout); 876 } 877 } 878}