001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements. See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache license, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License. You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the license for the specific language governing permissions and
015     * limitations under the license.
016     */
017    package org.apache.logging.log4j.flume.appender;
018    
019    import java.io.ByteArrayInputStream;
020    import java.io.ByteArrayOutputStream;
021    import java.io.DataInputStream;
022    import java.io.DataOutputStream;
023    import java.io.File;
024    import java.nio.charset.Charset;
025    import java.util.HashMap;
026    import java.util.Map;
027    import java.util.concurrent.Callable;
028    import java.util.concurrent.ExecutorService;
029    import java.util.concurrent.Executors;
030    import java.util.concurrent.Future;
031    import java.util.concurrent.ThreadFactory;
032    import java.util.concurrent.TimeUnit;
033    import java.util.concurrent.atomic.AtomicInteger;
034    import java.util.concurrent.atomic.AtomicLong;
035    
036    import javax.crypto.Cipher;
037    import javax.crypto.SecretKey;
038    
039    import org.apache.flume.Event;
040    import org.apache.flume.event.SimpleEvent;
041    import org.apache.logging.log4j.LoggingException;
042    import org.apache.logging.log4j.core.appender.ManagerFactory;
043    import org.apache.logging.log4j.core.config.Property;
044    import org.apache.logging.log4j.core.config.plugins.util.PluginManager;
045    import org.apache.logging.log4j.core.config.plugins.util.PluginType;
046    import org.apache.logging.log4j.core.util.FileUtils;
047    import org.apache.logging.log4j.core.util.SecretKeyProvider;
048    import org.apache.logging.log4j.util.Strings;
049    
050    import com.sleepycat.je.Cursor;
051    import com.sleepycat.je.CursorConfig;
052    import com.sleepycat.je.Database;
053    import com.sleepycat.je.DatabaseConfig;
054    import com.sleepycat.je.DatabaseEntry;
055    import com.sleepycat.je.Environment;
056    import com.sleepycat.je.EnvironmentConfig;
057    import com.sleepycat.je.LockConflictException;
058    import com.sleepycat.je.LockMode;
059    import com.sleepycat.je.OperationStatus;
060    import com.sleepycat.je.StatsConfig;
061    import com.sleepycat.je.Transaction;
062    
063    /**
064     * Manager that persists data to Berkeley DB before passing it on to Flume.
065     */
066    public class FlumePersistentManager extends FlumeAvroManager {
067    
068        /** Attribute name for the key provider. */
069        public static final String KEY_PROVIDER = "keyProvider";
070    
071        private static final Charset UTF8 = Charset.forName("UTF-8");
072    
073        private static final String DEFAULT_DATA_DIR = ".log4j/flumeData";
074    
075        private static final int SHUTDOWN_WAIT = 60;
076    
077        private static final int MILLIS_PER_SECOND = 1000;
078    
079        private static final int LOCK_TIMEOUT_SLEEP_MILLIS = 500;
080    
081        private static BDBManagerFactory factory = new BDBManagerFactory();
082    
083        private final Database database;
084    
085        private final Environment environment;
086    
087        private final WriterThread worker;
088    
089        private final Gate gate = new Gate();
090    
091        private final SecretKey secretKey;
092    
093        private final int delay;
094    
095        private final int lockTimeoutRetryCount;
096    
097        private final ExecutorService threadPool;
098    
099        private final AtomicLong dbCount = new AtomicLong();
100    
101        /**
102         * Constructor
103         * @param name The unique name of this manager.
104         * @param shortName Original name for the Manager.
105         * @param agents An array of Agents.
106         * @param batchSize The number of events to include in a batch.
107         * @param retries The number of times to retry connecting before giving up.
108         * @param connectionTimeout The amount of time to wait for a connection to be established.
109         * @param requestTimeout The amount of time to wair for a response to a request.
110         * @param delay The amount of time to wait between retries.
111         * @param database The database to write to.
112         * @param environment The database environment.
113         * @param secretKey The SecretKey to use for encryption.
114         * @param lockTimeoutRetryCount The number of times to retry a lock timeout.
115         */
116        protected FlumePersistentManager(final String name, final String shortName, final Agent[] agents,
117                                         final int batchSize, final int retries, final int connectionTimeout,
118                                         final int requestTimeout, final int delay, final Database database,
119                                         final Environment environment, final SecretKey secretKey,
120                                         final int lockTimeoutRetryCount) {
121            super(name, shortName, agents, batchSize, retries, connectionTimeout, requestTimeout);
122            this.delay = delay;
123            this.database = database;
124            this.environment = environment;
125            dbCount.set(database.count());
126            this.worker = new WriterThread(database, environment, this, gate, batchSize, secretKey, dbCount,
127                lockTimeoutRetryCount);
128            this.worker.start();
129            this.secretKey = secretKey;
130            this.threadPool = Executors.newCachedThreadPool(new DaemonThreadFactory());
131            this.lockTimeoutRetryCount = lockTimeoutRetryCount;
132        }
133    
134    
135        /**
136         * Returns a FlumeAvroManager.
137         * @param name The name of the manager.
138         * @param agents The agents to use.
139         * @param properties Properties to pass to the Manager.
140         * @param batchSize The number of events to include in a batch.
141         * @param retries The number of times to retry connecting before giving up.
142         * @param connectionTimeout The amount of time to wait to establish a connection.
143         * @param requestTimeout The amount of time to wait for a response to a request.
144         * @param delay Amount of time to delay before delivering a batch.
145         * @param lockTimeoutRetryCount The number of times to retry after a lock timeout.
146         * @param dataDir The location of the Berkeley database.
147         * @return A FlumeAvroManager.
148         */
149        public static FlumePersistentManager getManager(final String name, final Agent[] agents,
150                                                        final Property[] properties, int batchSize, final int retries,
151                                                        final int connectionTimeout, final int requestTimeout,
152                                                        final int delay, final int lockTimeoutRetryCount,
153                                                        final String dataDir) {
154            if (agents == null || agents.length == 0) {
155                throw new IllegalArgumentException("At least one agent is required");
156            }
157    
158            if (batchSize <= 0) {
159                batchSize = 1;
160            }
161            final String dataDirectory = Strings.isEmpty(dataDir) ? DEFAULT_DATA_DIR : dataDir;
162    
163            final StringBuilder sb = new StringBuilder("FlumePersistent[");
164            boolean first = true;
165            for (final Agent agent : agents) {
166                if (!first) {
167                    sb.append(',');
168                }
169                sb.append(agent.getHost()).append(':').append(agent.getPort());
170                first = false;
171            }
172            sb.append(']');
173            sb.append(' ').append(dataDirectory);
174            return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries,
175                connectionTimeout, requestTimeout, delay, lockTimeoutRetryCount, dataDir, properties));
176        }
177    
178        @Override
179        public void send(final Event event)  {
180            if (worker.isShutdown()) {
181                throw new LoggingException("Unable to record event");
182            }
183    
184            final Map<String, String> headers = event.getHeaders();
185            final byte[] keyData = headers.get(FlumeEvent.GUID).getBytes(UTF8);
186            try {
187                final ByteArrayOutputStream baos = new ByteArrayOutputStream();
188                final DataOutputStream daos = new DataOutputStream(baos);
189                daos.writeInt(event.getBody().length);
190                daos.write(event.getBody(), 0, event.getBody().length);
191                daos.writeInt(event.getHeaders().size());
192                for (final Map.Entry<String, String> entry : headers.entrySet()) {
193                    daos.writeUTF(entry.getKey());
194                    daos.writeUTF(entry.getValue());
195                }
196                byte[] eventData = baos.toByteArray();
197                if (secretKey != null) {
198                    final Cipher cipher = Cipher.getInstance("AES");
199                    cipher.init(Cipher.ENCRYPT_MODE, secretKey);
200                    eventData = cipher.doFinal(eventData);
201                }
202                final Future<Integer> future = threadPool.submit(new BDBWriter(keyData, eventData, environment, database,
203                    gate, dbCount, getBatchSize(), lockTimeoutRetryCount));
204                boolean interrupted = false;
205                int count = 0;
206                do {
207                    try {
208                        future.get();
209                    } catch (final InterruptedException ie) {
210                        interrupted = true;
211                        ++count;
212                    }
213                } while (interrupted && count <= 1);
214    
215            } catch (final Exception ex) {
216                throw new LoggingException("Exception occurred writing log event", ex);
217            }
218        }
219    
220        @Override
221        protected void releaseSub() {
222            LOGGER.debug("Shutting down FlumePersistentManager");
223            worker.shutdown();
224            try {
225                worker.join(SHUTDOWN_WAIT * MILLIS_PER_SECOND);
226            } catch (InterruptedException ie) {
227                // Ignore the exception and shutdown.
228            }
229            threadPool.shutdown();
230            try {
231                threadPool.awaitTermination(SHUTDOWN_WAIT, TimeUnit.SECONDS);
232            } catch (final InterruptedException ie) {
233                LOGGER.warn("PersistentManager Thread pool failed to shut down");
234            }
235            try {
236                worker.join();
237            } catch (final InterruptedException ex) {
238                LOGGER.debug("Interrupted while waiting for worker to complete");
239            }
240            try {
241                LOGGER.debug("FlumePersistenceManager dataset status: {}", database.getStats(new StatsConfig()));
242                database.close();
243            } catch (final Exception ex) {
244                LOGGER.warn("Failed to close database", ex);
245            }
246            try {
247                environment.cleanLog();
248                environment.close();
249            } catch (final Exception ex) {
250                LOGGER.warn("Failed to close environment", ex);
251            }
252            super.releaseSub();
253        }
254    
255        private void doSend(final SimpleEvent event) {
256            LOGGER.debug("Sending event to Flume");
257            super.send(event);
258        }
259    
260        /**
261         * Thread for writing to Berkeley DB to avoid having interrupts close the database.
262         */
263        private static class BDBWriter implements Callable<Integer> {
264            private final byte[] eventData;
265            private final byte[] keyData;
266            private final Environment environment;
267            private final Database database;
268            private final Gate gate;
269            private final AtomicLong dbCount;
270            private final long batchSize;
271            private final int lockTimeoutRetryCount;
272    
273            public BDBWriter(final byte[] keyData, final byte[] eventData, final Environment environment,
274                             final Database database, final Gate gate, final AtomicLong dbCount, final long batchSize,
275                             final int lockTimeoutRetryCount) {
276                this.keyData = keyData;
277                this.eventData = eventData;
278                this.environment = environment;
279                this.database = database;
280                this.gate = gate;
281                this.dbCount = dbCount;
282                this.batchSize = batchSize;
283                this.lockTimeoutRetryCount = lockTimeoutRetryCount;
284            }
285    
286            @Override
287            public Integer call() throws Exception {
288                final DatabaseEntry key = new DatabaseEntry(keyData);
289                final DatabaseEntry data = new DatabaseEntry(eventData);
290                Exception exception = null;
291                for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
292                    Transaction txn = null;
293                    try {
294                        txn = environment.beginTransaction(null, null);
295                        try {
296                            database.put(txn, key, data);
297                            txn.commit();
298                            txn = null;
299                            if (dbCount.incrementAndGet() >= batchSize) {
300                                gate.open();
301                            }
302                            exception = null;
303                            break;
304                        } catch (final LockConflictException lce) {
305                            exception = lce;
306                            // Fall through and retry.
307                        } catch (final Exception ex) {
308                            if (txn != null) {
309                                txn.abort();
310                            }
311                            throw ex;
312                        } finally {
313                            if (txn != null) {
314                                txn.abort();
315                                txn = null;
316                            }
317                        }
318                    } catch (LockConflictException lce) {
319                        exception = lce;
320                        if (txn != null) {
321                            try {
322                                txn.abort();
323                                txn = null;
324                            } catch (Exception ex) {
325                                LOGGER.trace("Ignoring exception while aborting transaction during lock conflict.");
326                            }
327                        }
328    
329                    }
330                    try {
331                        Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
332                    } catch (InterruptedException ie) {
333                        // Ignore the error
334                    }
335                }
336                if (exception != null) {
337                    throw exception;
338                }
339                return eventData.length;
340            }
341        }
342    
343        /**
344         * Factory data.
345         */
346        private static class FactoryData {
347            private final String name;
348            private final Agent[] agents;
349            private final int batchSize;
350            private final String dataDir;
351            private final int retries;
352            private final int connectionTimeout;
353            private final int requestTimeout;
354            private final int delay;
355            private final int lockTimeoutRetryCount;
356            private final Property[] properties;
357    
358            /**
359             * Constructor.
360             * @param name The name of the Appender.
361             * @param agents The agents.
362             * @param batchSize The number of events to include in a batch.
363             * @param dataDir The directory for data.
364             */
365            public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries,
366                               final int connectionTimeout, final int requestTimeout, final int delay,
367                               final int lockTimeoutRetryCount, final String dataDir, final Property[] properties) {
368                this.name = name;
369                this.agents = agents;
370                this.batchSize = batchSize;
371                this.dataDir = dataDir;
372                this.retries = retries;
373                this.connectionTimeout = connectionTimeout;
374                this.requestTimeout = requestTimeout;
375                this.delay = delay;
376                this.lockTimeoutRetryCount = lockTimeoutRetryCount;
377                this.properties = properties;
378            }
379        }
380    
381        /**
382         * Avro Manager Factory.
383         */
384        private static class BDBManagerFactory implements ManagerFactory<FlumePersistentManager, FactoryData> {
385    
386            /**
387             * Create the FlumeKratiManager.
388             * @param name The name of the entity to manage.
389             * @param data The data required to create the entity.
390             * @return The FlumeKratiManager.
391             */
392            @Override
393            public FlumePersistentManager createManager(final String name, final FactoryData data) {
394                SecretKey secretKey = null;
395                Database database = null;
396                Environment environment = null;
397    
398                final Map<String, String> properties = new HashMap<String, String>();
399                if (data.properties != null) {
400                    for (final Property property : data.properties) {
401                        properties.put(property.getName(), property.getValue());
402                    }
403                }
404    
405                try {
406                    final File dir = new File(data.dataDir);
407                    FileUtils.mkdir(dir, true);
408                    final EnvironmentConfig dbEnvConfig = new EnvironmentConfig();
409                    dbEnvConfig.setTransactional(true);
410                    dbEnvConfig.setAllowCreate(true);
411                    dbEnvConfig.setLockTimeout(5, TimeUnit.SECONDS);
412                    environment = new Environment(dir, dbEnvConfig);
413                    final DatabaseConfig dbConfig = new DatabaseConfig();
414                    dbConfig.setTransactional(true);
415                    dbConfig.setAllowCreate(true);
416                    database = environment.openDatabase(null, name, dbConfig);
417                } catch (final Exception ex) {
418                    LOGGER.error("Could not create FlumePersistentManager", ex);
419                    // For consistency, close database as well as environment even though it should never happen since the
420                    // database is that last thing in the block above, but this does guard against a future line being
421                    // inserted at the end that would bomb (like some debug logging).
422                    if (database != null) {
423                        database.close();
424                        database = null;
425                    }
426                    if (environment != null) {
427                        environment.close();
428                        environment = null;
429                    }
430                    return null;
431                }
432    
433                try {
434                    String key = null;
435                    for (final Map.Entry<String, String> entry : properties.entrySet()) {
436                        if (entry.getKey().equalsIgnoreCase(KEY_PROVIDER)) {
437                            key = entry.getValue();
438                            break;
439                        }
440                    }
441                    if (key != null) {
442                        final PluginManager manager = new PluginManager("KeyProvider");
443                        manager.collectPlugins();
444                        final Map<String, PluginType<?>> plugins = manager.getPlugins();
445                        if (plugins != null) {
446                            boolean found = false;
447                            for (final Map.Entry<String, PluginType<?>> entry : plugins.entrySet()) {
448                                if (entry.getKey().equalsIgnoreCase(key)) {
449                                    found = true;
450                                    final Class<?> cl = entry.getValue().getPluginClass();
451                                    try {
452                                        final SecretKeyProvider provider = (SecretKeyProvider) cl.newInstance();
453                                        secretKey = provider.getSecretKey();
454                                        LOGGER.debug("Persisting events using SecretKeyProvider {}", cl.getName());
455                                    } catch (final Exception ex) {
456                                        LOGGER.error("Unable to create SecretKeyProvider {}, encryption will be disabled",
457                                            cl.getName());
458                                    }
459                                    break;
460                                }
461                            }
462                            if (!found) {
463                                LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
464                            }
465                        } else {
466                            LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
467                        }
468                    }
469                } catch (final Exception ex) {
470                    LOGGER.warn("Error setting up encryption - encryption will be disabled", ex);
471                }
472                return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries,
473                    data.connectionTimeout, data.requestTimeout, data.delay, database, environment, secretKey,
474                    data.lockTimeoutRetryCount);
475            }
476        }
477    
478        /**
479         * Thread that sends data to Flume and pulls it from Berkeley DB.
480         */
481        private static class WriterThread extends Thread  {
482            private volatile boolean shutdown = false;
483            private final Database database;
484            private final Environment environment;
485            private final FlumePersistentManager manager;
486            private final Gate gate;
487            private final SecretKey secretKey;
488            private final int batchSize;
489            private final AtomicLong dbCounter;
490            private final int lockTimeoutRetryCount;
491    
492            public WriterThread(final Database database, final Environment environment,
493                                final FlumePersistentManager manager, final Gate gate, final int batchsize,
494                                final SecretKey secretKey, final AtomicLong dbCount, final int lockTimeoutRetryCount) {
495                this.database = database;
496                this.environment = environment;
497                this.manager = manager;
498                this.gate = gate;
499                this.batchSize = batchsize;
500                this.secretKey = secretKey;
501                this.setDaemon(true);
502                this.dbCounter = dbCount;
503                this.lockTimeoutRetryCount = lockTimeoutRetryCount;
504            }
505    
506            public void shutdown() {
507                LOGGER.debug("Writer thread shutting down");
508                this.shutdown = true;
509                gate.open();
510            }
511    
512            public boolean isShutdown() {
513                return shutdown;
514            }
515    
516            @Override
517            public void run() {
518                LOGGER.trace("WriterThread started - batch size = " + batchSize + ", delay = " + manager.delay);
519                long nextBatch = System.currentTimeMillis() + manager.delay;
520                while (!shutdown) {
521                    long now = System.currentTimeMillis();
522                    long dbCount = database.count();
523                    dbCounter.set(dbCount);
524                    if (dbCount >= batchSize || dbCount > 0 && nextBatch <= now) {
525                        nextBatch = now + manager.delay;
526                        try {
527                            boolean errors = false;
528                            DatabaseEntry key = new DatabaseEntry();
529                            final DatabaseEntry data = new DatabaseEntry();
530    
531                            gate.close();
532                            OperationStatus status;
533                            if (batchSize > 1) {
534                                try {
535                                    errors = sendBatch(key, data);
536                                } catch (final Exception ex) {
537                                    break;
538                                }
539                            } else {
540                                Exception exception = null;
541                                for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
542                                    exception = null;
543                                    Transaction txn = null;
544                                    Cursor cursor = null;
545                                    try {
546                                        txn = environment.beginTransaction(null, null);
547                                        cursor = database.openCursor(txn, null);
548                                        try {
549                                            status = cursor.getFirst(key, data, LockMode.RMW);
550                                            while (status == OperationStatus.SUCCESS) {
551                                                final SimpleEvent event = createEvent(data);
552                                                if (event != null) {
553                                                    try {
554                                                        manager.doSend(event);
555                                                    } catch (final Exception ioe) {
556                                                        errors = true;
557                                                        LOGGER.error("Error sending event", ioe);
558                                                        break;
559                                                    }
560                                                    try {
561                                                        cursor.delete();
562                                                    } catch (final Exception ex) {
563                                                        LOGGER.error("Unable to delete event", ex);
564                                                    }
565                                                }
566                                                status = cursor.getNext(key, data, LockMode.RMW);
567                                            }
568                                            if (cursor != null) {
569                                                cursor.close();
570                                                cursor = null;
571                                            }
572                                            txn.commit();
573                                            txn = null;
574                                            dbCounter.decrementAndGet();
575                                            exception = null;
576                                            break;
577                                        } catch (final LockConflictException lce) {
578                                            exception = lce;
579                                            // Fall through and retry.
580                                        } catch (final Exception ex) {
581                                            LOGGER.error("Error reading or writing to database", ex);
582                                            shutdown = true;
583                                            break;
584                                        } finally {
585                                            if (cursor != null) {
586                                                cursor.close();
587                                                cursor = null;
588                                            }
589                                            if (txn != null) {
590                                                txn.abort();
591                                                txn = null;
592                                            }
593                                        }
594                                    } catch (LockConflictException lce) {
595                                        exception = lce;
596                                        if (cursor != null) {
597                                            try {
598                                                cursor.close();
599                                                cursor = null;
600                                            } catch (Exception ex) {
601                                                LOGGER.trace("Ignored exception closing cursor during lock conflict.");
602                                            }
603                                        }
604                                        if (txn != null) {
605                                            try {
606                                                txn.abort();
607                                                txn = null;
608                                            } catch (Exception ex) {
609                                                LOGGER.trace("Ignored exception aborting tx during lock conflict.");
610                                            }
611                                        }
612                                    }
613                                    try {
614                                        Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
615                                    } catch (InterruptedException ie) {
616                                        // Ignore the error
617                                    }
618                                }
619                                if (exception != null) {
620                                    LOGGER.error("Unable to read or update data base", exception);
621                                }
622                            }
623                            if (errors) {
624                                Thread.sleep(manager.delay);
625                                continue;
626                            }
627                        } catch (final Exception ex) {
628                            LOGGER.warn("WriterThread encountered an exception. Continuing.", ex);
629                        }
630                    } else {
631                        if (nextBatch <= now) {
632                            nextBatch = now + manager.delay;
633                        }
634                        try {
635                            final long interval = nextBatch - now;
636                            gate.waitForOpen(interval);
637                        } catch (final InterruptedException ie) {
638                            LOGGER.warn("WriterThread interrupted, continuing");
639                        } catch (final Exception ex) {
640                            LOGGER.error("WriterThread encountered an exception waiting for work", ex);
641                            break;
642                        }
643                    }
644                }
645    
646                if (batchSize > 1 && database.count() > 0) {
647                    DatabaseEntry key = new DatabaseEntry();
648                    final DatabaseEntry data = new DatabaseEntry();
649                    try {
650                        sendBatch(key, data);
651                    } catch (final Exception ex) {
652                        LOGGER.warn("Unable to write final batch");
653                    }
654                }
655                LOGGER.trace("WriterThread exiting");
656            }
657    
658            private boolean sendBatch(DatabaseEntry key, DatabaseEntry data) throws Exception {
659                boolean errors = false;
660                OperationStatus status;
661                Cursor cursor = database.openCursor(null, CursorConfig.DEFAULT);
662                try {
663                    status = cursor.getFirst(key, data, null);
664    
665                    final BatchEvent batch = new BatchEvent();
666                    for (int i = 0; status == OperationStatus.SUCCESS && i < batchSize; ++i) {
667                        final SimpleEvent event = createEvent(data);
668                        if (event != null) {
669                            batch.addEvent(event);
670                        }
671                        status = cursor.getNext(key, data, null);
672                    }
673                    try {
674                        manager.send(batch);
675                    } catch (final Exception ioe) {
676                        LOGGER.error("Error sending events", ioe);
677                        errors = true;
678                    }
679                    if (!errors) {
680                        cursor.close();
681                        cursor = null;
682                        Transaction txn = null;
683                        Exception exception = null;
684                        for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
685                            try {
686                                txn = environment.beginTransaction(null, null);
687                                try {
688                                    for (final Event event : batch.getEvents()) {
689                                        try {
690                                            final Map<String, String> headers = event.getHeaders();
691                                            key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
692                                            database.delete(txn, key);
693                                        } catch (final Exception ex) {
694                                            LOGGER.error("Error deleting key from database", ex);
695                                        }
696                                    }
697                                    txn.commit();
698                                    long count = dbCounter.get();
699                                    while (!dbCounter.compareAndSet(count, count - batch.getEvents().size())) {
700                                        count = dbCounter.get();
701                                    }
702                                    exception = null;
703                                    break;
704                                } catch (final LockConflictException lce) {
705                                    exception = lce;
706                                    if (cursor != null) {
707                                        try {
708                                            cursor.close();
709                                            cursor = null;
710                                        } catch (Exception ex) {
711                                            LOGGER.trace("Ignored exception closing cursor during lock conflict.");
712                                        }
713                                    }
714                                    if (txn != null) {
715                                        try {
716                                            txn.abort();
717                                            txn = null;
718                                        } catch (Exception ex) {
719                                            LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
720                                        }
721                                    }
722                                } catch (final Exception ex) {
723                                    LOGGER.error("Unable to commit transaction", ex);
724                                    if (txn != null) {
725                                        txn.abort();
726                                    }
727                                }
728                            } catch (LockConflictException lce) {
729                                exception = lce;
730                                if (cursor != null) {
731                                    try {
732                                        cursor.close();
733                                        cursor = null;
734                                    } catch (Exception ex) {
735                                        LOGGER.trace("Ignored exception closing cursor during lock conflict.");
736                                    }
737                                }
738                                if (txn != null) {
739                                    try {
740                                        txn.abort();
741                                        txn = null;
742                                    } catch (Exception ex) {
743                                        LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
744                                    }
745                                }
746                            } finally {
747                                if (cursor != null) {
748                                    cursor.close();
749                                    cursor = null;
750                                }
751                                if (txn != null) {
752                                    txn.abort();
753                                    txn = null;
754                                }
755                            }
756                            try {
757                                Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
758                            } catch (InterruptedException ie) {
759                                // Ignore the error
760                            }
761                        }
762                        if (exception != null) {
763                            LOGGER.error("Unable to delete events from data base", exception);
764                        }
765                    }
766                } catch (final Exception ex) {
767                    LOGGER.error("Error reading database", ex);
768                    shutdown = true;
769                    throw ex;
770                } finally {
771                    if (cursor != null) {
772                        cursor.close();
773                    }
774                }
775    
776                return errors;
777            }
778    
779            private SimpleEvent createEvent(final DatabaseEntry data) {
780                final SimpleEvent event = new SimpleEvent();
781                try {
782                    byte[] eventData = data.getData();
783                    if (secretKey != null) {
784                        final Cipher cipher = Cipher.getInstance("AES");
785                        cipher.init(Cipher.DECRYPT_MODE, secretKey);
786                        eventData = cipher.doFinal(eventData);
787                    }
788                    final ByteArrayInputStream bais = new ByteArrayInputStream(eventData);
789                    final DataInputStream dais = new DataInputStream(bais);
790                    int length = dais.readInt();
791                    final byte[] bytes = new byte[length];
792                    dais.read(bytes, 0, length);
793                    event.setBody(bytes);
794                    length = dais.readInt();
795                    final Map<String, String> map = new HashMap<String, String>(length);
796                    for (int i = 0; i < length; ++i) {
797                        final String headerKey = dais.readUTF();
798                        final String value = dais.readUTF();
799                        map.put(headerKey, value);
800                    }
801                    event.setHeaders(map);
802                    return event;
803                } catch (final Exception ex) {
804                    LOGGER.error("Error retrieving event", ex);
805                    return null;
806                }
807            }
808    
809        }
810    
811        /**
812         * Factory that creates Daemon threads that can be properly shut down.
813         */
814        private static class DaemonThreadFactory implements ThreadFactory {
815            private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
816            private final ThreadGroup group;
817            private final AtomicInteger threadNumber = new AtomicInteger(1);
818            private final String namePrefix;
819    
820            public DaemonThreadFactory() {
821                final SecurityManager securityManager = System.getSecurityManager();
822                group = securityManager != null ? securityManager.getThreadGroup() :
823                    Thread.currentThread().getThreadGroup();
824                namePrefix = "DaemonPool-" + POOL_NUMBER.getAndIncrement() + "-thread-";
825            }
826    
827            @Override
828            public Thread newThread(final Runnable r) {
829                final Thread thread = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
830                thread.setDaemon(true);
831                if (thread.getPriority() != Thread.NORM_PRIORITY) {
832                    thread.setPriority(Thread.NORM_PRIORITY);
833                }
834                return thread;
835            }
836        }
837    
838        /**
839         * An internal class.
840         */
841        private static class Gate {
842    
843            private boolean isOpen = false;
844    
845            public boolean isOpen() {
846                return isOpen;
847            }
848    
849            public synchronized void open() {
850                isOpen = true;
851                notifyAll();
852            }
853    
854            public synchronized void close() {
855                isOpen = false;
856            }
857    
858            public synchronized void waitForOpen(long timeout) throws InterruptedException {
859                wait(timeout);
860            }
861        }
862    }