001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017package org.apache.logging.log4j.flume.appender;
018
019import java.io.ByteArrayInputStream;
020import java.io.ByteArrayOutputStream;
021import java.io.DataInputStream;
022import java.io.DataOutputStream;
023import java.io.File;
024import java.nio.charset.Charset;
025import java.nio.charset.StandardCharsets;
026import java.util.HashMap;
027import java.util.Map;
028import java.util.concurrent.Callable;
029import java.util.concurrent.ExecutorService;
030import java.util.concurrent.Executors;
031import java.util.concurrent.Future;
032import java.util.concurrent.ThreadFactory;
033import java.util.concurrent.TimeUnit;
034import java.util.concurrent.atomic.AtomicInteger;
035import java.util.concurrent.atomic.AtomicLong;
036
037import javax.crypto.Cipher;
038import javax.crypto.SecretKey;
039
040import org.apache.flume.Event;
041import org.apache.flume.event.SimpleEvent;
042import org.apache.logging.log4j.LoggingException;
043import org.apache.logging.log4j.core.appender.ManagerFactory;
044import org.apache.logging.log4j.core.config.Property;
045import org.apache.logging.log4j.core.config.plugins.util.PluginManager;
046import org.apache.logging.log4j.core.config.plugins.util.PluginType;
047import org.apache.logging.log4j.core.util.FileUtils;
048import org.apache.logging.log4j.core.util.Log4jThread;
049import org.apache.logging.log4j.core.util.SecretKeyProvider;
050import org.apache.logging.log4j.util.Strings;
051
052import com.sleepycat.je.Cursor;
053import com.sleepycat.je.CursorConfig;
054import com.sleepycat.je.Database;
055import com.sleepycat.je.DatabaseConfig;
056import com.sleepycat.je.DatabaseEntry;
057import com.sleepycat.je.Environment;
058import com.sleepycat.je.EnvironmentConfig;
059import com.sleepycat.je.LockConflictException;
060import com.sleepycat.je.LockMode;
061import com.sleepycat.je.OperationStatus;
062import com.sleepycat.je.StatsConfig;
063import com.sleepycat.je.Transaction;
064
065/**
066 * Manager that persists data to Berkeley DB before passing it on to Flume.
067 */
068public class FlumePersistentManager extends FlumeAvroManager {
069
070    /** Attribute name for the key provider. */
071    public static final String KEY_PROVIDER = "keyProvider";
072
073    private static final Charset UTF8 = StandardCharsets.UTF_8;
074
075    private static final String DEFAULT_DATA_DIR = ".log4j/flumeData";
076
077    private static final int SHUTDOWN_WAIT = 60;
078
079    private static final int MILLIS_PER_SECOND = 1000;
080
081    private static final int LOCK_TIMEOUT_SLEEP_MILLIS = 500;
082
083    private static BDBManagerFactory factory = new BDBManagerFactory();
084
085    private final Database database;
086
087    private final Environment environment;
088
089    private final WriterThread worker;
090
091    private final Gate gate = new Gate();
092
093    private final SecretKey secretKey;
094
095    private final int lockTimeoutRetryCount;
096
097    private final ExecutorService threadPool;
098
099    private final AtomicLong dbCount = new AtomicLong();
100
101    /**
102     * Constructor
103     * @param name The unique name of this manager.
104     * @param shortName Original name for the Manager.
105     * @param agents An array of Agents.
106     * @param batchSize The number of events to include in a batch.
107     * @param retries The number of times to retry connecting before giving up.
108     * @param connectionTimeout The amount of time to wait for a connection to be established.
109     * @param requestTimeout The amount of time to wair for a response to a request.
110     * @param delay The amount of time to wait between retries.
111     * @param database The database to write to.
112     * @param environment The database environment.
113     * @param secretKey The SecretKey to use for encryption.
114     * @param lockTimeoutRetryCount The number of times to retry a lock timeout.
115     */
116    protected FlumePersistentManager(final String name, final String shortName, final Agent[] agents,
117                                     final int batchSize, final int retries, final int connectionTimeout,
118                                     final int requestTimeout, final int delay, final Database database,
119                                     final Environment environment, final SecretKey secretKey,
120                                     final int lockTimeoutRetryCount) {
121        super(name, shortName, agents, batchSize, delay, retries, connectionTimeout, requestTimeout);
122        this.database = database;
123        this.environment = environment;
124        dbCount.set(database.count());
125        this.worker = new WriterThread(database, environment, this, gate, batchSize, secretKey, dbCount,
126            lockTimeoutRetryCount);
127        this.worker.start();
128        this.secretKey = secretKey;
129        this.threadPool = Executors.newCachedThreadPool(new DaemonThreadFactory());
130        this.lockTimeoutRetryCount = lockTimeoutRetryCount;
131    }
132
133
134    /**
135     * Returns a FlumeAvroManager.
136     * @param name The name of the manager.
137     * @param agents The agents to use.
138     * @param properties Properties to pass to the Manager.
139     * @param batchSize The number of events to include in a batch.
140     * @param retries The number of times to retry connecting before giving up.
141     * @param connectionTimeout The amount of time to wait to establish a connection.
142     * @param requestTimeout The amount of time to wait for a response to a request.
143     * @param delayMillis Amount of time to delay before delivering a batch.
144     * @param lockTimeoutRetryCount The number of times to retry after a lock timeout.
145     * @param dataDir The location of the Berkeley database.
146     * @return A FlumeAvroManager.
147     */
148    public static FlumePersistentManager getManager(final String name, final Agent[] agents,
149                                                    final Property[] properties, int batchSize, final int retries,
150                                                    final int connectionTimeout, final int requestTimeout,
151                                                    final int delayMillis, final int lockTimeoutRetryCount,
152                                                    final String dataDir) {
153        if (agents == null || agents.length == 0) {
154            throw new IllegalArgumentException("At least one agent is required");
155        }
156
157        if (batchSize <= 0) {
158            batchSize = 1;
159        }
160        final String dataDirectory = Strings.isEmpty(dataDir) ? DEFAULT_DATA_DIR : dataDir;
161
162        final StringBuilder sb = new StringBuilder("FlumePersistent[");
163        boolean first = true;
164        for (final Agent agent : agents) {
165            if (!first) {
166                sb.append(',');
167            }
168            sb.append(agent.getHost()).append(':').append(agent.getPort());
169            first = false;
170        }
171        sb.append(']');
172        sb.append(' ').append(dataDirectory);
173        return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries,
174            connectionTimeout, requestTimeout, delayMillis, lockTimeoutRetryCount, dataDir, properties));
175    }
176
177    @Override
178    public void send(final Event event)  {
179        if (worker.isShutdown()) {
180            throw new LoggingException("Unable to record event");
181        }
182
183        final Map<String, String> headers = event.getHeaders();
184        final byte[] keyData = headers.get(FlumeEvent.GUID).getBytes(UTF8);
185        try {
186            final ByteArrayOutputStream baos = new ByteArrayOutputStream();
187            final DataOutputStream daos = new DataOutputStream(baos);
188            daos.writeInt(event.getBody().length);
189            daos.write(event.getBody(), 0, event.getBody().length);
190            daos.writeInt(event.getHeaders().size());
191            for (final Map.Entry<String, String> entry : headers.entrySet()) {
192                daos.writeUTF(entry.getKey());
193                daos.writeUTF(entry.getValue());
194            }
195            byte[] eventData = baos.toByteArray();
196            if (secretKey != null) {
197                final Cipher cipher = Cipher.getInstance("AES");
198                cipher.init(Cipher.ENCRYPT_MODE, secretKey);
199                eventData = cipher.doFinal(eventData);
200            }
201            final Future<Integer> future = threadPool.submit(new BDBWriter(keyData, eventData, environment, database,
202                gate, dbCount, getBatchSize(), lockTimeoutRetryCount));
203            boolean interrupted = false;
204            int ieCount = 0;
205            do {
206                try {
207                    future.get();
208                } catch (final InterruptedException ie) {
209                    interrupted = true;
210                    ++ieCount;
211                }
212            } while (interrupted && ieCount <= 1);
213
214        } catch (final Exception ex) {
215            throw new LoggingException("Exception occurred writing log event", ex);
216        }
217    }
218
219    @Override
220    protected void releaseSub() {
221        LOGGER.debug("Shutting down FlumePersistentManager");
222        worker.shutdown();
223        try {
224            worker.join(SHUTDOWN_WAIT * MILLIS_PER_SECOND);
225        } catch (final InterruptedException ie) {
226            // Ignore the exception and shutdown.
227        }
228        threadPool.shutdown();
229        try {
230            threadPool.awaitTermination(SHUTDOWN_WAIT, TimeUnit.SECONDS);
231        } catch (final InterruptedException e) {
232            logWarn("PersistentManager Thread pool failed to shut down", e);
233        }
234        try {
235            worker.join();
236        } catch (final InterruptedException ex) {
237            logDebug("interrupted while waiting for worker to complete", ex);
238        }
239        try {
240            LOGGER.debug("FlumePersistenceManager dataset status: {}", database.getStats(new StatsConfig()));
241            database.close();
242        } catch (final Exception ex) {
243            logWarn("failed to close database", ex);
244        }
245        try {
246            environment.cleanLog();
247            environment.close();
248        } catch (final Exception ex) {
249            logWarn("failed to close environment", ex);
250        }
251        super.releaseSub();
252    }
253
254    private void doSend(final SimpleEvent event) {
255        LOGGER.debug("Sending event to Flume");
256        super.send(event);
257    }
258
259    /**
260     * Thread for writing to Berkeley DB to avoid having interrupts close the database.
261     */
262    private static class BDBWriter implements Callable<Integer> {
263        private final byte[] eventData;
264        private final byte[] keyData;
265        private final Environment environment;
266        private final Database database;
267        private final Gate gate;
268        private final AtomicLong dbCount;
269        private final long batchSize;
270        private final int lockTimeoutRetryCount;
271
272        public BDBWriter(final byte[] keyData, final byte[] eventData, final Environment environment,
273                         final Database database, final Gate gate, final AtomicLong dbCount, final long batchSize,
274                         final int lockTimeoutRetryCount) {
275            this.keyData = keyData;
276            this.eventData = eventData;
277            this.environment = environment;
278            this.database = database;
279            this.gate = gate;
280            this.dbCount = dbCount;
281            this.batchSize = batchSize;
282            this.lockTimeoutRetryCount = lockTimeoutRetryCount;
283        }
284
285        @Override
286        public Integer call() throws Exception {
287            final DatabaseEntry key = new DatabaseEntry(keyData);
288            final DatabaseEntry data = new DatabaseEntry(eventData);
289            Exception exception = null;
290            for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
291                Transaction txn = null;
292                try {
293                    txn = environment.beginTransaction(null, null);
294                    try {
295                        database.put(txn, key, data);
296                        txn.commit();
297                        txn = null;
298                        if (dbCount.incrementAndGet() >= batchSize) {
299                            gate.open();
300                        }
301                        exception = null;
302                        break;
303                    } catch (final LockConflictException lce) {
304                        exception = lce;
305                        // Fall through and retry.
306                    } catch (final Exception ex) {
307                        if (txn != null) {
308                            txn.abort();
309                        }
310                        throw ex;
311                    } finally {
312                        if (txn != null) {
313                            txn.abort();
314                            txn = null;
315                        }
316                    }
317                } catch (final LockConflictException lce) {
318                    exception = lce;
319                    if (txn != null) {
320                        try {
321                            txn.abort();
322                            txn = null;
323                        } catch (final Exception ex) {
324                            LOGGER.trace("Ignoring exception while aborting transaction during lock conflict.");
325                        }
326                    }
327
328                }
329                try {
330                    Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
331                } catch (final InterruptedException ie) {
332                    // Ignore the error
333                }
334            }
335            if (exception != null) {
336                throw exception;
337            }
338            return eventData.length;
339        }
340    }
341
342    /**
343     * Factory data.
344     */
345    private static class FactoryData {
346        private final String name;
347        private final Agent[] agents;
348        private final int batchSize;
349        private final String dataDir;
350        private final int retries;
351        private final int connectionTimeout;
352        private final int requestTimeout;
353        private final int delayMillis;
354        private final int lockTimeoutRetryCount;
355        private final Property[] properties;
356
357        /**
358         * Constructor.
359         * @param name The name of the Appender.
360         * @param agents The agents.
361         * @param batchSize The number of events to include in a batch.
362         * @param dataDir The directory for data.
363         */
364        public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries,
365                           final int connectionTimeout, final int requestTimeout, final int delayMillis,
366                           final int lockTimeoutRetryCount, final String dataDir, final Property[] properties) {
367            this.name = name;
368            this.agents = agents;
369            this.batchSize = batchSize;
370            this.dataDir = dataDir;
371            this.retries = retries;
372            this.connectionTimeout = connectionTimeout;
373            this.requestTimeout = requestTimeout;
374            this.delayMillis = delayMillis;
375            this.lockTimeoutRetryCount = lockTimeoutRetryCount;
376            this.properties = properties;
377        }
378    }
379
380    /**
381     * Avro Manager Factory.
382     */
383    private static class BDBManagerFactory implements ManagerFactory<FlumePersistentManager, FactoryData> {
384
385        /**
386         * Create the FlumeKratiManager.
387         * @param name The name of the entity to manage.
388         * @param data The data required to create the entity.
389         * @return The FlumeKratiManager.
390         */
391        @Override
392        public FlumePersistentManager createManager(final String name, final FactoryData data) {
393            SecretKey secretKey = null;
394            Database database = null;
395            Environment environment = null;
396
397            final Map<String, String> properties = new HashMap<>();
398            if (data.properties != null) {
399                for (final Property property : data.properties) {
400                    properties.put(property.getName(), property.getValue());
401                }
402            }
403
404            try {
405                final File dir = new File(data.dataDir);
406                FileUtils.mkdir(dir, true);
407                final EnvironmentConfig dbEnvConfig = new EnvironmentConfig();
408                dbEnvConfig.setTransactional(true);
409                dbEnvConfig.setAllowCreate(true);
410                dbEnvConfig.setLockTimeout(5, TimeUnit.SECONDS);
411                environment = new Environment(dir, dbEnvConfig);
412                final DatabaseConfig dbConfig = new DatabaseConfig();
413                dbConfig.setTransactional(true);
414                dbConfig.setAllowCreate(true);
415                database = environment.openDatabase(null, name, dbConfig);
416            } catch (final Exception ex) {
417                LOGGER.error("Could not create FlumePersistentManager", ex);
418                // For consistency, close database as well as environment even though it should never happen since the
419                // database is that last thing in the block above, but this does guard against a future line being
420                // inserted at the end that would bomb (like some debug logging).
421                if (database != null) {
422                    database.close();
423                    database = null;
424                }
425                if (environment != null) {
426                    environment.close();
427                    environment = null;
428                }
429                return null;
430            }
431
432            try {
433                String key = null;
434                for (final Map.Entry<String, String> entry : properties.entrySet()) {
435                    if (entry.getKey().equalsIgnoreCase(KEY_PROVIDER)) {
436                        key = entry.getValue();
437                        break;
438                    }
439                }
440                if (key != null) {
441                    final PluginManager manager = new PluginManager("KeyProvider");
442                    manager.collectPlugins();
443                    final Map<String, PluginType<?>> plugins = manager.getPlugins();
444                    if (plugins != null) {
445                        boolean found = false;
446                        for (final Map.Entry<String, PluginType<?>> entry : plugins.entrySet()) {
447                            if (entry.getKey().equalsIgnoreCase(key)) {
448                                found = true;
449                                final Class<?> cl = entry.getValue().getPluginClass();
450                                try {
451                                    final SecretKeyProvider provider = (SecretKeyProvider) cl.newInstance();
452                                    secretKey = provider.getSecretKey();
453                                    LOGGER.debug("Persisting events using SecretKeyProvider {}", cl.getName());
454                                } catch (final Exception ex) {
455                                    LOGGER.error("Unable to create SecretKeyProvider {}, encryption will be disabled",
456                                        cl.getName());
457                                }
458                                break;
459                            }
460                        }
461                        if (!found) {
462                            LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
463                        }
464                    } else {
465                        LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
466                    }
467                }
468            } catch (final Exception ex) {
469                LOGGER.warn("Error setting up encryption - encryption will be disabled", ex);
470            }
471            return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries,
472                data.connectionTimeout, data.requestTimeout, data.delayMillis, database, environment, secretKey,
473                data.lockTimeoutRetryCount);
474        }
475    }
476
477    /**
478     * Thread that sends data to Flume and pulls it from Berkeley DB.
479     */
480    private static class WriterThread extends Thread  {
481        private volatile boolean shutdown = false;
482        private final Database database;
483        private final Environment environment;
484        private final FlumePersistentManager manager;
485        private final Gate gate;
486        private final SecretKey secretKey;
487        private final int batchSize;
488        private final AtomicLong dbCounter;
489        private final int lockTimeoutRetryCount;
490
491        public WriterThread(final Database database, final Environment environment,
492                            final FlumePersistentManager manager, final Gate gate, final int batchsize,
493                            final SecretKey secretKey, final AtomicLong dbCount, final int lockTimeoutRetryCount) {
494            this.database = database;
495            this.environment = environment;
496            this.manager = manager;
497            this.gate = gate;
498            this.batchSize = batchsize;
499            this.secretKey = secretKey;
500            this.setDaemon(true);
501            this.dbCounter = dbCount;
502            this.lockTimeoutRetryCount = lockTimeoutRetryCount;
503        }
504
505        public void shutdown() {
506            LOGGER.debug("Writer thread shutting down");
507            this.shutdown = true;
508            gate.open();
509        }
510
511        public boolean isShutdown() {
512            return shutdown;
513        }
514
515        @Override
516        public void run() {
517            LOGGER.trace("WriterThread started - batch size = " + batchSize + ", delayMillis = " + manager.getDelayMillis());
518            long nextBatchMillis = System.currentTimeMillis() + manager.getDelayMillis();
519            while (!shutdown) {
520                final long nowMillis = System.currentTimeMillis();
521                final long dbCount = database.count();
522                dbCounter.set(dbCount);
523                if (dbCount >= batchSize || dbCount > 0 && nextBatchMillis <= nowMillis) {
524                    nextBatchMillis = nowMillis + manager.getDelayMillis();
525                    try {
526                        boolean errors = false;
527                        final DatabaseEntry key = new DatabaseEntry();
528                        final DatabaseEntry data = new DatabaseEntry();
529
530                        gate.close();
531                        OperationStatus status;
532                        if (batchSize > 1) {
533                            try {
534                                errors = sendBatch(key, data);
535                            } catch (final Exception ex) {
536                                break;
537                            }
538                        } else {
539                            Exception exception = null;
540                            for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
541                                exception = null;
542                                Transaction txn = null;
543                                Cursor cursor = null;
544                                try {
545                                    txn = environment.beginTransaction(null, null);
546                                    cursor = database.openCursor(txn, null);
547                                    try {
548                                        status = cursor.getFirst(key, data, LockMode.RMW);
549                                        while (status == OperationStatus.SUCCESS) {
550                                            final SimpleEvent event = createEvent(data);
551                                            if (event != null) {
552                                                try {
553                                                    manager.doSend(event);
554                                                } catch (final Exception ioe) {
555                                                    errors = true;
556                                                    LOGGER.error("Error sending event", ioe);
557                                                    break;
558                                                }
559                                                try {
560                                                    cursor.delete();
561                                                } catch (final Exception ex) {
562                                                    LOGGER.error("Unable to delete event", ex);
563                                                }
564                                            }
565                                            status = cursor.getNext(key, data, LockMode.RMW);
566                                        }
567                                        if (cursor != null) {
568                                            cursor.close();
569                                            cursor = null;
570                                        }
571                                        txn.commit();
572                                        txn = null;
573                                        dbCounter.decrementAndGet();
574                                        exception = null;
575                                        break;
576                                    } catch (final LockConflictException lce) {
577                                        exception = lce;
578                                        // Fall through and retry.
579                                    } catch (final Exception ex) {
580                                        LOGGER.error("Error reading or writing to database", ex);
581                                        shutdown = true;
582                                        break;
583                                    } finally {
584                                        if (cursor != null) {
585                                            cursor.close();
586                                            cursor = null;
587                                        }
588                                        if (txn != null) {
589                                            txn.abort();
590                                            txn = null;
591                                        }
592                                    }
593                                } catch (final LockConflictException lce) {
594                                    exception = lce;
595                                    if (cursor != null) {
596                                        try {
597                                            cursor.close();
598                                            cursor = null;
599                                        } catch (final Exception ex) {
600                                            LOGGER.trace("Ignored exception closing cursor during lock conflict.");
601                                        }
602                                    }
603                                    if (txn != null) {
604                                        try {
605                                            txn.abort();
606                                            txn = null;
607                                        } catch (final Exception ex) {
608                                            LOGGER.trace("Ignored exception aborting tx during lock conflict.");
609                                        }
610                                    }
611                                }
612                                try {
613                                    Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
614                                } catch (final InterruptedException ie) {
615                                    // Ignore the error
616                                }
617                            }
618                            if (exception != null) {
619                                LOGGER.error("Unable to read or update data base", exception);
620                            }
621                        }
622                        if (errors) {
623                            Thread.sleep(manager.getDelayMillis());
624                            continue;
625                        }
626                    } catch (final Exception ex) {
627                        LOGGER.warn("WriterThread encountered an exception. Continuing.", ex);
628                    }
629                } else {
630                    if (nextBatchMillis <= nowMillis) {
631                        nextBatchMillis = nowMillis + manager.getDelayMillis();
632                    }
633                    try {
634                        final long interval = nextBatchMillis - nowMillis;
635                        gate.waitForOpen(interval);
636                    } catch (final InterruptedException ie) {
637                        LOGGER.warn("WriterThread interrupted, continuing");
638                    } catch (final Exception ex) {
639                        LOGGER.error("WriterThread encountered an exception waiting for work", ex);
640                        break;
641                    }
642                }
643            }
644
645            if (batchSize > 1 && database.count() > 0) {
646                final DatabaseEntry key = new DatabaseEntry();
647                final DatabaseEntry data = new DatabaseEntry();
648                try {
649                    sendBatch(key, data);
650                } catch (final Exception ex) {
651                    LOGGER.warn("Unable to write final batch");
652                }
653            }
654            LOGGER.trace("WriterThread exiting");
655        }
656
657        private boolean sendBatch(DatabaseEntry key, final DatabaseEntry data) throws Exception {
658            boolean errors = false;
659            OperationStatus status;
660            Cursor cursor = null;
661            try {
662                final BatchEvent batch = new BatchEvent();
663                for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
664                        try {
665                                cursor = database.openCursor(null, CursorConfig.DEFAULT);
666                                status = cursor.getFirst(key, data, null);
667
668                                for (int i = 0; status == OperationStatus.SUCCESS && i < batchSize; ++i) {
669                                        final SimpleEvent event = createEvent(data);
670                                        if (event != null) {
671                                                batch.addEvent(event);
672                                        }
673                                        status = cursor.getNext(key, data, null);
674                                }
675                                break;
676                        } catch (final LockConflictException lce) {
677                                if (cursor != null) {
678                                        try {
679                                cursor.close();
680                                cursor = null;
681                            } catch (final Exception ex) {
682                                LOGGER.trace("Ignored exception closing cursor during lock conflict.");
683                            }
684                        }
685                    }
686                }
687
688                try {
689                    manager.send(batch);
690                } catch (final Exception ioe) {
691                    LOGGER.error("Error sending events", ioe);
692                    errors = true;
693                }
694                if (!errors) {
695                        if (cursor != null) {
696                            cursor.close();
697                            cursor = null;
698                        }
699                    Transaction txn = null;
700                    Exception exception = null;
701                    for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
702                        try {
703                            txn = environment.beginTransaction(null, null);
704                            try {
705                                for (final Event event : batch.getEvents()) {
706                                    try {
707                                        final Map<String, String> headers = event.getHeaders();
708                                        key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
709                                        database.delete(txn, key);
710                                    } catch (final Exception ex) {
711                                        LOGGER.error("Error deleting key from database", ex);
712                                    }
713                                }
714                                txn.commit();
715                                long count = dbCounter.get();
716                                while (!dbCounter.compareAndSet(count, count - batch.getEvents().size())) {
717                                    count = dbCounter.get();
718                                }
719                                exception = null;
720                                break;
721                            } catch (final LockConflictException lce) {
722                                exception = lce;
723                                if (cursor != null) {
724                                    try {
725                                        cursor.close();
726                                        cursor = null;
727                                    } catch (final Exception ex) {
728                                        LOGGER.trace("Ignored exception closing cursor during lock conflict.");
729                                    }
730                                }
731                                if (txn != null) {
732                                    try {
733                                        txn.abort();
734                                        txn = null;
735                                    } catch (final Exception ex) {
736                                        LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
737                                    }
738                                }
739                            } catch (final Exception ex) {
740                                LOGGER.error("Unable to commit transaction", ex);
741                                if (txn != null) {
742                                    txn.abort();
743                                }
744                            }
745                        } catch (final LockConflictException lce) {
746                            exception = lce;
747                            if (cursor != null) {
748                                try {
749                                    cursor.close();
750                                    cursor = null;
751                                } catch (final Exception ex) {
752                                    LOGGER.trace("Ignored exception closing cursor during lock conflict.");
753                                }
754                            }
755                            if (txn != null) {
756                                try {
757                                    txn.abort();
758                                    txn = null;
759                                } catch (final Exception ex) {
760                                    LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
761                                }
762                            }
763                        } finally {
764                            if (cursor != null) {
765                                cursor.close();
766                                cursor = null;
767                            }
768                            if (txn != null) {
769                                txn.abort();
770                                txn = null;
771                            }
772                        }
773                        try {
774                            Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
775                        } catch (final InterruptedException ie) {
776                            // Ignore the error
777                        }
778                    }
779                    if (exception != null) {
780                        LOGGER.error("Unable to delete events from data base", exception);
781                    }
782                }
783            } catch (final Exception ex) {
784                LOGGER.error("Error reading database", ex);
785                shutdown = true;
786                throw ex;
787            } finally {
788                if (cursor != null) {
789                    cursor.close();
790                }
791            }
792
793            return errors;
794        }
795
796        private SimpleEvent createEvent(final DatabaseEntry data) {
797            final SimpleEvent event = new SimpleEvent();
798            try {
799                byte[] eventData = data.getData();
800                if (secretKey != null) {
801                    final Cipher cipher = Cipher.getInstance("AES");
802                    cipher.init(Cipher.DECRYPT_MODE, secretKey);
803                    eventData = cipher.doFinal(eventData);
804                }
805                final ByteArrayInputStream bais = new ByteArrayInputStream(eventData);
806                final DataInputStream dais = new DataInputStream(bais);
807                int length = dais.readInt();
808                final byte[] bytes = new byte[length];
809                dais.read(bytes, 0, length);
810                event.setBody(bytes);
811                length = dais.readInt();
812                final Map<String, String> map = new HashMap<>(length);
813                for (int i = 0; i < length; ++i) {
814                    final String headerKey = dais.readUTF();
815                    final String value = dais.readUTF();
816                    map.put(headerKey, value);
817                }
818                event.setHeaders(map);
819                return event;
820            } catch (final Exception ex) {
821                LOGGER.error("Error retrieving event", ex);
822                return null;
823            }
824        }
825
826    }
827
828    /**
829     * Factory that creates Daemon threads that can be properly shut down.
830     */
831    private static class DaemonThreadFactory implements ThreadFactory {
832        private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
833        private final ThreadGroup group;
834        private final AtomicInteger threadNumber = new AtomicInteger(1);
835        private final String namePrefix;
836
837        public DaemonThreadFactory() {
838            final SecurityManager securityManager = System.getSecurityManager();
839            group = securityManager != null ? securityManager.getThreadGroup() :
840                Thread.currentThread().getThreadGroup();
841            namePrefix = "DaemonPool-" + POOL_NUMBER.getAndIncrement() + "-thread-";
842        }
843
844        @Override
845        public Thread newThread(final Runnable r) {
846            final Thread thread = new Log4jThread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
847            thread.setDaemon(true);
848            if (thread.getPriority() != Thread.NORM_PRIORITY) {
849                thread.setPriority(Thread.NORM_PRIORITY);
850            }
851            return thread;
852        }
853    }
854
855    /**
856     * An internal class.
857     */
858    private static class Gate {
859
860        private boolean isOpen = false;
861
862        public boolean isOpen() {
863            return isOpen;
864        }
865
866        public synchronized void open() {
867            isOpen = true;
868            notifyAll();
869        }
870
871        public synchronized void close() {
872            isOpen = false;
873        }
874
875        public synchronized void waitForOpen(final long timeout) throws InterruptedException {
876            wait(timeout);
877        }
878    }
879}