1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.flume.appender;
18
19 import com.sleepycat.je.Cursor;
20 import com.sleepycat.je.CursorConfig;
21 import com.sleepycat.je.Database;
22 import com.sleepycat.je.DatabaseConfig;
23 import com.sleepycat.je.DatabaseEntry;
24 import com.sleepycat.je.Environment;
25 import com.sleepycat.je.EnvironmentConfig;
26 import com.sleepycat.je.LockMode;
27 import com.sleepycat.je.OperationStatus;
28 import com.sleepycat.je.StatsConfig;
29 import com.sleepycat.je.Transaction;
30 import org.apache.flume.Event;
31 import org.apache.flume.event.SimpleEvent;
32 import org.apache.logging.log4j.LoggingException;
33 import org.apache.logging.log4j.core.appender.ManagerFactory;
34 import org.apache.logging.log4j.core.config.Property;
35 import org.apache.logging.log4j.core.config.plugins.PluginManager;
36 import org.apache.logging.log4j.core.config.plugins.PluginType;
37 import org.apache.logging.log4j.core.helpers.FileUtils;
38 import org.apache.logging.log4j.core.helpers.SecretKeyProvider;
39
40 import javax.crypto.Cipher;
41 import javax.crypto.SecretKey;
42 import java.io.ByteArrayInputStream;
43 import java.io.ByteArrayOutputStream;
44 import java.io.DataInputStream;
45 import java.io.DataOutputStream;
46 import java.io.File;
47 import java.nio.charset.Charset;
48 import java.util.HashMap;
49 import java.util.Map;
50 import java.util.concurrent.LinkedBlockingQueue;
51 import java.util.concurrent.TimeUnit;
52 import java.util.concurrent.locks.ReadWriteLock;
53 import java.util.concurrent.locks.ReentrantReadWriteLock;
54 import java.util.logging.Level;
55
56
57
58
59 public class FlumePersistentManager extends FlumeAvroManager {
60
61 public static final String KEY_PROVIDER = "keyProvider";
62
63 private static final Charset UTF8 = Charset.forName("UTF-8");
64
65 private static final String SHUTDOWN = "Shutdown";
66
67 private static final String DEFAULT_DATA_DIR = ".log4j/flumeData";
68
69 private static BDBManagerFactory factory = new BDBManagerFactory();
70
71 private Database database;
72
73 private Environment environment;
74
75 private final WriterThread worker;
76
77 private final LinkedBlockingQueue<byte []> queue = new LinkedBlockingQueue<byte[]>();
78
79 private final SecretKey secretKey;
80
81 private final int delay;
82
83
84
85
86
87
88
89
90 protected FlumePersistentManager(final String name, final String shortName, final Agent[] agents,
91 final int batchSize, final int retries, final int connectionTimeout,
92 final int requestTimeout, final int delay, final Database database,
93 final Environment environment, SecretKey secretKey) {
94 super(name, shortName, agents, batchSize, retries, connectionTimeout, requestTimeout);
95 this.delay = delay;
96 this.database = database;
97 this.environment = environment;
98 this.worker = new WriterThread(database, environment, this, queue, batchSize, secretKey);
99 this.worker.start();
100 this.secretKey = secretKey;
101 }
102
103
104
105
106
107
108
109
110
111 public static FlumePersistentManager getManager(final String name, final Agent[] agents, Property[] properties,
112 int batchSize, final int retries, final int connectionTimeout,
113 final int requestTimeout, final int delay, final String dataDir) {
114 if (agents == null || agents.length == 0) {
115 throw new IllegalArgumentException("At least one agent is required");
116 }
117
118 if (batchSize <= 0) {
119 batchSize = 1;
120 }
121 String dataDirectory = dataDir == null || dataDir.length() == 0 ? DEFAULT_DATA_DIR : dataDir;
122
123 final StringBuilder sb = new StringBuilder("FlumePersistent[");
124 boolean first = true;
125 for (final Agent agent : agents) {
126 if (!first) {
127 sb.append(",");
128 }
129 sb.append(agent.getHost()).append(":").append(agent.getPort());
130 first = false;
131 }
132 sb.append("]");
133 sb.append(" ").append(dataDirectory);
134 return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries,
135 connectionTimeout, requestTimeout, delay, dataDir, properties));
136 }
137
138 @Override
139 public void send(final Event event) {
140 if (worker.isShutdown()) {
141 throw new LoggingException("Unable to record event");
142 }
143
144 Map<String, String> headers = event.getHeaders();
145 byte[] keyData = headers.get(FlumeEvent.GUID).getBytes(UTF8);
146 try {
147 ByteArrayOutputStream baos = new ByteArrayOutputStream();
148 DataOutputStream daos = new DataOutputStream(baos);
149 daos.writeInt(event.getBody().length);
150 daos.write(event.getBody(), 0, event.getBody().length);
151 daos.writeInt(event.getHeaders().size());
152 for (Map.Entry<String, String> entry : headers.entrySet()) {
153 daos.writeUTF(entry.getKey());
154 daos.writeUTF(entry.getValue());
155 }
156 byte[] eventData = baos.toByteArray();
157 if (secretKey != null) {
158 Cipher cipher = Cipher.getInstance("AES");
159 cipher.init(Cipher.ENCRYPT_MODE, secretKey);
160 eventData = cipher.doFinal(eventData);
161 }
162 final DatabaseEntry key = new DatabaseEntry(keyData);
163 final DatabaseEntry data = new DatabaseEntry(eventData);
164 Transaction txn = environment.beginTransaction(null, null);
165 try {
166 database.put(txn, key, data);
167 txn.commit();
168 queue.add(keyData);
169 } catch (Exception ex) {
170 if (txn != null) {
171 txn.abort();
172 }
173 throw ex;
174 }
175 } catch (Exception ex) {
176 throw new LoggingException("Exception occurred writing log event", ex);
177 }
178 }
179
180 @Override
181 protected void releaseSub() {
182 LOGGER.debug("Shutting down FlumePersistentManager");
183 worker.shutdown();
184 try {
185 worker.join();
186 } catch (InterruptedException ex) {
187 LOGGER.debug("Interrupted while waiting for worker to complete");
188 }
189 try {
190 LOGGER.debug("FlumePersistenceManager dataset status: {}", database.getStats(new StatsConfig()));
191 database.close();
192 } catch (final Exception ex) {
193 LOGGER.warn("Failed to close database", ex);
194 }
195 try {
196 environment.cleanLog();
197 environment.close();
198 } catch (final Exception ex) {
199 LOGGER.warn("Failed to close environment", ex);
200 }
201 super.releaseSub();
202 }
203
204 private void doSend(final SimpleEvent event) {
205 LOGGER.debug("Sending event to Flume");
206 super.send(event);
207 }
208
209
210
211
212 private static class FactoryData {
213 private final String name;
214 private final Agent[] agents;
215 private final int batchSize;
216 private final String dataDir;
217 private final int retries;
218 private final int connectionTimeout;
219 private final int requestTimeout;
220 private final int delay;
221 private final Property[] properties;
222
223
224
225
226
227
228
229
230 public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries,
231 final int connectionTimeout, final int requestTimeout, final int delay,
232 final String dataDir, final Property[] properties) {
233 this.name = name;
234 this.agents = agents;
235 this.batchSize = batchSize;
236 this.dataDir = dataDir;
237 this.retries = retries;
238 this.connectionTimeout = connectionTimeout;
239 this.requestTimeout = requestTimeout;
240 this.delay = delay;
241 this.properties = properties;
242 }
243 }
244
245
246
247
248 private static class BDBManagerFactory implements ManagerFactory<FlumePersistentManager, FactoryData> {
249
250
251
252
253
254
255
256 @Override
257 public FlumePersistentManager createManager(final String name, final FactoryData data) {
258 SecretKey secretKey = null;
259
260 Database database;
261 Environment environment;
262
263 Map<String, String> properties = new HashMap<String, String>();
264 if (data.properties != null) {
265 for (Property property : data.properties) {
266 properties.put(property.getName(), property.getValue());
267 }
268 }
269
270 try {
271
272 File dir = new File(data.dataDir);
273 FileUtils.mkdir(dir, true);
274 final EnvironmentConfig dbEnvConfig = new EnvironmentConfig();
275 dbEnvConfig.setTransactional(true);
276 dbEnvConfig.setAllowCreate(true);
277 dbEnvConfig.setLockTimeout(5, TimeUnit.SECONDS);
278 environment = new Environment(dir, dbEnvConfig);
279 final DatabaseConfig dbConfig = new DatabaseConfig();
280 dbConfig.setTransactional(true);
281 dbConfig.setAllowCreate(true);
282 database = environment.openDatabase(null, name, dbConfig);
283 } catch (final Exception ex) {
284 LOGGER.error("Could not create FlumePersistentManager", ex);
285 return null;
286 }
287
288 try {
289 String key = null;
290 for (Map.Entry<String, String> entry : properties.entrySet()) {
291 if (entry.getKey().equalsIgnoreCase(KEY_PROVIDER)) {
292 key = entry.getValue();
293 }
294 }
295 if (key != null) {
296 final PluginManager manager = new PluginManager("KeyProvider", SecretKeyProvider.class);
297 manager.collectPlugins();
298 final Map<String, PluginType> plugins = manager.getPlugins();
299 if (plugins != null) {
300 boolean found = false;
301 for (Map.Entry<String, PluginType> entry : plugins.entrySet()) {
302 if (entry.getKey().equalsIgnoreCase(key)) {
303 found = true;
304 Class cl = entry.getValue().getPluginClass();
305 try {
306 SecretKeyProvider provider = (SecretKeyProvider) cl.newInstance();
307 secretKey = provider.getSecretKey();
308 LOGGER.debug("Persisting events using SecretKeyProvider {}", cl.getName());
309 } catch (Exception ex) {
310 LOGGER.error("Unable to create SecretKeyProvider {}, encryption will be disabled",
311 cl.getName());
312 }
313 break;
314 }
315 }
316 if (!found) {
317 LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
318 }
319 } else {
320 LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
321 }
322 }
323 } catch (Exception ex) {
324 LOGGER.warn("Error setting up encryption - encryption will be disabled", ex);
325 }
326 return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries,
327 data.connectionTimeout, data.requestTimeout, data.delay, database, environment, secretKey);
328 }
329 }
330
331 private static class WriterThread extends Thread {
332 private volatile boolean shutdown = false;
333 private final Database database;
334 private final Environment environment;
335 private final FlumePersistentManager manager;
336 private final LinkedBlockingQueue<byte[]> queue;
337 private final SecretKey secretKey;
338 private final int batchSize;
339
340 public WriterThread(Database database, Environment environment, FlumePersistentManager manager,
341 LinkedBlockingQueue<byte[]> queue, int batchsize, SecretKey secretKey) {
342 this.database = database;
343 this.environment = environment;
344 this.manager = manager;
345 this.queue = queue;
346 this.batchSize = batchsize;
347 this.secretKey = secretKey;
348 this.setDaemon(true);
349 }
350
351 public void shutdown() {
352 LOGGER.debug("Writer thread shutting down");
353 this.shutdown = true;
354 if (queue.size() == 0) {
355 queue.add(SHUTDOWN.getBytes(UTF8));
356 }
357 }
358
359 public boolean isShutdown() {
360 return shutdown;
361 }
362
363 @Override
364 public void run() {
365 LOGGER.trace("WriterThread started - batch size = " + batchSize + ", delay = " + manager.delay);
366 long nextBatch = System.currentTimeMillis() + manager.delay;
367 while (!shutdown) {
368 long now = System.currentTimeMillis();
369 if (database.count() >= batchSize || (database.count() > 0 && nextBatch < now)) {
370 nextBatch = now + manager.delay;
371 try {
372 boolean errors = false;
373 DatabaseEntry key = new DatabaseEntry();
374 final DatabaseEntry data = new DatabaseEntry();
375
376 queue.clear();
377 OperationStatus status;
378 if (batchSize > 1) {
379 Cursor cursor = database.openCursor(null, CursorConfig.DEFAULT);
380 try {
381 status = cursor.getFirst(key, data, null);
382
383 BatchEvent batch = new BatchEvent();
384 for (int i = 0; status == OperationStatus.SUCCESS && i < batchSize; ++i) {
385 SimpleEvent event = createEvent(data);
386 if (event != null) {
387 batch.addEvent(event);
388 }
389 status = cursor.getNext(key, data, null);
390 }
391 try {
392 manager.send(batch);
393 } catch (Exception ioe) {
394 LOGGER.error("Error sending events", ioe);
395 break;
396 }
397 cursor.close();
398 cursor = null;
399 Transaction txn = environment.beginTransaction(null, null);
400 try {
401 for (Event event : batch.getEvents()) {
402 try {
403 Map<String, String> headers = event.getHeaders();
404 key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
405 database.delete(txn, key);
406 } catch (Exception ex) {
407 LOGGER.error("Error deleting key from database", ex);
408 }
409 }
410 txn.commit();
411 } catch (Exception ex) {
412 LOGGER.error("Unable to commit transaction", ex);
413 if (txn != null) {
414 txn.abort();
415 }
416 }
417 } catch (Exception ex) {
418 LOGGER.error("Error reading database", ex);
419 shutdown = true;
420 break;
421 } finally {
422 if (cursor != null) {
423 cursor.close();
424 }
425 }
426 } else {
427 Transaction txn = environment.beginTransaction(null, null);
428 Cursor cursor = database.openCursor(txn, null);
429 try {
430 status = cursor.getFirst(key, data, LockMode.RMW);
431 while (status == OperationStatus.SUCCESS) {
432 SimpleEvent event = createEvent(data);
433 if (event != null) {
434 try {
435 manager.doSend(event);
436 } catch (Exception ioe) {
437 errors = true;
438 LOGGER.error("Error sending event", ioe);
439 break;
440 }
441 if (!errors) {
442 try {
443 cursor.delete();
444 } catch (Exception ex) {
445 LOGGER.error("Unable to delete event", ex);
446 }
447 }
448 }
449 status = cursor.getNext(key, data, LockMode.RMW);
450 }
451 if (cursor != null) {
452 cursor.close();
453 cursor = null;
454 }
455 txn.commit();
456 txn = null;
457 } catch (Exception ex) {
458 LOGGER.error("Error reading or writing to database", ex);
459 shutdown = true;
460 break;
461 } finally {
462 if (cursor != null) {
463 cursor.close();
464 }
465 if (txn != null) {
466 txn.abort();
467 }
468 }
469 }
470 if (errors) {
471 Thread.sleep(manager.delay);
472 continue;
473 }
474 } catch (Exception ex) {
475 LOGGER.warn("WriterThread encountered an exception. Continuing.", ex);
476 }
477 } else {
478 while (!shutdown && (database.count() == 0 || database.count() < batchSize && nextBatch > now)) {
479 try {
480 long interval = nextBatch - now;
481 queue.poll(interval, TimeUnit.MILLISECONDS);
482 } catch (InterruptedException ie) {
483 LOGGER.warn("WriterThread interrupted, continuing");
484 } catch (Exception ex) {
485 LOGGER.error("WriterThread encountered an exception waiting for work", ex);
486 break;
487 }
488 now = System.currentTimeMillis();
489 if (database.count() == 0) {
490 nextBatch = now + manager.delay;
491 }
492 }
493 LOGGER.debug("WriterThread ready to work");
494 }
495 }
496 LOGGER.trace("WriterThread exiting");
497 }
498
499 private SimpleEvent createEvent(DatabaseEntry data) {
500 SimpleEvent event = new SimpleEvent();
501 try {
502 byte[] eventData = data.getData();
503 if (secretKey != null) {
504 Cipher cipher = Cipher.getInstance("AES");
505 cipher.init(Cipher.DECRYPT_MODE, secretKey);
506 eventData = cipher.doFinal(eventData);
507 }
508 ByteArrayInputStream bais = new ByteArrayInputStream(eventData);
509 DataInputStream dais = new DataInputStream(bais);
510 int length = dais.readInt();
511 byte[] bytes = new byte[length];
512 dais.read(bytes, 0, length);
513 event.setBody(bytes);
514 length = dais.readInt();
515 Map<String, String> map = new HashMap<String, String>(length);
516 for (int i = 0; i < length; ++i) {
517 String headerKey = dais.readUTF();
518 String value = dais.readUTF();
519 map.put(headerKey, value);
520 }
521 event.setHeaders(map);
522 return event;
523 } catch (Exception ex) {
524 LOGGER.error("Error retrieving event", ex);
525 return null;
526 }
527 }
528
529 }
530 }