001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017package org.apache.logging.log4j.flume.appender;
018
019import java.io.ByteArrayInputStream;
020import java.io.ByteArrayOutputStream;
021import java.io.DataInputStream;
022import java.io.DataOutputStream;
023import java.io.File;
024import java.nio.charset.Charset;
025import java.util.HashMap;
026import java.util.Map;
027import java.util.concurrent.Callable;
028import java.util.concurrent.ExecutorService;
029import java.util.concurrent.Executors;
030import java.util.concurrent.Future;
031import java.util.concurrent.ThreadFactory;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.atomic.AtomicInteger;
034import java.util.concurrent.atomic.AtomicLong;
035import javax.crypto.Cipher;
036import javax.crypto.SecretKey;
037
038import org.apache.flume.Event;
039import org.apache.flume.event.SimpleEvent;
040import org.apache.logging.log4j.LoggingException;
041import org.apache.logging.log4j.core.appender.ManagerFactory;
042import org.apache.logging.log4j.core.config.Property;
043import org.apache.logging.log4j.core.config.plugins.PluginManager;
044import org.apache.logging.log4j.core.config.plugins.PluginType;
045import org.apache.logging.log4j.core.helpers.FileUtils;
046import org.apache.logging.log4j.core.helpers.SecretKeyProvider;
047import org.apache.logging.log4j.core.helpers.Strings;
048
049import com.sleepycat.je.Cursor;
050import com.sleepycat.je.CursorConfig;
051import com.sleepycat.je.Database;
052import com.sleepycat.je.DatabaseConfig;
053import com.sleepycat.je.DatabaseEntry;
054import com.sleepycat.je.Environment;
055import com.sleepycat.je.EnvironmentConfig;
056import com.sleepycat.je.LockConflictException;
057import com.sleepycat.je.LockMode;
058import com.sleepycat.je.OperationStatus;
059import com.sleepycat.je.StatsConfig;
060import com.sleepycat.je.Transaction;
061
062/**
063 * Manager that persists data to Berkeley DB before passing it on to Flume.
064 */
065public class FlumePersistentManager extends FlumeAvroManager {
066
067    /** Attribute name for the key provider. */
068    public static final String KEY_PROVIDER = "keyProvider";
069
070    private static final Charset UTF8 = Charset.forName("UTF-8");
071
072    private static final String DEFAULT_DATA_DIR = ".log4j/flumeData";
073
074    private static final int SHUTDOWN_WAIT = 60;
075
076    private static final int MILLIS_PER_SECOND = 1000;
077
078    private static final int LOCK_TIMEOUT_SLEEP_MILLIS = 500;
079
080    private static BDBManagerFactory factory = new BDBManagerFactory();
081
082    private final Database database;
083
084    private final Environment environment;
085
086    private final WriterThread worker;
087
088    private final Gate gate = new Gate();
089
090    private final SecretKey secretKey;
091
092    private final int delay;
093
094    private final int lockTimeoutRetryCount;
095
096    private final ExecutorService threadPool;
097
098    private AtomicLong dbCount = new AtomicLong();
099
100    /**
101     * Constructor
102     * @param name The unique name of this manager.
103     * @param shortName Original name for the Manager.
104     * @param agents An array of Agents.
105     * @param batchSize The number of events to include in a batch.
106     * @param retries The number of times to retry connecting before giving up.
107     * @param connectionTimeout The amount of time to wait for a connection to be established.
108     * @param requestTimeout The amount of time to wair for a response to a request.
109     * @param delay The amount of time to wait between retries.
110     * @param database The database to write to.
111     * @param environment The database environment.
112     * @param secretKey The SecretKey to use for encryption.
113     * @param lockTimeoutRetryCount The number of times to retry a lock timeout.
114     */
115    protected FlumePersistentManager(final String name, final String shortName, final Agent[] agents,
116                                     final int batchSize, final int retries, final int connectionTimeout,
117                                     final int requestTimeout, final int delay, final Database database,
118                                     final Environment environment, final SecretKey secretKey,
119                                     final int lockTimeoutRetryCount) {
120        super(name, shortName, agents, batchSize, retries, connectionTimeout, requestTimeout);
121        this.delay = delay;
122        this.database = database;
123        this.environment = environment;
124        dbCount.set(database.count());
125        this.worker = new WriterThread(database, environment, this, gate, batchSize, secretKey, dbCount,
126            lockTimeoutRetryCount);
127        this.worker.start();
128        this.secretKey = secretKey;
129        this.threadPool = Executors.newCachedThreadPool(new DaemonThreadFactory());
130        this.lockTimeoutRetryCount = lockTimeoutRetryCount;
131    }
132
133
134    /**
135     * Returns a FlumeAvroManager.
136     * @param name The name of the manager.
137     * @param agents The agents to use.
138     * @param properties Properties to pass to the Manager.
139     * @param batchSize The number of events to include in a batch.
140     * @param retries The number of times to retry connecting before giving up.
141     * @param connectionTimeout The amount of time to wait to establish a connection.
142     * @param requestTimeout The amount of time to wait for a response to a request.
143     * @param delay Amount of time to delay before delivering a batch.
144     * @param lockTimeoutRetryCount The number of times to retry after a lock timeout.
145     * @param dataDir The location of the Berkeley database.
146     * @return A FlumeAvroManager.
147     */
148    public static FlumePersistentManager getManager(final String name, final Agent[] agents,
149                                                    final Property[] properties, int batchSize, final int retries,
150                                                    final int connectionTimeout, final int requestTimeout,
151                                                    final int delay, final int lockTimeoutRetryCount,
152                                                    final String dataDir) {
153        if (agents == null || agents.length == 0) {
154            throw new IllegalArgumentException("At least one agent is required");
155        }
156
157        if (batchSize <= 0) {
158            batchSize = 1;
159        }
160        final String dataDirectory = Strings.isEmpty(dataDir) ? DEFAULT_DATA_DIR : dataDir;
161
162        final StringBuilder sb = new StringBuilder("FlumePersistent[");
163        boolean first = true;
164        for (final Agent agent : agents) {
165            if (!first) {
166                sb.append(",");
167            }
168            sb.append(agent.getHost()).append(":").append(agent.getPort());
169            first = false;
170        }
171        sb.append("]");
172        sb.append(" ").append(dataDirectory);
173        return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries,
174            connectionTimeout, requestTimeout, delay, lockTimeoutRetryCount, dataDir, properties));
175    }
176
177    @Override
178    public void send(final Event event)  {
179        if (worker.isShutdown()) {
180            throw new LoggingException("Unable to record event");
181        }
182
183        final Map<String, String> headers = event.getHeaders();
184        final byte[] keyData = headers.get(FlumeEvent.GUID).getBytes(UTF8);
185        try {
186            final ByteArrayOutputStream baos = new ByteArrayOutputStream();
187            final DataOutputStream daos = new DataOutputStream(baos);
188            daos.writeInt(event.getBody().length);
189            daos.write(event.getBody(), 0, event.getBody().length);
190            daos.writeInt(event.getHeaders().size());
191            for (final Map.Entry<String, String> entry : headers.entrySet()) {
192                daos.writeUTF(entry.getKey());
193                daos.writeUTF(entry.getValue());
194            }
195            byte[] eventData = baos.toByteArray();
196            if (secretKey != null) {
197                final Cipher cipher = Cipher.getInstance("AES");
198                cipher.init(Cipher.ENCRYPT_MODE, secretKey);
199                eventData = cipher.doFinal(eventData);
200            }
201            final Future<Integer> future = threadPool.submit(new BDBWriter(keyData, eventData, environment, database,
202                gate, dbCount, getBatchSize(), lockTimeoutRetryCount));
203            boolean interrupted = false;
204            int count = 0;
205            do {
206                try {
207                    future.get();
208                } catch (final InterruptedException ie) {
209                    interrupted = true;
210                    ++count;
211                }
212            } while (interrupted && count <= 1);
213
214        } catch (final Exception ex) {
215            throw new LoggingException("Exception occurred writing log event", ex);
216        }
217    }
218
219    @Override
220    protected void releaseSub() {
221        LOGGER.debug("Shutting down FlumePersistentManager");
222        worker.shutdown();
223        try {
224            worker.join(SHUTDOWN_WAIT * MILLIS_PER_SECOND);
225        } catch (InterruptedException ie) {
226            // Ignore the exception and shutdown.
227        }
228        threadPool.shutdown();
229        try {
230            threadPool.awaitTermination(SHUTDOWN_WAIT, TimeUnit.SECONDS);
231        } catch (final InterruptedException ie) {
232            LOGGER.warn("PersistentManager Thread pool failed to shut down");
233        }
234        try {
235            worker.join();
236        } catch (final InterruptedException ex) {
237            LOGGER.debug("Interrupted while waiting for worker to complete");
238        }
239        try {
240            LOGGER.debug("FlumePersistenceManager dataset status: {}", database.getStats(new StatsConfig()));
241            database.close();
242        } catch (final Exception ex) {
243            LOGGER.warn("Failed to close database", ex);
244        }
245        try {
246            environment.cleanLog();
247            environment.close();
248        } catch (final Exception ex) {
249            LOGGER.warn("Failed to close environment", ex);
250        }
251        super.releaseSub();
252    }
253
254    private void doSend(final SimpleEvent event) {
255        LOGGER.debug("Sending event to Flume");
256        super.send(event);
257    }
258
259    /**
260     * Thread for writing to Berkeley DB to avoid having interrupts close the database.
261     */
262    private static class BDBWriter implements Callable<Integer> {
263        private final byte[] eventData;
264        private final byte[] keyData;
265        private final Environment environment;
266        private final Database database;
267        private final Gate gate;
268        private final AtomicLong dbCount;
269        private final long batchSize;
270        private final int lockTimeoutRetryCount;
271
272        public BDBWriter(final byte[] keyData, final byte[] eventData, final Environment environment,
273                         final Database database, final Gate gate, final AtomicLong dbCount, final long batchSize,
274                         final int lockTimeoutRetryCount) {
275            this.keyData = keyData;
276            this.eventData = eventData;
277            this.environment = environment;
278            this.database = database;
279            this.gate = gate;
280            this.dbCount = dbCount;
281            this.batchSize = batchSize;
282            this.lockTimeoutRetryCount = lockTimeoutRetryCount;
283        }
284
285        @Override
286        public Integer call() throws Exception {
287            final DatabaseEntry key = new DatabaseEntry(keyData);
288            final DatabaseEntry data = new DatabaseEntry(eventData);
289            Exception exception = null;
290            for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
291                Transaction txn = null;
292                try {
293                    txn = environment.beginTransaction(null, null);
294                    try {
295                        database.put(txn, key, data);
296                        txn.commit();
297                        txn = null;
298                        if (dbCount.incrementAndGet() >= batchSize) {
299                            gate.open();
300                        }
301                        exception = null;
302                        break;
303                    } catch (final LockConflictException lce) {
304                        exception = lce;
305                        // Fall through and retry.
306                    } catch (final Exception ex) {
307                        if (txn != null) {
308                            txn.abort();
309                        }
310                        throw ex;
311                    } finally {
312                        if (txn != null) {
313                            txn.abort();
314                            txn = null;
315                        }
316                    }
317                } catch (LockConflictException lce) {
318                    exception = lce;
319                    if (txn != null) {
320                        try {
321                            txn.abort();
322                            txn = null;
323                        } catch (Exception ex) {
324                            LOGGER.trace("Ignoring exception while aborting transaction during lock conflict.");
325                        }
326                    }
327
328                }
329                try {
330                    Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
331                } catch (InterruptedException ie) {
332                    // Ignore the error
333                }
334            }
335            if (exception != null) {
336                throw exception;
337            }
338            return eventData.length;
339        }
340    }
341
342    /**
343     * Factory data.
344     */
345    private static class FactoryData {
346        private final String name;
347        private final Agent[] agents;
348        private final int batchSize;
349        private final String dataDir;
350        private final int retries;
351        private final int connectionTimeout;
352        private final int requestTimeout;
353        private final int delay;
354        private final int lockTimeoutRetryCount;
355        private final Property[] properties;
356
357        /**
358         * Constructor.
359         * @param name The name of the Appender.
360         * @param agents The agents.
361         * @param batchSize The number of events to include in a batch.
362         * @param dataDir The directory for data.
363         */
364        public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries,
365                           final int connectionTimeout, final int requestTimeout, final int delay,
366                           final int lockTimeoutRetryCount, final String dataDir, final Property[] properties) {
367            this.name = name;
368            this.agents = agents;
369            this.batchSize = batchSize;
370            this.dataDir = dataDir;
371            this.retries = retries;
372            this.connectionTimeout = connectionTimeout;
373            this.requestTimeout = requestTimeout;
374            this.delay = delay;
375            this.lockTimeoutRetryCount = lockTimeoutRetryCount;
376            this.properties = properties;
377        }
378    }
379
380    /**
381     * Avro Manager Factory.
382     */
383    private static class BDBManagerFactory implements ManagerFactory<FlumePersistentManager, FactoryData> {
384
385        /**
386         * Create the FlumeKratiManager.
387         * @param name The name of the entity to manage.
388         * @param data The data required to create the entity.
389         * @return The FlumeKratiManager.
390         */
391        @Override
392        public FlumePersistentManager createManager(final String name, final FactoryData data) {
393            SecretKey secretKey = null;
394            Database database = null;
395            Environment environment = null;
396
397            final Map<String, String> properties = new HashMap<String, String>();
398            if (data.properties != null) {
399                for (final Property property : data.properties) {
400                    properties.put(property.getName(), property.getValue());
401                }
402            }
403
404            try {
405                final File dir = new File(data.dataDir);
406                FileUtils.mkdir(dir, true);
407                final EnvironmentConfig dbEnvConfig = new EnvironmentConfig();
408                dbEnvConfig.setTransactional(true);
409                dbEnvConfig.setAllowCreate(true);
410                dbEnvConfig.setLockTimeout(5, TimeUnit.SECONDS);
411                environment = new Environment(dir, dbEnvConfig);
412                final DatabaseConfig dbConfig = new DatabaseConfig();
413                dbConfig.setTransactional(true);
414                dbConfig.setAllowCreate(true);
415                database = environment.openDatabase(null, name, dbConfig);
416            } catch (final Exception ex) {
417                LOGGER.error("Could not create FlumePersistentManager", ex);
418                // For consistency, close database as well as environment even though it should never happen since the
419                // database is that last thing in the block above, but this does guard against a future line being
420                // inserted at the end that would bomb (like some debug logging).
421                if (database != null) {
422                    database.close();
423                    database = null;
424                }
425                if (environment != null) {
426                    environment.close();
427                    environment = null;
428                }
429                return null;
430            }
431
432            try {
433                String key = null;
434                for (final Map.Entry<String, String> entry : properties.entrySet()) {
435                    if (entry.getKey().equalsIgnoreCase(KEY_PROVIDER)) {
436                        key = entry.getValue();
437                        break;
438                    }
439                }
440                if (key != null) {
441                    final PluginManager manager = new PluginManager("KeyProvider", SecretKeyProvider.class);
442                    manager.collectPlugins();
443                    final Map<String, PluginType<?>> plugins = manager.getPlugins();
444                    if (plugins != null) {
445                        boolean found = false;
446                        for (final Map.Entry<String, PluginType<?>> entry : plugins.entrySet()) {
447                            if (entry.getKey().equalsIgnoreCase(key)) {
448                                found = true;
449                                final Class<?> cl = entry.getValue().getPluginClass();
450                                try {
451                                    final SecretKeyProvider provider = (SecretKeyProvider) cl.newInstance();
452                                    secretKey = provider.getSecretKey();
453                                    LOGGER.debug("Persisting events using SecretKeyProvider {}", cl.getName());
454                                } catch (final Exception ex) {
455                                    LOGGER.error("Unable to create SecretKeyProvider {}, encryption will be disabled",
456                                        cl.getName());
457                                }
458                                break;
459                            }
460                        }
461                        if (!found) {
462                            LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
463                        }
464                    } else {
465                        LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
466                    }
467                }
468            } catch (final Exception ex) {
469                LOGGER.warn("Error setting up encryption - encryption will be disabled", ex);
470            }
471            return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries,
472                data.connectionTimeout, data.requestTimeout, data.delay, database, environment, secretKey,
473                data.lockTimeoutRetryCount);
474        }
475    }
476
477    /**
478     * Thread that sends data to Flume and pulls it from Berkeley DB.
479     */
480    private static class WriterThread extends Thread  {
481        private volatile boolean shutdown = false;
482        private final Database database;
483        private final Environment environment;
484        private final FlumePersistentManager manager;
485        private final Gate gate;
486        private final SecretKey secretKey;
487        private final int batchSize;
488        private final AtomicLong dbCounter;
489        private final int lockTimeoutRetryCount;
490
491        public WriterThread(final Database database, final Environment environment,
492                            final FlumePersistentManager manager, final Gate gate, final int batchsize,
493                            final SecretKey secretKey, final AtomicLong dbCount, final int lockTimeoutRetryCount) {
494            this.database = database;
495            this.environment = environment;
496            this.manager = manager;
497            this.gate = gate;
498            this.batchSize = batchsize;
499            this.secretKey = secretKey;
500            this.setDaemon(true);
501            this.dbCounter = dbCount;
502            this.lockTimeoutRetryCount = lockTimeoutRetryCount;
503        }
504
505        public void shutdown() {
506            LOGGER.debug("Writer thread shutting down");
507            this.shutdown = true;
508            gate.open();
509        }
510
511        public boolean isShutdown() {
512            return shutdown;
513        }
514
515        @Override
516        public void run() {
517            LOGGER.trace("WriterThread started - batch size = " + batchSize + ", delay = " + manager.delay);
518            long nextBatch = System.currentTimeMillis() + manager.delay;
519            while (!shutdown) {
520                long now = System.currentTimeMillis();
521                long dbCount = database.count();
522                dbCounter.set(dbCount);
523                if (dbCount >= batchSize || dbCount > 0 && nextBatch <= now) {
524                    nextBatch = now + manager.delay;
525                    try {
526                        boolean errors = false;
527                        DatabaseEntry key = new DatabaseEntry();
528                        final DatabaseEntry data = new DatabaseEntry();
529
530                        gate.close();
531                        OperationStatus status;
532                        if (batchSize > 1) {
533                            try {
534                                errors = sendBatch(key, data);
535                            } catch (final Exception ex) {
536                                break;
537                            }
538                        } else {
539                            Exception exception = null;
540                            for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
541                                exception = null;
542                                Transaction txn = null;
543                                Cursor cursor = null;
544                                try {
545                                    txn = environment.beginTransaction(null, null);
546                                    cursor = database.openCursor(txn, null);
547                                    try {
548                                        status = cursor.getFirst(key, data, LockMode.RMW);
549                                        while (status == OperationStatus.SUCCESS) {
550                                            final SimpleEvent event = createEvent(data);
551                                            if (event != null) {
552                                                try {
553                                                    manager.doSend(event);
554                                                } catch (final Exception ioe) {
555                                                    errors = true;
556                                                    LOGGER.error("Error sending event", ioe);
557                                                    break;
558                                                }
559                                                try {
560                                                    cursor.delete();
561                                                } catch (final Exception ex) {
562                                                    LOGGER.error("Unable to delete event", ex);
563                                                }
564                                            }
565                                            status = cursor.getNext(key, data, LockMode.RMW);
566                                        }
567                                        if (cursor != null) {
568                                            cursor.close();
569                                            cursor = null;
570                                        }
571                                        txn.commit();
572                                        txn = null;
573                                        dbCounter.decrementAndGet();
574                                        exception = null;
575                                        break;
576                                    } catch (final LockConflictException lce) {
577                                        exception = lce;
578                                        // Fall through and retry.
579                                    } catch (final Exception ex) {
580                                        LOGGER.error("Error reading or writing to database", ex);
581                                        shutdown = true;
582                                        break;
583                                    } finally {
584                                        if (cursor != null) {
585                                            cursor.close();
586                                            cursor = null;
587                                        }
588                                        if (txn != null) {
589                                            txn.abort();
590                                            txn = null;
591                                        }
592                                    }
593                                } catch (LockConflictException lce) {
594                                    exception = lce;
595                                    if (cursor != null) {
596                                        try {
597                                            cursor.close();
598                                            cursor = null;
599                                        } catch (Exception ex) {
600                                            LOGGER.trace("Ignored exception closing cursor during lock conflict.");
601                                        }
602                                    }
603                                    if (txn != null) {
604                                        try {
605                                            txn.abort();
606                                            txn = null;
607                                        } catch (Exception ex) {
608                                            LOGGER.trace("Ignored exception aborting tx during lock conflict.");
609                                        }
610                                    }
611                                }
612                                try {
613                                    Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
614                                } catch (InterruptedException ie) {
615                                    // Ignore the error
616                                }
617                            }
618                            if (exception != null) {
619                                LOGGER.error("Unable to read or update data base", exception);
620                            }
621                        }
622                        if (errors) {
623                            Thread.sleep(manager.delay);
624                            continue;
625                        }
626                    } catch (final Exception ex) {
627                        LOGGER.warn("WriterThread encountered an exception. Continuing.", ex);
628                    }
629                } else {
630                    if (nextBatch <= now) {
631                        nextBatch = now + manager.delay;
632                    }
633                    try {
634                        final long interval = nextBatch - now;
635                        gate.waitForOpen(interval);
636                    } catch (final InterruptedException ie) {
637                        LOGGER.warn("WriterThread interrupted, continuing");
638                    } catch (final Exception ex) {
639                        LOGGER.error("WriterThread encountered an exception waiting for work", ex);
640                        break;
641                    }
642                }
643            }
644
645            if (batchSize > 1 && database.count() > 0) {
646                DatabaseEntry key = new DatabaseEntry();
647                final DatabaseEntry data = new DatabaseEntry();
648                try {
649                    sendBatch(key, data);
650                } catch (final Exception ex) {
651                    LOGGER.warn("Unable to write final batch");
652                }
653            }
654            LOGGER.trace("WriterThread exiting");
655        }
656
657        private boolean sendBatch(DatabaseEntry key, DatabaseEntry data) throws Exception {
658            boolean errors = false;
659            OperationStatus status;
660            Cursor cursor = database.openCursor(null, CursorConfig.DEFAULT);
661            try {
662                status = cursor.getFirst(key, data, null);
663
664                final BatchEvent batch = new BatchEvent();
665                for (int i = 0; status == OperationStatus.SUCCESS && i < batchSize; ++i) {
666                    final SimpleEvent event = createEvent(data);
667                    if (event != null) {
668                        batch.addEvent(event);
669                    }
670                    status = cursor.getNext(key, data, null);
671                }
672                try {
673                    manager.send(batch);
674                } catch (final Exception ioe) {
675                    LOGGER.error("Error sending events", ioe);
676                    errors = true;
677                }
678                if (!errors) {
679                    cursor.close();
680                    cursor = null;
681                    Transaction txn = null;
682                    Exception exception = null;
683                    for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
684                        try {
685                            txn = environment.beginTransaction(null, null);
686                            try {
687                                for (final Event event : batch.getEvents()) {
688                                    try {
689                                        final Map<String, String> headers = event.getHeaders();
690                                        key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
691                                        database.delete(txn, key);
692                                    } catch (final Exception ex) {
693                                        LOGGER.error("Error deleting key from database", ex);
694                                    }
695                                }
696                                txn.commit();
697                                long count = dbCounter.get();
698                                while (!dbCounter.compareAndSet(count, count - batch.getEvents().size())) {
699                                    count = dbCounter.get();
700                                }
701                                exception = null;
702                                break;
703                            } catch (final LockConflictException lce) {
704                                exception = lce;
705                                if (cursor != null) {
706                                    try {
707                                        cursor.close();
708                                        cursor = null;
709                                    } catch (Exception ex) {
710                                        LOGGER.trace("Ignored exception closing cursor during lock conflict.");
711                                    }
712                                }
713                                if (txn != null) {
714                                    try {
715                                        txn.abort();
716                                        txn = null;
717                                    } catch (Exception ex) {
718                                        LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
719                                    }
720                                }
721                            } catch (final Exception ex) {
722                                LOGGER.error("Unable to commit transaction", ex);
723                                if (txn != null) {
724                                    txn.abort();
725                                }
726                            }
727                        } catch (LockConflictException lce) {
728                            exception = lce;
729                            if (cursor != null) {
730                                try {
731                                    cursor.close();
732                                    cursor = null;
733                                } catch (Exception ex) {
734                                    LOGGER.trace("Ignored exception closing cursor during lock conflict.");
735                                }
736                            }
737                            if (txn != null) {
738                                try {
739                                    txn.abort();
740                                    txn = null;
741                                } catch (Exception ex) {
742                                    LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
743                                }
744                            }
745                        } finally {
746                            if (cursor != null) {
747                                cursor.close();
748                                cursor = null;
749                            }
750                            if (txn != null) {
751                                txn.abort();
752                                txn = null;
753                            }
754                        }
755                        try {
756                            Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
757                        } catch (InterruptedException ie) {
758                            // Ignore the error
759                        }
760                    }
761                    if (exception != null) {
762                        LOGGER.error("Unable to delete events from data base", exception);
763                    }
764                }
765            } catch (final Exception ex) {
766                LOGGER.error("Error reading database", ex);
767                shutdown = true;
768                throw ex;
769            } finally {
770                if (cursor != null) {
771                    cursor.close();
772                }
773            }
774
775            return errors;
776        }
777
778        private SimpleEvent createEvent(final DatabaseEntry data) {
779            final SimpleEvent event = new SimpleEvent();
780            try {
781                byte[] eventData = data.getData();
782                if (secretKey != null) {
783                    final Cipher cipher = Cipher.getInstance("AES");
784                    cipher.init(Cipher.DECRYPT_MODE, secretKey);
785                    eventData = cipher.doFinal(eventData);
786                }
787                final ByteArrayInputStream bais = new ByteArrayInputStream(eventData);
788                final DataInputStream dais = new DataInputStream(bais);
789                int length = dais.readInt();
790                final byte[] bytes = new byte[length];
791                dais.read(bytes, 0, length);
792                event.setBody(bytes);
793                length = dais.readInt();
794                final Map<String, String> map = new HashMap<String, String>(length);
795                for (int i = 0; i < length; ++i) {
796                    final String headerKey = dais.readUTF();
797                    final String value = dais.readUTF();
798                    map.put(headerKey, value);
799                }
800                event.setHeaders(map);
801                return event;
802            } catch (final Exception ex) {
803                LOGGER.error("Error retrieving event", ex);
804                return null;
805            }
806        }
807
808    }
809
810    /**
811     * Factory that creates Daemon threads that can be properly shut down.
812     */
813    private static class DaemonThreadFactory implements ThreadFactory {
814        private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
815        private final ThreadGroup group;
816        private final AtomicInteger threadNumber = new AtomicInteger(1);
817        private final String namePrefix;
818
819        public DaemonThreadFactory() {
820            final SecurityManager securityManager = System.getSecurityManager();
821            group = securityManager != null ? securityManager.getThreadGroup() :
822                Thread.currentThread().getThreadGroup();
823            namePrefix = "DaemonPool-" + POOL_NUMBER.getAndIncrement() + "-thread-";
824        }
825
826        @Override
827        public Thread newThread(final Runnable r) {
828            final Thread thread = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
829            thread.setDaemon(true);
830            if (thread.getPriority() != Thread.NORM_PRIORITY) {
831                thread.setPriority(Thread.NORM_PRIORITY);
832            }
833            return thread;
834        }
835    }
836
837    /**
838     * An internal class.
839     */
840    private static class Gate {
841
842        private boolean isOpen = false;
843
844        public boolean isOpen() {
845            return isOpen;
846        }
847
848        public synchronized void open() {
849            isOpen = true;
850            notifyAll();
851        }
852
853        public synchronized void close() {
854            isOpen = false;
855        }
856
857        public synchronized void waitForOpen(long timeout) throws InterruptedException {
858            wait(timeout);
859        }
860    }
861}