View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.flume.appender;
18  
19  import java.io.ByteArrayInputStream;
20  import java.io.ByteArrayOutputStream;
21  import java.io.DataInputStream;
22  import java.io.DataOutputStream;
23  import java.io.File;
24  import java.nio.charset.Charset;
25  import java.util.HashMap;
26  import java.util.Map;
27  import java.util.concurrent.Callable;
28  import java.util.concurrent.ExecutorService;
29  import java.util.concurrent.Executors;
30  import java.util.concurrent.Future;
31  import java.util.concurrent.ThreadFactory;
32  import java.util.concurrent.TimeUnit;
33  import java.util.concurrent.atomic.AtomicInteger;
34  import java.util.concurrent.atomic.AtomicLong;
35  
36  import javax.crypto.Cipher;
37  import javax.crypto.SecretKey;
38  
39  import org.apache.flume.Event;
40  import org.apache.flume.event.SimpleEvent;
41  import org.apache.logging.log4j.LoggingException;
42  import org.apache.logging.log4j.core.appender.ManagerFactory;
43  import org.apache.logging.log4j.core.config.Property;
44  import org.apache.logging.log4j.core.config.plugins.util.PluginManager;
45  import org.apache.logging.log4j.core.config.plugins.util.PluginType;
46  import org.apache.logging.log4j.core.util.FileUtils;
47  import org.apache.logging.log4j.core.util.SecretKeyProvider;
48  import org.apache.logging.log4j.util.Strings;
49  
50  import com.sleepycat.je.Cursor;
51  import com.sleepycat.je.CursorConfig;
52  import com.sleepycat.je.Database;
53  import com.sleepycat.je.DatabaseConfig;
54  import com.sleepycat.je.DatabaseEntry;
55  import com.sleepycat.je.Environment;
56  import com.sleepycat.je.EnvironmentConfig;
57  import com.sleepycat.je.LockConflictException;
58  import com.sleepycat.je.LockMode;
59  import com.sleepycat.je.OperationStatus;
60  import com.sleepycat.je.StatsConfig;
61  import com.sleepycat.je.Transaction;
62  
63  /**
64   * Manager that persists data to Berkeley DB before passing it on to Flume.
65   */
66  public class FlumePersistentManager extends FlumeAvroManager {
67  
68      /** Attribute name for the key provider. */
69      public static final String KEY_PROVIDER = "keyProvider";
70  
71      private static final Charset UTF8 = Charset.forName("UTF-8");
72  
73      private static final String DEFAULT_DATA_DIR = ".log4j/flumeData";
74  
75      private static final int SHUTDOWN_WAIT = 60;
76  
77      private static final int MILLIS_PER_SECOND = 1000;
78  
79      private static final int LOCK_TIMEOUT_SLEEP_MILLIS = 500;
80  
81      private static BDBManagerFactory factory = new BDBManagerFactory();
82  
83      private final Database database;
84  
85      private final Environment environment;
86  
87      private final WriterThread worker;
88  
89      private final Gate gate = new Gate();
90  
91      private final SecretKey secretKey;
92  
93      private final int lockTimeoutRetryCount;
94  
95      private final ExecutorService threadPool;
96  
97      private final AtomicLong dbCount = new AtomicLong();
98  
99      /**
100      * Constructor
101      * @param name The unique name of this manager.
102      * @param shortName Original name for the Manager.
103      * @param agents An array of Agents.
104      * @param batchSize The number of events to include in a batch.
105      * @param retries The number of times to retry connecting before giving up.
106      * @param connectionTimeout The amount of time to wait for a connection to be established.
107      * @param requestTimeout The amount of time to wair for a response to a request.
108      * @param delay The amount of time to wait between retries.
109      * @param database The database to write to.
110      * @param environment The database environment.
111      * @param secretKey The SecretKey to use for encryption.
112      * @param lockTimeoutRetryCount The number of times to retry a lock timeout.
113      */
114     protected FlumePersistentManager(final String name, final String shortName, final Agent[] agents,
115                                      final int batchSize, final int retries, final int connectionTimeout,
116                                      final int requestTimeout, final int delay, final Database database,
117                                      final Environment environment, final SecretKey secretKey,
118                                      final int lockTimeoutRetryCount) {
119         super(name, shortName, agents, batchSize, delay, retries, connectionTimeout, requestTimeout);
120         this.database = database;
121         this.environment = environment;
122         dbCount.set(database.count());
123         this.worker = new WriterThread(database, environment, this, gate, batchSize, secretKey, dbCount,
124             lockTimeoutRetryCount);
125         this.worker.start();
126         this.secretKey = secretKey;
127         this.threadPool = Executors.newCachedThreadPool(new DaemonThreadFactory());
128         this.lockTimeoutRetryCount = lockTimeoutRetryCount;
129     }
130 
131 
132     /**
133      * Returns a FlumeAvroManager.
134      * @param name The name of the manager.
135      * @param agents The agents to use.
136      * @param properties Properties to pass to the Manager.
137      * @param batchSize The number of events to include in a batch.
138      * @param retries The number of times to retry connecting before giving up.
139      * @param connectionTimeout The amount of time to wait to establish a connection.
140      * @param requestTimeout The amount of time to wait for a response to a request.
141      * @param delayMillis Amount of time to delay before delivering a batch.
142      * @param lockTimeoutRetryCount The number of times to retry after a lock timeout.
143      * @param dataDir The location of the Berkeley database.
144      * @return A FlumeAvroManager.
145      */
146     public static FlumePersistentManager getManager(final String name, final Agent[] agents,
147                                                     final Property[] properties, int batchSize, final int retries,
148                                                     final int connectionTimeout, final int requestTimeout,
149                                                     final int delayMillis, final int lockTimeoutRetryCount,
150                                                     final String dataDir) {
151         if (agents == null || agents.length == 0) {
152             throw new IllegalArgumentException("At least one agent is required");
153         }
154 
155         if (batchSize <= 0) {
156             batchSize = 1;
157         }
158         final String dataDirectory = Strings.isEmpty(dataDir) ? DEFAULT_DATA_DIR : dataDir;
159 
160         final StringBuilder sb = new StringBuilder("FlumePersistent[");
161         boolean first = true;
162         for (final Agent agent : agents) {
163             if (!first) {
164                 sb.append(',');
165             }
166             sb.append(agent.getHost()).append(':').append(agent.getPort());
167             first = false;
168         }
169         sb.append(']');
170         sb.append(' ').append(dataDirectory);
171         return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries,
172             connectionTimeout, requestTimeout, delayMillis, lockTimeoutRetryCount, dataDir, properties));
173     }
174 
175     @Override
176     public void send(final Event event)  {
177         if (worker.isShutdown()) {
178             throw new LoggingException("Unable to record event");
179         }
180 
181         final Map<String, String> headers = event.getHeaders();
182         final byte[] keyData = headers.get(FlumeEvent.GUID).getBytes(UTF8);
183         try {
184             final ByteArrayOutputStream baos = new ByteArrayOutputStream();
185             final DataOutputStream daos = new DataOutputStream(baos);
186             daos.writeInt(event.getBody().length);
187             daos.write(event.getBody(), 0, event.getBody().length);
188             daos.writeInt(event.getHeaders().size());
189             for (final Map.Entry<String, String> entry : headers.entrySet()) {
190                 daos.writeUTF(entry.getKey());
191                 daos.writeUTF(entry.getValue());
192             }
193             byte[] eventData = baos.toByteArray();
194             if (secretKey != null) {
195                 final Cipher cipher = Cipher.getInstance("AES");
196                 cipher.init(Cipher.ENCRYPT_MODE, secretKey);
197                 eventData = cipher.doFinal(eventData);
198             }
199             final Future<Integer> future = threadPool.submit(new BDBWriter(keyData, eventData, environment, database,
200                 gate, dbCount, getBatchSize(), lockTimeoutRetryCount));
201             boolean interrupted = false;
202             int ieCount = 0;
203             do {
204                 try {
205                     future.get();
206                 } catch (final InterruptedException ie) {
207                     interrupted = true;
208                     ++ieCount;
209                 }
210             } while (interrupted && ieCount <= 1);
211 
212         } catch (final Exception ex) {
213             throw new LoggingException("Exception occurred writing log event", ex);
214         }
215     }
216 
217     @Override
218     protected void releaseSub() {
219         LOGGER.debug("Shutting down FlumePersistentManager");
220         worker.shutdown();
221         try {
222             worker.join(SHUTDOWN_WAIT * MILLIS_PER_SECOND);
223         } catch (final InterruptedException ie) {
224             // Ignore the exception and shutdown.
225         }
226         threadPool.shutdown();
227         try {
228             threadPool.awaitTermination(SHUTDOWN_WAIT, TimeUnit.SECONDS);
229         } catch (final InterruptedException ie) {
230             LOGGER.warn("PersistentManager Thread pool failed to shut down");
231         }
232         try {
233             worker.join();
234         } catch (final InterruptedException ex) {
235             LOGGER.debug("Interrupted while waiting for worker to complete");
236         }
237         try {
238             LOGGER.debug("FlumePersistenceManager dataset status: {}", database.getStats(new StatsConfig()));
239             database.close();
240         } catch (final Exception ex) {
241             LOGGER.warn("Failed to close database", ex);
242         }
243         try {
244             environment.cleanLog();
245             environment.close();
246         } catch (final Exception ex) {
247             LOGGER.warn("Failed to close environment", ex);
248         }
249         super.releaseSub();
250     }
251 
252     private void doSend(final SimpleEvent event) {
253         LOGGER.debug("Sending event to Flume");
254         super.send(event);
255     }
256 
257     /**
258      * Thread for writing to Berkeley DB to avoid having interrupts close the database.
259      */
260     private static class BDBWriter implements Callable<Integer> {
261         private final byte[] eventData;
262         private final byte[] keyData;
263         private final Environment environment;
264         private final Database database;
265         private final Gate gate;
266         private final AtomicLong dbCount;
267         private final long batchSize;
268         private final int lockTimeoutRetryCount;
269 
270         public BDBWriter(final byte[] keyData, final byte[] eventData, final Environment environment,
271                          final Database database, final Gate gate, final AtomicLong dbCount, final long batchSize,
272                          final int lockTimeoutRetryCount) {
273             this.keyData = keyData;
274             this.eventData = eventData;
275             this.environment = environment;
276             this.database = database;
277             this.gate = gate;
278             this.dbCount = dbCount;
279             this.batchSize = batchSize;
280             this.lockTimeoutRetryCount = lockTimeoutRetryCount;
281         }
282 
283         @Override
284         public Integer call() throws Exception {
285             final DatabaseEntry key = new DatabaseEntry(keyData);
286             final DatabaseEntry data = new DatabaseEntry(eventData);
287             Exception exception = null;
288             for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
289                 Transaction txn = null;
290                 try {
291                     txn = environment.beginTransaction(null, null);
292                     try {
293                         database.put(txn, key, data);
294                         txn.commit();
295                         txn = null;
296                         if (dbCount.incrementAndGet() >= batchSize) {
297                             gate.open();
298                         }
299                         exception = null;
300                         break;
301                     } catch (final LockConflictException lce) {
302                         exception = lce;
303                         // Fall through and retry.
304                     } catch (final Exception ex) {
305                         if (txn != null) {
306                             txn.abort();
307                         }
308                         throw ex;
309                     } finally {
310                         if (txn != null) {
311                             txn.abort();
312                             txn = null;
313                         }
314                     }
315                 } catch (final LockConflictException lce) {
316                     exception = lce;
317                     if (txn != null) {
318                         try {
319                             txn.abort();
320                             txn = null;
321                         } catch (final Exception ex) {
322                             LOGGER.trace("Ignoring exception while aborting transaction during lock conflict.");
323                         }
324                     }
325 
326                 }
327                 try {
328                     Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
329                 } catch (final InterruptedException ie) {
330                     // Ignore the error
331                 }
332             }
333             if (exception != null) {
334                 throw exception;
335             }
336             return eventData.length;
337         }
338     }
339 
340     /**
341      * Factory data.
342      */
343     private static class FactoryData {
344         private final String name;
345         private final Agent[] agents;
346         private final int batchSize;
347         private final String dataDir;
348         private final int retries;
349         private final int connectionTimeout;
350         private final int requestTimeout;
351         private final int delayMillis;
352         private final int lockTimeoutRetryCount;
353         private final Property[] properties;
354 
355         /**
356          * Constructor.
357          * @param name The name of the Appender.
358          * @param agents The agents.
359          * @param batchSize The number of events to include in a batch.
360          * @param dataDir The directory for data.
361          */
362         public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries,
363                            final int connectionTimeout, final int requestTimeout, final int delayMillis,
364                            final int lockTimeoutRetryCount, final String dataDir, final Property[] properties) {
365             this.name = name;
366             this.agents = agents;
367             this.batchSize = batchSize;
368             this.dataDir = dataDir;
369             this.retries = retries;
370             this.connectionTimeout = connectionTimeout;
371             this.requestTimeout = requestTimeout;
372             this.delayMillis = delayMillis;
373             this.lockTimeoutRetryCount = lockTimeoutRetryCount;
374             this.properties = properties;
375         }
376     }
377 
378     /**
379      * Avro Manager Factory.
380      */
381     private static class BDBManagerFactory implements ManagerFactory<FlumePersistentManager, FactoryData> {
382 
383         /**
384          * Create the FlumeKratiManager.
385          * @param name The name of the entity to manage.
386          * @param data The data required to create the entity.
387          * @return The FlumeKratiManager.
388          */
389         @Override
390         public FlumePersistentManager createManager(final String name, final FactoryData data) {
391             SecretKey secretKey = null;
392             Database database = null;
393             Environment environment = null;
394 
395             final Map<String, String> properties = new HashMap<>();
396             if (data.properties != null) {
397                 for (final Property property : data.properties) {
398                     properties.put(property.getName(), property.getValue());
399                 }
400             }
401 
402             try {
403                 final File dir = new File(data.dataDir);
404                 FileUtils.mkdir(dir, true);
405                 final EnvironmentConfig dbEnvConfig = new EnvironmentConfig();
406                 dbEnvConfig.setTransactional(true);
407                 dbEnvConfig.setAllowCreate(true);
408                 dbEnvConfig.setLockTimeout(5, TimeUnit.SECONDS);
409                 environment = new Environment(dir, dbEnvConfig);
410                 final DatabaseConfig dbConfig = new DatabaseConfig();
411                 dbConfig.setTransactional(true);
412                 dbConfig.setAllowCreate(true);
413                 database = environment.openDatabase(null, name, dbConfig);
414             } catch (final Exception ex) {
415                 LOGGER.error("Could not create FlumePersistentManager", ex);
416                 // For consistency, close database as well as environment even though it should never happen since the
417                 // database is that last thing in the block above, but this does guard against a future line being
418                 // inserted at the end that would bomb (like some debug logging).
419                 if (database != null) {
420                     database.close();
421                     database = null;
422                 }
423                 if (environment != null) {
424                     environment.close();
425                     environment = null;
426                 }
427                 return null;
428             }
429 
430             try {
431                 String key = null;
432                 for (final Map.Entry<String, String> entry : properties.entrySet()) {
433                     if (entry.getKey().equalsIgnoreCase(KEY_PROVIDER)) {
434                         key = entry.getValue();
435                         break;
436                     }
437                 }
438                 if (key != null) {
439                     final PluginManager manager = new PluginManager("KeyProvider");
440                     manager.collectPlugins();
441                     final Map<String, PluginType<?>> plugins = manager.getPlugins();
442                     if (plugins != null) {
443                         boolean found = false;
444                         for (final Map.Entry<String, PluginType<?>> entry : plugins.entrySet()) {
445                             if (entry.getKey().equalsIgnoreCase(key)) {
446                                 found = true;
447                                 final Class<?> cl = entry.getValue().getPluginClass();
448                                 try {
449                                     final SecretKeyProvider provider = (SecretKeyProvider) cl.newInstance();
450                                     secretKey = provider.getSecretKey();
451                                     LOGGER.debug("Persisting events using SecretKeyProvider {}", cl.getName());
452                                 } catch (final Exception ex) {
453                                     LOGGER.error("Unable to create SecretKeyProvider {}, encryption will be disabled",
454                                         cl.getName());
455                                 }
456                                 break;
457                             }
458                         }
459                         if (!found) {
460                             LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
461                         }
462                     } else {
463                         LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
464                     }
465                 }
466             } catch (final Exception ex) {
467                 LOGGER.warn("Error setting up encryption - encryption will be disabled", ex);
468             }
469             return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries,
470                 data.connectionTimeout, data.requestTimeout, data.delayMillis, database, environment, secretKey,
471                 data.lockTimeoutRetryCount);
472         }
473     }
474 
475     /**
476      * Thread that sends data to Flume and pulls it from Berkeley DB.
477      */
478     private static class WriterThread extends Thread  {
479         private volatile boolean shutdown = false;
480         private final Database database;
481         private final Environment environment;
482         private final FlumePersistentManager manager;
483         private final Gate gate;
484         private final SecretKey secretKey;
485         private final int batchSize;
486         private final AtomicLong dbCounter;
487         private final int lockTimeoutRetryCount;
488 
489         public WriterThread(final Database database, final Environment environment,
490                             final FlumePersistentManager manager, final Gate gate, final int batchsize,
491                             final SecretKey secretKey, final AtomicLong dbCount, final int lockTimeoutRetryCount) {
492             this.database = database;
493             this.environment = environment;
494             this.manager = manager;
495             this.gate = gate;
496             this.batchSize = batchsize;
497             this.secretKey = secretKey;
498             this.setDaemon(true);
499             this.dbCounter = dbCount;
500             this.lockTimeoutRetryCount = lockTimeoutRetryCount;
501         }
502 
503         public void shutdown() {
504             LOGGER.debug("Writer thread shutting down");
505             this.shutdown = true;
506             gate.open();
507         }
508 
509         public boolean isShutdown() {
510             return shutdown;
511         }
512 
513         @Override
514         public void run() {
515             LOGGER.trace("WriterThread started - batch size = " + batchSize + ", delayMillis = " + manager.getDelayMillis());
516             long nextBatchMillis = System.currentTimeMillis() + manager.getDelayMillis();
517             while (!shutdown) {
518                 final long nowMillis = System.currentTimeMillis();
519                 final long dbCount = database.count();
520                 dbCounter.set(dbCount);
521                 if (dbCount >= batchSize || dbCount > 0 && nextBatchMillis <= nowMillis) {
522                     nextBatchMillis = nowMillis + manager.getDelayMillis();
523                     try {
524                         boolean errors = false;
525                         final DatabaseEntry key = new DatabaseEntry();
526                         final DatabaseEntry data = new DatabaseEntry();
527 
528                         gate.close();
529                         OperationStatus status;
530                         if (batchSize > 1) {
531                             try {
532                                 errors = sendBatch(key, data);
533                             } catch (final Exception ex) {
534                                 break;
535                             }
536                         } else {
537                             Exception exception = null;
538                             for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
539                                 exception = null;
540                                 Transaction txn = null;
541                                 Cursor cursor = null;
542                                 try {
543                                     txn = environment.beginTransaction(null, null);
544                                     cursor = database.openCursor(txn, null);
545                                     try {
546                                         status = cursor.getFirst(key, data, LockMode.RMW);
547                                         while (status == OperationStatus.SUCCESS) {
548                                             final SimpleEvent event = createEvent(data);
549                                             if (event != null) {
550                                                 try {
551                                                     manager.doSend(event);
552                                                 } catch (final Exception ioe) {
553                                                     errors = true;
554                                                     LOGGER.error("Error sending event", ioe);
555                                                     break;
556                                                 }
557                                                 try {
558                                                     cursor.delete();
559                                                 } catch (final Exception ex) {
560                                                     LOGGER.error("Unable to delete event", ex);
561                                                 }
562                                             }
563                                             status = cursor.getNext(key, data, LockMode.RMW);
564                                         }
565                                         if (cursor != null) {
566                                             cursor.close();
567                                             cursor = null;
568                                         }
569                                         txn.commit();
570                                         txn = null;
571                                         dbCounter.decrementAndGet();
572                                         exception = null;
573                                         break;
574                                     } catch (final LockConflictException lce) {
575                                         exception = lce;
576                                         // Fall through and retry.
577                                     } catch (final Exception ex) {
578                                         LOGGER.error("Error reading or writing to database", ex);
579                                         shutdown = true;
580                                         break;
581                                     } finally {
582                                         if (cursor != null) {
583                                             cursor.close();
584                                             cursor = null;
585                                         }
586                                         if (txn != null) {
587                                             txn.abort();
588                                             txn = null;
589                                         }
590                                     }
591                                 } catch (final LockConflictException lce) {
592                                     exception = lce;
593                                     if (cursor != null) {
594                                         try {
595                                             cursor.close();
596                                             cursor = null;
597                                         } catch (final Exception ex) {
598                                             LOGGER.trace("Ignored exception closing cursor during lock conflict.");
599                                         }
600                                     }
601                                     if (txn != null) {
602                                         try {
603                                             txn.abort();
604                                             txn = null;
605                                         } catch (final Exception ex) {
606                                             LOGGER.trace("Ignored exception aborting tx during lock conflict.");
607                                         }
608                                     }
609                                 }
610                                 try {
611                                     Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
612                                 } catch (final InterruptedException ie) {
613                                     // Ignore the error
614                                 }
615                             }
616                             if (exception != null) {
617                                 LOGGER.error("Unable to read or update data base", exception);
618                             }
619                         }
620                         if (errors) {
621                             Thread.sleep(manager.getDelayMillis());
622                             continue;
623                         }
624                     } catch (final Exception ex) {
625                         LOGGER.warn("WriterThread encountered an exception. Continuing.", ex);
626                     }
627                 } else {
628                     if (nextBatchMillis <= nowMillis) {
629                         nextBatchMillis = nowMillis + manager.getDelayMillis();
630                     }
631                     try {
632                         final long interval = nextBatchMillis - nowMillis;
633                         gate.waitForOpen(interval);
634                     } catch (final InterruptedException ie) {
635                         LOGGER.warn("WriterThread interrupted, continuing");
636                     } catch (final Exception ex) {
637                         LOGGER.error("WriterThread encountered an exception waiting for work", ex);
638                         break;
639                     }
640                 }
641             }
642 
643             if (batchSize > 1 && database.count() > 0) {
644                 final DatabaseEntry key = new DatabaseEntry();
645                 final DatabaseEntry data = new DatabaseEntry();
646                 try {
647                     sendBatch(key, data);
648                 } catch (final Exception ex) {
649                     LOGGER.warn("Unable to write final batch");
650                 }
651             }
652             LOGGER.trace("WriterThread exiting");
653         }
654 
655         private boolean sendBatch(DatabaseEntry key, final DatabaseEntry data) throws Exception {
656             boolean errors = false;
657             OperationStatus status;
658             Cursor cursor = null;
659             try {
660             	final BatchEvent batch = new BatchEvent();
661             	for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
662             		try {
663             			cursor = database.openCursor(null, CursorConfig.DEFAULT);
664             			status = cursor.getFirst(key, data, null);
665 
666             			for (int i = 0; status == OperationStatus.SUCCESS && i < batchSize; ++i) {
667             				final SimpleEvent event = createEvent(data);
668             				if (event != null) {
669             					batch.addEvent(event);
670             				}
671             				status = cursor.getNext(key, data, null);
672             			}
673             			break;
674             		} catch (final LockConflictException lce) {
675             			if (cursor != null) {
676             				try {
677                                 cursor.close();
678                                 cursor = null;
679                             } catch (final Exception ex) {
680                                 LOGGER.trace("Ignored exception closing cursor during lock conflict.");
681                             }
682                         }
683                     }
684             	}
685 
686                 try {
687                     manager.send(batch);
688                 } catch (final Exception ioe) {
689                     LOGGER.error("Error sending events", ioe);
690                     errors = true;
691                 }
692                 if (!errors) {
693                 	if (cursor != null) {
694 	                    cursor.close();
695 	                    cursor = null;
696                 	}
697                     Transaction txn = null;
698                     Exception exception = null;
699                     for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
700                         try {
701                             txn = environment.beginTransaction(null, null);
702                             try {
703                                 for (final Event event : batch.getEvents()) {
704                                     try {
705                                         final Map<String, String> headers = event.getHeaders();
706                                         key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
707                                         database.delete(txn, key);
708                                     } catch (final Exception ex) {
709                                         LOGGER.error("Error deleting key from database", ex);
710                                     }
711                                 }
712                                 txn.commit();
713                                 long count = dbCounter.get();
714                                 while (!dbCounter.compareAndSet(count, count - batch.getEvents().size())) {
715                                     count = dbCounter.get();
716                                 }
717                                 exception = null;
718                                 break;
719                             } catch (final LockConflictException lce) {
720                                 exception = lce;
721                                 if (cursor != null) {
722                                     try {
723                                         cursor.close();
724                                         cursor = null;
725                                     } catch (final Exception ex) {
726                                         LOGGER.trace("Ignored exception closing cursor during lock conflict.");
727                                     }
728                                 }
729                                 if (txn != null) {
730                                     try {
731                                         txn.abort();
732                                         txn = null;
733                                     } catch (final Exception ex) {
734                                         LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
735                                     }
736                                 }
737                             } catch (final Exception ex) {
738                                 LOGGER.error("Unable to commit transaction", ex);
739                                 if (txn != null) {
740                                     txn.abort();
741                                 }
742                             }
743                         } catch (final LockConflictException lce) {
744                             exception = lce;
745                             if (cursor != null) {
746                                 try {
747                                     cursor.close();
748                                     cursor = null;
749                                 } catch (final Exception ex) {
750                                     LOGGER.trace("Ignored exception closing cursor during lock conflict.");
751                                 }
752                             }
753                             if (txn != null) {
754                                 try {
755                                     txn.abort();
756                                     txn = null;
757                                 } catch (final Exception ex) {
758                                     LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
759                                 }
760                             }
761                         } finally {
762                             if (cursor != null) {
763                                 cursor.close();
764                                 cursor = null;
765                             }
766                             if (txn != null) {
767                                 txn.abort();
768                                 txn = null;
769                             }
770                         }
771                         try {
772                             Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
773                         } catch (final InterruptedException ie) {
774                             // Ignore the error
775                         }
776                     }
777                     if (exception != null) {
778                         LOGGER.error("Unable to delete events from data base", exception);
779                     }
780                 }
781             } catch (final Exception ex) {
782                 LOGGER.error("Error reading database", ex);
783                 shutdown = true;
784                 throw ex;
785             } finally {
786                 if (cursor != null) {
787                     cursor.close();
788                 }
789             }
790 
791             return errors;
792         }
793 
794         private SimpleEvent createEvent(final DatabaseEntry data) {
795             final SimpleEvent event = new SimpleEvent();
796             try {
797                 byte[] eventData = data.getData();
798                 if (secretKey != null) {
799                     final Cipher cipher = Cipher.getInstance("AES");
800                     cipher.init(Cipher.DECRYPT_MODE, secretKey);
801                     eventData = cipher.doFinal(eventData);
802                 }
803                 final ByteArrayInputStream bais = new ByteArrayInputStream(eventData);
804                 final DataInputStream dais = new DataInputStream(bais);
805                 int length = dais.readInt();
806                 final byte[] bytes = new byte[length];
807                 dais.read(bytes, 0, length);
808                 event.setBody(bytes);
809                 length = dais.readInt();
810                 final Map<String, String> map = new HashMap<>(length);
811                 for (int i = 0; i < length; ++i) {
812                     final String headerKey = dais.readUTF();
813                     final String value = dais.readUTF();
814                     map.put(headerKey, value);
815                 }
816                 event.setHeaders(map);
817                 return event;
818             } catch (final Exception ex) {
819                 LOGGER.error("Error retrieving event", ex);
820                 return null;
821             }
822         }
823 
824     }
825 
826     /**
827      * Factory that creates Daemon threads that can be properly shut down.
828      */
829     private static class DaemonThreadFactory implements ThreadFactory {
830         private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
831         private final ThreadGroup group;
832         private final AtomicInteger threadNumber = new AtomicInteger(1);
833         private final String namePrefix;
834 
835         public DaemonThreadFactory() {
836             final SecurityManager securityManager = System.getSecurityManager();
837             group = securityManager != null ? securityManager.getThreadGroup() :
838                 Thread.currentThread().getThreadGroup();
839             namePrefix = "DaemonPool-" + POOL_NUMBER.getAndIncrement() + "-thread-";
840         }
841 
842         @Override
843         public Thread newThread(final Runnable r) {
844             final Thread thread = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
845             thread.setDaemon(true);
846             if (thread.getPriority() != Thread.NORM_PRIORITY) {
847                 thread.setPriority(Thread.NORM_PRIORITY);
848             }
849             return thread;
850         }
851     }
852 
853     /**
854      * An internal class.
855      */
856     private static class Gate {
857 
858         private boolean isOpen = false;
859 
860         public boolean isOpen() {
861             return isOpen;
862         }
863 
864         public synchronized void open() {
865             isOpen = true;
866             notifyAll();
867         }
868 
869         public synchronized void close() {
870             isOpen = false;
871         }
872 
873         public synchronized void waitForOpen(final long timeout) throws InterruptedException {
874             wait(timeout);
875         }
876     }
877 }