View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.flume.appender;
18  
19  import com.sleepycat.je.Cursor;
20  import com.sleepycat.je.CursorConfig;
21  import com.sleepycat.je.Database;
22  import com.sleepycat.je.DatabaseConfig;
23  import com.sleepycat.je.DatabaseEntry;
24  import com.sleepycat.je.Environment;
25  import com.sleepycat.je.EnvironmentConfig;
26  import com.sleepycat.je.LockMode;
27  import com.sleepycat.je.OperationStatus;
28  import com.sleepycat.je.StatsConfig;
29  import com.sleepycat.je.Transaction;
30  import org.apache.flume.Event;
31  import org.apache.flume.event.SimpleEvent;
32  import org.apache.logging.log4j.LoggingException;
33  import org.apache.logging.log4j.core.appender.ManagerFactory;
34  import org.apache.logging.log4j.core.config.Property;
35  import org.apache.logging.log4j.core.config.plugins.PluginManager;
36  import org.apache.logging.log4j.core.config.plugins.PluginType;
37  import org.apache.logging.log4j.core.helpers.FileUtils;
38  import org.apache.logging.log4j.core.helpers.SecretKeyProvider;
39  
40  import javax.crypto.Cipher;
41  import javax.crypto.SecretKey;
42  import java.io.ByteArrayInputStream;
43  import java.io.ByteArrayOutputStream;
44  import java.io.DataInputStream;
45  import java.io.DataOutputStream;
46  import java.io.File;
47  import java.nio.charset.Charset;
48  import java.util.HashMap;
49  import java.util.Map;
50  import java.util.concurrent.LinkedBlockingQueue;
51  import java.util.concurrent.TimeUnit;
52  import java.util.concurrent.locks.ReadWriteLock;
53  import java.util.concurrent.locks.ReentrantReadWriteLock;
54  import java.util.logging.Level;
55  
56  /**
57   *
58   */
59  public class FlumePersistentManager extends FlumeAvroManager {
60  
61      public static final String KEY_PROVIDER = "keyProvider";
62  
63      private static final Charset UTF8 = Charset.forName("UTF-8");
64  
65      private static final String SHUTDOWN = "Shutdown";
66  
67      private static final String DEFAULT_DATA_DIR = ".log4j/flumeData";
68  
69      private static BDBManagerFactory factory = new BDBManagerFactory();
70  
71      private Database database;
72  
73      private Environment environment;
74  
75      private final WriterThread worker;
76  
77      private final LinkedBlockingQueue<byte []> queue = new LinkedBlockingQueue<byte[]>();
78  
79      private final SecretKey secretKey;
80  
81      private final int delay;
82  
83      /**
84       * Constructor
85       * @param name The unique name of this manager.
86       * @param agents An array of Agents.
87       * @param batchSize The number of events to include in a batch.
88       * @param database The database to write to.
89       */
90      protected FlumePersistentManager(final String name, final String shortName, final Agent[] agents,
91                                       final int batchSize, final int retries, final int connectionTimeout,
92                                       final int requestTimeout, final int delay, final Database database,
93                                       final Environment environment, SecretKey secretKey) {
94          super(name, shortName, agents, batchSize, retries, connectionTimeout, requestTimeout);
95          this.delay = delay;
96          this.database = database;
97          this.environment = environment;
98          this.worker = new WriterThread(database, environment, this, queue, batchSize, secretKey);
99          this.worker.start();
100         this.secretKey = secretKey;
101     }
102 
103 
104     /**
105      * Returns a FlumeAvroManager.
106      * @param name The name of the manager.
107      * @param agents The agents to use.
108      * @param batchSize The number of events to include in a batch.
109      * @return A FlumeAvroManager.
110      */
111     public static FlumePersistentManager getManager(final String name, final Agent[] agents, Property[] properties,
112                                                     int batchSize, final int retries, final int connectionTimeout,
113                                                     final int requestTimeout, final int delay, final String dataDir) {
114         if (agents == null || agents.length == 0) {
115             throw new IllegalArgumentException("At least one agent is required");
116         }
117 
118         if (batchSize <= 0) {
119             batchSize = 1;
120         }
121         String dataDirectory = dataDir == null || dataDir.length() == 0 ? DEFAULT_DATA_DIR : dataDir;
122 
123         final StringBuilder sb = new StringBuilder("FlumePersistent[");
124         boolean first = true;
125         for (final Agent agent : agents) {
126             if (!first) {
127                 sb.append(",");
128             }
129             sb.append(agent.getHost()).append(":").append(agent.getPort());
130             first = false;
131         }
132         sb.append("]");
133         sb.append(" ").append(dataDirectory);
134         return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries,
135             connectionTimeout, requestTimeout, delay, dataDir, properties));
136     }
137 
138     @Override
139     public void send(final Event event)  {
140         if (worker.isShutdown()) {
141             throw new LoggingException("Unable to record event");
142         }
143 
144         Map<String, String> headers = event.getHeaders();
145         byte[] keyData = headers.get(FlumeEvent.GUID).getBytes(UTF8);
146         try {
147             ByteArrayOutputStream baos = new ByteArrayOutputStream();
148             DataOutputStream daos = new DataOutputStream(baos);
149             daos.writeInt(event.getBody().length);
150             daos.write(event.getBody(), 0, event.getBody().length);
151             daos.writeInt(event.getHeaders().size());
152             for (Map.Entry<String, String> entry : headers.entrySet()) {
153                 daos.writeUTF(entry.getKey());
154                 daos.writeUTF(entry.getValue());
155             }
156             byte[] eventData = baos.toByteArray();
157             if (secretKey != null) {
158                 Cipher cipher = Cipher.getInstance("AES");
159                 cipher.init(Cipher.ENCRYPT_MODE, secretKey);
160                 eventData = cipher.doFinal(eventData);
161             }
162             final DatabaseEntry key = new DatabaseEntry(keyData);
163             final DatabaseEntry data = new DatabaseEntry(eventData);
164             Transaction txn = environment.beginTransaction(null, null);
165             try {
166                 database.put(txn, key, data);
167                 txn.commit();
168                 queue.add(keyData);
169             } catch (Exception ex) {
170                 if (txn != null) {
171                     txn.abort();
172                 }
173                 throw ex;
174             }
175         } catch (Exception ex) {
176             throw new LoggingException("Exception occurred writing log event", ex);
177         }
178     }
179 
180     @Override
181     protected void releaseSub() {
182         LOGGER.debug("Shutting down FlumePersistentManager");
183         worker.shutdown();
184         try {
185             worker.join();
186         } catch (InterruptedException ex) {
187             LOGGER.debug("Interrupted while waiting for worker to complete");
188         }
189         try {
190             LOGGER.debug("FlumePersistenceManager dataset status: {}", database.getStats(new StatsConfig()));
191             database.close();
192         } catch (final Exception ex) {
193             LOGGER.warn("Failed to close database", ex);
194         }
195         try {
196             environment.cleanLog();
197             environment.close();
198         } catch (final Exception ex) {
199             LOGGER.warn("Failed to close environment", ex);
200         }
201         super.releaseSub();
202     }
203 
204     private void doSend(final SimpleEvent event) {
205         LOGGER.debug("Sending event to Flume");
206         super.send(event);
207     }
208 
209     /**
210      * Factory data.
211      */
212     private static class FactoryData {
213         private final String name;
214         private final Agent[] agents;
215         private final int batchSize;
216         private final String dataDir;
217         private final int retries;
218         private final int connectionTimeout;
219         private final int requestTimeout;
220         private final int delay;
221         private final Property[] properties;
222 
223         /**
224          * Constructor.
225          * @param name The name of the Appender.
226          * @param agents The agents.
227          * @param batchSize The number of events to include in a batch.
228          * @param dataDir The directory for data.
229          */
230         public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries,
231                            final int connectionTimeout, final int requestTimeout, final int delay,
232                            final String dataDir, final Property[] properties) {
233             this.name = name;
234             this.agents = agents;
235             this.batchSize = batchSize;
236             this.dataDir = dataDir;
237             this.retries = retries;
238             this.connectionTimeout = connectionTimeout;
239             this.requestTimeout = requestTimeout;
240             this.delay = delay;
241             this.properties = properties;
242         }
243     }
244 
245     /**
246      * Avro Manager Factory.
247      */
248     private static class BDBManagerFactory implements ManagerFactory<FlumePersistentManager, FactoryData> {
249 
250         /**
251          * Create the FlumeKratiManager.
252          * @param name The name of the entity to manage.
253          * @param data The data required to create the entity.
254          * @return The FlumeKratiManager.
255          */
256         @Override
257         public FlumePersistentManager createManager(final String name, final FactoryData data) {
258             SecretKey secretKey = null;
259 
260             Database database;
261             Environment environment;
262 
263             Map<String, String> properties = new HashMap<String, String>();
264             if (data.properties != null) {
265                 for (Property property : data.properties) {
266                     properties.put(property.getName(), property.getValue());
267                 }
268             }
269 
270             try {
271 
272                 File dir = new File(data.dataDir);
273                 FileUtils.mkdir(dir, true);
274                 final EnvironmentConfig dbEnvConfig = new EnvironmentConfig();
275                 dbEnvConfig.setTransactional(true);
276                 dbEnvConfig.setAllowCreate(true);
277                 dbEnvConfig.setLockTimeout(5, TimeUnit.SECONDS);
278                 environment = new Environment(dir, dbEnvConfig);
279                 final DatabaseConfig dbConfig = new DatabaseConfig();
280                 dbConfig.setTransactional(true);
281                 dbConfig.setAllowCreate(true);
282                 database = environment.openDatabase(null, name, dbConfig);
283             } catch (final Exception ex) {
284                 LOGGER.error("Could not create FlumePersistentManager", ex);
285                 return null;
286             }
287 
288             try {
289                 String key = null;
290                 for (Map.Entry<String, String> entry : properties.entrySet()) {
291                     if (entry.getKey().equalsIgnoreCase(KEY_PROVIDER)) {
292                         key = entry.getValue();
293                     }
294                 }
295                 if (key != null) {
296                     final PluginManager manager = new PluginManager("KeyProvider", SecretKeyProvider.class);
297                     manager.collectPlugins();
298                     final Map<String, PluginType> plugins = manager.getPlugins();
299                     if (plugins != null) {
300                         boolean found = false;
301                         for (Map.Entry<String, PluginType> entry : plugins.entrySet()) {
302                             if (entry.getKey().equalsIgnoreCase(key)) {
303                                 found = true;
304                                 Class cl = entry.getValue().getPluginClass();
305                                 try {
306                                     SecretKeyProvider provider = (SecretKeyProvider) cl.newInstance();
307                                     secretKey = provider.getSecretKey();
308                                     LOGGER.debug("Persisting events using SecretKeyProvider {}", cl.getName());
309                                 } catch (Exception ex) {
310                                     LOGGER.error("Unable to create SecretKeyProvider {}, encryption will be disabled",
311                                         cl.getName());
312                                 }
313                                 break;
314                             }
315                         }
316                         if (!found) {
317                             LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
318                         }
319                     } else {
320                         LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
321                     }
322                 }
323             } catch (Exception ex) {
324                 LOGGER.warn("Error setting up encryption - encryption will be disabled", ex);
325             }
326             return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries,
327                 data.connectionTimeout, data.requestTimeout, data.delay, database, environment, secretKey);
328         }
329     }
330 
331     private static class WriterThread extends Thread  {
332         private volatile boolean shutdown = false;
333         private final Database database;
334         private final Environment environment;
335         private final FlumePersistentManager manager;
336         private final LinkedBlockingQueue<byte[]> queue;
337         private final SecretKey secretKey;
338         private final int batchSize;
339 
340         public WriterThread(Database database, Environment environment, FlumePersistentManager manager,
341                             LinkedBlockingQueue<byte[]> queue, int batchsize, SecretKey secretKey) {
342             this.database = database;
343             this.environment = environment;
344             this.manager = manager;
345             this.queue = queue;
346             this.batchSize = batchsize;
347             this.secretKey = secretKey;
348             this.setDaemon(true);
349         }
350 
351         public void shutdown() {
352             LOGGER.debug("Writer thread shutting down");
353             this.shutdown = true;
354             if (queue.size() == 0) {
355                 queue.add(SHUTDOWN.getBytes(UTF8));
356             }
357         }
358 
359         public boolean isShutdown() {
360             return shutdown;
361         }
362 
363         @Override
364         public void run() {
365             LOGGER.trace("WriterThread started - batch size = " + batchSize + ", delay = " + manager.delay);
366             long nextBatch = System.currentTimeMillis() + manager.delay;
367             while (!shutdown) {
368                 long now = System.currentTimeMillis();
369                 if (database.count() >= batchSize || (database.count() > 0 && nextBatch < now)) {
370                     nextBatch = now + manager.delay;
371                     try {
372                         boolean errors = false;
373                         DatabaseEntry key = new DatabaseEntry();
374                         final DatabaseEntry data = new DatabaseEntry();
375 
376                         queue.clear();
377                         OperationStatus status;
378                         if (batchSize > 1) {
379                             Cursor cursor = database.openCursor(null, CursorConfig.DEFAULT);
380                             try {
381                                 status = cursor.getFirst(key, data, null);
382 
383                                 BatchEvent batch = new BatchEvent();
384                                 for (int i = 0; status == OperationStatus.SUCCESS && i < batchSize; ++i) {
385                                     SimpleEvent event = createEvent(data);
386                                     if (event != null) {
387                                         batch.addEvent(event);
388                                     }
389                                     status = cursor.getNext(key, data, null);
390                                 }
391                                 try {
392                                     manager.send(batch);
393                                 } catch (Exception ioe) {
394                                     LOGGER.error("Error sending events", ioe);
395                                     break;
396                                 }
397                                 cursor.close();
398                                 cursor = null;
399                                 Transaction txn = environment.beginTransaction(null, null);
400                                 try {
401                                     for (Event event : batch.getEvents()) {
402                                         try {
403                                             Map<String, String> headers = event.getHeaders();
404                                             key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
405                                             database.delete(txn, key);
406                                         } catch (Exception ex) {
407                                             LOGGER.error("Error deleting key from database", ex);
408                                         }
409                                     }
410                                     txn.commit();
411                                 } catch (Exception ex) {
412                                     LOGGER.error("Unable to commit transaction", ex);
413                                     if (txn != null) {
414                                         txn.abort();
415                                     }
416                                 }
417                             } catch (Exception ex) {
418                                 LOGGER.error("Error reading database", ex);
419                                 shutdown = true;
420                                 break;
421                             } finally {
422                                 if (cursor != null) {
423                                     cursor.close();
424                                 }
425                             }
426                         } else {
427                             Transaction txn = environment.beginTransaction(null, null);
428                             Cursor cursor = database.openCursor(txn, null);
429                             try {
430                                 status = cursor.getFirst(key, data, LockMode.RMW);
431                                 while (status == OperationStatus.SUCCESS) {
432                                     SimpleEvent event = createEvent(data);
433                                     if (event != null) {
434                                         try {
435                                             manager.doSend(event);
436                                         } catch (Exception ioe) {
437                                             errors = true;
438                                             LOGGER.error("Error sending event", ioe);
439                                             break;
440                                         }
441                                         if (!errors) {
442                                             try {
443                                                 cursor.delete();
444                                             } catch (Exception ex) {
445                                                 LOGGER.error("Unable to delete event", ex);
446                                             }
447                                         }
448                                     }
449                                     status = cursor.getNext(key, data, LockMode.RMW);
450                                 }
451                                 if (cursor != null) {
452                                     cursor.close();
453                                     cursor = null;
454                                 }
455                                 txn.commit();
456                                 txn = null;
457                             } catch (Exception ex) {
458                                 LOGGER.error("Error reading or writing to database", ex);
459                                 shutdown = true;
460                                 break;
461                             } finally {
462                                 if (cursor != null) {
463                                     cursor.close();
464                                 }
465                                 if (txn != null) {
466                                     txn.abort();
467                                 }
468                             }
469                         }
470                         if (errors) {
471                             Thread.sleep(manager.delay);
472                             continue;
473                         }
474                     } catch (Exception ex) {
475                         LOGGER.warn("WriterThread encountered an exception. Continuing.", ex);
476                     }
477                 } else {
478                     while (!shutdown && (database.count() == 0 || database.count() < batchSize && nextBatch > now)) {
479                         try {
480                             long interval = nextBatch - now;
481                             queue.poll(interval, TimeUnit.MILLISECONDS);
482                         } catch (InterruptedException ie) {
483                             LOGGER.warn("WriterThread interrupted, continuing");
484                         } catch (Exception ex) {
485                             LOGGER.error("WriterThread encountered an exception waiting for work", ex);
486                             break;
487                         }
488                         now = System.currentTimeMillis();
489                         if (database.count() == 0) {
490                             nextBatch = now + manager.delay;
491                         }
492                     }
493                     LOGGER.debug("WriterThread ready to work");
494                 }
495             }
496             LOGGER.trace("WriterThread exiting");
497         }
498 
499         private SimpleEvent createEvent(DatabaseEntry data) {
500             SimpleEvent event = new SimpleEvent();
501             try {
502                 byte[] eventData = data.getData();
503                 if (secretKey != null) {
504                     Cipher cipher = Cipher.getInstance("AES");
505                     cipher.init(Cipher.DECRYPT_MODE, secretKey);
506                     eventData = cipher.doFinal(eventData);
507                 }
508                 ByteArrayInputStream bais = new ByteArrayInputStream(eventData);
509                 DataInputStream dais = new DataInputStream(bais);
510                 int length = dais.readInt();
511                 byte[] bytes = new byte[length];
512                 dais.read(bytes, 0, length);
513                 event.setBody(bytes);
514                 length = dais.readInt();
515                 Map<String, String> map = new HashMap<String, String>(length);
516                 for (int i = 0; i < length; ++i) {
517                     String headerKey = dais.readUTF();
518                     String value = dais.readUTF();
519                     map.put(headerKey, value);
520                 }
521                 event.setHeaders(map);
522                 return event;
523             } catch (Exception ex) {
524                 LOGGER.error("Error retrieving event", ex);
525                 return null;
526             }
527         }
528 
529     }
530 }