View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.flume.appender;
18  
19  import java.io.ByteArrayInputStream;
20  import java.io.ByteArrayOutputStream;
21  import java.io.DataInputStream;
22  import java.io.DataOutputStream;
23  import java.io.File;
24  import java.nio.charset.Charset;
25  import java.nio.charset.StandardCharsets;
26  import java.util.HashMap;
27  import java.util.Map;
28  import java.util.concurrent.Callable;
29  import java.util.concurrent.ExecutorService;
30  import java.util.concurrent.Executors;
31  import java.util.concurrent.Future;
32  import java.util.concurrent.ThreadFactory;
33  import java.util.concurrent.TimeUnit;
34  import java.util.concurrent.atomic.AtomicInteger;
35  import java.util.concurrent.atomic.AtomicLong;
36  
37  import javax.crypto.Cipher;
38  import javax.crypto.SecretKey;
39  
40  import org.apache.flume.Event;
41  import org.apache.flume.event.SimpleEvent;
42  import org.apache.logging.log4j.LoggingException;
43  import org.apache.logging.log4j.core.appender.ManagerFactory;
44  import org.apache.logging.log4j.core.config.Property;
45  import org.apache.logging.log4j.core.config.plugins.util.PluginManager;
46  import org.apache.logging.log4j.core.config.plugins.util.PluginType;
47  import org.apache.logging.log4j.core.util.FileUtils;
48  import org.apache.logging.log4j.core.util.SecretKeyProvider;
49  import org.apache.logging.log4j.util.Strings;
50  
51  import com.sleepycat.je.Cursor;
52  import com.sleepycat.je.CursorConfig;
53  import com.sleepycat.je.Database;
54  import com.sleepycat.je.DatabaseConfig;
55  import com.sleepycat.je.DatabaseEntry;
56  import com.sleepycat.je.Environment;
57  import com.sleepycat.je.EnvironmentConfig;
58  import com.sleepycat.je.LockConflictException;
59  import com.sleepycat.je.LockMode;
60  import com.sleepycat.je.OperationStatus;
61  import com.sleepycat.je.StatsConfig;
62  import com.sleepycat.je.Transaction;
63  
64  /**
65   * Manager that persists data to Berkeley DB before passing it on to Flume.
66   */
67  public class FlumePersistentManager extends FlumeAvroManager {
68  
69      /** Attribute name for the key provider. */
70      public static final String KEY_PROVIDER = "keyProvider";
71  
72      private static final Charset UTF8 = StandardCharsets.UTF_8;
73  
74      private static final String DEFAULT_DATA_DIR = ".log4j/flumeData";
75  
76      private static final int SHUTDOWN_WAIT = 60;
77  
78      private static final int MILLIS_PER_SECOND = 1000;
79  
80      private static final int LOCK_TIMEOUT_SLEEP_MILLIS = 500;
81  
82      private static BDBManagerFactory factory = new BDBManagerFactory();
83  
84      private final Database database;
85  
86      private final Environment environment;
87  
88      private final WriterThread worker;
89  
90      private final Gate gate = new Gate();
91  
92      private final SecretKey secretKey;
93  
94      private final int lockTimeoutRetryCount;
95  
96      private final ExecutorService threadPool;
97  
98      private final AtomicLong dbCount = new AtomicLong();
99  
100     /**
101      * Constructor
102      * @param name The unique name of this manager.
103      * @param shortName Original name for the Manager.
104      * @param agents An array of Agents.
105      * @param batchSize The number of events to include in a batch.
106      * @param retries The number of times to retry connecting before giving up.
107      * @param connectionTimeout The amount of time to wait for a connection to be established.
108      * @param requestTimeout The amount of time to wair for a response to a request.
109      * @param delay The amount of time to wait between retries.
110      * @param database The database to write to.
111      * @param environment The database environment.
112      * @param secretKey The SecretKey to use for encryption.
113      * @param lockTimeoutRetryCount The number of times to retry a lock timeout.
114      */
115     protected FlumePersistentManager(final String name, final String shortName, final Agent[] agents,
116                                      final int batchSize, final int retries, final int connectionTimeout,
117                                      final int requestTimeout, final int delay, final Database database,
118                                      final Environment environment, final SecretKey secretKey,
119                                      final int lockTimeoutRetryCount) {
120         super(name, shortName, agents, batchSize, delay, retries, connectionTimeout, requestTimeout);
121         this.database = database;
122         this.environment = environment;
123         dbCount.set(database.count());
124         this.worker = new WriterThread(database, environment, this, gate, batchSize, secretKey, dbCount,
125             lockTimeoutRetryCount);
126         this.worker.start();
127         this.secretKey = secretKey;
128         this.threadPool = Executors.newCachedThreadPool(new DaemonThreadFactory());
129         this.lockTimeoutRetryCount = lockTimeoutRetryCount;
130     }
131 
132 
133     /**
134      * Returns a FlumeAvroManager.
135      * @param name The name of the manager.
136      * @param agents The agents to use.
137      * @param properties Properties to pass to the Manager.
138      * @param batchSize The number of events to include in a batch.
139      * @param retries The number of times to retry connecting before giving up.
140      * @param connectionTimeout The amount of time to wait to establish a connection.
141      * @param requestTimeout The amount of time to wait for a response to a request.
142      * @param delayMillis Amount of time to delay before delivering a batch.
143      * @param lockTimeoutRetryCount The number of times to retry after a lock timeout.
144      * @param dataDir The location of the Berkeley database.
145      * @return A FlumeAvroManager.
146      */
147     public static FlumePersistentManager getManager(final String name, final Agent[] agents,
148                                                     final Property[] properties, int batchSize, final int retries,
149                                                     final int connectionTimeout, final int requestTimeout,
150                                                     final int delayMillis, final int lockTimeoutRetryCount,
151                                                     final String dataDir) {
152         if (agents == null || agents.length == 0) {
153             throw new IllegalArgumentException("At least one agent is required");
154         }
155 
156         if (batchSize <= 0) {
157             batchSize = 1;
158         }
159         final String dataDirectory = Strings.isEmpty(dataDir) ? DEFAULT_DATA_DIR : dataDir;
160 
161         final StringBuilder sb = new StringBuilder("FlumePersistent[");
162         boolean first = true;
163         for (final Agent agent : agents) {
164             if (!first) {
165                 sb.append(',');
166             }
167             sb.append(agent.getHost()).append(':').append(agent.getPort());
168             first = false;
169         }
170         sb.append(']');
171         sb.append(' ').append(dataDirectory);
172         return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries,
173             connectionTimeout, requestTimeout, delayMillis, lockTimeoutRetryCount, dataDir, properties));
174     }
175 
176     @Override
177     public void send(final Event event)  {
178         if (worker.isShutdown()) {
179             throw new LoggingException("Unable to record event");
180         }
181 
182         final Map<String, String> headers = event.getHeaders();
183         final byte[] keyData = headers.get(FlumeEvent.GUID).getBytes(UTF8);
184         try {
185             final ByteArrayOutputStream baos = new ByteArrayOutputStream();
186             final DataOutputStream daos = new DataOutputStream(baos);
187             daos.writeInt(event.getBody().length);
188             daos.write(event.getBody(), 0, event.getBody().length);
189             daos.writeInt(event.getHeaders().size());
190             for (final Map.Entry<String, String> entry : headers.entrySet()) {
191                 daos.writeUTF(entry.getKey());
192                 daos.writeUTF(entry.getValue());
193             }
194             byte[] eventData = baos.toByteArray();
195             if (secretKey != null) {
196                 final Cipher cipher = Cipher.getInstance("AES");
197                 cipher.init(Cipher.ENCRYPT_MODE, secretKey);
198                 eventData = cipher.doFinal(eventData);
199             }
200             final Future<Integer> future = threadPool.submit(new BDBWriter(keyData, eventData, environment, database,
201                 gate, dbCount, getBatchSize(), lockTimeoutRetryCount));
202             boolean interrupted = false;
203             int ieCount = 0;
204             do {
205                 try {
206                     future.get();
207                 } catch (final InterruptedException ie) {
208                     interrupted = true;
209                     ++ieCount;
210                 }
211             } while (interrupted && ieCount <= 1);
212 
213         } catch (final Exception ex) {
214             throw new LoggingException("Exception occurred writing log event", ex);
215         }
216     }
217 
218     @Override
219     protected void releaseSub() {
220         LOGGER.debug("Shutting down FlumePersistentManager");
221         worker.shutdown();
222         try {
223             worker.join(SHUTDOWN_WAIT * MILLIS_PER_SECOND);
224         } catch (final InterruptedException ie) {
225             // Ignore the exception and shutdown.
226         }
227         threadPool.shutdown();
228         try {
229             threadPool.awaitTermination(SHUTDOWN_WAIT, TimeUnit.SECONDS);
230         } catch (final InterruptedException ie) {
231             LOGGER.warn("PersistentManager Thread pool failed to shut down");
232         }
233         try {
234             worker.join();
235         } catch (final InterruptedException ex) {
236             LOGGER.debug("Interrupted while waiting for worker to complete");
237         }
238         try {
239             LOGGER.debug("FlumePersistenceManager dataset status: {}", database.getStats(new StatsConfig()));
240             database.close();
241         } catch (final Exception ex) {
242             LOGGER.warn("Failed to close database", ex);
243         }
244         try {
245             environment.cleanLog();
246             environment.close();
247         } catch (final Exception ex) {
248             LOGGER.warn("Failed to close environment", ex);
249         }
250         super.releaseSub();
251     }
252 
253     private void doSend(final SimpleEvent event) {
254         LOGGER.debug("Sending event to Flume");
255         super.send(event);
256     }
257 
258     /**
259      * Thread for writing to Berkeley DB to avoid having interrupts close the database.
260      */
261     private static class BDBWriter implements Callable<Integer> {
262         private final byte[] eventData;
263         private final byte[] keyData;
264         private final Environment environment;
265         private final Database database;
266         private final Gate gate;
267         private final AtomicLong dbCount;
268         private final long batchSize;
269         private final int lockTimeoutRetryCount;
270 
271         public BDBWriter(final byte[] keyData, final byte[] eventData, final Environment environment,
272                          final Database database, final Gate gate, final AtomicLong dbCount, final long batchSize,
273                          final int lockTimeoutRetryCount) {
274             this.keyData = keyData;
275             this.eventData = eventData;
276             this.environment = environment;
277             this.database = database;
278             this.gate = gate;
279             this.dbCount = dbCount;
280             this.batchSize = batchSize;
281             this.lockTimeoutRetryCount = lockTimeoutRetryCount;
282         }
283 
284         @Override
285         public Integer call() throws Exception {
286             final DatabaseEntry key = new DatabaseEntry(keyData);
287             final DatabaseEntry data = new DatabaseEntry(eventData);
288             Exception exception = null;
289             for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
290                 Transaction txn = null;
291                 try {
292                     txn = environment.beginTransaction(null, null);
293                     try {
294                         database.put(txn, key, data);
295                         txn.commit();
296                         txn = null;
297                         if (dbCount.incrementAndGet() >= batchSize) {
298                             gate.open();
299                         }
300                         exception = null;
301                         break;
302                     } catch (final LockConflictException lce) {
303                         exception = lce;
304                         // Fall through and retry.
305                     } catch (final Exception ex) {
306                         if (txn != null) {
307                             txn.abort();
308                         }
309                         throw ex;
310                     } finally {
311                         if (txn != null) {
312                             txn.abort();
313                             txn = null;
314                         }
315                     }
316                 } catch (final LockConflictException lce) {
317                     exception = lce;
318                     if (txn != null) {
319                         try {
320                             txn.abort();
321                             txn = null;
322                         } catch (final Exception ex) {
323                             LOGGER.trace("Ignoring exception while aborting transaction during lock conflict.");
324                         }
325                     }
326 
327                 }
328                 try {
329                     Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
330                 } catch (final InterruptedException ie) {
331                     // Ignore the error
332                 }
333             }
334             if (exception != null) {
335                 throw exception;
336             }
337             return eventData.length;
338         }
339     }
340 
341     /**
342      * Factory data.
343      */
344     private static class FactoryData {
345         private final String name;
346         private final Agent[] agents;
347         private final int batchSize;
348         private final String dataDir;
349         private final int retries;
350         private final int connectionTimeout;
351         private final int requestTimeout;
352         private final int delayMillis;
353         private final int lockTimeoutRetryCount;
354         private final Property[] properties;
355 
356         /**
357          * Constructor.
358          * @param name The name of the Appender.
359          * @param agents The agents.
360          * @param batchSize The number of events to include in a batch.
361          * @param dataDir The directory for data.
362          */
363         public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries,
364                            final int connectionTimeout, final int requestTimeout, final int delayMillis,
365                            final int lockTimeoutRetryCount, final String dataDir, final Property[] properties) {
366             this.name = name;
367             this.agents = agents;
368             this.batchSize = batchSize;
369             this.dataDir = dataDir;
370             this.retries = retries;
371             this.connectionTimeout = connectionTimeout;
372             this.requestTimeout = requestTimeout;
373             this.delayMillis = delayMillis;
374             this.lockTimeoutRetryCount = lockTimeoutRetryCount;
375             this.properties = properties;
376         }
377     }
378 
379     /**
380      * Avro Manager Factory.
381      */
382     private static class BDBManagerFactory implements ManagerFactory<FlumePersistentManager, FactoryData> {
383 
384         /**
385          * Create the FlumeKratiManager.
386          * @param name The name of the entity to manage.
387          * @param data The data required to create the entity.
388          * @return The FlumeKratiManager.
389          */
390         @Override
391         public FlumePersistentManager createManager(final String name, final FactoryData data) {
392             SecretKey secretKey = null;
393             Database database = null;
394             Environment environment = null;
395 
396             final Map<String, String> properties = new HashMap<>();
397             if (data.properties != null) {
398                 for (final Property property : data.properties) {
399                     properties.put(property.getName(), property.getValue());
400                 }
401             }
402 
403             try {
404                 final File dir = new File(data.dataDir);
405                 FileUtils.mkdir(dir, true);
406                 final EnvironmentConfig dbEnvConfig = new EnvironmentConfig();
407                 dbEnvConfig.setTransactional(true);
408                 dbEnvConfig.setAllowCreate(true);
409                 dbEnvConfig.setLockTimeout(5, TimeUnit.SECONDS);
410                 environment = new Environment(dir, dbEnvConfig);
411                 final DatabaseConfig dbConfig = new DatabaseConfig();
412                 dbConfig.setTransactional(true);
413                 dbConfig.setAllowCreate(true);
414                 database = environment.openDatabase(null, name, dbConfig);
415             } catch (final Exception ex) {
416                 LOGGER.error("Could not create FlumePersistentManager", ex);
417                 // For consistency, close database as well as environment even though it should never happen since the
418                 // database is that last thing in the block above, but this does guard against a future line being
419                 // inserted at the end that would bomb (like some debug logging).
420                 if (database != null) {
421                     database.close();
422                     database = null;
423                 }
424                 if (environment != null) {
425                     environment.close();
426                     environment = null;
427                 }
428                 return null;
429             }
430 
431             try {
432                 String key = null;
433                 for (final Map.Entry<String, String> entry : properties.entrySet()) {
434                     if (entry.getKey().equalsIgnoreCase(KEY_PROVIDER)) {
435                         key = entry.getValue();
436                         break;
437                     }
438                 }
439                 if (key != null) {
440                     final PluginManager manager = new PluginManager("KeyProvider");
441                     manager.collectPlugins();
442                     final Map<String, PluginType<?>> plugins = manager.getPlugins();
443                     if (plugins != null) {
444                         boolean found = false;
445                         for (final Map.Entry<String, PluginType<?>> entry : plugins.entrySet()) {
446                             if (entry.getKey().equalsIgnoreCase(key)) {
447                                 found = true;
448                                 final Class<?> cl = entry.getValue().getPluginClass();
449                                 try {
450                                     final SecretKeyProvider provider = (SecretKeyProvider) cl.newInstance();
451                                     secretKey = provider.getSecretKey();
452                                     LOGGER.debug("Persisting events using SecretKeyProvider {}", cl.getName());
453                                 } catch (final Exception ex) {
454                                     LOGGER.error("Unable to create SecretKeyProvider {}, encryption will be disabled",
455                                         cl.getName());
456                                 }
457                                 break;
458                             }
459                         }
460                         if (!found) {
461                             LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
462                         }
463                     } else {
464                         LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
465                     }
466                 }
467             } catch (final Exception ex) {
468                 LOGGER.warn("Error setting up encryption - encryption will be disabled", ex);
469             }
470             return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries,
471                 data.connectionTimeout, data.requestTimeout, data.delayMillis, database, environment, secretKey,
472                 data.lockTimeoutRetryCount);
473         }
474     }
475 
476     /**
477      * Thread that sends data to Flume and pulls it from Berkeley DB.
478      */
479     private static class WriterThread extends Thread  {
480         private volatile boolean shutdown = false;
481         private final Database database;
482         private final Environment environment;
483         private final FlumePersistentManager manager;
484         private final Gate gate;
485         private final SecretKey secretKey;
486         private final int batchSize;
487         private final AtomicLong dbCounter;
488         private final int lockTimeoutRetryCount;
489 
490         public WriterThread(final Database database, final Environment environment,
491                             final FlumePersistentManager manager, final Gate gate, final int batchsize,
492                             final SecretKey secretKey, final AtomicLong dbCount, final int lockTimeoutRetryCount) {
493             this.database = database;
494             this.environment = environment;
495             this.manager = manager;
496             this.gate = gate;
497             this.batchSize = batchsize;
498             this.secretKey = secretKey;
499             this.setDaemon(true);
500             this.dbCounter = dbCount;
501             this.lockTimeoutRetryCount = lockTimeoutRetryCount;
502         }
503 
504         public void shutdown() {
505             LOGGER.debug("Writer thread shutting down");
506             this.shutdown = true;
507             gate.open();
508         }
509 
510         public boolean isShutdown() {
511             return shutdown;
512         }
513 
514         @Override
515         public void run() {
516             LOGGER.trace("WriterThread started - batch size = " + batchSize + ", delayMillis = " + manager.getDelayMillis());
517             long nextBatchMillis = System.currentTimeMillis() + manager.getDelayMillis();
518             while (!shutdown) {
519                 final long nowMillis = System.currentTimeMillis();
520                 final long dbCount = database.count();
521                 dbCounter.set(dbCount);
522                 if (dbCount >= batchSize || dbCount > 0 && nextBatchMillis <= nowMillis) {
523                     nextBatchMillis = nowMillis + manager.getDelayMillis();
524                     try {
525                         boolean errors = false;
526                         final DatabaseEntry key = new DatabaseEntry();
527                         final DatabaseEntry data = new DatabaseEntry();
528 
529                         gate.close();
530                         OperationStatus status;
531                         if (batchSize > 1) {
532                             try {
533                                 errors = sendBatch(key, data);
534                             } catch (final Exception ex) {
535                                 break;
536                             }
537                         } else {
538                             Exception exception = null;
539                             for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
540                                 exception = null;
541                                 Transaction txn = null;
542                                 Cursor cursor = null;
543                                 try {
544                                     txn = environment.beginTransaction(null, null);
545                                     cursor = database.openCursor(txn, null);
546                                     try {
547                                         status = cursor.getFirst(key, data, LockMode.RMW);
548                                         while (status == OperationStatus.SUCCESS) {
549                                             final SimpleEvent event = createEvent(data);
550                                             if (event != null) {
551                                                 try {
552                                                     manager.doSend(event);
553                                                 } catch (final Exception ioe) {
554                                                     errors = true;
555                                                     LOGGER.error("Error sending event", ioe);
556                                                     break;
557                                                 }
558                                                 try {
559                                                     cursor.delete();
560                                                 } catch (final Exception ex) {
561                                                     LOGGER.error("Unable to delete event", ex);
562                                                 }
563                                             }
564                                             status = cursor.getNext(key, data, LockMode.RMW);
565                                         }
566                                         if (cursor != null) {
567                                             cursor.close();
568                                             cursor = null;
569                                         }
570                                         txn.commit();
571                                         txn = null;
572                                         dbCounter.decrementAndGet();
573                                         exception = null;
574                                         break;
575                                     } catch (final LockConflictException lce) {
576                                         exception = lce;
577                                         // Fall through and retry.
578                                     } catch (final Exception ex) {
579                                         LOGGER.error("Error reading or writing to database", ex);
580                                         shutdown = true;
581                                         break;
582                                     } finally {
583                                         if (cursor != null) {
584                                             cursor.close();
585                                             cursor = null;
586                                         }
587                                         if (txn != null) {
588                                             txn.abort();
589                                             txn = null;
590                                         }
591                                     }
592                                 } catch (final LockConflictException lce) {
593                                     exception = lce;
594                                     if (cursor != null) {
595                                         try {
596                                             cursor.close();
597                                             cursor = null;
598                                         } catch (final Exception ex) {
599                                             LOGGER.trace("Ignored exception closing cursor during lock conflict.");
600                                         }
601                                     }
602                                     if (txn != null) {
603                                         try {
604                                             txn.abort();
605                                             txn = null;
606                                         } catch (final Exception ex) {
607                                             LOGGER.trace("Ignored exception aborting tx during lock conflict.");
608                                         }
609                                     }
610                                 }
611                                 try {
612                                     Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
613                                 } catch (final InterruptedException ie) {
614                                     // Ignore the error
615                                 }
616                             }
617                             if (exception != null) {
618                                 LOGGER.error("Unable to read or update data base", exception);
619                             }
620                         }
621                         if (errors) {
622                             Thread.sleep(manager.getDelayMillis());
623                             continue;
624                         }
625                     } catch (final Exception ex) {
626                         LOGGER.warn("WriterThread encountered an exception. Continuing.", ex);
627                     }
628                 } else {
629                     if (nextBatchMillis <= nowMillis) {
630                         nextBatchMillis = nowMillis + manager.getDelayMillis();
631                     }
632                     try {
633                         final long interval = nextBatchMillis - nowMillis;
634                         gate.waitForOpen(interval);
635                     } catch (final InterruptedException ie) {
636                         LOGGER.warn("WriterThread interrupted, continuing");
637                     } catch (final Exception ex) {
638                         LOGGER.error("WriterThread encountered an exception waiting for work", ex);
639                         break;
640                     }
641                 }
642             }
643 
644             if (batchSize > 1 && database.count() > 0) {
645                 final DatabaseEntry key = new DatabaseEntry();
646                 final DatabaseEntry data = new DatabaseEntry();
647                 try {
648                     sendBatch(key, data);
649                 } catch (final Exception ex) {
650                     LOGGER.warn("Unable to write final batch");
651                 }
652             }
653             LOGGER.trace("WriterThread exiting");
654         }
655 
656         private boolean sendBatch(DatabaseEntry key, final DatabaseEntry data) throws Exception {
657             boolean errors = false;
658             OperationStatus status;
659             Cursor cursor = null;
660             try {
661             	final BatchEvent batch = new BatchEvent();
662             	for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
663             		try {
664             			cursor = database.openCursor(null, CursorConfig.DEFAULT);
665             			status = cursor.getFirst(key, data, null);
666 
667             			for (int i = 0; status == OperationStatus.SUCCESS && i < batchSize; ++i) {
668             				final SimpleEvent event = createEvent(data);
669             				if (event != null) {
670             					batch.addEvent(event);
671             				}
672             				status = cursor.getNext(key, data, null);
673             			}
674             			break;
675             		} catch (final LockConflictException lce) {
676             			if (cursor != null) {
677             				try {
678                                 cursor.close();
679                                 cursor = null;
680                             } catch (final Exception ex) {
681                                 LOGGER.trace("Ignored exception closing cursor during lock conflict.");
682                             }
683                         }
684                     }
685             	}
686 
687                 try {
688                     manager.send(batch);
689                 } catch (final Exception ioe) {
690                     LOGGER.error("Error sending events", ioe);
691                     errors = true;
692                 }
693                 if (!errors) {
694                 	if (cursor != null) {
695 	                    cursor.close();
696 	                    cursor = null;
697                 	}
698                     Transaction txn = null;
699                     Exception exception = null;
700                     for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
701                         try {
702                             txn = environment.beginTransaction(null, null);
703                             try {
704                                 for (final Event event : batch.getEvents()) {
705                                     try {
706                                         final Map<String, String> headers = event.getHeaders();
707                                         key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
708                                         database.delete(txn, key);
709                                     } catch (final Exception ex) {
710                                         LOGGER.error("Error deleting key from database", ex);
711                                     }
712                                 }
713                                 txn.commit();
714                                 long count = dbCounter.get();
715                                 while (!dbCounter.compareAndSet(count, count - batch.getEvents().size())) {
716                                     count = dbCounter.get();
717                                 }
718                                 exception = null;
719                                 break;
720                             } catch (final LockConflictException lce) {
721                                 exception = lce;
722                                 if (cursor != null) {
723                                     try {
724                                         cursor.close();
725                                         cursor = null;
726                                     } catch (final Exception ex) {
727                                         LOGGER.trace("Ignored exception closing cursor during lock conflict.");
728                                     }
729                                 }
730                                 if (txn != null) {
731                                     try {
732                                         txn.abort();
733                                         txn = null;
734                                     } catch (final Exception ex) {
735                                         LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
736                                     }
737                                 }
738                             } catch (final Exception ex) {
739                                 LOGGER.error("Unable to commit transaction", ex);
740                                 if (txn != null) {
741                                     txn.abort();
742                                 }
743                             }
744                         } catch (final LockConflictException lce) {
745                             exception = lce;
746                             if (cursor != null) {
747                                 try {
748                                     cursor.close();
749                                     cursor = null;
750                                 } catch (final Exception ex) {
751                                     LOGGER.trace("Ignored exception closing cursor during lock conflict.");
752                                 }
753                             }
754                             if (txn != null) {
755                                 try {
756                                     txn.abort();
757                                     txn = null;
758                                 } catch (final Exception ex) {
759                                     LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
760                                 }
761                             }
762                         } finally {
763                             if (cursor != null) {
764                                 cursor.close();
765                                 cursor = null;
766                             }
767                             if (txn != null) {
768                                 txn.abort();
769                                 txn = null;
770                             }
771                         }
772                         try {
773                             Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
774                         } catch (final InterruptedException ie) {
775                             // Ignore the error
776                         }
777                     }
778                     if (exception != null) {
779                         LOGGER.error("Unable to delete events from data base", exception);
780                     }
781                 }
782             } catch (final Exception ex) {
783                 LOGGER.error("Error reading database", ex);
784                 shutdown = true;
785                 throw ex;
786             } finally {
787                 if (cursor != null) {
788                     cursor.close();
789                 }
790             }
791 
792             return errors;
793         }
794 
795         private SimpleEvent createEvent(final DatabaseEntry data) {
796             final SimpleEvent event = new SimpleEvent();
797             try {
798                 byte[] eventData = data.getData();
799                 if (secretKey != null) {
800                     final Cipher cipher = Cipher.getInstance("AES");
801                     cipher.init(Cipher.DECRYPT_MODE, secretKey);
802                     eventData = cipher.doFinal(eventData);
803                 }
804                 final ByteArrayInputStream bais = new ByteArrayInputStream(eventData);
805                 final DataInputStream dais = new DataInputStream(bais);
806                 int length = dais.readInt();
807                 final byte[] bytes = new byte[length];
808                 dais.read(bytes, 0, length);
809                 event.setBody(bytes);
810                 length = dais.readInt();
811                 final Map<String, String> map = new HashMap<>(length);
812                 for (int i = 0; i < length; ++i) {
813                     final String headerKey = dais.readUTF();
814                     final String value = dais.readUTF();
815                     map.put(headerKey, value);
816                 }
817                 event.setHeaders(map);
818                 return event;
819             } catch (final Exception ex) {
820                 LOGGER.error("Error retrieving event", ex);
821                 return null;
822             }
823         }
824 
825     }
826 
827     /**
828      * Factory that creates Daemon threads that can be properly shut down.
829      */
830     private static class DaemonThreadFactory implements ThreadFactory {
831         private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
832         private final ThreadGroup group;
833         private final AtomicInteger threadNumber = new AtomicInteger(1);
834         private final String namePrefix;
835 
836         public DaemonThreadFactory() {
837             final SecurityManager securityManager = System.getSecurityManager();
838             group = securityManager != null ? securityManager.getThreadGroup() :
839                 Thread.currentThread().getThreadGroup();
840             namePrefix = "DaemonPool-" + POOL_NUMBER.getAndIncrement() + "-thread-";
841         }
842 
843         @Override
844         public Thread newThread(final Runnable r) {
845             final Thread thread = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
846             thread.setDaemon(true);
847             if (thread.getPriority() != Thread.NORM_PRIORITY) {
848                 thread.setPriority(Thread.NORM_PRIORITY);
849             }
850             return thread;
851         }
852     }
853 
854     /**
855      * An internal class.
856      */
857     private static class Gate {
858 
859         private boolean isOpen = false;
860 
861         public boolean isOpen() {
862             return isOpen;
863         }
864 
865         public synchronized void open() {
866             isOpen = true;
867             notifyAll();
868         }
869 
870         public synchronized void close() {
871             isOpen = false;
872         }
873 
874         public synchronized void waitForOpen(final long timeout) throws InterruptedException {
875             wait(timeout);
876         }
877     }
878 }