001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements. See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache license, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License. You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the license for the specific language governing permissions and
015     * limitations under the license.
016     */
017    package org.apache.logging.log4j.flume.appender;
018    
019    import com.sleepycat.je.Cursor;
020    import com.sleepycat.je.Database;
021    import com.sleepycat.je.DatabaseConfig;
022    import com.sleepycat.je.DatabaseEntry;
023    import com.sleepycat.je.Environment;
024    import com.sleepycat.je.EnvironmentConfig;
025    import com.sleepycat.je.LockMode;
026    import com.sleepycat.je.OperationStatus;
027    import com.sleepycat.je.StatsConfig;
028    import org.apache.flume.Event;
029    import org.apache.flume.event.SimpleEvent;
030    import org.apache.logging.log4j.LoggingException;
031    import org.apache.logging.log4j.core.appender.ManagerFactory;
032    import org.apache.logging.log4j.core.config.Property;
033    import org.apache.logging.log4j.core.config.plugins.PluginManager;
034    import org.apache.logging.log4j.core.config.plugins.PluginType;
035    import org.apache.logging.log4j.core.helpers.FileUtils;
036    import org.apache.logging.log4j.core.helpers.SecretKeyProvider;
037    
038    import javax.crypto.Cipher;
039    import javax.crypto.SecretKey;
040    import java.io.ByteArrayInputStream;
041    import java.io.ByteArrayOutputStream;
042    import java.io.DataInputStream;
043    import java.io.DataOutputStream;
044    import java.io.File;
045    import java.nio.charset.Charset;
046    import java.util.HashMap;
047    import java.util.Map;
048    import java.util.concurrent.LinkedBlockingQueue;
049    import java.util.concurrent.TimeUnit;
050    
051    /**
052     *
053     */
054    public class FlumePersistentManager extends FlumeAvroManager {
055    
056        public static final String KEY_PROVIDER = "keyProvider";
057    
058        private static final Charset UTF8 = Charset.forName("UTF-8");
059    
060        private static final String SHUTDOWN = "Shutdown";
061    
062        private static final String DEFAULT_DATA_DIR = ".log4j/flumeData";
063    
064        private static BDBManagerFactory factory = new BDBManagerFactory();
065    
066        private Database database;
067    
068        private final WriterThread worker;
069    
070        private final LinkedBlockingQueue<byte []> queue = new LinkedBlockingQueue<byte[]>();
071    
072        private final SecretKey secretKey;
073    
074        private final int delay;
075    
076        /**
077         * Constructor
078         * @param name The unique name of this manager.
079         * @param agents An array of Agents.
080         * @param batchSize The number of events to include in a batch.
081         * @param database The database to write to.
082         */
083        protected FlumePersistentManager(final String name, final String shortName, final Agent[] agents,
084                                         final int batchSize, final int retries, final int connectionTimeout,
085                                         final int requestTimeout, final int delay, final Database database,
086                                         SecretKey secretKey) {
087            super(name, shortName, agents, batchSize, retries, connectionTimeout, requestTimeout);
088            this.delay = delay;
089            this.database = database;
090            this.worker = new WriterThread(database, this, queue, batchSize, secretKey);
091            this.worker.start();
092            this.secretKey = secretKey;
093        }
094    
095    
096        /**
097         * Returns a FlumeAvroManager.
098         * @param name The name of the manager.
099         * @param agents The agents to use.
100         * @param batchSize The number of events to include in a batch.
101         * @return A FlumeAvroManager.
102         */
103        public static FlumePersistentManager getManager(final String name, final Agent[] agents, Property[] properties,
104                                                        int batchSize, final int retries, final int connectionTimeout,
105                                                        final int requestTimeout, final int delay, final String dataDir) {
106            if (agents == null || agents.length == 0) {
107                throw new IllegalArgumentException("At least one agent is required");
108            }
109    
110            if (batchSize <= 0) {
111                batchSize = 1;
112            }
113            String dataDirectory = dataDir == null || dataDir.length() == 0 ? DEFAULT_DATA_DIR : dataDir;
114    
115            final StringBuilder sb = new StringBuilder("FlumePersistent[");
116            boolean first = true;
117            for (final Agent agent : agents) {
118                if (!first) {
119                    sb.append(",");
120                }
121                sb.append(agent.getHost()).append(":").append(agent.getPort());
122                first = false;
123            }
124            sb.append("]");
125            sb.append(" ").append(dataDirectory);
126            return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries,
127                connectionTimeout, requestTimeout, delay, dataDir, properties));
128        }
129    
130        @Override
131        public synchronized void send(final Event event)  {
132            if (worker.isShutdown()) {
133                throw new LoggingException("Unable to record event");
134            }
135    
136            Map<String, String> headers = event.getHeaders();
137            byte[] keyData = headers.get(FlumeEvent.GUID).getBytes(UTF8);
138            try {
139                ByteArrayOutputStream baos = new ByteArrayOutputStream();
140                DataOutputStream daos = new DataOutputStream(baos);
141                daos.writeInt(event.getBody().length);
142                daos.write(event.getBody(), 0, event.getBody().length);
143                daos.writeInt(event.getHeaders().size());
144                for (Map.Entry<String, String> entry : headers.entrySet()) {
145                    daos.writeUTF(entry.getKey());
146                    daos.writeUTF(entry.getValue());
147                }
148                byte[] eventData = baos.toByteArray();
149                if (secretKey != null) {
150                    Cipher cipher = Cipher.getInstance("AES");
151                    cipher.init(Cipher.ENCRYPT_MODE, secretKey);
152                    eventData = cipher.doFinal(eventData);
153                }
154                final DatabaseEntry key = new DatabaseEntry(keyData);
155                final DatabaseEntry data = new DatabaseEntry(eventData);
156                database.put(null, key, data);
157                queue.add(keyData);
158            } catch (Exception ex) {
159                throw new LoggingException("Exception occurred writing log event", ex);
160            }
161        }
162    
163        @Override
164        protected void releaseSub() {
165            LOGGER.debug("Shutting down FlumePersistentManager");
166            worker.shutdown();
167            try {
168                worker.join();
169            } catch (InterruptedException ex) {
170                LOGGER.debug("Interrupted while waiting for worker to complete");
171            }
172            try {
173                LOGGER.debug("FlumePersistenceManager dataset status: {}", database.getStats(new StatsConfig()));
174                database.close();
175            } catch (final Exception ex) {
176                LOGGER.warn("Failed to close database", ex);
177            }
178            super.releaseSub();
179        }
180    
181        private void doSend(final SimpleEvent event) {
182            LOGGER.debug("Sending event to Flume");
183            super.send(event);
184        }
185    
186        /**
187         * Factory data.
188         */
189        private static class FactoryData {
190            private final String name;
191            private final Agent[] agents;
192            private final int batchSize;
193            private final String dataDir;
194            private final int retries;
195            private final int connectionTimeout;
196            private final int requestTimeout;
197            private final int delay;
198            private final Property[] properties;
199    
200            /**
201             * Constructor.
202             * @param name The name of the Appender.
203             * @param agents The agents.
204             * @param batchSize The number of events to include in a batch.
205             * @param dataDir The directory for data.
206             */
207            public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries,
208                               final int connectionTimeout, final int requestTimeout, final int delay,
209                               final String dataDir, final Property[] properties) {
210                this.name = name;
211                this.agents = agents;
212                this.batchSize = batchSize;
213                this.dataDir = dataDir;
214                this.retries = retries;
215                this.connectionTimeout = connectionTimeout;
216                this.requestTimeout = requestTimeout;
217                this.delay = delay;
218                this.properties = properties;
219            }
220        }
221    
222        /**
223         * Avro Manager Factory.
224         */
225        private static class BDBManagerFactory implements ManagerFactory<FlumePersistentManager, FactoryData> {
226    
227            /**
228             * Create the FlumeKratiManager.
229             * @param name The name of the entity to manage.
230             * @param data The data required to create the entity.
231             * @return The FlumeKratiManager.
232             */
233            public FlumePersistentManager createManager(final String name, final FactoryData data) {
234                SecretKey secretKey = null;
235    
236                Database database;
237    
238                Map<String, String> properties = new HashMap<String, String>();
239                if (data.properties != null) {
240                    for (Property property : data.properties) {
241                        properties.put(property.getName(), property.getValue());
242                    }
243                }
244    
245                try {
246    
247                    File dir = new File(data.dataDir);
248                    FileUtils.mkdir(dir, true);
249                    final EnvironmentConfig dbEnvConfig = new EnvironmentConfig();
250                    dbEnvConfig.setTransactional(false);
251                    dbEnvConfig.setAllowCreate(true);
252                    final Environment environment = new Environment(dir, dbEnvConfig);
253                    final DatabaseConfig dbConfig = new DatabaseConfig();
254                    dbConfig.setTransactional(false);
255                    dbConfig.setAllowCreate(true);
256                    database = environment.openDatabase(null, name, dbConfig);
257                } catch (final Exception ex) {
258                    LOGGER.error("Could not create FlumePersistentManager", ex);
259                    return null;
260                }
261    
262                try {
263                    String key = null;
264                    for (Map.Entry<String, String> entry : properties.entrySet()) {
265                        if (entry.getKey().equalsIgnoreCase(KEY_PROVIDER)) {
266                            key = entry.getValue();
267                        }
268                    }
269                    if (key != null) {
270                        final PluginManager manager = new PluginManager("KeyProvider", SecretKeyProvider.class);
271                        manager.collectPlugins();
272                        final Map<String, PluginType> plugins = manager.getPlugins();
273                        if (plugins != null) {
274                            boolean found = false;
275                            for (Map.Entry<String, PluginType> entry : plugins.entrySet()) {
276                                if (entry.getKey().equalsIgnoreCase(key)) {
277                                    found = true;
278                                    Class cl = entry.getValue().getPluginClass();
279                                    try {
280                                        SecretKeyProvider provider = (SecretKeyProvider) cl.newInstance();
281                                        secretKey = provider.getSecretKey();
282                                        LOGGER.debug("Persisting events using SecretKeyProvider {}", cl.getName());
283                                    } catch (Exception ex) {
284                                        LOGGER.error("Unable to create SecretKeyProvider {}, encryption will be disabled",
285                                            cl.getName());
286                                    }
287                                    break;
288                                }
289                            }
290                            if (!found) {
291                                LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
292                            }
293                        } else {
294                            LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
295                        }
296                    }
297                } catch (Exception ex) {
298                    LOGGER.warn("Error setting up encryption - encryption will be disabled", ex);
299                }
300                return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries,
301                    data.connectionTimeout, data.requestTimeout, data.delay, database, secretKey);
302            }
303        }
304    
305        private static class WriterThread extends Thread  {
306            private volatile boolean shutdown = false;
307            private final Database database;
308            private final FlumePersistentManager manager;
309            private final LinkedBlockingQueue<byte[]> queue;
310            private final SecretKey secretKey;
311            private final int batchSize;
312    
313            public WriterThread(Database database, FlumePersistentManager manager, LinkedBlockingQueue<byte[]> queue,
314                                int batchsize, SecretKey secretKey) {
315                this.database = database;
316                this.manager = manager;
317                this.queue = queue;
318                this.batchSize = batchsize;
319                this.secretKey = secretKey;
320                this.setDaemon(true);
321            }
322    
323            public void shutdown() {
324                LOGGER.debug("Writer thread shutting down");
325                this.shutdown = true;
326                if (queue.size() == 0) {
327                    queue.add(SHUTDOWN.getBytes(UTF8));
328                }
329            }
330    
331            public boolean isShutdown() {
332                return shutdown;
333            }
334    
335            @Override
336            public void run() {
337                LOGGER.trace("WriterThread started");
338                long lastBatch = System.currentTimeMillis();
339                while (!shutdown) {
340                    if (database.count() >= batchSize ||
341                        database.count() > 0 && lastBatch + manager.delay > System.currentTimeMillis()) {
342                        lastBatch = System.currentTimeMillis();
343                        try {
344                            boolean errors = false;
345                            DatabaseEntry key = new DatabaseEntry();
346                            final DatabaseEntry data = new DatabaseEntry();
347                            final Cursor cursor = database.openCursor(null, null);
348                            try {
349                                queue.clear();
350                                OperationStatus status;
351                                try {
352                                    status = cursor.getFirst(key, data, LockMode.RMW);
353                                    if (batchSize > 1) {
354                                        BatchEvent batch = new BatchEvent();
355                                        for (int i = 0; status == OperationStatus.SUCCESS && i < batchSize; ++i) {
356                                            SimpleEvent event = createEvent(data);
357                                            if (event != null) {
358                                                batch.addEvent(event);
359                                            }
360                                            status = cursor.getNext(key, data, LockMode.RMW);
361                                        }
362                                        try {
363                                            manager.send(batch);
364                                        } catch (Exception ioe) {
365                                            LOGGER.error("Error sending events", ioe);
366                                            break;
367                                        }
368                                        for (Event event : batch.getEvents()) {
369                                            try {
370                                                Map<String, String> headers = event.getHeaders();
371                                                key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
372                                                database.delete(null, key);
373                                            } catch (Exception ex) {
374                                                LOGGER.error("Error deleting key from database", ex);
375                                            }
376                                        }
377                                    } else {
378                                        while (status == OperationStatus.SUCCESS) {
379                                            SimpleEvent event = createEvent(data);
380                                            if (event != null) {
381                                                try {
382                                                    manager.doSend(event);
383                                                } catch (Exception ioe) {
384                                                    errors = true;
385                                                    LOGGER.error("Error sending event", ioe);
386                                                    break;
387                                                }
388                                                if (!errors) {
389                                                    try {
390                                                        cursor.delete();
391                                                    } catch (Exception ex) {
392                                                        LOGGER.error("Unable to delete event", ex);
393                                                    }
394                                                }
395                                            }
396                                            status = cursor.getNext(key, data, LockMode.RMW);
397                                        }
398                                    }
399                                } catch (Exception ex) {
400                                    LOGGER.error("Error reading database", ex);
401                                    shutdown = true;
402                                    break;
403                                }
404    
405                            } finally {
406                                cursor.close();
407                            }
408                            if (errors) {
409                                Thread.sleep(manager.delay);
410                                continue;
411                            }
412                        } catch (Exception ex) {
413                            LOGGER.warn("WriterThread encountered an exception. Continuing.", ex);
414                        }
415                    } else {
416                        try {
417                            if (database.count() >= batchSize) {
418                                continue;
419                            }
420                            queue.poll(manager.delay, TimeUnit.MILLISECONDS);
421                            LOGGER.debug("WriterThread notified of work");
422                        } catch (InterruptedException ie) {
423                            LOGGER.warn("WriterThread interrupted, continuing");
424                        } catch (Exception ex) {
425                            LOGGER.error("WriterThread encountered an exception waiting for work", ex);
426                            break;
427                        }
428                    }
429                }
430                LOGGER.trace("WriterThread exiting");
431            }
432    
433            private SimpleEvent createEvent(DatabaseEntry data) {
434                SimpleEvent event = new SimpleEvent();
435                try {
436                    byte[] eventData = data.getData();
437                    if (secretKey != null) {
438                        Cipher cipher = Cipher.getInstance("AES");
439                        cipher.init(Cipher.DECRYPT_MODE, secretKey);
440                        eventData = cipher.doFinal(eventData);
441                    }
442                    ByteArrayInputStream bais = new ByteArrayInputStream(eventData);
443                    DataInputStream dais = new DataInputStream(bais);
444                    int length = dais.readInt();
445                    byte[] bytes = new byte[length];
446                    dais.read(bytes, 0, length);
447                    event.setBody(bytes);
448                    length = dais.readInt();
449                    Map<String, String> map = new HashMap<String, String>(length);
450                    for (int i = 0; i < length; ++i) {
451                        String headerKey = dais.readUTF();
452                        String value = dais.readUTF();
453                        map.put(headerKey, value);
454                    }
455                    event.setHeaders(map);
456                    return event;
457                } catch (Exception ex) {
458                    LOGGER.error("Error retrieving event", ex);
459                    return null;
460                }
461            }
462    
463        }
464    }