001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements. See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache license, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License. You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the license for the specific language governing permissions and
015     * limitations under the license.
016     */
017    package org.apache.logging.log4j.flume.appender;
018    
019    import com.sleepycat.je.Cursor;
020    import com.sleepycat.je.CursorConfig;
021    import com.sleepycat.je.Database;
022    import com.sleepycat.je.DatabaseConfig;
023    import com.sleepycat.je.DatabaseEntry;
024    import com.sleepycat.je.Environment;
025    import com.sleepycat.je.EnvironmentConfig;
026    import com.sleepycat.je.LockMode;
027    import com.sleepycat.je.OperationStatus;
028    import com.sleepycat.je.StatsConfig;
029    import com.sleepycat.je.Transaction;
030    import org.apache.flume.Event;
031    import org.apache.flume.event.SimpleEvent;
032    import org.apache.logging.log4j.LoggingException;
033    import org.apache.logging.log4j.core.appender.ManagerFactory;
034    import org.apache.logging.log4j.core.config.Property;
035    import org.apache.logging.log4j.core.config.plugins.PluginManager;
036    import org.apache.logging.log4j.core.config.plugins.PluginType;
037    import org.apache.logging.log4j.core.helpers.FileUtils;
038    import org.apache.logging.log4j.core.helpers.SecretKeyProvider;
039    
040    import javax.crypto.Cipher;
041    import javax.crypto.SecretKey;
042    import java.io.ByteArrayInputStream;
043    import java.io.ByteArrayOutputStream;
044    import java.io.DataInputStream;
045    import java.io.DataOutputStream;
046    import java.io.File;
047    import java.nio.charset.Charset;
048    import java.util.HashMap;
049    import java.util.Map;
050    import java.util.concurrent.LinkedBlockingQueue;
051    import java.util.concurrent.TimeUnit;
052    import java.util.concurrent.locks.ReadWriteLock;
053    import java.util.concurrent.locks.ReentrantReadWriteLock;
054    import java.util.logging.Level;
055    
056    /**
057     *
058     */
059    public class FlumePersistentManager extends FlumeAvroManager {
060    
061        public static final String KEY_PROVIDER = "keyProvider";
062    
063        private static final Charset UTF8 = Charset.forName("UTF-8");
064    
065        private static final String SHUTDOWN = "Shutdown";
066    
067        private static final String DEFAULT_DATA_DIR = ".log4j/flumeData";
068    
069        private static BDBManagerFactory factory = new BDBManagerFactory();
070    
071        private Database database;
072    
073        private Environment environment;
074    
075        private final WriterThread worker;
076    
077        private final LinkedBlockingQueue<byte []> queue = new LinkedBlockingQueue<byte[]>();
078    
079        private final SecretKey secretKey;
080    
081        private final int delay;
082    
083        /**
084         * Constructor
085         * @param name The unique name of this manager.
086         * @param agents An array of Agents.
087         * @param batchSize The number of events to include in a batch.
088         * @param database The database to write to.
089         */
090        protected FlumePersistentManager(final String name, final String shortName, final Agent[] agents,
091                                         final int batchSize, final int retries, final int connectionTimeout,
092                                         final int requestTimeout, final int delay, final Database database,
093                                         final Environment environment, SecretKey secretKey) {
094            super(name, shortName, agents, batchSize, retries, connectionTimeout, requestTimeout);
095            this.delay = delay;
096            this.database = database;
097            this.environment = environment;
098            this.worker = new WriterThread(database, environment, this, queue, batchSize, secretKey);
099            this.worker.start();
100            this.secretKey = secretKey;
101        }
102    
103    
104        /**
105         * Returns a FlumeAvroManager.
106         * @param name The name of the manager.
107         * @param agents The agents to use.
108         * @param batchSize The number of events to include in a batch.
109         * @return A FlumeAvroManager.
110         */
111        public static FlumePersistentManager getManager(final String name, final Agent[] agents, Property[] properties,
112                                                        int batchSize, final int retries, final int connectionTimeout,
113                                                        final int requestTimeout, final int delay, final String dataDir) {
114            if (agents == null || agents.length == 0) {
115                throw new IllegalArgumentException("At least one agent is required");
116            }
117    
118            if (batchSize <= 0) {
119                batchSize = 1;
120            }
121            String dataDirectory = dataDir == null || dataDir.length() == 0 ? DEFAULT_DATA_DIR : dataDir;
122    
123            final StringBuilder sb = new StringBuilder("FlumePersistent[");
124            boolean first = true;
125            for (final Agent agent : agents) {
126                if (!first) {
127                    sb.append(",");
128                }
129                sb.append(agent.getHost()).append(":").append(agent.getPort());
130                first = false;
131            }
132            sb.append("]");
133            sb.append(" ").append(dataDirectory);
134            return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries,
135                connectionTimeout, requestTimeout, delay, dataDir, properties));
136        }
137    
138        @Override
139        public void send(final Event event)  {
140            if (worker.isShutdown()) {
141                throw new LoggingException("Unable to record event");
142            }
143    
144            Map<String, String> headers = event.getHeaders();
145            byte[] keyData = headers.get(FlumeEvent.GUID).getBytes(UTF8);
146            try {
147                ByteArrayOutputStream baos = new ByteArrayOutputStream();
148                DataOutputStream daos = new DataOutputStream(baos);
149                daos.writeInt(event.getBody().length);
150                daos.write(event.getBody(), 0, event.getBody().length);
151                daos.writeInt(event.getHeaders().size());
152                for (Map.Entry<String, String> entry : headers.entrySet()) {
153                    daos.writeUTF(entry.getKey());
154                    daos.writeUTF(entry.getValue());
155                }
156                byte[] eventData = baos.toByteArray();
157                if (secretKey != null) {
158                    Cipher cipher = Cipher.getInstance("AES");
159                    cipher.init(Cipher.ENCRYPT_MODE, secretKey);
160                    eventData = cipher.doFinal(eventData);
161                }
162                final DatabaseEntry key = new DatabaseEntry(keyData);
163                final DatabaseEntry data = new DatabaseEntry(eventData);
164                Transaction txn = environment.beginTransaction(null, null);
165                try {
166                    database.put(txn, key, data);
167                    txn.commit();
168                    queue.add(keyData);
169                } catch (Exception ex) {
170                    if (txn != null) {
171                        txn.abort();
172                    }
173                    throw ex;
174                }
175            } catch (Exception ex) {
176                throw new LoggingException("Exception occurred writing log event", ex);
177            }
178        }
179    
180        @Override
181        protected void releaseSub() {
182            LOGGER.debug("Shutting down FlumePersistentManager");
183            worker.shutdown();
184            try {
185                worker.join();
186            } catch (InterruptedException ex) {
187                LOGGER.debug("Interrupted while waiting for worker to complete");
188            }
189            try {
190                LOGGER.debug("FlumePersistenceManager dataset status: {}", database.getStats(new StatsConfig()));
191                database.close();
192            } catch (final Exception ex) {
193                LOGGER.warn("Failed to close database", ex);
194            }
195            try {
196                environment.cleanLog();
197                environment.close();
198            } catch (final Exception ex) {
199                LOGGER.warn("Failed to close environment", ex);
200            }
201            super.releaseSub();
202        }
203    
204        private void doSend(final SimpleEvent event) {
205            LOGGER.debug("Sending event to Flume");
206            super.send(event);
207        }
208    
209        /**
210         * Factory data.
211         */
212        private static class FactoryData {
213            private final String name;
214            private final Agent[] agents;
215            private final int batchSize;
216            private final String dataDir;
217            private final int retries;
218            private final int connectionTimeout;
219            private final int requestTimeout;
220            private final int delay;
221            private final Property[] properties;
222    
223            /**
224             * Constructor.
225             * @param name The name of the Appender.
226             * @param agents The agents.
227             * @param batchSize The number of events to include in a batch.
228             * @param dataDir The directory for data.
229             */
230            public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries,
231                               final int connectionTimeout, final int requestTimeout, final int delay,
232                               final String dataDir, final Property[] properties) {
233                this.name = name;
234                this.agents = agents;
235                this.batchSize = batchSize;
236                this.dataDir = dataDir;
237                this.retries = retries;
238                this.connectionTimeout = connectionTimeout;
239                this.requestTimeout = requestTimeout;
240                this.delay = delay;
241                this.properties = properties;
242            }
243        }
244    
245        /**
246         * Avro Manager Factory.
247         */
248        private static class BDBManagerFactory implements ManagerFactory<FlumePersistentManager, FactoryData> {
249    
250            /**
251             * Create the FlumeKratiManager.
252             * @param name The name of the entity to manage.
253             * @param data The data required to create the entity.
254             * @return The FlumeKratiManager.
255             */
256            @Override
257            public FlumePersistentManager createManager(final String name, final FactoryData data) {
258                SecretKey secretKey = null;
259    
260                Database database;
261                Environment environment;
262    
263                Map<String, String> properties = new HashMap<String, String>();
264                if (data.properties != null) {
265                    for (Property property : data.properties) {
266                        properties.put(property.getName(), property.getValue());
267                    }
268                }
269    
270                try {
271    
272                    File dir = new File(data.dataDir);
273                    FileUtils.mkdir(dir, true);
274                    final EnvironmentConfig dbEnvConfig = new EnvironmentConfig();
275                    dbEnvConfig.setTransactional(true);
276                    dbEnvConfig.setAllowCreate(true);
277                    dbEnvConfig.setLockTimeout(5, TimeUnit.SECONDS);
278                    environment = new Environment(dir, dbEnvConfig);
279                    final DatabaseConfig dbConfig = new DatabaseConfig();
280                    dbConfig.setTransactional(true);
281                    dbConfig.setAllowCreate(true);
282                    database = environment.openDatabase(null, name, dbConfig);
283                } catch (final Exception ex) {
284                    LOGGER.error("Could not create FlumePersistentManager", ex);
285                    return null;
286                }
287    
288                try {
289                    String key = null;
290                    for (Map.Entry<String, String> entry : properties.entrySet()) {
291                        if (entry.getKey().equalsIgnoreCase(KEY_PROVIDER)) {
292                            key = entry.getValue();
293                        }
294                    }
295                    if (key != null) {
296                        final PluginManager manager = new PluginManager("KeyProvider", SecretKeyProvider.class);
297                        manager.collectPlugins();
298                        final Map<String, PluginType> plugins = manager.getPlugins();
299                        if (plugins != null) {
300                            boolean found = false;
301                            for (Map.Entry<String, PluginType> entry : plugins.entrySet()) {
302                                if (entry.getKey().equalsIgnoreCase(key)) {
303                                    found = true;
304                                    Class cl = entry.getValue().getPluginClass();
305                                    try {
306                                        SecretKeyProvider provider = (SecretKeyProvider) cl.newInstance();
307                                        secretKey = provider.getSecretKey();
308                                        LOGGER.debug("Persisting events using SecretKeyProvider {}", cl.getName());
309                                    } catch (Exception ex) {
310                                        LOGGER.error("Unable to create SecretKeyProvider {}, encryption will be disabled",
311                                            cl.getName());
312                                    }
313                                    break;
314                                }
315                            }
316                            if (!found) {
317                                LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
318                            }
319                        } else {
320                            LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
321                        }
322                    }
323                } catch (Exception ex) {
324                    LOGGER.warn("Error setting up encryption - encryption will be disabled", ex);
325                }
326                return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries,
327                    data.connectionTimeout, data.requestTimeout, data.delay, database, environment, secretKey);
328            }
329        }
330    
331        private static class WriterThread extends Thread  {
332            private volatile boolean shutdown = false;
333            private final Database database;
334            private final Environment environment;
335            private final FlumePersistentManager manager;
336            private final LinkedBlockingQueue<byte[]> queue;
337            private final SecretKey secretKey;
338            private final int batchSize;
339    
340            public WriterThread(Database database, Environment environment, FlumePersistentManager manager,
341                                LinkedBlockingQueue<byte[]> queue, int batchsize, SecretKey secretKey) {
342                this.database = database;
343                this.environment = environment;
344                this.manager = manager;
345                this.queue = queue;
346                this.batchSize = batchsize;
347                this.secretKey = secretKey;
348                this.setDaemon(true);
349            }
350    
351            public void shutdown() {
352                LOGGER.debug("Writer thread shutting down");
353                this.shutdown = true;
354                if (queue.size() == 0) {
355                    queue.add(SHUTDOWN.getBytes(UTF8));
356                }
357            }
358    
359            public boolean isShutdown() {
360                return shutdown;
361            }
362    
363            @Override
364            public void run() {
365                LOGGER.trace("WriterThread started - batch size = " + batchSize + ", delay = " + manager.delay);
366                long nextBatch = System.currentTimeMillis() + manager.delay;
367                while (!shutdown) {
368                    long now = System.currentTimeMillis();
369                    if (database.count() >= batchSize || (database.count() > 0 && nextBatch < now)) {
370                        nextBatch = now + manager.delay;
371                        try {
372                            boolean errors = false;
373                            DatabaseEntry key = new DatabaseEntry();
374                            final DatabaseEntry data = new DatabaseEntry();
375    
376                            queue.clear();
377                            OperationStatus status;
378                            if (batchSize > 1) {
379                                Cursor cursor = database.openCursor(null, CursorConfig.DEFAULT);
380                                try {
381                                    status = cursor.getFirst(key, data, null);
382    
383                                    BatchEvent batch = new BatchEvent();
384                                    for (int i = 0; status == OperationStatus.SUCCESS && i < batchSize; ++i) {
385                                        SimpleEvent event = createEvent(data);
386                                        if (event != null) {
387                                            batch.addEvent(event);
388                                        }
389                                        status = cursor.getNext(key, data, null);
390                                    }
391                                    try {
392                                        manager.send(batch);
393                                    } catch (Exception ioe) {
394                                        LOGGER.error("Error sending events", ioe);
395                                        break;
396                                    }
397                                    cursor.close();
398                                    cursor = null;
399                                    Transaction txn = environment.beginTransaction(null, null);
400                                    try {
401                                        for (Event event : batch.getEvents()) {
402                                            try {
403                                                Map<String, String> headers = event.getHeaders();
404                                                key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
405                                                database.delete(txn, key);
406                                            } catch (Exception ex) {
407                                                LOGGER.error("Error deleting key from database", ex);
408                                            }
409                                        }
410                                        txn.commit();
411                                    } catch (Exception ex) {
412                                        LOGGER.error("Unable to commit transaction", ex);
413                                        if (txn != null) {
414                                            txn.abort();
415                                        }
416                                    }
417                                } catch (Exception ex) {
418                                    LOGGER.error("Error reading database", ex);
419                                    shutdown = true;
420                                    break;
421                                } finally {
422                                    if (cursor != null) {
423                                        cursor.close();
424                                    }
425                                }
426                            } else {
427                                Transaction txn = environment.beginTransaction(null, null);
428                                Cursor cursor = database.openCursor(txn, null);
429                                try {
430                                    status = cursor.getFirst(key, data, LockMode.RMW);
431                                    while (status == OperationStatus.SUCCESS) {
432                                        SimpleEvent event = createEvent(data);
433                                        if (event != null) {
434                                            try {
435                                                manager.doSend(event);
436                                            } catch (Exception ioe) {
437                                                errors = true;
438                                                LOGGER.error("Error sending event", ioe);
439                                                break;
440                                            }
441                                            if (!errors) {
442                                                try {
443                                                    cursor.delete();
444                                                } catch (Exception ex) {
445                                                    LOGGER.error("Unable to delete event", ex);
446                                                }
447                                            }
448                                        }
449                                        status = cursor.getNext(key, data, LockMode.RMW);
450                                    }
451                                    if (cursor != null) {
452                                        cursor.close();
453                                        cursor = null;
454                                    }
455                                    txn.commit();
456                                    txn = null;
457                                } catch (Exception ex) {
458                                    LOGGER.error("Error reading or writing to database", ex);
459                                    shutdown = true;
460                                    break;
461                                } finally {
462                                    if (cursor != null) {
463                                        cursor.close();
464                                    }
465                                    if (txn != null) {
466                                        txn.abort();
467                                    }
468                                }
469                            }
470                            if (errors) {
471                                Thread.sleep(manager.delay);
472                                continue;
473                            }
474                        } catch (Exception ex) {
475                            LOGGER.warn("WriterThread encountered an exception. Continuing.", ex);
476                        }
477                    } else {
478                        while (!shutdown && (database.count() == 0 || database.count() < batchSize && nextBatch > now)) {
479                            try {
480                                long interval = nextBatch - now;
481                                queue.poll(interval, TimeUnit.MILLISECONDS);
482                            } catch (InterruptedException ie) {
483                                LOGGER.warn("WriterThread interrupted, continuing");
484                            } catch (Exception ex) {
485                                LOGGER.error("WriterThread encountered an exception waiting for work", ex);
486                                break;
487                            }
488                            now = System.currentTimeMillis();
489                            if (database.count() == 0) {
490                                nextBatch = now + manager.delay;
491                            }
492                        }
493                        LOGGER.debug("WriterThread ready to work");
494                    }
495                }
496                LOGGER.trace("WriterThread exiting");
497            }
498    
499            private SimpleEvent createEvent(DatabaseEntry data) {
500                SimpleEvent event = new SimpleEvent();
501                try {
502                    byte[] eventData = data.getData();
503                    if (secretKey != null) {
504                        Cipher cipher = Cipher.getInstance("AES");
505                        cipher.init(Cipher.DECRYPT_MODE, secretKey);
506                        eventData = cipher.doFinal(eventData);
507                    }
508                    ByteArrayInputStream bais = new ByteArrayInputStream(eventData);
509                    DataInputStream dais = new DataInputStream(bais);
510                    int length = dais.readInt();
511                    byte[] bytes = new byte[length];
512                    dais.read(bytes, 0, length);
513                    event.setBody(bytes);
514                    length = dais.readInt();
515                    Map<String, String> map = new HashMap<String, String>(length);
516                    for (int i = 0; i < length; ++i) {
517                        String headerKey = dais.readUTF();
518                        String value = dais.readUTF();
519                        map.put(headerKey, value);
520                    }
521                    event.setHeaders(map);
522                    return event;
523                } catch (Exception ex) {
524                    LOGGER.error("Error retrieving event", ex);
525                    return null;
526                }
527            }
528    
529        }
530    }