1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.flume.appender;
18
19 import java.io.ByteArrayInputStream;
20 import java.io.ByteArrayOutputStream;
21 import java.io.DataInputStream;
22 import java.io.DataOutputStream;
23 import java.io.File;
24 import java.nio.charset.Charset;
25 import java.nio.charset.StandardCharsets;
26 import java.util.HashMap;
27 import java.util.Map;
28 import java.util.concurrent.Callable;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.Future;
32 import java.util.concurrent.ThreadFactory;
33 import java.util.concurrent.TimeUnit;
34 import java.util.concurrent.atomic.AtomicInteger;
35 import java.util.concurrent.atomic.AtomicLong;
36
37 import javax.crypto.Cipher;
38 import javax.crypto.SecretKey;
39
40 import org.apache.flume.Event;
41 import org.apache.flume.event.SimpleEvent;
42 import org.apache.logging.log4j.LoggingException;
43 import org.apache.logging.log4j.core.appender.ManagerFactory;
44 import org.apache.logging.log4j.core.config.Property;
45 import org.apache.logging.log4j.core.config.plugins.util.PluginManager;
46 import org.apache.logging.log4j.core.config.plugins.util.PluginType;
47 import org.apache.logging.log4j.core.util.FileUtils;
48 import org.apache.logging.log4j.core.util.SecretKeyProvider;
49 import org.apache.logging.log4j.util.Strings;
50
51 import com.sleepycat.je.Cursor;
52 import com.sleepycat.je.CursorConfig;
53 import com.sleepycat.je.Database;
54 import com.sleepycat.je.DatabaseConfig;
55 import com.sleepycat.je.DatabaseEntry;
56 import com.sleepycat.je.Environment;
57 import com.sleepycat.je.EnvironmentConfig;
58 import com.sleepycat.je.LockConflictException;
59 import com.sleepycat.je.LockMode;
60 import com.sleepycat.je.OperationStatus;
61 import com.sleepycat.je.StatsConfig;
62 import com.sleepycat.je.Transaction;
63
64
65
66
67 public class FlumePersistentManager extends FlumeAvroManager {
68
69
70 public static final String KEY_PROVIDER = "keyProvider";
71
72 private static final Charset UTF8 = StandardCharsets.UTF_8;
73
74 private static final String DEFAULT_DATA_DIR = ".log4j/flumeData";
75
76 private static final int SHUTDOWN_WAIT = 60;
77
78 private static final int MILLIS_PER_SECOND = 1000;
79
80 private static final int LOCK_TIMEOUT_SLEEP_MILLIS = 500;
81
82 private static BDBManagerFactory factory = new BDBManagerFactory();
83
84 private final Database database;
85
86 private final Environment environment;
87
88 private final WriterThread worker;
89
90 private final Gate gate = new Gate();
91
92 private final SecretKey secretKey;
93
94 private final int lockTimeoutRetryCount;
95
96 private final ExecutorService threadPool;
97
98 private final AtomicLong dbCount = new AtomicLong();
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115 protected FlumePersistentManager(final String name, final String shortName, final Agent[] agents,
116 final int batchSize, final int retries, final int connectionTimeout,
117 final int requestTimeout, final int delay, final Database database,
118 final Environment environment, final SecretKey secretKey,
119 final int lockTimeoutRetryCount) {
120 super(name, shortName, agents, batchSize, delay, retries, connectionTimeout, requestTimeout);
121 this.database = database;
122 this.environment = environment;
123 dbCount.set(database.count());
124 this.worker = new WriterThread(database, environment, this, gate, batchSize, secretKey, dbCount,
125 lockTimeoutRetryCount);
126 this.worker.start();
127 this.secretKey = secretKey;
128 this.threadPool = Executors.newCachedThreadPool(new DaemonThreadFactory());
129 this.lockTimeoutRetryCount = lockTimeoutRetryCount;
130 }
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147 public static FlumePersistentManager getManager(final String name, final Agent[] agents,
148 final Property[] properties, int batchSize, final int retries,
149 final int connectionTimeout, final int requestTimeout,
150 final int delayMillis, final int lockTimeoutRetryCount,
151 final String dataDir) {
152 if (agents == null || agents.length == 0) {
153 throw new IllegalArgumentException("At least one agent is required");
154 }
155
156 if (batchSize <= 0) {
157 batchSize = 1;
158 }
159 final String dataDirectory = Strings.isEmpty(dataDir) ? DEFAULT_DATA_DIR : dataDir;
160
161 final StringBuilder sb = new StringBuilder("FlumePersistent[");
162 boolean first = true;
163 for (final Agent agent : agents) {
164 if (!first) {
165 sb.append(',');
166 }
167 sb.append(agent.getHost()).append(':').append(agent.getPort());
168 first = false;
169 }
170 sb.append(']');
171 sb.append(' ').append(dataDirectory);
172 return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries,
173 connectionTimeout, requestTimeout, delayMillis, lockTimeoutRetryCount, dataDir, properties));
174 }
175
176 @Override
177 public void send(final Event event) {
178 if (worker.isShutdown()) {
179 throw new LoggingException("Unable to record event");
180 }
181
182 final Map<String, String> headers = event.getHeaders();
183 final byte[] keyData = headers.get(FlumeEvent.GUID).getBytes(UTF8);
184 try {
185 final ByteArrayOutputStream baos = new ByteArrayOutputStream();
186 final DataOutputStream daos = new DataOutputStream(baos);
187 daos.writeInt(event.getBody().length);
188 daos.write(event.getBody(), 0, event.getBody().length);
189 daos.writeInt(event.getHeaders().size());
190 for (final Map.Entry<String, String> entry : headers.entrySet()) {
191 daos.writeUTF(entry.getKey());
192 daos.writeUTF(entry.getValue());
193 }
194 byte[] eventData = baos.toByteArray();
195 if (secretKey != null) {
196 final Cipher cipher = Cipher.getInstance("AES");
197 cipher.init(Cipher.ENCRYPT_MODE, secretKey);
198 eventData = cipher.doFinal(eventData);
199 }
200 final Future<Integer> future = threadPool.submit(new BDBWriter(keyData, eventData, environment, database,
201 gate, dbCount, getBatchSize(), lockTimeoutRetryCount));
202 boolean interrupted = false;
203 int ieCount = 0;
204 do {
205 try {
206 future.get();
207 } catch (final InterruptedException ie) {
208 interrupted = true;
209 ++ieCount;
210 }
211 } while (interrupted && ieCount <= 1);
212
213 } catch (final Exception ex) {
214 throw new LoggingException("Exception occurred writing log event", ex);
215 }
216 }
217
218 @Override
219 protected void releaseSub() {
220 LOGGER.debug("Shutting down FlumePersistentManager");
221 worker.shutdown();
222 try {
223 worker.join(SHUTDOWN_WAIT * MILLIS_PER_SECOND);
224 } catch (final InterruptedException ie) {
225
226 }
227 threadPool.shutdown();
228 try {
229 threadPool.awaitTermination(SHUTDOWN_WAIT, TimeUnit.SECONDS);
230 } catch (final InterruptedException ie) {
231 LOGGER.warn("PersistentManager Thread pool failed to shut down");
232 }
233 try {
234 worker.join();
235 } catch (final InterruptedException ex) {
236 LOGGER.debug("Interrupted while waiting for worker to complete");
237 }
238 try {
239 LOGGER.debug("FlumePersistenceManager dataset status: {}", database.getStats(new StatsConfig()));
240 database.close();
241 } catch (final Exception ex) {
242 LOGGER.warn("Failed to close database", ex);
243 }
244 try {
245 environment.cleanLog();
246 environment.close();
247 } catch (final Exception ex) {
248 LOGGER.warn("Failed to close environment", ex);
249 }
250 super.releaseSub();
251 }
252
253 private void doSend(final SimpleEvent event) {
254 LOGGER.debug("Sending event to Flume");
255 super.send(event);
256 }
257
258
259
260
261 private static class BDBWriter implements Callable<Integer> {
262 private final byte[] eventData;
263 private final byte[] keyData;
264 private final Environment environment;
265 private final Database database;
266 private final Gate gate;
267 private final AtomicLong dbCount;
268 private final long batchSize;
269 private final int lockTimeoutRetryCount;
270
271 public BDBWriter(final byte[] keyData, final byte[] eventData, final Environment environment,
272 final Database database, final Gate gate, final AtomicLong dbCount, final long batchSize,
273 final int lockTimeoutRetryCount) {
274 this.keyData = keyData;
275 this.eventData = eventData;
276 this.environment = environment;
277 this.database = database;
278 this.gate = gate;
279 this.dbCount = dbCount;
280 this.batchSize = batchSize;
281 this.lockTimeoutRetryCount = lockTimeoutRetryCount;
282 }
283
284 @Override
285 public Integer call() throws Exception {
286 final DatabaseEntry key = new DatabaseEntry(keyData);
287 final DatabaseEntry data = new DatabaseEntry(eventData);
288 Exception exception = null;
289 for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
290 Transaction txn = null;
291 try {
292 txn = environment.beginTransaction(null, null);
293 try {
294 database.put(txn, key, data);
295 txn.commit();
296 txn = null;
297 if (dbCount.incrementAndGet() >= batchSize) {
298 gate.open();
299 }
300 exception = null;
301 break;
302 } catch (final LockConflictException lce) {
303 exception = lce;
304
305 } catch (final Exception ex) {
306 if (txn != null) {
307 txn.abort();
308 }
309 throw ex;
310 } finally {
311 if (txn != null) {
312 txn.abort();
313 txn = null;
314 }
315 }
316 } catch (final LockConflictException lce) {
317 exception = lce;
318 if (txn != null) {
319 try {
320 txn.abort();
321 txn = null;
322 } catch (final Exception ex) {
323 LOGGER.trace("Ignoring exception while aborting transaction during lock conflict.");
324 }
325 }
326
327 }
328 try {
329 Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
330 } catch (final InterruptedException ie) {
331
332 }
333 }
334 if (exception != null) {
335 throw exception;
336 }
337 return eventData.length;
338 }
339 }
340
341
342
343
344 private static class FactoryData {
345 private final String name;
346 private final Agent[] agents;
347 private final int batchSize;
348 private final String dataDir;
349 private final int retries;
350 private final int connectionTimeout;
351 private final int requestTimeout;
352 private final int delayMillis;
353 private final int lockTimeoutRetryCount;
354 private final Property[] properties;
355
356
357
358
359
360
361
362
363 public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries,
364 final int connectionTimeout, final int requestTimeout, final int delayMillis,
365 final int lockTimeoutRetryCount, final String dataDir, final Property[] properties) {
366 this.name = name;
367 this.agents = agents;
368 this.batchSize = batchSize;
369 this.dataDir = dataDir;
370 this.retries = retries;
371 this.connectionTimeout = connectionTimeout;
372 this.requestTimeout = requestTimeout;
373 this.delayMillis = delayMillis;
374 this.lockTimeoutRetryCount = lockTimeoutRetryCount;
375 this.properties = properties;
376 }
377 }
378
379
380
381
382 private static class BDBManagerFactory implements ManagerFactory<FlumePersistentManager, FactoryData> {
383
384
385
386
387
388
389
390 @Override
391 public FlumePersistentManager createManager(final String name, final FactoryData data) {
392 SecretKey secretKey = null;
393 Database database = null;
394 Environment environment = null;
395
396 final Map<String, String> properties = new HashMap<>();
397 if (data.properties != null) {
398 for (final Property property : data.properties) {
399 properties.put(property.getName(), property.getValue());
400 }
401 }
402
403 try {
404 final File dir = new File(data.dataDir);
405 FileUtils.mkdir(dir, true);
406 final EnvironmentConfig dbEnvConfig = new EnvironmentConfig();
407 dbEnvConfig.setTransactional(true);
408 dbEnvConfig.setAllowCreate(true);
409 dbEnvConfig.setLockTimeout(5, TimeUnit.SECONDS);
410 environment = new Environment(dir, dbEnvConfig);
411 final DatabaseConfig dbConfig = new DatabaseConfig();
412 dbConfig.setTransactional(true);
413 dbConfig.setAllowCreate(true);
414 database = environment.openDatabase(null, name, dbConfig);
415 } catch (final Exception ex) {
416 LOGGER.error("Could not create FlumePersistentManager", ex);
417
418
419
420 if (database != null) {
421 database.close();
422 database = null;
423 }
424 if (environment != null) {
425 environment.close();
426 environment = null;
427 }
428 return null;
429 }
430
431 try {
432 String key = null;
433 for (final Map.Entry<String, String> entry : properties.entrySet()) {
434 if (entry.getKey().equalsIgnoreCase(KEY_PROVIDER)) {
435 key = entry.getValue();
436 break;
437 }
438 }
439 if (key != null) {
440 final PluginManager manager = new PluginManager("KeyProvider");
441 manager.collectPlugins();
442 final Map<String, PluginType<?>> plugins = manager.getPlugins();
443 if (plugins != null) {
444 boolean found = false;
445 for (final Map.Entry<String, PluginType<?>> entry : plugins.entrySet()) {
446 if (entry.getKey().equalsIgnoreCase(key)) {
447 found = true;
448 final Class<?> cl = entry.getValue().getPluginClass();
449 try {
450 final SecretKeyProvider provider = (SecretKeyProvider) cl.newInstance();
451 secretKey = provider.getSecretKey();
452 LOGGER.debug("Persisting events using SecretKeyProvider {}", cl.getName());
453 } catch (final Exception ex) {
454 LOGGER.error("Unable to create SecretKeyProvider {}, encryption will be disabled",
455 cl.getName());
456 }
457 break;
458 }
459 }
460 if (!found) {
461 LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
462 }
463 } else {
464 LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
465 }
466 }
467 } catch (final Exception ex) {
468 LOGGER.warn("Error setting up encryption - encryption will be disabled", ex);
469 }
470 return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries,
471 data.connectionTimeout, data.requestTimeout, data.delayMillis, database, environment, secretKey,
472 data.lockTimeoutRetryCount);
473 }
474 }
475
476
477
478
479 private static class WriterThread extends Thread {
480 private volatile boolean shutdown = false;
481 private final Database database;
482 private final Environment environment;
483 private final FlumePersistentManager manager;
484 private final Gate gate;
485 private final SecretKey secretKey;
486 private final int batchSize;
487 private final AtomicLong dbCounter;
488 private final int lockTimeoutRetryCount;
489
490 public WriterThread(final Database database, final Environment environment,
491 final FlumePersistentManager manager, final Gate gate, final int batchsize,
492 final SecretKey secretKey, final AtomicLong dbCount, final int lockTimeoutRetryCount) {
493 this.database = database;
494 this.environment = environment;
495 this.manager = manager;
496 this.gate = gate;
497 this.batchSize = batchsize;
498 this.secretKey = secretKey;
499 this.setDaemon(true);
500 this.dbCounter = dbCount;
501 this.lockTimeoutRetryCount = lockTimeoutRetryCount;
502 }
503
504 public void shutdown() {
505 LOGGER.debug("Writer thread shutting down");
506 this.shutdown = true;
507 gate.open();
508 }
509
510 public boolean isShutdown() {
511 return shutdown;
512 }
513
514 @Override
515 public void run() {
516 LOGGER.trace("WriterThread started - batch size = " + batchSize + ", delayMillis = " + manager.getDelayMillis());
517 long nextBatchMillis = System.currentTimeMillis() + manager.getDelayMillis();
518 while (!shutdown) {
519 final long nowMillis = System.currentTimeMillis();
520 final long dbCount = database.count();
521 dbCounter.set(dbCount);
522 if (dbCount >= batchSize || dbCount > 0 && nextBatchMillis <= nowMillis) {
523 nextBatchMillis = nowMillis + manager.getDelayMillis();
524 try {
525 boolean errors = false;
526 final DatabaseEntry key = new DatabaseEntry();
527 final DatabaseEntry data = new DatabaseEntry();
528
529 gate.close();
530 OperationStatus status;
531 if (batchSize > 1) {
532 try {
533 errors = sendBatch(key, data);
534 } catch (final Exception ex) {
535 break;
536 }
537 } else {
538 Exception exception = null;
539 for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
540 exception = null;
541 Transaction txn = null;
542 Cursor cursor = null;
543 try {
544 txn = environment.beginTransaction(null, null);
545 cursor = database.openCursor(txn, null);
546 try {
547 status = cursor.getFirst(key, data, LockMode.RMW);
548 while (status == OperationStatus.SUCCESS) {
549 final SimpleEvent event = createEvent(data);
550 if (event != null) {
551 try {
552 manager.doSend(event);
553 } catch (final Exception ioe) {
554 errors = true;
555 LOGGER.error("Error sending event", ioe);
556 break;
557 }
558 try {
559 cursor.delete();
560 } catch (final Exception ex) {
561 LOGGER.error("Unable to delete event", ex);
562 }
563 }
564 status = cursor.getNext(key, data, LockMode.RMW);
565 }
566 if (cursor != null) {
567 cursor.close();
568 cursor = null;
569 }
570 txn.commit();
571 txn = null;
572 dbCounter.decrementAndGet();
573 exception = null;
574 break;
575 } catch (final LockConflictException lce) {
576 exception = lce;
577
578 } catch (final Exception ex) {
579 LOGGER.error("Error reading or writing to database", ex);
580 shutdown = true;
581 break;
582 } finally {
583 if (cursor != null) {
584 cursor.close();
585 cursor = null;
586 }
587 if (txn != null) {
588 txn.abort();
589 txn = null;
590 }
591 }
592 } catch (final LockConflictException lce) {
593 exception = lce;
594 if (cursor != null) {
595 try {
596 cursor.close();
597 cursor = null;
598 } catch (final Exception ex) {
599 LOGGER.trace("Ignored exception closing cursor during lock conflict.");
600 }
601 }
602 if (txn != null) {
603 try {
604 txn.abort();
605 txn = null;
606 } catch (final Exception ex) {
607 LOGGER.trace("Ignored exception aborting tx during lock conflict.");
608 }
609 }
610 }
611 try {
612 Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
613 } catch (final InterruptedException ie) {
614
615 }
616 }
617 if (exception != null) {
618 LOGGER.error("Unable to read or update data base", exception);
619 }
620 }
621 if (errors) {
622 Thread.sleep(manager.getDelayMillis());
623 continue;
624 }
625 } catch (final Exception ex) {
626 LOGGER.warn("WriterThread encountered an exception. Continuing.", ex);
627 }
628 } else {
629 if (nextBatchMillis <= nowMillis) {
630 nextBatchMillis = nowMillis + manager.getDelayMillis();
631 }
632 try {
633 final long interval = nextBatchMillis - nowMillis;
634 gate.waitForOpen(interval);
635 } catch (final InterruptedException ie) {
636 LOGGER.warn("WriterThread interrupted, continuing");
637 } catch (final Exception ex) {
638 LOGGER.error("WriterThread encountered an exception waiting for work", ex);
639 break;
640 }
641 }
642 }
643
644 if (batchSize > 1 && database.count() > 0) {
645 final DatabaseEntry key = new DatabaseEntry();
646 final DatabaseEntry data = new DatabaseEntry();
647 try {
648 sendBatch(key, data);
649 } catch (final Exception ex) {
650 LOGGER.warn("Unable to write final batch");
651 }
652 }
653 LOGGER.trace("WriterThread exiting");
654 }
655
656 private boolean sendBatch(DatabaseEntry key, final DatabaseEntry data) throws Exception {
657 boolean errors = false;
658 OperationStatus status;
659 Cursor cursor = null;
660 try {
661 final BatchEvent batch = new BatchEvent();
662 for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
663 try {
664 cursor = database.openCursor(null, CursorConfig.DEFAULT);
665 status = cursor.getFirst(key, data, null);
666
667 for (int i = 0; status == OperationStatus.SUCCESS && i < batchSize; ++i) {
668 final SimpleEvent event = createEvent(data);
669 if (event != null) {
670 batch.addEvent(event);
671 }
672 status = cursor.getNext(key, data, null);
673 }
674 break;
675 } catch (final LockConflictException lce) {
676 if (cursor != null) {
677 try {
678 cursor.close();
679 cursor = null;
680 } catch (final Exception ex) {
681 LOGGER.trace("Ignored exception closing cursor during lock conflict.");
682 }
683 }
684 }
685 }
686
687 try {
688 manager.send(batch);
689 } catch (final Exception ioe) {
690 LOGGER.error("Error sending events", ioe);
691 errors = true;
692 }
693 if (!errors) {
694 if (cursor != null) {
695 cursor.close();
696 cursor = null;
697 }
698 Transaction txn = null;
699 Exception exception = null;
700 for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
701 try {
702 txn = environment.beginTransaction(null, null);
703 try {
704 for (final Event event : batch.getEvents()) {
705 try {
706 final Map<String, String> headers = event.getHeaders();
707 key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
708 database.delete(txn, key);
709 } catch (final Exception ex) {
710 LOGGER.error("Error deleting key from database", ex);
711 }
712 }
713 txn.commit();
714 long count = dbCounter.get();
715 while (!dbCounter.compareAndSet(count, count - batch.getEvents().size())) {
716 count = dbCounter.get();
717 }
718 exception = null;
719 break;
720 } catch (final LockConflictException lce) {
721 exception = lce;
722 if (cursor != null) {
723 try {
724 cursor.close();
725 cursor = null;
726 } catch (final Exception ex) {
727 LOGGER.trace("Ignored exception closing cursor during lock conflict.");
728 }
729 }
730 if (txn != null) {
731 try {
732 txn.abort();
733 txn = null;
734 } catch (final Exception ex) {
735 LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
736 }
737 }
738 } catch (final Exception ex) {
739 LOGGER.error("Unable to commit transaction", ex);
740 if (txn != null) {
741 txn.abort();
742 }
743 }
744 } catch (final LockConflictException lce) {
745 exception = lce;
746 if (cursor != null) {
747 try {
748 cursor.close();
749 cursor = null;
750 } catch (final Exception ex) {
751 LOGGER.trace("Ignored exception closing cursor during lock conflict.");
752 }
753 }
754 if (txn != null) {
755 try {
756 txn.abort();
757 txn = null;
758 } catch (final Exception ex) {
759 LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
760 }
761 }
762 } finally {
763 if (cursor != null) {
764 cursor.close();
765 cursor = null;
766 }
767 if (txn != null) {
768 txn.abort();
769 txn = null;
770 }
771 }
772 try {
773 Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
774 } catch (final InterruptedException ie) {
775
776 }
777 }
778 if (exception != null) {
779 LOGGER.error("Unable to delete events from data base", exception);
780 }
781 }
782 } catch (final Exception ex) {
783 LOGGER.error("Error reading database", ex);
784 shutdown = true;
785 throw ex;
786 } finally {
787 if (cursor != null) {
788 cursor.close();
789 }
790 }
791
792 return errors;
793 }
794
795 private SimpleEvent createEvent(final DatabaseEntry data) {
796 final SimpleEvent event = new SimpleEvent();
797 try {
798 byte[] eventData = data.getData();
799 if (secretKey != null) {
800 final Cipher cipher = Cipher.getInstance("AES");
801 cipher.init(Cipher.DECRYPT_MODE, secretKey);
802 eventData = cipher.doFinal(eventData);
803 }
804 final ByteArrayInputStream bais = new ByteArrayInputStream(eventData);
805 final DataInputStream dais = new DataInputStream(bais);
806 int length = dais.readInt();
807 final byte[] bytes = new byte[length];
808 dais.read(bytes, 0, length);
809 event.setBody(bytes);
810 length = dais.readInt();
811 final Map<String, String> map = new HashMap<>(length);
812 for (int i = 0; i < length; ++i) {
813 final String headerKey = dais.readUTF();
814 final String value = dais.readUTF();
815 map.put(headerKey, value);
816 }
817 event.setHeaders(map);
818 return event;
819 } catch (final Exception ex) {
820 LOGGER.error("Error retrieving event", ex);
821 return null;
822 }
823 }
824
825 }
826
827
828
829
830 private static class DaemonThreadFactory implements ThreadFactory {
831 private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
832 private final ThreadGroup group;
833 private final AtomicInteger threadNumber = new AtomicInteger(1);
834 private final String namePrefix;
835
836 public DaemonThreadFactory() {
837 final SecurityManager securityManager = System.getSecurityManager();
838 group = securityManager != null ? securityManager.getThreadGroup() :
839 Thread.currentThread().getThreadGroup();
840 namePrefix = "DaemonPool-" + POOL_NUMBER.getAndIncrement() + "-thread-";
841 }
842
843 @Override
844 public Thread newThread(final Runnable r) {
845 final Thread thread = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
846 thread.setDaemon(true);
847 if (thread.getPriority() != Thread.NORM_PRIORITY) {
848 thread.setPriority(Thread.NORM_PRIORITY);
849 }
850 return thread;
851 }
852 }
853
854
855
856
857 private static class Gate {
858
859 private boolean isOpen = false;
860
861 public boolean isOpen() {
862 return isOpen;
863 }
864
865 public synchronized void open() {
866 isOpen = true;
867 notifyAll();
868 }
869
870 public synchronized void close() {
871 isOpen = false;
872 }
873
874 public synchronized void waitForOpen(final long timeout) throws InterruptedException {
875 wait(timeout);
876 }
877 }
878 }