001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017package org.apache.logging.log4j.flume.appender;
018
019import java.io.ByteArrayInputStream;
020import java.io.ByteArrayOutputStream;
021import java.io.DataInputStream;
022import java.io.DataOutputStream;
023import java.io.File;
024import java.nio.charset.Charset;
025import java.util.HashMap;
026import java.util.Map;
027import java.util.concurrent.Callable;
028import java.util.concurrent.ExecutorService;
029import java.util.concurrent.Executors;
030import java.util.concurrent.Future;
031import java.util.concurrent.ThreadFactory;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.atomic.AtomicInteger;
034import java.util.concurrent.atomic.AtomicLong;
035
036import javax.crypto.Cipher;
037import javax.crypto.SecretKey;
038
039import org.apache.flume.Event;
040import org.apache.flume.event.SimpleEvent;
041import org.apache.logging.log4j.LoggingException;
042import org.apache.logging.log4j.core.appender.ManagerFactory;
043import org.apache.logging.log4j.core.config.Property;
044import org.apache.logging.log4j.core.config.plugins.util.PluginManager;
045import org.apache.logging.log4j.core.config.plugins.util.PluginType;
046import org.apache.logging.log4j.core.util.FileUtils;
047import org.apache.logging.log4j.core.util.SecretKeyProvider;
048import org.apache.logging.log4j.util.Strings;
049
050import com.sleepycat.je.Cursor;
051import com.sleepycat.je.CursorConfig;
052import com.sleepycat.je.Database;
053import com.sleepycat.je.DatabaseConfig;
054import com.sleepycat.je.DatabaseEntry;
055import com.sleepycat.je.Environment;
056import com.sleepycat.je.EnvironmentConfig;
057import com.sleepycat.je.LockConflictException;
058import com.sleepycat.je.LockMode;
059import com.sleepycat.je.OperationStatus;
060import com.sleepycat.je.StatsConfig;
061import com.sleepycat.je.Transaction;
062
063/**
064 * Manager that persists data to Berkeley DB before passing it on to Flume.
065 */
066public class FlumePersistentManager extends FlumeAvroManager {
067
068    /** Attribute name for the key provider. */
069    public static final String KEY_PROVIDER = "keyProvider";
070
071    private static final Charset UTF8 = Charset.forName("UTF-8");
072
073    private static final String DEFAULT_DATA_DIR = ".log4j/flumeData";
074
075    private static final int SHUTDOWN_WAIT = 60;
076
077    private static final int MILLIS_PER_SECOND = 1000;
078
079    private static final int LOCK_TIMEOUT_SLEEP_MILLIS = 500;
080
081    private static BDBManagerFactory factory = new BDBManagerFactory();
082
083    private final Database database;
084
085    private final Environment environment;
086
087    private final WriterThread worker;
088
089    private final Gate gate = new Gate();
090
091    private final SecretKey secretKey;
092
093    private final int lockTimeoutRetryCount;
094
095    private final ExecutorService threadPool;
096
097    private final AtomicLong dbCount = new AtomicLong();
098
099    /**
100     * Constructor
101     * @param name The unique name of this manager.
102     * @param shortName Original name for the Manager.
103     * @param agents An array of Agents.
104     * @param batchSize The number of events to include in a batch.
105     * @param retries The number of times to retry connecting before giving up.
106     * @param connectionTimeout The amount of time to wait for a connection to be established.
107     * @param requestTimeout The amount of time to wair for a response to a request.
108     * @param delay The amount of time to wait between retries.
109     * @param database The database to write to.
110     * @param environment The database environment.
111     * @param secretKey The SecretKey to use for encryption.
112     * @param lockTimeoutRetryCount The number of times to retry a lock timeout.
113     */
114    protected FlumePersistentManager(final String name, final String shortName, final Agent[] agents,
115                                     final int batchSize, final int retries, final int connectionTimeout,
116                                     final int requestTimeout, final int delay, final Database database,
117                                     final Environment environment, final SecretKey secretKey,
118                                     final int lockTimeoutRetryCount) {
119        super(name, shortName, agents, batchSize, delay, retries, connectionTimeout, requestTimeout);
120        this.database = database;
121        this.environment = environment;
122        dbCount.set(database.count());
123        this.worker = new WriterThread(database, environment, this, gate, batchSize, secretKey, dbCount,
124            lockTimeoutRetryCount);
125        this.worker.start();
126        this.secretKey = secretKey;
127        this.threadPool = Executors.newCachedThreadPool(new DaemonThreadFactory());
128        this.lockTimeoutRetryCount = lockTimeoutRetryCount;
129    }
130
131
132    /**
133     * Returns a FlumeAvroManager.
134     * @param name The name of the manager.
135     * @param agents The agents to use.
136     * @param properties Properties to pass to the Manager.
137     * @param batchSize The number of events to include in a batch.
138     * @param retries The number of times to retry connecting before giving up.
139     * @param connectionTimeout The amount of time to wait to establish a connection.
140     * @param requestTimeout The amount of time to wait for a response to a request.
141     * @param delayMillis Amount of time to delay before delivering a batch.
142     * @param lockTimeoutRetryCount The number of times to retry after a lock timeout.
143     * @param dataDir The location of the Berkeley database.
144     * @return A FlumeAvroManager.
145     */
146    public static FlumePersistentManager getManager(final String name, final Agent[] agents,
147                                                    final Property[] properties, int batchSize, final int retries,
148                                                    final int connectionTimeout, final int requestTimeout,
149                                                    final int delayMillis, final int lockTimeoutRetryCount,
150                                                    final String dataDir) {
151        if (agents == null || agents.length == 0) {
152            throw new IllegalArgumentException("At least one agent is required");
153        }
154
155        if (batchSize <= 0) {
156            batchSize = 1;
157        }
158        final String dataDirectory = Strings.isEmpty(dataDir) ? DEFAULT_DATA_DIR : dataDir;
159
160        final StringBuilder sb = new StringBuilder("FlumePersistent[");
161        boolean first = true;
162        for (final Agent agent : agents) {
163            if (!first) {
164                sb.append(',');
165            }
166            sb.append(agent.getHost()).append(':').append(agent.getPort());
167            first = false;
168        }
169        sb.append(']');
170        sb.append(' ').append(dataDirectory);
171        return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries,
172            connectionTimeout, requestTimeout, delayMillis, lockTimeoutRetryCount, dataDir, properties));
173    }
174
175    @Override
176    public void send(final Event event)  {
177        if (worker.isShutdown()) {
178            throw new LoggingException("Unable to record event");
179        }
180
181        final Map<String, String> headers = event.getHeaders();
182        final byte[] keyData = headers.get(FlumeEvent.GUID).getBytes(UTF8);
183        try {
184            final ByteArrayOutputStream baos = new ByteArrayOutputStream();
185            final DataOutputStream daos = new DataOutputStream(baos);
186            daos.writeInt(event.getBody().length);
187            daos.write(event.getBody(), 0, event.getBody().length);
188            daos.writeInt(event.getHeaders().size());
189            for (final Map.Entry<String, String> entry : headers.entrySet()) {
190                daos.writeUTF(entry.getKey());
191                daos.writeUTF(entry.getValue());
192            }
193            byte[] eventData = baos.toByteArray();
194            if (secretKey != null) {
195                final Cipher cipher = Cipher.getInstance("AES");
196                cipher.init(Cipher.ENCRYPT_MODE, secretKey);
197                eventData = cipher.doFinal(eventData);
198            }
199            final Future<Integer> future = threadPool.submit(new BDBWriter(keyData, eventData, environment, database,
200                gate, dbCount, getBatchSize(), lockTimeoutRetryCount));
201            boolean interrupted = false;
202            int ieCount = 0;
203            do {
204                try {
205                    future.get();
206                } catch (final InterruptedException ie) {
207                    interrupted = true;
208                    ++ieCount;
209                }
210            } while (interrupted && ieCount <= 1);
211
212        } catch (final Exception ex) {
213            throw new LoggingException("Exception occurred writing log event", ex);
214        }
215    }
216
217    @Override
218    protected void releaseSub() {
219        LOGGER.debug("Shutting down FlumePersistentManager");
220        worker.shutdown();
221        try {
222            worker.join(SHUTDOWN_WAIT * MILLIS_PER_SECOND);
223        } catch (final InterruptedException ie) {
224            // Ignore the exception and shutdown.
225        }
226        threadPool.shutdown();
227        try {
228            threadPool.awaitTermination(SHUTDOWN_WAIT, TimeUnit.SECONDS);
229        } catch (final InterruptedException ie) {
230            LOGGER.warn("PersistentManager Thread pool failed to shut down");
231        }
232        try {
233            worker.join();
234        } catch (final InterruptedException ex) {
235            LOGGER.debug("Interrupted while waiting for worker to complete");
236        }
237        try {
238            LOGGER.debug("FlumePersistenceManager dataset status: {}", database.getStats(new StatsConfig()));
239            database.close();
240        } catch (final Exception ex) {
241            LOGGER.warn("Failed to close database", ex);
242        }
243        try {
244            environment.cleanLog();
245            environment.close();
246        } catch (final Exception ex) {
247            LOGGER.warn("Failed to close environment", ex);
248        }
249        super.releaseSub();
250    }
251
252    private void doSend(final SimpleEvent event) {
253        LOGGER.debug("Sending event to Flume");
254        super.send(event);
255    }
256
257    /**
258     * Thread for writing to Berkeley DB to avoid having interrupts close the database.
259     */
260    private static class BDBWriter implements Callable<Integer> {
261        private final byte[] eventData;
262        private final byte[] keyData;
263        private final Environment environment;
264        private final Database database;
265        private final Gate gate;
266        private final AtomicLong dbCount;
267        private final long batchSize;
268        private final int lockTimeoutRetryCount;
269
270        public BDBWriter(final byte[] keyData, final byte[] eventData, final Environment environment,
271                         final Database database, final Gate gate, final AtomicLong dbCount, final long batchSize,
272                         final int lockTimeoutRetryCount) {
273            this.keyData = keyData;
274            this.eventData = eventData;
275            this.environment = environment;
276            this.database = database;
277            this.gate = gate;
278            this.dbCount = dbCount;
279            this.batchSize = batchSize;
280            this.lockTimeoutRetryCount = lockTimeoutRetryCount;
281        }
282
283        @Override
284        public Integer call() throws Exception {
285            final DatabaseEntry key = new DatabaseEntry(keyData);
286            final DatabaseEntry data = new DatabaseEntry(eventData);
287            Exception exception = null;
288            for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
289                Transaction txn = null;
290                try {
291                    txn = environment.beginTransaction(null, null);
292                    try {
293                        database.put(txn, key, data);
294                        txn.commit();
295                        txn = null;
296                        if (dbCount.incrementAndGet() >= batchSize) {
297                            gate.open();
298                        }
299                        exception = null;
300                        break;
301                    } catch (final LockConflictException lce) {
302                        exception = lce;
303                        // Fall through and retry.
304                    } catch (final Exception ex) {
305                        if (txn != null) {
306                            txn.abort();
307                        }
308                        throw ex;
309                    } finally {
310                        if (txn != null) {
311                            txn.abort();
312                            txn = null;
313                        }
314                    }
315                } catch (final LockConflictException lce) {
316                    exception = lce;
317                    if (txn != null) {
318                        try {
319                            txn.abort();
320                            txn = null;
321                        } catch (final Exception ex) {
322                            LOGGER.trace("Ignoring exception while aborting transaction during lock conflict.");
323                        }
324                    }
325
326                }
327                try {
328                    Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
329                } catch (final InterruptedException ie) {
330                    // Ignore the error
331                }
332            }
333            if (exception != null) {
334                throw exception;
335            }
336            return eventData.length;
337        }
338    }
339
340    /**
341     * Factory data.
342     */
343    private static class FactoryData {
344        private final String name;
345        private final Agent[] agents;
346        private final int batchSize;
347        private final String dataDir;
348        private final int retries;
349        private final int connectionTimeout;
350        private final int requestTimeout;
351        private final int delayMillis;
352        private final int lockTimeoutRetryCount;
353        private final Property[] properties;
354
355        /**
356         * Constructor.
357         * @param name The name of the Appender.
358         * @param agents The agents.
359         * @param batchSize The number of events to include in a batch.
360         * @param dataDir The directory for data.
361         */
362        public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries,
363                           final int connectionTimeout, final int requestTimeout, final int delayMillis,
364                           final int lockTimeoutRetryCount, final String dataDir, final Property[] properties) {
365            this.name = name;
366            this.agents = agents;
367            this.batchSize = batchSize;
368            this.dataDir = dataDir;
369            this.retries = retries;
370            this.connectionTimeout = connectionTimeout;
371            this.requestTimeout = requestTimeout;
372            this.delayMillis = delayMillis;
373            this.lockTimeoutRetryCount = lockTimeoutRetryCount;
374            this.properties = properties;
375        }
376    }
377
378    /**
379     * Avro Manager Factory.
380     */
381    private static class BDBManagerFactory implements ManagerFactory<FlumePersistentManager, FactoryData> {
382
383        /**
384         * Create the FlumeKratiManager.
385         * @param name The name of the entity to manage.
386         * @param data The data required to create the entity.
387         * @return The FlumeKratiManager.
388         */
389        @Override
390        public FlumePersistentManager createManager(final String name, final FactoryData data) {
391            SecretKey secretKey = null;
392            Database database = null;
393            Environment environment = null;
394
395            final Map<String, String> properties = new HashMap<>();
396            if (data.properties != null) {
397                for (final Property property : data.properties) {
398                    properties.put(property.getName(), property.getValue());
399                }
400            }
401
402            try {
403                final File dir = new File(data.dataDir);
404                FileUtils.mkdir(dir, true);
405                final EnvironmentConfig dbEnvConfig = new EnvironmentConfig();
406                dbEnvConfig.setTransactional(true);
407                dbEnvConfig.setAllowCreate(true);
408                dbEnvConfig.setLockTimeout(5, TimeUnit.SECONDS);
409                environment = new Environment(dir, dbEnvConfig);
410                final DatabaseConfig dbConfig = new DatabaseConfig();
411                dbConfig.setTransactional(true);
412                dbConfig.setAllowCreate(true);
413                database = environment.openDatabase(null, name, dbConfig);
414            } catch (final Exception ex) {
415                LOGGER.error("Could not create FlumePersistentManager", ex);
416                // For consistency, close database as well as environment even though it should never happen since the
417                // database is that last thing in the block above, but this does guard against a future line being
418                // inserted at the end that would bomb (like some debug logging).
419                if (database != null) {
420                    database.close();
421                    database = null;
422                }
423                if (environment != null) {
424                    environment.close();
425                    environment = null;
426                }
427                return null;
428            }
429
430            try {
431                String key = null;
432                for (final Map.Entry<String, String> entry : properties.entrySet()) {
433                    if (entry.getKey().equalsIgnoreCase(KEY_PROVIDER)) {
434                        key = entry.getValue();
435                        break;
436                    }
437                }
438                if (key != null) {
439                    final PluginManager manager = new PluginManager("KeyProvider");
440                    manager.collectPlugins();
441                    final Map<String, PluginType<?>> plugins = manager.getPlugins();
442                    if (plugins != null) {
443                        boolean found = false;
444                        for (final Map.Entry<String, PluginType<?>> entry : plugins.entrySet()) {
445                            if (entry.getKey().equalsIgnoreCase(key)) {
446                                found = true;
447                                final Class<?> cl = entry.getValue().getPluginClass();
448                                try {
449                                    final SecretKeyProvider provider = (SecretKeyProvider) cl.newInstance();
450                                    secretKey = provider.getSecretKey();
451                                    LOGGER.debug("Persisting events using SecretKeyProvider {}", cl.getName());
452                                } catch (final Exception ex) {
453                                    LOGGER.error("Unable to create SecretKeyProvider {}, encryption will be disabled",
454                                        cl.getName());
455                                }
456                                break;
457                            }
458                        }
459                        if (!found) {
460                            LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
461                        }
462                    } else {
463                        LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
464                    }
465                }
466            } catch (final Exception ex) {
467                LOGGER.warn("Error setting up encryption - encryption will be disabled", ex);
468            }
469            return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries,
470                data.connectionTimeout, data.requestTimeout, data.delayMillis, database, environment, secretKey,
471                data.lockTimeoutRetryCount);
472        }
473    }
474
475    /**
476     * Thread that sends data to Flume and pulls it from Berkeley DB.
477     */
478    private static class WriterThread extends Thread  {
479        private volatile boolean shutdown = false;
480        private final Database database;
481        private final Environment environment;
482        private final FlumePersistentManager manager;
483        private final Gate gate;
484        private final SecretKey secretKey;
485        private final int batchSize;
486        private final AtomicLong dbCounter;
487        private final int lockTimeoutRetryCount;
488
489        public WriterThread(final Database database, final Environment environment,
490                            final FlumePersistentManager manager, final Gate gate, final int batchsize,
491                            final SecretKey secretKey, final AtomicLong dbCount, final int lockTimeoutRetryCount) {
492            this.database = database;
493            this.environment = environment;
494            this.manager = manager;
495            this.gate = gate;
496            this.batchSize = batchsize;
497            this.secretKey = secretKey;
498            this.setDaemon(true);
499            this.dbCounter = dbCount;
500            this.lockTimeoutRetryCount = lockTimeoutRetryCount;
501        }
502
503        public void shutdown() {
504            LOGGER.debug("Writer thread shutting down");
505            this.shutdown = true;
506            gate.open();
507        }
508
509        public boolean isShutdown() {
510            return shutdown;
511        }
512
513        @Override
514        public void run() {
515            LOGGER.trace("WriterThread started - batch size = " + batchSize + ", delayMillis = " + manager.getDelayMillis());
516            long nextBatchMillis = System.currentTimeMillis() + manager.getDelayMillis();
517            while (!shutdown) {
518                final long nowMillis = System.currentTimeMillis();
519                final long dbCount = database.count();
520                dbCounter.set(dbCount);
521                if (dbCount >= batchSize || dbCount > 0 && nextBatchMillis <= nowMillis) {
522                    nextBatchMillis = nowMillis + manager.getDelayMillis();
523                    try {
524                        boolean errors = false;
525                        final DatabaseEntry key = new DatabaseEntry();
526                        final DatabaseEntry data = new DatabaseEntry();
527
528                        gate.close();
529                        OperationStatus status;
530                        if (batchSize > 1) {
531                            try {
532                                errors = sendBatch(key, data);
533                            } catch (final Exception ex) {
534                                break;
535                            }
536                        } else {
537                            Exception exception = null;
538                            for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
539                                exception = null;
540                                Transaction txn = null;
541                                Cursor cursor = null;
542                                try {
543                                    txn = environment.beginTransaction(null, null);
544                                    cursor = database.openCursor(txn, null);
545                                    try {
546                                        status = cursor.getFirst(key, data, LockMode.RMW);
547                                        while (status == OperationStatus.SUCCESS) {
548                                            final SimpleEvent event = createEvent(data);
549                                            if (event != null) {
550                                                try {
551                                                    manager.doSend(event);
552                                                } catch (final Exception ioe) {
553                                                    errors = true;
554                                                    LOGGER.error("Error sending event", ioe);
555                                                    break;
556                                                }
557                                                try {
558                                                    cursor.delete();
559                                                } catch (final Exception ex) {
560                                                    LOGGER.error("Unable to delete event", ex);
561                                                }
562                                            }
563                                            status = cursor.getNext(key, data, LockMode.RMW);
564                                        }
565                                        if (cursor != null) {
566                                            cursor.close();
567                                            cursor = null;
568                                        }
569                                        txn.commit();
570                                        txn = null;
571                                        dbCounter.decrementAndGet();
572                                        exception = null;
573                                        break;
574                                    } catch (final LockConflictException lce) {
575                                        exception = lce;
576                                        // Fall through and retry.
577                                    } catch (final Exception ex) {
578                                        LOGGER.error("Error reading or writing to database", ex);
579                                        shutdown = true;
580                                        break;
581                                    } finally {
582                                        if (cursor != null) {
583                                            cursor.close();
584                                            cursor = null;
585                                        }
586                                        if (txn != null) {
587                                            txn.abort();
588                                            txn = null;
589                                        }
590                                    }
591                                } catch (final LockConflictException lce) {
592                                    exception = lce;
593                                    if (cursor != null) {
594                                        try {
595                                            cursor.close();
596                                            cursor = null;
597                                        } catch (final Exception ex) {
598                                            LOGGER.trace("Ignored exception closing cursor during lock conflict.");
599                                        }
600                                    }
601                                    if (txn != null) {
602                                        try {
603                                            txn.abort();
604                                            txn = null;
605                                        } catch (final Exception ex) {
606                                            LOGGER.trace("Ignored exception aborting tx during lock conflict.");
607                                        }
608                                    }
609                                }
610                                try {
611                                    Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
612                                } catch (final InterruptedException ie) {
613                                    // Ignore the error
614                                }
615                            }
616                            if (exception != null) {
617                                LOGGER.error("Unable to read or update data base", exception);
618                            }
619                        }
620                        if (errors) {
621                            Thread.sleep(manager.getDelayMillis());
622                            continue;
623                        }
624                    } catch (final Exception ex) {
625                        LOGGER.warn("WriterThread encountered an exception. Continuing.", ex);
626                    }
627                } else {
628                    if (nextBatchMillis <= nowMillis) {
629                        nextBatchMillis = nowMillis + manager.getDelayMillis();
630                    }
631                    try {
632                        final long interval = nextBatchMillis - nowMillis;
633                        gate.waitForOpen(interval);
634                    } catch (final InterruptedException ie) {
635                        LOGGER.warn("WriterThread interrupted, continuing");
636                    } catch (final Exception ex) {
637                        LOGGER.error("WriterThread encountered an exception waiting for work", ex);
638                        break;
639                    }
640                }
641            }
642
643            if (batchSize > 1 && database.count() > 0) {
644                final DatabaseEntry key = new DatabaseEntry();
645                final DatabaseEntry data = new DatabaseEntry();
646                try {
647                    sendBatch(key, data);
648                } catch (final Exception ex) {
649                    LOGGER.warn("Unable to write final batch");
650                }
651            }
652            LOGGER.trace("WriterThread exiting");
653        }
654
655        private boolean sendBatch(DatabaseEntry key, final DatabaseEntry data) throws Exception {
656            boolean errors = false;
657            OperationStatus status;
658            Cursor cursor = null;
659            try {
660                final BatchEvent batch = new BatchEvent();
661                for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
662                        try {
663                                cursor = database.openCursor(null, CursorConfig.DEFAULT);
664                                status = cursor.getFirst(key, data, null);
665
666                                for (int i = 0; status == OperationStatus.SUCCESS && i < batchSize; ++i) {
667                                        final SimpleEvent event = createEvent(data);
668                                        if (event != null) {
669                                                batch.addEvent(event);
670                                        }
671                                        status = cursor.getNext(key, data, null);
672                                }
673                                break;
674                        } catch (final LockConflictException lce) {
675                                if (cursor != null) {
676                                        try {
677                                cursor.close();
678                                cursor = null;
679                            } catch (final Exception ex) {
680                                LOGGER.trace("Ignored exception closing cursor during lock conflict.");
681                            }
682                        }
683                    }
684                }
685
686                try {
687                    manager.send(batch);
688                } catch (final Exception ioe) {
689                    LOGGER.error("Error sending events", ioe);
690                    errors = true;
691                }
692                if (!errors) {
693                        if (cursor != null) {
694                            cursor.close();
695                            cursor = null;
696                        }
697                    Transaction txn = null;
698                    Exception exception = null;
699                    for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
700                        try {
701                            txn = environment.beginTransaction(null, null);
702                            try {
703                                for (final Event event : batch.getEvents()) {
704                                    try {
705                                        final Map<String, String> headers = event.getHeaders();
706                                        key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
707                                        database.delete(txn, key);
708                                    } catch (final Exception ex) {
709                                        LOGGER.error("Error deleting key from database", ex);
710                                    }
711                                }
712                                txn.commit();
713                                long count = dbCounter.get();
714                                while (!dbCounter.compareAndSet(count, count - batch.getEvents().size())) {
715                                    count = dbCounter.get();
716                                }
717                                exception = null;
718                                break;
719                            } catch (final LockConflictException lce) {
720                                exception = lce;
721                                if (cursor != null) {
722                                    try {
723                                        cursor.close();
724                                        cursor = null;
725                                    } catch (final Exception ex) {
726                                        LOGGER.trace("Ignored exception closing cursor during lock conflict.");
727                                    }
728                                }
729                                if (txn != null) {
730                                    try {
731                                        txn.abort();
732                                        txn = null;
733                                    } catch (final Exception ex) {
734                                        LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
735                                    }
736                                }
737                            } catch (final Exception ex) {
738                                LOGGER.error("Unable to commit transaction", ex);
739                                if (txn != null) {
740                                    txn.abort();
741                                }
742                            }
743                        } catch (final LockConflictException lce) {
744                            exception = lce;
745                            if (cursor != null) {
746                                try {
747                                    cursor.close();
748                                    cursor = null;
749                                } catch (final Exception ex) {
750                                    LOGGER.trace("Ignored exception closing cursor during lock conflict.");
751                                }
752                            }
753                            if (txn != null) {
754                                try {
755                                    txn.abort();
756                                    txn = null;
757                                } catch (final Exception ex) {
758                                    LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
759                                }
760                            }
761                        } finally {
762                            if (cursor != null) {
763                                cursor.close();
764                                cursor = null;
765                            }
766                            if (txn != null) {
767                                txn.abort();
768                                txn = null;
769                            }
770                        }
771                        try {
772                            Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
773                        } catch (final InterruptedException ie) {
774                            // Ignore the error
775                        }
776                    }
777                    if (exception != null) {
778                        LOGGER.error("Unable to delete events from data base", exception);
779                    }
780                }
781            } catch (final Exception ex) {
782                LOGGER.error("Error reading database", ex);
783                shutdown = true;
784                throw ex;
785            } finally {
786                if (cursor != null) {
787                    cursor.close();
788                }
789            }
790
791            return errors;
792        }
793
794        private SimpleEvent createEvent(final DatabaseEntry data) {
795            final SimpleEvent event = new SimpleEvent();
796            try {
797                byte[] eventData = data.getData();
798                if (secretKey != null) {
799                    final Cipher cipher = Cipher.getInstance("AES");
800                    cipher.init(Cipher.DECRYPT_MODE, secretKey);
801                    eventData = cipher.doFinal(eventData);
802                }
803                final ByteArrayInputStream bais = new ByteArrayInputStream(eventData);
804                final DataInputStream dais = new DataInputStream(bais);
805                int length = dais.readInt();
806                final byte[] bytes = new byte[length];
807                dais.read(bytes, 0, length);
808                event.setBody(bytes);
809                length = dais.readInt();
810                final Map<String, String> map = new HashMap<>(length);
811                for (int i = 0; i < length; ++i) {
812                    final String headerKey = dais.readUTF();
813                    final String value = dais.readUTF();
814                    map.put(headerKey, value);
815                }
816                event.setHeaders(map);
817                return event;
818            } catch (final Exception ex) {
819                LOGGER.error("Error retrieving event", ex);
820                return null;
821            }
822        }
823
824    }
825
826    /**
827     * Factory that creates Daemon threads that can be properly shut down.
828     */
829    private static class DaemonThreadFactory implements ThreadFactory {
830        private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
831        private final ThreadGroup group;
832        private final AtomicInteger threadNumber = new AtomicInteger(1);
833        private final String namePrefix;
834
835        public DaemonThreadFactory() {
836            final SecurityManager securityManager = System.getSecurityManager();
837            group = securityManager != null ? securityManager.getThreadGroup() :
838                Thread.currentThread().getThreadGroup();
839            namePrefix = "DaemonPool-" + POOL_NUMBER.getAndIncrement() + "-thread-";
840        }
841
842        @Override
843        public Thread newThread(final Runnable r) {
844            final Thread thread = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
845            thread.setDaemon(true);
846            if (thread.getPriority() != Thread.NORM_PRIORITY) {
847                thread.setPriority(Thread.NORM_PRIORITY);
848            }
849            return thread;
850        }
851    }
852
853    /**
854     * An internal class.
855     */
856    private static class Gate {
857
858        private boolean isOpen = false;
859
860        public boolean isOpen() {
861            return isOpen;
862        }
863
864        public synchronized void open() {
865            isOpen = true;
866            notifyAll();
867        }
868
869        public synchronized void close() {
870            isOpen = false;
871        }
872
873        public synchronized void waitForOpen(final long timeout) throws InterruptedException {
874            wait(timeout);
875        }
876    }
877}