1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.flume.appender;
18
19 import java.io.ByteArrayInputStream;
20 import java.io.ByteArrayOutputStream;
21 import java.io.DataInputStream;
22 import java.io.DataOutputStream;
23 import java.io.File;
24 import java.nio.charset.Charset;
25 import java.util.HashMap;
26 import java.util.Map;
27 import java.util.concurrent.Callable;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.Future;
31 import java.util.concurrent.ThreadFactory;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.concurrent.atomic.AtomicLong;
35
36 import javax.crypto.Cipher;
37 import javax.crypto.SecretKey;
38
39 import org.apache.flume.Event;
40 import org.apache.flume.event.SimpleEvent;
41 import org.apache.logging.log4j.LoggingException;
42 import org.apache.logging.log4j.core.appender.ManagerFactory;
43 import org.apache.logging.log4j.core.config.Property;
44 import org.apache.logging.log4j.core.config.plugins.util.PluginManager;
45 import org.apache.logging.log4j.core.config.plugins.util.PluginType;
46 import org.apache.logging.log4j.core.util.FileUtils;
47 import org.apache.logging.log4j.core.util.SecretKeyProvider;
48 import org.apache.logging.log4j.util.Strings;
49
50 import com.sleepycat.je.Cursor;
51 import com.sleepycat.je.CursorConfig;
52 import com.sleepycat.je.Database;
53 import com.sleepycat.je.DatabaseConfig;
54 import com.sleepycat.je.DatabaseEntry;
55 import com.sleepycat.je.Environment;
56 import com.sleepycat.je.EnvironmentConfig;
57 import com.sleepycat.je.LockConflictException;
58 import com.sleepycat.je.LockMode;
59 import com.sleepycat.je.OperationStatus;
60 import com.sleepycat.je.StatsConfig;
61 import com.sleepycat.je.Transaction;
62
63
64
65
66 public class FlumePersistentManager extends FlumeAvroManager {
67
68
69 public static final String KEY_PROVIDER = "keyProvider";
70
71 private static final Charset UTF8 = Charset.forName("UTF-8");
72
73 private static final String DEFAULT_DATA_DIR = ".log4j/flumeData";
74
75 private static final int SHUTDOWN_WAIT = 60;
76
77 private static final int MILLIS_PER_SECOND = 1000;
78
79 private static final int LOCK_TIMEOUT_SLEEP_MILLIS = 500;
80
81 private static BDBManagerFactory factory = new BDBManagerFactory();
82
83 private final Database database;
84
85 private final Environment environment;
86
87 private final WriterThread worker;
88
89 private final Gate gate = new Gate();
90
91 private final SecretKey secretKey;
92
93 private final int lockTimeoutRetryCount;
94
95 private final ExecutorService threadPool;
96
97 private final AtomicLong dbCount = new AtomicLong();
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114 protected FlumePersistentManager(final String name, final String shortName, final Agent[] agents,
115 final int batchSize, final int retries, final int connectionTimeout,
116 final int requestTimeout, final int delay, final Database database,
117 final Environment environment, final SecretKey secretKey,
118 final int lockTimeoutRetryCount) {
119 super(name, shortName, agents, batchSize, delay, retries, connectionTimeout, requestTimeout);
120 this.database = database;
121 this.environment = environment;
122 dbCount.set(database.count());
123 this.worker = new WriterThread(database, environment, this, gate, batchSize, secretKey, dbCount,
124 lockTimeoutRetryCount);
125 this.worker.start();
126 this.secretKey = secretKey;
127 this.threadPool = Executors.newCachedThreadPool(new DaemonThreadFactory());
128 this.lockTimeoutRetryCount = lockTimeoutRetryCount;
129 }
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146 public static FlumePersistentManager getManager(final String name, final Agent[] agents,
147 final Property[] properties, int batchSize, final int retries,
148 final int connectionTimeout, final int requestTimeout,
149 final int delayMillis, final int lockTimeoutRetryCount,
150 final String dataDir) {
151 if (agents == null || agents.length == 0) {
152 throw new IllegalArgumentException("At least one agent is required");
153 }
154
155 if (batchSize <= 0) {
156 batchSize = 1;
157 }
158 final String dataDirectory = Strings.isEmpty(dataDir) ? DEFAULT_DATA_DIR : dataDir;
159
160 final StringBuilder sb = new StringBuilder("FlumePersistent[");
161 boolean first = true;
162 for (final Agent agent : agents) {
163 if (!first) {
164 sb.append(',');
165 }
166 sb.append(agent.getHost()).append(':').append(agent.getPort());
167 first = false;
168 }
169 sb.append(']');
170 sb.append(' ').append(dataDirectory);
171 return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries,
172 connectionTimeout, requestTimeout, delayMillis, lockTimeoutRetryCount, dataDir, properties));
173 }
174
175 @Override
176 public void send(final Event event) {
177 if (worker.isShutdown()) {
178 throw new LoggingException("Unable to record event");
179 }
180
181 final Map<String, String> headers = event.getHeaders();
182 final byte[] keyData = headers.get(FlumeEvent.GUID).getBytes(UTF8);
183 try {
184 final ByteArrayOutputStream baos = new ByteArrayOutputStream();
185 final DataOutputStream daos = new DataOutputStream(baos);
186 daos.writeInt(event.getBody().length);
187 daos.write(event.getBody(), 0, event.getBody().length);
188 daos.writeInt(event.getHeaders().size());
189 for (final Map.Entry<String, String> entry : headers.entrySet()) {
190 daos.writeUTF(entry.getKey());
191 daos.writeUTF(entry.getValue());
192 }
193 byte[] eventData = baos.toByteArray();
194 if (secretKey != null) {
195 final Cipher cipher = Cipher.getInstance("AES");
196 cipher.init(Cipher.ENCRYPT_MODE, secretKey);
197 eventData = cipher.doFinal(eventData);
198 }
199 final Future<Integer> future = threadPool.submit(new BDBWriter(keyData, eventData, environment, database,
200 gate, dbCount, getBatchSize(), lockTimeoutRetryCount));
201 boolean interrupted = false;
202 int ieCount = 0;
203 do {
204 try {
205 future.get();
206 } catch (final InterruptedException ie) {
207 interrupted = true;
208 ++ieCount;
209 }
210 } while (interrupted && ieCount <= 1);
211
212 } catch (final Exception ex) {
213 throw new LoggingException("Exception occurred writing log event", ex);
214 }
215 }
216
217 @Override
218 protected void releaseSub() {
219 LOGGER.debug("Shutting down FlumePersistentManager");
220 worker.shutdown();
221 try {
222 worker.join(SHUTDOWN_WAIT * MILLIS_PER_SECOND);
223 } catch (final InterruptedException ie) {
224
225 }
226 threadPool.shutdown();
227 try {
228 threadPool.awaitTermination(SHUTDOWN_WAIT, TimeUnit.SECONDS);
229 } catch (final InterruptedException ie) {
230 LOGGER.warn("PersistentManager Thread pool failed to shut down");
231 }
232 try {
233 worker.join();
234 } catch (final InterruptedException ex) {
235 LOGGER.debug("Interrupted while waiting for worker to complete");
236 }
237 try {
238 LOGGER.debug("FlumePersistenceManager dataset status: {}", database.getStats(new StatsConfig()));
239 database.close();
240 } catch (final Exception ex) {
241 LOGGER.warn("Failed to close database", ex);
242 }
243 try {
244 environment.cleanLog();
245 environment.close();
246 } catch (final Exception ex) {
247 LOGGER.warn("Failed to close environment", ex);
248 }
249 super.releaseSub();
250 }
251
252 private void doSend(final SimpleEvent event) {
253 LOGGER.debug("Sending event to Flume");
254 super.send(event);
255 }
256
257
258
259
260 private static class BDBWriter implements Callable<Integer> {
261 private final byte[] eventData;
262 private final byte[] keyData;
263 private final Environment environment;
264 private final Database database;
265 private final Gate gate;
266 private final AtomicLong dbCount;
267 private final long batchSize;
268 private final int lockTimeoutRetryCount;
269
270 public BDBWriter(final byte[] keyData, final byte[] eventData, final Environment environment,
271 final Database database, final Gate gate, final AtomicLong dbCount, final long batchSize,
272 final int lockTimeoutRetryCount) {
273 this.keyData = keyData;
274 this.eventData = eventData;
275 this.environment = environment;
276 this.database = database;
277 this.gate = gate;
278 this.dbCount = dbCount;
279 this.batchSize = batchSize;
280 this.lockTimeoutRetryCount = lockTimeoutRetryCount;
281 }
282
283 @Override
284 public Integer call() throws Exception {
285 final DatabaseEntry key = new DatabaseEntry(keyData);
286 final DatabaseEntry data = new DatabaseEntry(eventData);
287 Exception exception = null;
288 for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
289 Transaction txn = null;
290 try {
291 txn = environment.beginTransaction(null, null);
292 try {
293 database.put(txn, key, data);
294 txn.commit();
295 txn = null;
296 if (dbCount.incrementAndGet() >= batchSize) {
297 gate.open();
298 }
299 exception = null;
300 break;
301 } catch (final LockConflictException lce) {
302 exception = lce;
303
304 } catch (final Exception ex) {
305 if (txn != null) {
306 txn.abort();
307 }
308 throw ex;
309 } finally {
310 if (txn != null) {
311 txn.abort();
312 txn = null;
313 }
314 }
315 } catch (final LockConflictException lce) {
316 exception = lce;
317 if (txn != null) {
318 try {
319 txn.abort();
320 txn = null;
321 } catch (final Exception ex) {
322 LOGGER.trace("Ignoring exception while aborting transaction during lock conflict.");
323 }
324 }
325
326 }
327 try {
328 Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
329 } catch (final InterruptedException ie) {
330
331 }
332 }
333 if (exception != null) {
334 throw exception;
335 }
336 return eventData.length;
337 }
338 }
339
340
341
342
343 private static class FactoryData {
344 private final String name;
345 private final Agent[] agents;
346 private final int batchSize;
347 private final String dataDir;
348 private final int retries;
349 private final int connectionTimeout;
350 private final int requestTimeout;
351 private final int delayMillis;
352 private final int lockTimeoutRetryCount;
353 private final Property[] properties;
354
355
356
357
358
359
360
361
362 public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries,
363 final int connectionTimeout, final int requestTimeout, final int delayMillis,
364 final int lockTimeoutRetryCount, final String dataDir, final Property[] properties) {
365 this.name = name;
366 this.agents = agents;
367 this.batchSize = batchSize;
368 this.dataDir = dataDir;
369 this.retries = retries;
370 this.connectionTimeout = connectionTimeout;
371 this.requestTimeout = requestTimeout;
372 this.delayMillis = delayMillis;
373 this.lockTimeoutRetryCount = lockTimeoutRetryCount;
374 this.properties = properties;
375 }
376 }
377
378
379
380
381 private static class BDBManagerFactory implements ManagerFactory<FlumePersistentManager, FactoryData> {
382
383
384
385
386
387
388
389 @Override
390 public FlumePersistentManager createManager(final String name, final FactoryData data) {
391 SecretKey secretKey = null;
392 Database database = null;
393 Environment environment = null;
394
395 final Map<String, String> properties = new HashMap<>();
396 if (data.properties != null) {
397 for (final Property property : data.properties) {
398 properties.put(property.getName(), property.getValue());
399 }
400 }
401
402 try {
403 final File dir = new File(data.dataDir);
404 FileUtils.mkdir(dir, true);
405 final EnvironmentConfig dbEnvConfig = new EnvironmentConfig();
406 dbEnvConfig.setTransactional(true);
407 dbEnvConfig.setAllowCreate(true);
408 dbEnvConfig.setLockTimeout(5, TimeUnit.SECONDS);
409 environment = new Environment(dir, dbEnvConfig);
410 final DatabaseConfig dbConfig = new DatabaseConfig();
411 dbConfig.setTransactional(true);
412 dbConfig.setAllowCreate(true);
413 database = environment.openDatabase(null, name, dbConfig);
414 } catch (final Exception ex) {
415 LOGGER.error("Could not create FlumePersistentManager", ex);
416
417
418
419 if (database != null) {
420 database.close();
421 database = null;
422 }
423 if (environment != null) {
424 environment.close();
425 environment = null;
426 }
427 return null;
428 }
429
430 try {
431 String key = null;
432 for (final Map.Entry<String, String> entry : properties.entrySet()) {
433 if (entry.getKey().equalsIgnoreCase(KEY_PROVIDER)) {
434 key = entry.getValue();
435 break;
436 }
437 }
438 if (key != null) {
439 final PluginManager manager = new PluginManager("KeyProvider");
440 manager.collectPlugins();
441 final Map<String, PluginType<?>> plugins = manager.getPlugins();
442 if (plugins != null) {
443 boolean found = false;
444 for (final Map.Entry<String, PluginType<?>> entry : plugins.entrySet()) {
445 if (entry.getKey().equalsIgnoreCase(key)) {
446 found = true;
447 final Class<?> cl = entry.getValue().getPluginClass();
448 try {
449 final SecretKeyProvider provider = (SecretKeyProvider) cl.newInstance();
450 secretKey = provider.getSecretKey();
451 LOGGER.debug("Persisting events using SecretKeyProvider {}", cl.getName());
452 } catch (final Exception ex) {
453 LOGGER.error("Unable to create SecretKeyProvider {}, encryption will be disabled",
454 cl.getName());
455 }
456 break;
457 }
458 }
459 if (!found) {
460 LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
461 }
462 } else {
463 LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
464 }
465 }
466 } catch (final Exception ex) {
467 LOGGER.warn("Error setting up encryption - encryption will be disabled", ex);
468 }
469 return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries,
470 data.connectionTimeout, data.requestTimeout, data.delayMillis, database, environment, secretKey,
471 data.lockTimeoutRetryCount);
472 }
473 }
474
475
476
477
478 private static class WriterThread extends Thread {
479 private volatile boolean shutdown = false;
480 private final Database database;
481 private final Environment environment;
482 private final FlumePersistentManager manager;
483 private final Gate gate;
484 private final SecretKey secretKey;
485 private final int batchSize;
486 private final AtomicLong dbCounter;
487 private final int lockTimeoutRetryCount;
488
489 public WriterThread(final Database database, final Environment environment,
490 final FlumePersistentManager manager, final Gate gate, final int batchsize,
491 final SecretKey secretKey, final AtomicLong dbCount, final int lockTimeoutRetryCount) {
492 this.database = database;
493 this.environment = environment;
494 this.manager = manager;
495 this.gate = gate;
496 this.batchSize = batchsize;
497 this.secretKey = secretKey;
498 this.setDaemon(true);
499 this.dbCounter = dbCount;
500 this.lockTimeoutRetryCount = lockTimeoutRetryCount;
501 }
502
503 public void shutdown() {
504 LOGGER.debug("Writer thread shutting down");
505 this.shutdown = true;
506 gate.open();
507 }
508
509 public boolean isShutdown() {
510 return shutdown;
511 }
512
513 @Override
514 public void run() {
515 LOGGER.trace("WriterThread started - batch size = " + batchSize + ", delayMillis = " + manager.getDelayMillis());
516 long nextBatchMillis = System.currentTimeMillis() + manager.getDelayMillis();
517 while (!shutdown) {
518 final long nowMillis = System.currentTimeMillis();
519 final long dbCount = database.count();
520 dbCounter.set(dbCount);
521 if (dbCount >= batchSize || dbCount > 0 && nextBatchMillis <= nowMillis) {
522 nextBatchMillis = nowMillis + manager.getDelayMillis();
523 try {
524 boolean errors = false;
525 final DatabaseEntry key = new DatabaseEntry();
526 final DatabaseEntry data = new DatabaseEntry();
527
528 gate.close();
529 OperationStatus status;
530 if (batchSize > 1) {
531 try {
532 errors = sendBatch(key, data);
533 } catch (final Exception ex) {
534 break;
535 }
536 } else {
537 Exception exception = null;
538 for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
539 exception = null;
540 Transaction txn = null;
541 Cursor cursor = null;
542 try {
543 txn = environment.beginTransaction(null, null);
544 cursor = database.openCursor(txn, null);
545 try {
546 status = cursor.getFirst(key, data, LockMode.RMW);
547 while (status == OperationStatus.SUCCESS) {
548 final SimpleEvent event = createEvent(data);
549 if (event != null) {
550 try {
551 manager.doSend(event);
552 } catch (final Exception ioe) {
553 errors = true;
554 LOGGER.error("Error sending event", ioe);
555 break;
556 }
557 try {
558 cursor.delete();
559 } catch (final Exception ex) {
560 LOGGER.error("Unable to delete event", ex);
561 }
562 }
563 status = cursor.getNext(key, data, LockMode.RMW);
564 }
565 if (cursor != null) {
566 cursor.close();
567 cursor = null;
568 }
569 txn.commit();
570 txn = null;
571 dbCounter.decrementAndGet();
572 exception = null;
573 break;
574 } catch (final LockConflictException lce) {
575 exception = lce;
576
577 } catch (final Exception ex) {
578 LOGGER.error("Error reading or writing to database", ex);
579 shutdown = true;
580 break;
581 } finally {
582 if (cursor != null) {
583 cursor.close();
584 cursor = null;
585 }
586 if (txn != null) {
587 txn.abort();
588 txn = null;
589 }
590 }
591 } catch (final LockConflictException lce) {
592 exception = lce;
593 if (cursor != null) {
594 try {
595 cursor.close();
596 cursor = null;
597 } catch (final Exception ex) {
598 LOGGER.trace("Ignored exception closing cursor during lock conflict.");
599 }
600 }
601 if (txn != null) {
602 try {
603 txn.abort();
604 txn = null;
605 } catch (final Exception ex) {
606 LOGGER.trace("Ignored exception aborting tx during lock conflict.");
607 }
608 }
609 }
610 try {
611 Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
612 } catch (final InterruptedException ie) {
613
614 }
615 }
616 if (exception != null) {
617 LOGGER.error("Unable to read or update data base", exception);
618 }
619 }
620 if (errors) {
621 Thread.sleep(manager.getDelayMillis());
622 continue;
623 }
624 } catch (final Exception ex) {
625 LOGGER.warn("WriterThread encountered an exception. Continuing.", ex);
626 }
627 } else {
628 if (nextBatchMillis <= nowMillis) {
629 nextBatchMillis = nowMillis + manager.getDelayMillis();
630 }
631 try {
632 final long interval = nextBatchMillis - nowMillis;
633 gate.waitForOpen(interval);
634 } catch (final InterruptedException ie) {
635 LOGGER.warn("WriterThread interrupted, continuing");
636 } catch (final Exception ex) {
637 LOGGER.error("WriterThread encountered an exception waiting for work", ex);
638 break;
639 }
640 }
641 }
642
643 if (batchSize > 1 && database.count() > 0) {
644 final DatabaseEntry key = new DatabaseEntry();
645 final DatabaseEntry data = new DatabaseEntry();
646 try {
647 sendBatch(key, data);
648 } catch (final Exception ex) {
649 LOGGER.warn("Unable to write final batch");
650 }
651 }
652 LOGGER.trace("WriterThread exiting");
653 }
654
655 private boolean sendBatch(DatabaseEntry key, final DatabaseEntry data) throws Exception {
656 boolean errors = false;
657 OperationStatus status;
658 Cursor cursor = null;
659 try {
660 final BatchEvent batch = new BatchEvent();
661 for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
662 try {
663 cursor = database.openCursor(null, CursorConfig.DEFAULT);
664 status = cursor.getFirst(key, data, null);
665
666 for (int i = 0; status == OperationStatus.SUCCESS && i < batchSize; ++i) {
667 final SimpleEvent event = createEvent(data);
668 if (event != null) {
669 batch.addEvent(event);
670 }
671 status = cursor.getNext(key, data, null);
672 }
673 break;
674 } catch (final LockConflictException lce) {
675 if (cursor != null) {
676 try {
677 cursor.close();
678 cursor = null;
679 } catch (final Exception ex) {
680 LOGGER.trace("Ignored exception closing cursor during lock conflict.");
681 }
682 }
683 }
684 }
685
686 try {
687 manager.send(batch);
688 } catch (final Exception ioe) {
689 LOGGER.error("Error sending events", ioe);
690 errors = true;
691 }
692 if (!errors) {
693 if (cursor != null) {
694 cursor.close();
695 cursor = null;
696 }
697 Transaction txn = null;
698 Exception exception = null;
699 for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
700 try {
701 txn = environment.beginTransaction(null, null);
702 try {
703 for (final Event event : batch.getEvents()) {
704 try {
705 final Map<String, String> headers = event.getHeaders();
706 key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
707 database.delete(txn, key);
708 } catch (final Exception ex) {
709 LOGGER.error("Error deleting key from database", ex);
710 }
711 }
712 txn.commit();
713 long count = dbCounter.get();
714 while (!dbCounter.compareAndSet(count, count - batch.getEvents().size())) {
715 count = dbCounter.get();
716 }
717 exception = null;
718 break;
719 } catch (final LockConflictException lce) {
720 exception = lce;
721 if (cursor != null) {
722 try {
723 cursor.close();
724 cursor = null;
725 } catch (final Exception ex) {
726 LOGGER.trace("Ignored exception closing cursor during lock conflict.");
727 }
728 }
729 if (txn != null) {
730 try {
731 txn.abort();
732 txn = null;
733 } catch (final Exception ex) {
734 LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
735 }
736 }
737 } catch (final Exception ex) {
738 LOGGER.error("Unable to commit transaction", ex);
739 if (txn != null) {
740 txn.abort();
741 }
742 }
743 } catch (final LockConflictException lce) {
744 exception = lce;
745 if (cursor != null) {
746 try {
747 cursor.close();
748 cursor = null;
749 } catch (final Exception ex) {
750 LOGGER.trace("Ignored exception closing cursor during lock conflict.");
751 }
752 }
753 if (txn != null) {
754 try {
755 txn.abort();
756 txn = null;
757 } catch (final Exception ex) {
758 LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
759 }
760 }
761 } finally {
762 if (cursor != null) {
763 cursor.close();
764 cursor = null;
765 }
766 if (txn != null) {
767 txn.abort();
768 txn = null;
769 }
770 }
771 try {
772 Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
773 } catch (final InterruptedException ie) {
774
775 }
776 }
777 if (exception != null) {
778 LOGGER.error("Unable to delete events from data base", exception);
779 }
780 }
781 } catch (final Exception ex) {
782 LOGGER.error("Error reading database", ex);
783 shutdown = true;
784 throw ex;
785 } finally {
786 if (cursor != null) {
787 cursor.close();
788 }
789 }
790
791 return errors;
792 }
793
794 private SimpleEvent createEvent(final DatabaseEntry data) {
795 final SimpleEvent event = new SimpleEvent();
796 try {
797 byte[] eventData = data.getData();
798 if (secretKey != null) {
799 final Cipher cipher = Cipher.getInstance("AES");
800 cipher.init(Cipher.DECRYPT_MODE, secretKey);
801 eventData = cipher.doFinal(eventData);
802 }
803 final ByteArrayInputStream bais = new ByteArrayInputStream(eventData);
804 final DataInputStream dais = new DataInputStream(bais);
805 int length = dais.readInt();
806 final byte[] bytes = new byte[length];
807 dais.read(bytes, 0, length);
808 event.setBody(bytes);
809 length = dais.readInt();
810 final Map<String, String> map = new HashMap<>(length);
811 for (int i = 0; i < length; ++i) {
812 final String headerKey = dais.readUTF();
813 final String value = dais.readUTF();
814 map.put(headerKey, value);
815 }
816 event.setHeaders(map);
817 return event;
818 } catch (final Exception ex) {
819 LOGGER.error("Error retrieving event", ex);
820 return null;
821 }
822 }
823
824 }
825
826
827
828
829 private static class DaemonThreadFactory implements ThreadFactory {
830 private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
831 private final ThreadGroup group;
832 private final AtomicInteger threadNumber = new AtomicInteger(1);
833 private final String namePrefix;
834
835 public DaemonThreadFactory() {
836 final SecurityManager securityManager = System.getSecurityManager();
837 group = securityManager != null ? securityManager.getThreadGroup() :
838 Thread.currentThread().getThreadGroup();
839 namePrefix = "DaemonPool-" + POOL_NUMBER.getAndIncrement() + "-thread-";
840 }
841
842 @Override
843 public Thread newThread(final Runnable r) {
844 final Thread thread = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
845 thread.setDaemon(true);
846 if (thread.getPriority() != Thread.NORM_PRIORITY) {
847 thread.setPriority(Thread.NORM_PRIORITY);
848 }
849 return thread;
850 }
851 }
852
853
854
855
856 private static class Gate {
857
858 private boolean isOpen = false;
859
860 public boolean isOpen() {
861 return isOpen;
862 }
863
864 public synchronized void open() {
865 isOpen = true;
866 notifyAll();
867 }
868
869 public synchronized void close() {
870 isOpen = false;
871 }
872
873 public synchronized void waitForOpen(final long timeout) throws InterruptedException {
874 wait(timeout);
875 }
876 }
877 }