001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017package org.apache.logging.log4j.flume.appender; 018 019import java.io.ByteArrayInputStream; 020import java.io.ByteArrayOutputStream; 021import java.io.DataInputStream; 022import java.io.DataOutputStream; 023import java.io.File; 024import java.nio.charset.Charset; 025import java.nio.charset.StandardCharsets; 026import java.util.HashMap; 027import java.util.Map; 028import java.util.concurrent.Callable; 029import java.util.concurrent.ExecutorService; 030import java.util.concurrent.Executors; 031import java.util.concurrent.Future; 032import java.util.concurrent.ThreadFactory; 033import java.util.concurrent.TimeUnit; 034import java.util.concurrent.atomic.AtomicInteger; 035import java.util.concurrent.atomic.AtomicLong; 036 037import javax.crypto.Cipher; 038import javax.crypto.SecretKey; 039 040import org.apache.flume.Event; 041import org.apache.flume.event.SimpleEvent; 042import org.apache.logging.log4j.LoggingException; 043import org.apache.logging.log4j.core.appender.ManagerFactory; 044import org.apache.logging.log4j.core.config.Property; 045import org.apache.logging.log4j.core.config.plugins.util.PluginManager; 046import org.apache.logging.log4j.core.config.plugins.util.PluginType; 047import org.apache.logging.log4j.core.util.FileUtils; 048import org.apache.logging.log4j.core.util.Log4jThread; 049import org.apache.logging.log4j.core.util.SecretKeyProvider; 050import org.apache.logging.log4j.util.Strings; 051 052import com.sleepycat.je.Cursor; 053import com.sleepycat.je.CursorConfig; 054import com.sleepycat.je.Database; 055import com.sleepycat.je.DatabaseConfig; 056import com.sleepycat.je.DatabaseEntry; 057import com.sleepycat.je.Environment; 058import com.sleepycat.je.EnvironmentConfig; 059import com.sleepycat.je.LockConflictException; 060import com.sleepycat.je.LockMode; 061import com.sleepycat.je.OperationStatus; 062import com.sleepycat.je.StatsConfig; 063import com.sleepycat.je.Transaction; 064 065/** 066 * Manager that persists data to Berkeley DB before passing it on to Flume. 067 */ 068public class FlumePersistentManager extends FlumeAvroManager { 069 070 /** Attribute name for the key provider. */ 071 public static final String KEY_PROVIDER = "keyProvider"; 072 073 private static final Charset UTF8 = StandardCharsets.UTF_8; 074 075 private static final String DEFAULT_DATA_DIR = ".log4j/flumeData"; 076 077 private static final int SHUTDOWN_WAIT = 60; 078 079 private static final int MILLIS_PER_SECOND = 1000; 080 081 private static final int LOCK_TIMEOUT_SLEEP_MILLIS = 500; 082 083 private static BDBManagerFactory factory = new BDBManagerFactory(); 084 085 private final Database database; 086 087 private final Environment environment; 088 089 private final WriterThread worker; 090 091 private final Gate gate = new Gate(); 092 093 private final SecretKey secretKey; 094 095 private final int lockTimeoutRetryCount; 096 097 private final ExecutorService threadPool; 098 099 private final AtomicLong dbCount = new AtomicLong(); 100 101 /** 102 * Constructor 103 * @param name The unique name of this manager. 104 * @param shortName Original name for the Manager. 105 * @param agents An array of Agents. 106 * @param batchSize The number of events to include in a batch. 107 * @param retries The number of times to retry connecting before giving up. 108 * @param connectionTimeout The amount of time to wait for a connection to be established. 109 * @param requestTimeout The amount of time to wair for a response to a request. 110 * @param delay The amount of time to wait between retries. 111 * @param database The database to write to. 112 * @param environment The database environment. 113 * @param secretKey The SecretKey to use for encryption. 114 * @param lockTimeoutRetryCount The number of times to retry a lock timeout. 115 */ 116 protected FlumePersistentManager(final String name, final String shortName, final Agent[] agents, 117 final int batchSize, final int retries, final int connectionTimeout, 118 final int requestTimeout, final int delay, final Database database, 119 final Environment environment, final SecretKey secretKey, 120 final int lockTimeoutRetryCount) { 121 super(name, shortName, agents, batchSize, delay, retries, connectionTimeout, requestTimeout); 122 this.database = database; 123 this.environment = environment; 124 dbCount.set(database.count()); 125 this.worker = new WriterThread(database, environment, this, gate, batchSize, secretKey, dbCount, 126 lockTimeoutRetryCount); 127 this.worker.start(); 128 this.secretKey = secretKey; 129 this.threadPool = Executors.newCachedThreadPool(new DaemonThreadFactory()); 130 this.lockTimeoutRetryCount = lockTimeoutRetryCount; 131 } 132 133 134 /** 135 * Returns a FlumeAvroManager. 136 * @param name The name of the manager. 137 * @param agents The agents to use. 138 * @param properties Properties to pass to the Manager. 139 * @param batchSize The number of events to include in a batch. 140 * @param retries The number of times to retry connecting before giving up. 141 * @param connectionTimeout The amount of time to wait to establish a connection. 142 * @param requestTimeout The amount of time to wait for a response to a request. 143 * @param delayMillis Amount of time to delay before delivering a batch. 144 * @param lockTimeoutRetryCount The number of times to retry after a lock timeout. 145 * @param dataDir The location of the Berkeley database. 146 * @return A FlumeAvroManager. 147 */ 148 public static FlumePersistentManager getManager(final String name, final Agent[] agents, 149 final Property[] properties, int batchSize, final int retries, 150 final int connectionTimeout, final int requestTimeout, 151 final int delayMillis, final int lockTimeoutRetryCount, 152 final String dataDir) { 153 if (agents == null || agents.length == 0) { 154 throw new IllegalArgumentException("At least one agent is required"); 155 } 156 157 if (batchSize <= 0) { 158 batchSize = 1; 159 } 160 final String dataDirectory = Strings.isEmpty(dataDir) ? DEFAULT_DATA_DIR : dataDir; 161 162 final StringBuilder sb = new StringBuilder("FlumePersistent["); 163 boolean first = true; 164 for (final Agent agent : agents) { 165 if (!first) { 166 sb.append(','); 167 } 168 sb.append(agent.getHost()).append(':').append(agent.getPort()); 169 first = false; 170 } 171 sb.append(']'); 172 sb.append(' ').append(dataDirectory); 173 return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries, 174 connectionTimeout, requestTimeout, delayMillis, lockTimeoutRetryCount, dataDir, properties)); 175 } 176 177 @Override 178 public void send(final Event event) { 179 if (worker.isShutdown()) { 180 throw new LoggingException("Unable to record event"); 181 } 182 183 final Map<String, String> headers = event.getHeaders(); 184 final byte[] keyData = headers.get(FlumeEvent.GUID).getBytes(UTF8); 185 try { 186 final ByteArrayOutputStream baos = new ByteArrayOutputStream(); 187 final DataOutputStream daos = new DataOutputStream(baos); 188 daos.writeInt(event.getBody().length); 189 daos.write(event.getBody(), 0, event.getBody().length); 190 daos.writeInt(event.getHeaders().size()); 191 for (final Map.Entry<String, String> entry : headers.entrySet()) { 192 daos.writeUTF(entry.getKey()); 193 daos.writeUTF(entry.getValue()); 194 } 195 byte[] eventData = baos.toByteArray(); 196 if (secretKey != null) { 197 final Cipher cipher = Cipher.getInstance("AES"); 198 cipher.init(Cipher.ENCRYPT_MODE, secretKey); 199 eventData = cipher.doFinal(eventData); 200 } 201 final Future<Integer> future = threadPool.submit(new BDBWriter(keyData, eventData, environment, database, 202 gate, dbCount, getBatchSize(), lockTimeoutRetryCount)); 203 boolean interrupted = false; 204 int ieCount = 0; 205 do { 206 try { 207 future.get(); 208 } catch (final InterruptedException ie) { 209 interrupted = true; 210 ++ieCount; 211 } 212 } while (interrupted && ieCount <= 1); 213 214 } catch (final Exception ex) { 215 throw new LoggingException("Exception occurred writing log event", ex); 216 } 217 } 218 219 @Override 220 protected void releaseSub() { 221 LOGGER.debug("Shutting down FlumePersistentManager"); 222 worker.shutdown(); 223 try { 224 worker.join(SHUTDOWN_WAIT * MILLIS_PER_SECOND); 225 } catch (final InterruptedException ie) { 226 // Ignore the exception and shutdown. 227 } 228 threadPool.shutdown(); 229 try { 230 threadPool.awaitTermination(SHUTDOWN_WAIT, TimeUnit.SECONDS); 231 } catch (final InterruptedException e) { 232 logWarn("PersistentManager Thread pool failed to shut down", e); 233 } 234 try { 235 worker.join(); 236 } catch (final InterruptedException ex) { 237 logDebug("interrupted while waiting for worker to complete", ex); 238 } 239 try { 240 LOGGER.debug("FlumePersistenceManager dataset status: {}", database.getStats(new StatsConfig())); 241 database.close(); 242 } catch (final Exception ex) { 243 logWarn("failed to close database", ex); 244 } 245 try { 246 environment.cleanLog(); 247 environment.close(); 248 } catch (final Exception ex) { 249 logWarn("failed to close environment", ex); 250 } 251 super.releaseSub(); 252 } 253 254 private void doSend(final SimpleEvent event) { 255 LOGGER.debug("Sending event to Flume"); 256 super.send(event); 257 } 258 259 /** 260 * Thread for writing to Berkeley DB to avoid having interrupts close the database. 261 */ 262 private static class BDBWriter implements Callable<Integer> { 263 private final byte[] eventData; 264 private final byte[] keyData; 265 private final Environment environment; 266 private final Database database; 267 private final Gate gate; 268 private final AtomicLong dbCount; 269 private final long batchSize; 270 private final int lockTimeoutRetryCount; 271 272 public BDBWriter(final byte[] keyData, final byte[] eventData, final Environment environment, 273 final Database database, final Gate gate, final AtomicLong dbCount, final long batchSize, 274 final int lockTimeoutRetryCount) { 275 this.keyData = keyData; 276 this.eventData = eventData; 277 this.environment = environment; 278 this.database = database; 279 this.gate = gate; 280 this.dbCount = dbCount; 281 this.batchSize = batchSize; 282 this.lockTimeoutRetryCount = lockTimeoutRetryCount; 283 } 284 285 @Override 286 public Integer call() throws Exception { 287 final DatabaseEntry key = new DatabaseEntry(keyData); 288 final DatabaseEntry data = new DatabaseEntry(eventData); 289 Exception exception = null; 290 for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) { 291 Transaction txn = null; 292 try { 293 txn = environment.beginTransaction(null, null); 294 try { 295 database.put(txn, key, data); 296 txn.commit(); 297 txn = null; 298 if (dbCount.incrementAndGet() >= batchSize) { 299 gate.open(); 300 } 301 exception = null; 302 break; 303 } catch (final LockConflictException lce) { 304 exception = lce; 305 // Fall through and retry. 306 } catch (final Exception ex) { 307 if (txn != null) { 308 txn.abort(); 309 } 310 throw ex; 311 } finally { 312 if (txn != null) { 313 txn.abort(); 314 txn = null; 315 } 316 } 317 } catch (final LockConflictException lce) { 318 exception = lce; 319 if (txn != null) { 320 try { 321 txn.abort(); 322 txn = null; 323 } catch (final Exception ex) { 324 LOGGER.trace("Ignoring exception while aborting transaction during lock conflict."); 325 } 326 } 327 328 } 329 try { 330 Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS); 331 } catch (final InterruptedException ie) { 332 // Ignore the error 333 } 334 } 335 if (exception != null) { 336 throw exception; 337 } 338 return eventData.length; 339 } 340 } 341 342 /** 343 * Factory data. 344 */ 345 private static class FactoryData { 346 private final String name; 347 private final Agent[] agents; 348 private final int batchSize; 349 private final String dataDir; 350 private final int retries; 351 private final int connectionTimeout; 352 private final int requestTimeout; 353 private final int delayMillis; 354 private final int lockTimeoutRetryCount; 355 private final Property[] properties; 356 357 /** 358 * Constructor. 359 * @param name The name of the Appender. 360 * @param agents The agents. 361 * @param batchSize The number of events to include in a batch. 362 * @param dataDir The directory for data. 363 */ 364 public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries, 365 final int connectionTimeout, final int requestTimeout, final int delayMillis, 366 final int lockTimeoutRetryCount, final String dataDir, final Property[] properties) { 367 this.name = name; 368 this.agents = agents; 369 this.batchSize = batchSize; 370 this.dataDir = dataDir; 371 this.retries = retries; 372 this.connectionTimeout = connectionTimeout; 373 this.requestTimeout = requestTimeout; 374 this.delayMillis = delayMillis; 375 this.lockTimeoutRetryCount = lockTimeoutRetryCount; 376 this.properties = properties; 377 } 378 } 379 380 /** 381 * Avro Manager Factory. 382 */ 383 private static class BDBManagerFactory implements ManagerFactory<FlumePersistentManager, FactoryData> { 384 385 /** 386 * Create the FlumeKratiManager. 387 * @param name The name of the entity to manage. 388 * @param data The data required to create the entity. 389 * @return The FlumeKratiManager. 390 */ 391 @Override 392 public FlumePersistentManager createManager(final String name, final FactoryData data) { 393 SecretKey secretKey = null; 394 Database database = null; 395 Environment environment = null; 396 397 final Map<String, String> properties = new HashMap<>(); 398 if (data.properties != null) { 399 for (final Property property : data.properties) { 400 properties.put(property.getName(), property.getValue()); 401 } 402 } 403 404 try { 405 final File dir = new File(data.dataDir); 406 FileUtils.mkdir(dir, true); 407 final EnvironmentConfig dbEnvConfig = new EnvironmentConfig(); 408 dbEnvConfig.setTransactional(true); 409 dbEnvConfig.setAllowCreate(true); 410 dbEnvConfig.setLockTimeout(5, TimeUnit.SECONDS); 411 environment = new Environment(dir, dbEnvConfig); 412 final DatabaseConfig dbConfig = new DatabaseConfig(); 413 dbConfig.setTransactional(true); 414 dbConfig.setAllowCreate(true); 415 database = environment.openDatabase(null, name, dbConfig); 416 } catch (final Exception ex) { 417 LOGGER.error("Could not create FlumePersistentManager", ex); 418 // For consistency, close database as well as environment even though it should never happen since the 419 // database is that last thing in the block above, but this does guard against a future line being 420 // inserted at the end that would bomb (like some debug logging). 421 if (database != null) { 422 database.close(); 423 database = null; 424 } 425 if (environment != null) { 426 environment.close(); 427 environment = null; 428 } 429 return null; 430 } 431 432 try { 433 String key = null; 434 for (final Map.Entry<String, String> entry : properties.entrySet()) { 435 if (entry.getKey().equalsIgnoreCase(KEY_PROVIDER)) { 436 key = entry.getValue(); 437 break; 438 } 439 } 440 if (key != null) { 441 final PluginManager manager = new PluginManager("KeyProvider"); 442 manager.collectPlugins(); 443 final Map<String, PluginType<?>> plugins = manager.getPlugins(); 444 if (plugins != null) { 445 boolean found = false; 446 for (final Map.Entry<String, PluginType<?>> entry : plugins.entrySet()) { 447 if (entry.getKey().equalsIgnoreCase(key)) { 448 found = true; 449 final Class<?> cl = entry.getValue().getPluginClass(); 450 try { 451 final SecretKeyProvider provider = (SecretKeyProvider) cl.newInstance(); 452 secretKey = provider.getSecretKey(); 453 LOGGER.debug("Persisting events using SecretKeyProvider {}", cl.getName()); 454 } catch (final Exception ex) { 455 LOGGER.error("Unable to create SecretKeyProvider {}, encryption will be disabled", 456 cl.getName()); 457 } 458 break; 459 } 460 } 461 if (!found) { 462 LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key); 463 } 464 } else { 465 LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key); 466 } 467 } 468 } catch (final Exception ex) { 469 LOGGER.warn("Error setting up encryption - encryption will be disabled", ex); 470 } 471 return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries, 472 data.connectionTimeout, data.requestTimeout, data.delayMillis, database, environment, secretKey, 473 data.lockTimeoutRetryCount); 474 } 475 } 476 477 /** 478 * Thread that sends data to Flume and pulls it from Berkeley DB. 479 */ 480 private static class WriterThread extends Thread { 481 private volatile boolean shutdown = false; 482 private final Database database; 483 private final Environment environment; 484 private final FlumePersistentManager manager; 485 private final Gate gate; 486 private final SecretKey secretKey; 487 private final int batchSize; 488 private final AtomicLong dbCounter; 489 private final int lockTimeoutRetryCount; 490 491 public WriterThread(final Database database, final Environment environment, 492 final FlumePersistentManager manager, final Gate gate, final int batchsize, 493 final SecretKey secretKey, final AtomicLong dbCount, final int lockTimeoutRetryCount) { 494 this.database = database; 495 this.environment = environment; 496 this.manager = manager; 497 this.gate = gate; 498 this.batchSize = batchsize; 499 this.secretKey = secretKey; 500 this.setDaemon(true); 501 this.dbCounter = dbCount; 502 this.lockTimeoutRetryCount = lockTimeoutRetryCount; 503 } 504 505 public void shutdown() { 506 LOGGER.debug("Writer thread shutting down"); 507 this.shutdown = true; 508 gate.open(); 509 } 510 511 public boolean isShutdown() { 512 return shutdown; 513 } 514 515 @Override 516 public void run() { 517 LOGGER.trace("WriterThread started - batch size = " + batchSize + ", delayMillis = " + manager.getDelayMillis()); 518 long nextBatchMillis = System.currentTimeMillis() + manager.getDelayMillis(); 519 while (!shutdown) { 520 final long nowMillis = System.currentTimeMillis(); 521 final long dbCount = database.count(); 522 dbCounter.set(dbCount); 523 if (dbCount >= batchSize || dbCount > 0 && nextBatchMillis <= nowMillis) { 524 nextBatchMillis = nowMillis + manager.getDelayMillis(); 525 try { 526 boolean errors = false; 527 final DatabaseEntry key = new DatabaseEntry(); 528 final DatabaseEntry data = new DatabaseEntry(); 529 530 gate.close(); 531 OperationStatus status; 532 if (batchSize > 1) { 533 try { 534 errors = sendBatch(key, data); 535 } catch (final Exception ex) { 536 break; 537 } 538 } else { 539 Exception exception = null; 540 for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) { 541 exception = null; 542 Transaction txn = null; 543 Cursor cursor = null; 544 try { 545 txn = environment.beginTransaction(null, null); 546 cursor = database.openCursor(txn, null); 547 try { 548 status = cursor.getFirst(key, data, LockMode.RMW); 549 while (status == OperationStatus.SUCCESS) { 550 final SimpleEvent event = createEvent(data); 551 if (event != null) { 552 try { 553 manager.doSend(event); 554 } catch (final Exception ioe) { 555 errors = true; 556 LOGGER.error("Error sending event", ioe); 557 break; 558 } 559 try { 560 cursor.delete(); 561 } catch (final Exception ex) { 562 LOGGER.error("Unable to delete event", ex); 563 } 564 } 565 status = cursor.getNext(key, data, LockMode.RMW); 566 } 567 if (cursor != null) { 568 cursor.close(); 569 cursor = null; 570 } 571 txn.commit(); 572 txn = null; 573 dbCounter.decrementAndGet(); 574 exception = null; 575 break; 576 } catch (final LockConflictException lce) { 577 exception = lce; 578 // Fall through and retry. 579 } catch (final Exception ex) { 580 LOGGER.error("Error reading or writing to database", ex); 581 shutdown = true; 582 break; 583 } finally { 584 if (cursor != null) { 585 cursor.close(); 586 cursor = null; 587 } 588 if (txn != null) { 589 txn.abort(); 590 txn = null; 591 } 592 } 593 } catch (final LockConflictException lce) { 594 exception = lce; 595 if (cursor != null) { 596 try { 597 cursor.close(); 598 cursor = null; 599 } catch (final Exception ex) { 600 LOGGER.trace("Ignored exception closing cursor during lock conflict."); 601 } 602 } 603 if (txn != null) { 604 try { 605 txn.abort(); 606 txn = null; 607 } catch (final Exception ex) { 608 LOGGER.trace("Ignored exception aborting tx during lock conflict."); 609 } 610 } 611 } 612 try { 613 Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS); 614 } catch (final InterruptedException ie) { 615 // Ignore the error 616 } 617 } 618 if (exception != null) { 619 LOGGER.error("Unable to read or update data base", exception); 620 } 621 } 622 if (errors) { 623 Thread.sleep(manager.getDelayMillis()); 624 continue; 625 } 626 } catch (final Exception ex) { 627 LOGGER.warn("WriterThread encountered an exception. Continuing.", ex); 628 } 629 } else { 630 if (nextBatchMillis <= nowMillis) { 631 nextBatchMillis = nowMillis + manager.getDelayMillis(); 632 } 633 try { 634 final long interval = nextBatchMillis - nowMillis; 635 gate.waitForOpen(interval); 636 } catch (final InterruptedException ie) { 637 LOGGER.warn("WriterThread interrupted, continuing"); 638 } catch (final Exception ex) { 639 LOGGER.error("WriterThread encountered an exception waiting for work", ex); 640 break; 641 } 642 } 643 } 644 645 if (batchSize > 1 && database.count() > 0) { 646 final DatabaseEntry key = new DatabaseEntry(); 647 final DatabaseEntry data = new DatabaseEntry(); 648 try { 649 sendBatch(key, data); 650 } catch (final Exception ex) { 651 LOGGER.warn("Unable to write final batch"); 652 } 653 } 654 LOGGER.trace("WriterThread exiting"); 655 } 656 657 private boolean sendBatch(DatabaseEntry key, final DatabaseEntry data) throws Exception { 658 boolean errors = false; 659 OperationStatus status; 660 Cursor cursor = null; 661 try { 662 final BatchEvent batch = new BatchEvent(); 663 for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) { 664 try { 665 cursor = database.openCursor(null, CursorConfig.DEFAULT); 666 status = cursor.getFirst(key, data, null); 667 668 for (int i = 0; status == OperationStatus.SUCCESS && i < batchSize; ++i) { 669 final SimpleEvent event = createEvent(data); 670 if (event != null) { 671 batch.addEvent(event); 672 } 673 status = cursor.getNext(key, data, null); 674 } 675 break; 676 } catch (final LockConflictException lce) { 677 if (cursor != null) { 678 try { 679 cursor.close(); 680 cursor = null; 681 } catch (final Exception ex) { 682 LOGGER.trace("Ignored exception closing cursor during lock conflict."); 683 } 684 } 685 } 686 } 687 688 try { 689 manager.send(batch); 690 } catch (final Exception ioe) { 691 LOGGER.error("Error sending events", ioe); 692 errors = true; 693 } 694 if (!errors) { 695 if (cursor != null) { 696 cursor.close(); 697 cursor = null; 698 } 699 Transaction txn = null; 700 Exception exception = null; 701 for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) { 702 try { 703 txn = environment.beginTransaction(null, null); 704 try { 705 for (final Event event : batch.getEvents()) { 706 try { 707 final Map<String, String> headers = event.getHeaders(); 708 key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8)); 709 database.delete(txn, key); 710 } catch (final Exception ex) { 711 LOGGER.error("Error deleting key from database", ex); 712 } 713 } 714 txn.commit(); 715 long count = dbCounter.get(); 716 while (!dbCounter.compareAndSet(count, count - batch.getEvents().size())) { 717 count = dbCounter.get(); 718 } 719 exception = null; 720 break; 721 } catch (final LockConflictException lce) { 722 exception = lce; 723 if (cursor != null) { 724 try { 725 cursor.close(); 726 cursor = null; 727 } catch (final Exception ex) { 728 LOGGER.trace("Ignored exception closing cursor during lock conflict."); 729 } 730 } 731 if (txn != null) { 732 try { 733 txn.abort(); 734 txn = null; 735 } catch (final Exception ex) { 736 LOGGER.trace("Ignored exception aborting transaction during lock conflict."); 737 } 738 } 739 } catch (final Exception ex) { 740 LOGGER.error("Unable to commit transaction", ex); 741 if (txn != null) { 742 txn.abort(); 743 } 744 } 745 } catch (final LockConflictException lce) { 746 exception = lce; 747 if (cursor != null) { 748 try { 749 cursor.close(); 750 cursor = null; 751 } catch (final Exception ex) { 752 LOGGER.trace("Ignored exception closing cursor during lock conflict."); 753 } 754 } 755 if (txn != null) { 756 try { 757 txn.abort(); 758 txn = null; 759 } catch (final Exception ex) { 760 LOGGER.trace("Ignored exception aborting transaction during lock conflict."); 761 } 762 } 763 } finally { 764 if (cursor != null) { 765 cursor.close(); 766 cursor = null; 767 } 768 if (txn != null) { 769 txn.abort(); 770 txn = null; 771 } 772 } 773 try { 774 Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS); 775 } catch (final InterruptedException ie) { 776 // Ignore the error 777 } 778 } 779 if (exception != null) { 780 LOGGER.error("Unable to delete events from data base", exception); 781 } 782 } 783 } catch (final Exception ex) { 784 LOGGER.error("Error reading database", ex); 785 shutdown = true; 786 throw ex; 787 } finally { 788 if (cursor != null) { 789 cursor.close(); 790 } 791 } 792 793 return errors; 794 } 795 796 private SimpleEvent createEvent(final DatabaseEntry data) { 797 final SimpleEvent event = new SimpleEvent(); 798 try { 799 byte[] eventData = data.getData(); 800 if (secretKey != null) { 801 final Cipher cipher = Cipher.getInstance("AES"); 802 cipher.init(Cipher.DECRYPT_MODE, secretKey); 803 eventData = cipher.doFinal(eventData); 804 } 805 final ByteArrayInputStream bais = new ByteArrayInputStream(eventData); 806 final DataInputStream dais = new DataInputStream(bais); 807 int length = dais.readInt(); 808 final byte[] bytes = new byte[length]; 809 dais.read(bytes, 0, length); 810 event.setBody(bytes); 811 length = dais.readInt(); 812 final Map<String, String> map = new HashMap<>(length); 813 for (int i = 0; i < length; ++i) { 814 final String headerKey = dais.readUTF(); 815 final String value = dais.readUTF(); 816 map.put(headerKey, value); 817 } 818 event.setHeaders(map); 819 return event; 820 } catch (final Exception ex) { 821 LOGGER.error("Error retrieving event", ex); 822 return null; 823 } 824 } 825 826 } 827 828 /** 829 * Factory that creates Daemon threads that can be properly shut down. 830 */ 831 private static class DaemonThreadFactory implements ThreadFactory { 832 private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1); 833 private final ThreadGroup group; 834 private final AtomicInteger threadNumber = new AtomicInteger(1); 835 private final String namePrefix; 836 837 public DaemonThreadFactory() { 838 final SecurityManager securityManager = System.getSecurityManager(); 839 group = securityManager != null ? securityManager.getThreadGroup() : 840 Thread.currentThread().getThreadGroup(); 841 namePrefix = "DaemonPool-" + POOL_NUMBER.getAndIncrement() + "-thread-"; 842 } 843 844 @Override 845 public Thread newThread(final Runnable r) { 846 final Thread thread = new Log4jThread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); 847 thread.setDaemon(true); 848 if (thread.getPriority() != Thread.NORM_PRIORITY) { 849 thread.setPriority(Thread.NORM_PRIORITY); 850 } 851 return thread; 852 } 853 } 854 855 /** 856 * An internal class. 857 */ 858 private static class Gate { 859 860 private boolean isOpen = false; 861 862 public boolean isOpen() { 863 return isOpen; 864 } 865 866 public synchronized void open() { 867 isOpen = true; 868 notifyAll(); 869 } 870 871 public synchronized void close() { 872 isOpen = false; 873 } 874 875 public synchronized void waitForOpen(final long timeout) throws InterruptedException { 876 wait(timeout); 877 } 878 } 879}