001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017package org.apache.logging.log4j.flume.appender;
018
019import java.io.ByteArrayInputStream;
020import java.io.ByteArrayOutputStream;
021import java.io.DataInputStream;
022import java.io.DataOutputStream;
023import java.io.File;
024import java.nio.charset.Charset;
025import java.nio.charset.StandardCharsets;
026import java.util.HashMap;
027import java.util.Map;
028import java.util.concurrent.Callable;
029import java.util.concurrent.ExecutorService;
030import java.util.concurrent.Executors;
031import java.util.concurrent.Future;
032import java.util.concurrent.ThreadFactory;
033import java.util.concurrent.TimeUnit;
034import java.util.concurrent.atomic.AtomicInteger;
035import java.util.concurrent.atomic.AtomicLong;
036
037import javax.crypto.Cipher;
038import javax.crypto.SecretKey;
039
040import org.apache.flume.Event;
041import org.apache.flume.event.SimpleEvent;
042import org.apache.logging.log4j.LoggingException;
043import org.apache.logging.log4j.core.appender.ManagerFactory;
044import org.apache.logging.log4j.core.config.Property;
045import org.apache.logging.log4j.core.config.plugins.util.PluginManager;
046import org.apache.logging.log4j.core.config.plugins.util.PluginType;
047import org.apache.logging.log4j.core.util.FileUtils;
048import org.apache.logging.log4j.core.util.SecretKeyProvider;
049import org.apache.logging.log4j.util.Strings;
050
051import com.sleepycat.je.Cursor;
052import com.sleepycat.je.CursorConfig;
053import com.sleepycat.je.Database;
054import com.sleepycat.je.DatabaseConfig;
055import com.sleepycat.je.DatabaseEntry;
056import com.sleepycat.je.Environment;
057import com.sleepycat.je.EnvironmentConfig;
058import com.sleepycat.je.LockConflictException;
059import com.sleepycat.je.LockMode;
060import com.sleepycat.je.OperationStatus;
061import com.sleepycat.je.StatsConfig;
062import com.sleepycat.je.Transaction;
063
064/**
065 * Manager that persists data to Berkeley DB before passing it on to Flume.
066 */
067public class FlumePersistentManager extends FlumeAvroManager {
068
069    /** Attribute name for the key provider. */
070    public static final String KEY_PROVIDER = "keyProvider";
071
072    private static final Charset UTF8 = StandardCharsets.UTF_8;
073
074    private static final String DEFAULT_DATA_DIR = ".log4j/flumeData";
075
076    private static final int SHUTDOWN_WAIT = 60;
077
078    private static final int MILLIS_PER_SECOND = 1000;
079
080    private static final int LOCK_TIMEOUT_SLEEP_MILLIS = 500;
081
082    private static BDBManagerFactory factory = new BDBManagerFactory();
083
084    private final Database database;
085
086    private final Environment environment;
087
088    private final WriterThread worker;
089
090    private final Gate gate = new Gate();
091
092    private final SecretKey secretKey;
093
094    private final int lockTimeoutRetryCount;
095
096    private final ExecutorService threadPool;
097
098    private final AtomicLong dbCount = new AtomicLong();
099
100    /**
101     * Constructor
102     * @param name The unique name of this manager.
103     * @param shortName Original name for the Manager.
104     * @param agents An array of Agents.
105     * @param batchSize The number of events to include in a batch.
106     * @param retries The number of times to retry connecting before giving up.
107     * @param connectionTimeout The amount of time to wait for a connection to be established.
108     * @param requestTimeout The amount of time to wair for a response to a request.
109     * @param delay The amount of time to wait between retries.
110     * @param database The database to write to.
111     * @param environment The database environment.
112     * @param secretKey The SecretKey to use for encryption.
113     * @param lockTimeoutRetryCount The number of times to retry a lock timeout.
114     */
115    protected FlumePersistentManager(final String name, final String shortName, final Agent[] agents,
116                                     final int batchSize, final int retries, final int connectionTimeout,
117                                     final int requestTimeout, final int delay, final Database database,
118                                     final Environment environment, final SecretKey secretKey,
119                                     final int lockTimeoutRetryCount) {
120        super(name, shortName, agents, batchSize, delay, retries, connectionTimeout, requestTimeout);
121        this.database = database;
122        this.environment = environment;
123        dbCount.set(database.count());
124        this.worker = new WriterThread(database, environment, this, gate, batchSize, secretKey, dbCount,
125            lockTimeoutRetryCount);
126        this.worker.start();
127        this.secretKey = secretKey;
128        this.threadPool = Executors.newCachedThreadPool(new DaemonThreadFactory());
129        this.lockTimeoutRetryCount = lockTimeoutRetryCount;
130    }
131
132
133    /**
134     * Returns a FlumeAvroManager.
135     * @param name The name of the manager.
136     * @param agents The agents to use.
137     * @param properties Properties to pass to the Manager.
138     * @param batchSize The number of events to include in a batch.
139     * @param retries The number of times to retry connecting before giving up.
140     * @param connectionTimeout The amount of time to wait to establish a connection.
141     * @param requestTimeout The amount of time to wait for a response to a request.
142     * @param delayMillis Amount of time to delay before delivering a batch.
143     * @param lockTimeoutRetryCount The number of times to retry after a lock timeout.
144     * @param dataDir The location of the Berkeley database.
145     * @return A FlumeAvroManager.
146     */
147    public static FlumePersistentManager getManager(final String name, final Agent[] agents,
148                                                    final Property[] properties, int batchSize, final int retries,
149                                                    final int connectionTimeout, final int requestTimeout,
150                                                    final int delayMillis, final int lockTimeoutRetryCount,
151                                                    final String dataDir) {
152        if (agents == null || agents.length == 0) {
153            throw new IllegalArgumentException("At least one agent is required");
154        }
155
156        if (batchSize <= 0) {
157            batchSize = 1;
158        }
159        final String dataDirectory = Strings.isEmpty(dataDir) ? DEFAULT_DATA_DIR : dataDir;
160
161        final StringBuilder sb = new StringBuilder("FlumePersistent[");
162        boolean first = true;
163        for (final Agent agent : agents) {
164            if (!first) {
165                sb.append(',');
166            }
167            sb.append(agent.getHost()).append(':').append(agent.getPort());
168            first = false;
169        }
170        sb.append(']');
171        sb.append(' ').append(dataDirectory);
172        return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries,
173            connectionTimeout, requestTimeout, delayMillis, lockTimeoutRetryCount, dataDir, properties));
174    }
175
176    @Override
177    public void send(final Event event)  {
178        if (worker.isShutdown()) {
179            throw new LoggingException("Unable to record event");
180        }
181
182        final Map<String, String> headers = event.getHeaders();
183        final byte[] keyData = headers.get(FlumeEvent.GUID).getBytes(UTF8);
184        try {
185            final ByteArrayOutputStream baos = new ByteArrayOutputStream();
186            final DataOutputStream daos = new DataOutputStream(baos);
187            daos.writeInt(event.getBody().length);
188            daos.write(event.getBody(), 0, event.getBody().length);
189            daos.writeInt(event.getHeaders().size());
190            for (final Map.Entry<String, String> entry : headers.entrySet()) {
191                daos.writeUTF(entry.getKey());
192                daos.writeUTF(entry.getValue());
193            }
194            byte[] eventData = baos.toByteArray();
195            if (secretKey != null) {
196                final Cipher cipher = Cipher.getInstance("AES");
197                cipher.init(Cipher.ENCRYPT_MODE, secretKey);
198                eventData = cipher.doFinal(eventData);
199            }
200            final Future<Integer> future = threadPool.submit(new BDBWriter(keyData, eventData, environment, database,
201                gate, dbCount, getBatchSize(), lockTimeoutRetryCount));
202            boolean interrupted = false;
203            int ieCount = 0;
204            do {
205                try {
206                    future.get();
207                } catch (final InterruptedException ie) {
208                    interrupted = true;
209                    ++ieCount;
210                }
211            } while (interrupted && ieCount <= 1);
212
213        } catch (final Exception ex) {
214            throw new LoggingException("Exception occurred writing log event", ex);
215        }
216    }
217
218    @Override
219    protected void releaseSub() {
220        LOGGER.debug("Shutting down FlumePersistentManager");
221        worker.shutdown();
222        try {
223            worker.join(SHUTDOWN_WAIT * MILLIS_PER_SECOND);
224        } catch (final InterruptedException ie) {
225            // Ignore the exception and shutdown.
226        }
227        threadPool.shutdown();
228        try {
229            threadPool.awaitTermination(SHUTDOWN_WAIT, TimeUnit.SECONDS);
230        } catch (final InterruptedException ie) {
231            LOGGER.warn("PersistentManager Thread pool failed to shut down");
232        }
233        try {
234            worker.join();
235        } catch (final InterruptedException ex) {
236            LOGGER.debug("Interrupted while waiting for worker to complete");
237        }
238        try {
239            LOGGER.debug("FlumePersistenceManager dataset status: {}", database.getStats(new StatsConfig()));
240            database.close();
241        } catch (final Exception ex) {
242            LOGGER.warn("Failed to close database", ex);
243        }
244        try {
245            environment.cleanLog();
246            environment.close();
247        } catch (final Exception ex) {
248            LOGGER.warn("Failed to close environment", ex);
249        }
250        super.releaseSub();
251    }
252
253    private void doSend(final SimpleEvent event) {
254        LOGGER.debug("Sending event to Flume");
255        super.send(event);
256    }
257
258    /**
259     * Thread for writing to Berkeley DB to avoid having interrupts close the database.
260     */
261    private static class BDBWriter implements Callable<Integer> {
262        private final byte[] eventData;
263        private final byte[] keyData;
264        private final Environment environment;
265        private final Database database;
266        private final Gate gate;
267        private final AtomicLong dbCount;
268        private final long batchSize;
269        private final int lockTimeoutRetryCount;
270
271        public BDBWriter(final byte[] keyData, final byte[] eventData, final Environment environment,
272                         final Database database, final Gate gate, final AtomicLong dbCount, final long batchSize,
273                         final int lockTimeoutRetryCount) {
274            this.keyData = keyData;
275            this.eventData = eventData;
276            this.environment = environment;
277            this.database = database;
278            this.gate = gate;
279            this.dbCount = dbCount;
280            this.batchSize = batchSize;
281            this.lockTimeoutRetryCount = lockTimeoutRetryCount;
282        }
283
284        @Override
285        public Integer call() throws Exception {
286            final DatabaseEntry key = new DatabaseEntry(keyData);
287            final DatabaseEntry data = new DatabaseEntry(eventData);
288            Exception exception = null;
289            for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
290                Transaction txn = null;
291                try {
292                    txn = environment.beginTransaction(null, null);
293                    try {
294                        database.put(txn, key, data);
295                        txn.commit();
296                        txn = null;
297                        if (dbCount.incrementAndGet() >= batchSize) {
298                            gate.open();
299                        }
300                        exception = null;
301                        break;
302                    } catch (final LockConflictException lce) {
303                        exception = lce;
304                        // Fall through and retry.
305                    } catch (final Exception ex) {
306                        if (txn != null) {
307                            txn.abort();
308                        }
309                        throw ex;
310                    } finally {
311                        if (txn != null) {
312                            txn.abort();
313                            txn = null;
314                        }
315                    }
316                } catch (final LockConflictException lce) {
317                    exception = lce;
318                    if (txn != null) {
319                        try {
320                            txn.abort();
321                            txn = null;
322                        } catch (final Exception ex) {
323                            LOGGER.trace("Ignoring exception while aborting transaction during lock conflict.");
324                        }
325                    }
326
327                }
328                try {
329                    Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
330                } catch (final InterruptedException ie) {
331                    // Ignore the error
332                }
333            }
334            if (exception != null) {
335                throw exception;
336            }
337            return eventData.length;
338        }
339    }
340
341    /**
342     * Factory data.
343     */
344    private static class FactoryData {
345        private final String name;
346        private final Agent[] agents;
347        private final int batchSize;
348        private final String dataDir;
349        private final int retries;
350        private final int connectionTimeout;
351        private final int requestTimeout;
352        private final int delayMillis;
353        private final int lockTimeoutRetryCount;
354        private final Property[] properties;
355
356        /**
357         * Constructor.
358         * @param name The name of the Appender.
359         * @param agents The agents.
360         * @param batchSize The number of events to include in a batch.
361         * @param dataDir The directory for data.
362         */
363        public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries,
364                           final int connectionTimeout, final int requestTimeout, final int delayMillis,
365                           final int lockTimeoutRetryCount, final String dataDir, final Property[] properties) {
366            this.name = name;
367            this.agents = agents;
368            this.batchSize = batchSize;
369            this.dataDir = dataDir;
370            this.retries = retries;
371            this.connectionTimeout = connectionTimeout;
372            this.requestTimeout = requestTimeout;
373            this.delayMillis = delayMillis;
374            this.lockTimeoutRetryCount = lockTimeoutRetryCount;
375            this.properties = properties;
376        }
377    }
378
379    /**
380     * Avro Manager Factory.
381     */
382    private static class BDBManagerFactory implements ManagerFactory<FlumePersistentManager, FactoryData> {
383
384        /**
385         * Create the FlumeKratiManager.
386         * @param name The name of the entity to manage.
387         * @param data The data required to create the entity.
388         * @return The FlumeKratiManager.
389         */
390        @Override
391        public FlumePersistentManager createManager(final String name, final FactoryData data) {
392            SecretKey secretKey = null;
393            Database database = null;
394            Environment environment = null;
395
396            final Map<String, String> properties = new HashMap<>();
397            if (data.properties != null) {
398                for (final Property property : data.properties) {
399                    properties.put(property.getName(), property.getValue());
400                }
401            }
402
403            try {
404                final File dir = new File(data.dataDir);
405                FileUtils.mkdir(dir, true);
406                final EnvironmentConfig dbEnvConfig = new EnvironmentConfig();
407                dbEnvConfig.setTransactional(true);
408                dbEnvConfig.setAllowCreate(true);
409                dbEnvConfig.setLockTimeout(5, TimeUnit.SECONDS);
410                environment = new Environment(dir, dbEnvConfig);
411                final DatabaseConfig dbConfig = new DatabaseConfig();
412                dbConfig.setTransactional(true);
413                dbConfig.setAllowCreate(true);
414                database = environment.openDatabase(null, name, dbConfig);
415            } catch (final Exception ex) {
416                LOGGER.error("Could not create FlumePersistentManager", ex);
417                // For consistency, close database as well as environment even though it should never happen since the
418                // database is that last thing in the block above, but this does guard against a future line being
419                // inserted at the end that would bomb (like some debug logging).
420                if (database != null) {
421                    database.close();
422                    database = null;
423                }
424                if (environment != null) {
425                    environment.close();
426                    environment = null;
427                }
428                return null;
429            }
430
431            try {
432                String key = null;
433                for (final Map.Entry<String, String> entry : properties.entrySet()) {
434                    if (entry.getKey().equalsIgnoreCase(KEY_PROVIDER)) {
435                        key = entry.getValue();
436                        break;
437                    }
438                }
439                if (key != null) {
440                    final PluginManager manager = new PluginManager("KeyProvider");
441                    manager.collectPlugins();
442                    final Map<String, PluginType<?>> plugins = manager.getPlugins();
443                    if (plugins != null) {
444                        boolean found = false;
445                        for (final Map.Entry<String, PluginType<?>> entry : plugins.entrySet()) {
446                            if (entry.getKey().equalsIgnoreCase(key)) {
447                                found = true;
448                                final Class<?> cl = entry.getValue().getPluginClass();
449                                try {
450                                    final SecretKeyProvider provider = (SecretKeyProvider) cl.newInstance();
451                                    secretKey = provider.getSecretKey();
452                                    LOGGER.debug("Persisting events using SecretKeyProvider {}", cl.getName());
453                                } catch (final Exception ex) {
454                                    LOGGER.error("Unable to create SecretKeyProvider {}, encryption will be disabled",
455                                        cl.getName());
456                                }
457                                break;
458                            }
459                        }
460                        if (!found) {
461                            LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
462                        }
463                    } else {
464                        LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
465                    }
466                }
467            } catch (final Exception ex) {
468                LOGGER.warn("Error setting up encryption - encryption will be disabled", ex);
469            }
470            return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries,
471                data.connectionTimeout, data.requestTimeout, data.delayMillis, database, environment, secretKey,
472                data.lockTimeoutRetryCount);
473        }
474    }
475
476    /**
477     * Thread that sends data to Flume and pulls it from Berkeley DB.
478     */
479    private static class WriterThread extends Thread  {
480        private volatile boolean shutdown = false;
481        private final Database database;
482        private final Environment environment;
483        private final FlumePersistentManager manager;
484        private final Gate gate;
485        private final SecretKey secretKey;
486        private final int batchSize;
487        private final AtomicLong dbCounter;
488        private final int lockTimeoutRetryCount;
489
490        public WriterThread(final Database database, final Environment environment,
491                            final FlumePersistentManager manager, final Gate gate, final int batchsize,
492                            final SecretKey secretKey, final AtomicLong dbCount, final int lockTimeoutRetryCount) {
493            this.database = database;
494            this.environment = environment;
495            this.manager = manager;
496            this.gate = gate;
497            this.batchSize = batchsize;
498            this.secretKey = secretKey;
499            this.setDaemon(true);
500            this.dbCounter = dbCount;
501            this.lockTimeoutRetryCount = lockTimeoutRetryCount;
502        }
503
504        public void shutdown() {
505            LOGGER.debug("Writer thread shutting down");
506            this.shutdown = true;
507            gate.open();
508        }
509
510        public boolean isShutdown() {
511            return shutdown;
512        }
513
514        @Override
515        public void run() {
516            LOGGER.trace("WriterThread started - batch size = " + batchSize + ", delayMillis = " + manager.getDelayMillis());
517            long nextBatchMillis = System.currentTimeMillis() + manager.getDelayMillis();
518            while (!shutdown) {
519                final long nowMillis = System.currentTimeMillis();
520                final long dbCount = database.count();
521                dbCounter.set(dbCount);
522                if (dbCount >= batchSize || dbCount > 0 && nextBatchMillis <= nowMillis) {
523                    nextBatchMillis = nowMillis + manager.getDelayMillis();
524                    try {
525                        boolean errors = false;
526                        final DatabaseEntry key = new DatabaseEntry();
527                        final DatabaseEntry data = new DatabaseEntry();
528
529                        gate.close();
530                        OperationStatus status;
531                        if (batchSize > 1) {
532                            try {
533                                errors = sendBatch(key, data);
534                            } catch (final Exception ex) {
535                                break;
536                            }
537                        } else {
538                            Exception exception = null;
539                            for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
540                                exception = null;
541                                Transaction txn = null;
542                                Cursor cursor = null;
543                                try {
544                                    txn = environment.beginTransaction(null, null);
545                                    cursor = database.openCursor(txn, null);
546                                    try {
547                                        status = cursor.getFirst(key, data, LockMode.RMW);
548                                        while (status == OperationStatus.SUCCESS) {
549                                            final SimpleEvent event = createEvent(data);
550                                            if (event != null) {
551                                                try {
552                                                    manager.doSend(event);
553                                                } catch (final Exception ioe) {
554                                                    errors = true;
555                                                    LOGGER.error("Error sending event", ioe);
556                                                    break;
557                                                }
558                                                try {
559                                                    cursor.delete();
560                                                } catch (final Exception ex) {
561                                                    LOGGER.error("Unable to delete event", ex);
562                                                }
563                                            }
564                                            status = cursor.getNext(key, data, LockMode.RMW);
565                                        }
566                                        if (cursor != null) {
567                                            cursor.close();
568                                            cursor = null;
569                                        }
570                                        txn.commit();
571                                        txn = null;
572                                        dbCounter.decrementAndGet();
573                                        exception = null;
574                                        break;
575                                    } catch (final LockConflictException lce) {
576                                        exception = lce;
577                                        // Fall through and retry.
578                                    } catch (final Exception ex) {
579                                        LOGGER.error("Error reading or writing to database", ex);
580                                        shutdown = true;
581                                        break;
582                                    } finally {
583                                        if (cursor != null) {
584                                            cursor.close();
585                                            cursor = null;
586                                        }
587                                        if (txn != null) {
588                                            txn.abort();
589                                            txn = null;
590                                        }
591                                    }
592                                } catch (final LockConflictException lce) {
593                                    exception = lce;
594                                    if (cursor != null) {
595                                        try {
596                                            cursor.close();
597                                            cursor = null;
598                                        } catch (final Exception ex) {
599                                            LOGGER.trace("Ignored exception closing cursor during lock conflict.");
600                                        }
601                                    }
602                                    if (txn != null) {
603                                        try {
604                                            txn.abort();
605                                            txn = null;
606                                        } catch (final Exception ex) {
607                                            LOGGER.trace("Ignored exception aborting tx during lock conflict.");
608                                        }
609                                    }
610                                }
611                                try {
612                                    Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
613                                } catch (final InterruptedException ie) {
614                                    // Ignore the error
615                                }
616                            }
617                            if (exception != null) {
618                                LOGGER.error("Unable to read or update data base", exception);
619                            }
620                        }
621                        if (errors) {
622                            Thread.sleep(manager.getDelayMillis());
623                            continue;
624                        }
625                    } catch (final Exception ex) {
626                        LOGGER.warn("WriterThread encountered an exception. Continuing.", ex);
627                    }
628                } else {
629                    if (nextBatchMillis <= nowMillis) {
630                        nextBatchMillis = nowMillis + manager.getDelayMillis();
631                    }
632                    try {
633                        final long interval = nextBatchMillis - nowMillis;
634                        gate.waitForOpen(interval);
635                    } catch (final InterruptedException ie) {
636                        LOGGER.warn("WriterThread interrupted, continuing");
637                    } catch (final Exception ex) {
638                        LOGGER.error("WriterThread encountered an exception waiting for work", ex);
639                        break;
640                    }
641                }
642            }
643
644            if (batchSize > 1 && database.count() > 0) {
645                final DatabaseEntry key = new DatabaseEntry();
646                final DatabaseEntry data = new DatabaseEntry();
647                try {
648                    sendBatch(key, data);
649                } catch (final Exception ex) {
650                    LOGGER.warn("Unable to write final batch");
651                }
652            }
653            LOGGER.trace("WriterThread exiting");
654        }
655
656        private boolean sendBatch(DatabaseEntry key, final DatabaseEntry data) throws Exception {
657            boolean errors = false;
658            OperationStatus status;
659            Cursor cursor = null;
660            try {
661                final BatchEvent batch = new BatchEvent();
662                for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
663                        try {
664                                cursor = database.openCursor(null, CursorConfig.DEFAULT);
665                                status = cursor.getFirst(key, data, null);
666
667                                for (int i = 0; status == OperationStatus.SUCCESS && i < batchSize; ++i) {
668                                        final SimpleEvent event = createEvent(data);
669                                        if (event != null) {
670                                                batch.addEvent(event);
671                                        }
672                                        status = cursor.getNext(key, data, null);
673                                }
674                                break;
675                        } catch (final LockConflictException lce) {
676                                if (cursor != null) {
677                                        try {
678                                cursor.close();
679                                cursor = null;
680                            } catch (final Exception ex) {
681                                LOGGER.trace("Ignored exception closing cursor during lock conflict.");
682                            }
683                        }
684                    }
685                }
686
687                try {
688                    manager.send(batch);
689                } catch (final Exception ioe) {
690                    LOGGER.error("Error sending events", ioe);
691                    errors = true;
692                }
693                if (!errors) {
694                        if (cursor != null) {
695                            cursor.close();
696                            cursor = null;
697                        }
698                    Transaction txn = null;
699                    Exception exception = null;
700                    for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
701                        try {
702                            txn = environment.beginTransaction(null, null);
703                            try {
704                                for (final Event event : batch.getEvents()) {
705                                    try {
706                                        final Map<String, String> headers = event.getHeaders();
707                                        key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
708                                        database.delete(txn, key);
709                                    } catch (final Exception ex) {
710                                        LOGGER.error("Error deleting key from database", ex);
711                                    }
712                                }
713                                txn.commit();
714                                long count = dbCounter.get();
715                                while (!dbCounter.compareAndSet(count, count - batch.getEvents().size())) {
716                                    count = dbCounter.get();
717                                }
718                                exception = null;
719                                break;
720                            } catch (final LockConflictException lce) {
721                                exception = lce;
722                                if (cursor != null) {
723                                    try {
724                                        cursor.close();
725                                        cursor = null;
726                                    } catch (final Exception ex) {
727                                        LOGGER.trace("Ignored exception closing cursor during lock conflict.");
728                                    }
729                                }
730                                if (txn != null) {
731                                    try {
732                                        txn.abort();
733                                        txn = null;
734                                    } catch (final Exception ex) {
735                                        LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
736                                    }
737                                }
738                            } catch (final Exception ex) {
739                                LOGGER.error("Unable to commit transaction", ex);
740                                if (txn != null) {
741                                    txn.abort();
742                                }
743                            }
744                        } catch (final LockConflictException lce) {
745                            exception = lce;
746                            if (cursor != null) {
747                                try {
748                                    cursor.close();
749                                    cursor = null;
750                                } catch (final Exception ex) {
751                                    LOGGER.trace("Ignored exception closing cursor during lock conflict.");
752                                }
753                            }
754                            if (txn != null) {
755                                try {
756                                    txn.abort();
757                                    txn = null;
758                                } catch (final Exception ex) {
759                                    LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
760                                }
761                            }
762                        } finally {
763                            if (cursor != null) {
764                                cursor.close();
765                                cursor = null;
766                            }
767                            if (txn != null) {
768                                txn.abort();
769                                txn = null;
770                            }
771                        }
772                        try {
773                            Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
774                        } catch (final InterruptedException ie) {
775                            // Ignore the error
776                        }
777                    }
778                    if (exception != null) {
779                        LOGGER.error("Unable to delete events from data base", exception);
780                    }
781                }
782            } catch (final Exception ex) {
783                LOGGER.error("Error reading database", ex);
784                shutdown = true;
785                throw ex;
786            } finally {
787                if (cursor != null) {
788                    cursor.close();
789                }
790            }
791
792            return errors;
793        }
794
795        private SimpleEvent createEvent(final DatabaseEntry data) {
796            final SimpleEvent event = new SimpleEvent();
797            try {
798                byte[] eventData = data.getData();
799                if (secretKey != null) {
800                    final Cipher cipher = Cipher.getInstance("AES");
801                    cipher.init(Cipher.DECRYPT_MODE, secretKey);
802                    eventData = cipher.doFinal(eventData);
803                }
804                final ByteArrayInputStream bais = new ByteArrayInputStream(eventData);
805                final DataInputStream dais = new DataInputStream(bais);
806                int length = dais.readInt();
807                final byte[] bytes = new byte[length];
808                dais.read(bytes, 0, length);
809                event.setBody(bytes);
810                length = dais.readInt();
811                final Map<String, String> map = new HashMap<>(length);
812                for (int i = 0; i < length; ++i) {
813                    final String headerKey = dais.readUTF();
814                    final String value = dais.readUTF();
815                    map.put(headerKey, value);
816                }
817                event.setHeaders(map);
818                return event;
819            } catch (final Exception ex) {
820                LOGGER.error("Error retrieving event", ex);
821                return null;
822            }
823        }
824
825    }
826
827    /**
828     * Factory that creates Daemon threads that can be properly shut down.
829     */
830    private static class DaemonThreadFactory implements ThreadFactory {
831        private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
832        private final ThreadGroup group;
833        private final AtomicInteger threadNumber = new AtomicInteger(1);
834        private final String namePrefix;
835
836        public DaemonThreadFactory() {
837            final SecurityManager securityManager = System.getSecurityManager();
838            group = securityManager != null ? securityManager.getThreadGroup() :
839                Thread.currentThread().getThreadGroup();
840            namePrefix = "DaemonPool-" + POOL_NUMBER.getAndIncrement() + "-thread-";
841        }
842
843        @Override
844        public Thread newThread(final Runnable r) {
845            final Thread thread = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
846            thread.setDaemon(true);
847            if (thread.getPriority() != Thread.NORM_PRIORITY) {
848                thread.setPriority(Thread.NORM_PRIORITY);
849            }
850            return thread;
851        }
852    }
853
854    /**
855     * An internal class.
856     */
857    private static class Gate {
858
859        private boolean isOpen = false;
860
861        public boolean isOpen() {
862            return isOpen;
863        }
864
865        public synchronized void open() {
866            isOpen = true;
867            notifyAll();
868        }
869
870        public synchronized void close() {
871            isOpen = false;
872        }
873
874        public synchronized void waitForOpen(final long timeout) throws InterruptedException {
875            wait(timeout);
876        }
877    }
878}