1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.flume.appender;
18
19 import java.io.ByteArrayInputStream;
20 import java.io.ByteArrayOutputStream;
21 import java.io.DataInputStream;
22 import java.io.DataOutputStream;
23 import java.io.File;
24 import java.nio.charset.Charset;
25 import java.util.HashMap;
26 import java.util.Map;
27 import java.util.concurrent.Callable;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.Future;
31 import java.util.concurrent.ThreadFactory;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.concurrent.atomic.AtomicLong;
35 import javax.crypto.Cipher;
36 import javax.crypto.SecretKey;
37
38 import org.apache.flume.Event;
39 import org.apache.flume.event.SimpleEvent;
40 import org.apache.logging.log4j.LoggingException;
41 import org.apache.logging.log4j.core.appender.ManagerFactory;
42 import org.apache.logging.log4j.core.config.Property;
43 import org.apache.logging.log4j.core.config.plugins.PluginManager;
44 import org.apache.logging.log4j.core.config.plugins.PluginType;
45 import org.apache.logging.log4j.core.helpers.FileUtils;
46 import org.apache.logging.log4j.core.helpers.SecretKeyProvider;
47 import org.apache.logging.log4j.core.helpers.Strings;
48
49 import com.sleepycat.je.Cursor;
50 import com.sleepycat.je.CursorConfig;
51 import com.sleepycat.je.Database;
52 import com.sleepycat.je.DatabaseConfig;
53 import com.sleepycat.je.DatabaseEntry;
54 import com.sleepycat.je.Environment;
55 import com.sleepycat.je.EnvironmentConfig;
56 import com.sleepycat.je.LockConflictException;
57 import com.sleepycat.je.LockMode;
58 import com.sleepycat.je.OperationStatus;
59 import com.sleepycat.je.StatsConfig;
60 import com.sleepycat.je.Transaction;
61
62
63
64
65 public class FlumePersistentManager extends FlumeAvroManager {
66
67
68 public static final String KEY_PROVIDER = "keyProvider";
69
70 private static final Charset UTF8 = Charset.forName("UTF-8");
71
72 private static final String DEFAULT_DATA_DIR = ".log4j/flumeData";
73
74 private static final int SHUTDOWN_WAIT = 60;
75
76 private static final int MILLIS_PER_SECOND = 1000;
77
78 private static final int LOCK_TIMEOUT_SLEEP_MILLIS = 500;
79
80 private static BDBManagerFactory factory = new BDBManagerFactory();
81
82 private final Database database;
83
84 private final Environment environment;
85
86 private final WriterThread worker;
87
88 private final Gate gate = new Gate();
89
90 private final SecretKey secretKey;
91
92 private final int delay;
93
94 private final int lockTimeoutRetryCount;
95
96 private final ExecutorService threadPool;
97
98 private AtomicLong dbCount = new AtomicLong();
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115 protected FlumePersistentManager(final String name, final String shortName, final Agent[] agents,
116 final int batchSize, final int retries, final int connectionTimeout,
117 final int requestTimeout, final int delay, final Database database,
118 final Environment environment, final SecretKey secretKey,
119 final int lockTimeoutRetryCount) {
120 super(name, shortName, agents, batchSize, retries, connectionTimeout, requestTimeout);
121 this.delay = delay;
122 this.database = database;
123 this.environment = environment;
124 dbCount.set(database.count());
125 this.worker = new WriterThread(database, environment, this, gate, batchSize, secretKey, dbCount,
126 lockTimeoutRetryCount);
127 this.worker.start();
128 this.secretKey = secretKey;
129 this.threadPool = Executors.newCachedThreadPool(new DaemonThreadFactory());
130 this.lockTimeoutRetryCount = lockTimeoutRetryCount;
131 }
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148 public static FlumePersistentManager getManager(final String name, final Agent[] agents,
149 final Property[] properties, int batchSize, final int retries,
150 final int connectionTimeout, final int requestTimeout,
151 final int delay, final int lockTimeoutRetryCount,
152 final String dataDir) {
153 if (agents == null || agents.length == 0) {
154 throw new IllegalArgumentException("At least one agent is required");
155 }
156
157 if (batchSize <= 0) {
158 batchSize = 1;
159 }
160 final String dataDirectory = Strings.isEmpty(dataDir) ? DEFAULT_DATA_DIR : dataDir;
161
162 final StringBuilder sb = new StringBuilder("FlumePersistent[");
163 boolean first = true;
164 for (final Agent agent : agents) {
165 if (!first) {
166 sb.append(",");
167 }
168 sb.append(agent.getHost()).append(":").append(agent.getPort());
169 first = false;
170 }
171 sb.append("]");
172 sb.append(" ").append(dataDirectory);
173 return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries,
174 connectionTimeout, requestTimeout, delay, lockTimeoutRetryCount, dataDir, properties));
175 }
176
177 @Override
178 public void send(final Event event) {
179 if (worker.isShutdown()) {
180 throw new LoggingException("Unable to record event");
181 }
182
183 final Map<String, String> headers = event.getHeaders();
184 final byte[] keyData = headers.get(FlumeEvent.GUID).getBytes(UTF8);
185 try {
186 final ByteArrayOutputStream baos = new ByteArrayOutputStream();
187 final DataOutputStream daos = new DataOutputStream(baos);
188 daos.writeInt(event.getBody().length);
189 daos.write(event.getBody(), 0, event.getBody().length);
190 daos.writeInt(event.getHeaders().size());
191 for (final Map.Entry<String, String> entry : headers.entrySet()) {
192 daos.writeUTF(entry.getKey());
193 daos.writeUTF(entry.getValue());
194 }
195 byte[] eventData = baos.toByteArray();
196 if (secretKey != null) {
197 final Cipher cipher = Cipher.getInstance("AES");
198 cipher.init(Cipher.ENCRYPT_MODE, secretKey);
199 eventData = cipher.doFinal(eventData);
200 }
201 final Future<Integer> future = threadPool.submit(new BDBWriter(keyData, eventData, environment, database,
202 gate, dbCount, getBatchSize(), lockTimeoutRetryCount));
203 boolean interrupted = false;
204 int count = 0;
205 do {
206 try {
207 future.get();
208 } catch (final InterruptedException ie) {
209 interrupted = true;
210 ++count;
211 }
212 } while (interrupted && count <= 1);
213
214 } catch (final Exception ex) {
215 throw new LoggingException("Exception occurred writing log event", ex);
216 }
217 }
218
219 @Override
220 protected void releaseSub() {
221 LOGGER.debug("Shutting down FlumePersistentManager");
222 worker.shutdown();
223 try {
224 worker.join(SHUTDOWN_WAIT * MILLIS_PER_SECOND);
225 } catch (InterruptedException ie) {
226
227 }
228 threadPool.shutdown();
229 try {
230 threadPool.awaitTermination(SHUTDOWN_WAIT, TimeUnit.SECONDS);
231 } catch (final InterruptedException ie) {
232 LOGGER.warn("PersistentManager Thread pool failed to shut down");
233 }
234 try {
235 worker.join();
236 } catch (final InterruptedException ex) {
237 LOGGER.debug("Interrupted while waiting for worker to complete");
238 }
239 try {
240 LOGGER.debug("FlumePersistenceManager dataset status: {}", database.getStats(new StatsConfig()));
241 database.close();
242 } catch (final Exception ex) {
243 LOGGER.warn("Failed to close database", ex);
244 }
245 try {
246 environment.cleanLog();
247 environment.close();
248 } catch (final Exception ex) {
249 LOGGER.warn("Failed to close environment", ex);
250 }
251 super.releaseSub();
252 }
253
254 private void doSend(final SimpleEvent event) {
255 LOGGER.debug("Sending event to Flume");
256 super.send(event);
257 }
258
259
260
261
262 private static class BDBWriter implements Callable<Integer> {
263 private final byte[] eventData;
264 private final byte[] keyData;
265 private final Environment environment;
266 private final Database database;
267 private final Gate gate;
268 private final AtomicLong dbCount;
269 private final long batchSize;
270 private final int lockTimeoutRetryCount;
271
272 public BDBWriter(final byte[] keyData, final byte[] eventData, final Environment environment,
273 final Database database, final Gate gate, final AtomicLong dbCount, final long batchSize,
274 final int lockTimeoutRetryCount) {
275 this.keyData = keyData;
276 this.eventData = eventData;
277 this.environment = environment;
278 this.database = database;
279 this.gate = gate;
280 this.dbCount = dbCount;
281 this.batchSize = batchSize;
282 this.lockTimeoutRetryCount = lockTimeoutRetryCount;
283 }
284
285 @Override
286 public Integer call() throws Exception {
287 final DatabaseEntry key = new DatabaseEntry(keyData);
288 final DatabaseEntry data = new DatabaseEntry(eventData);
289 Exception exception = null;
290 for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
291 Transaction txn = null;
292 try {
293 txn = environment.beginTransaction(null, null);
294 try {
295 database.put(txn, key, data);
296 txn.commit();
297 txn = null;
298 if (dbCount.incrementAndGet() >= batchSize) {
299 gate.open();
300 }
301 exception = null;
302 break;
303 } catch (final LockConflictException lce) {
304 exception = lce;
305
306 } catch (final Exception ex) {
307 if (txn != null) {
308 txn.abort();
309 }
310 throw ex;
311 } finally {
312 if (txn != null) {
313 txn.abort();
314 txn = null;
315 }
316 }
317 } catch (LockConflictException lce) {
318 exception = lce;
319 if (txn != null) {
320 try {
321 txn.abort();
322 txn = null;
323 } catch (Exception ex) {
324 LOGGER.trace("Ignoring exception while aborting transaction during lock conflict.");
325 }
326 }
327
328 }
329 try {
330 Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
331 } catch (InterruptedException ie) {
332
333 }
334 }
335 if (exception != null) {
336 throw exception;
337 }
338 return eventData.length;
339 }
340 }
341
342
343
344
345 private static class FactoryData {
346 private final String name;
347 private final Agent[] agents;
348 private final int batchSize;
349 private final String dataDir;
350 private final int retries;
351 private final int connectionTimeout;
352 private final int requestTimeout;
353 private final int delay;
354 private final int lockTimeoutRetryCount;
355 private final Property[] properties;
356
357
358
359
360
361
362
363
364 public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries,
365 final int connectionTimeout, final int requestTimeout, final int delay,
366 final int lockTimeoutRetryCount, final String dataDir, final Property[] properties) {
367 this.name = name;
368 this.agents = agents;
369 this.batchSize = batchSize;
370 this.dataDir = dataDir;
371 this.retries = retries;
372 this.connectionTimeout = connectionTimeout;
373 this.requestTimeout = requestTimeout;
374 this.delay = delay;
375 this.lockTimeoutRetryCount = lockTimeoutRetryCount;
376 this.properties = properties;
377 }
378 }
379
380
381
382
383 private static class BDBManagerFactory implements ManagerFactory<FlumePersistentManager, FactoryData> {
384
385
386
387
388
389
390
391 @Override
392 public FlumePersistentManager createManager(final String name, final FactoryData data) {
393 SecretKey secretKey = null;
394 Database database = null;
395 Environment environment = null;
396
397 final Map<String, String> properties = new HashMap<String, String>();
398 if (data.properties != null) {
399 for (final Property property : data.properties) {
400 properties.put(property.getName(), property.getValue());
401 }
402 }
403
404 try {
405 final File dir = new File(data.dataDir);
406 FileUtils.mkdir(dir, true);
407 final EnvironmentConfig dbEnvConfig = new EnvironmentConfig();
408 dbEnvConfig.setTransactional(true);
409 dbEnvConfig.setAllowCreate(true);
410 dbEnvConfig.setLockTimeout(5, TimeUnit.SECONDS);
411 environment = new Environment(dir, dbEnvConfig);
412 final DatabaseConfig dbConfig = new DatabaseConfig();
413 dbConfig.setTransactional(true);
414 dbConfig.setAllowCreate(true);
415 database = environment.openDatabase(null, name, dbConfig);
416 } catch (final Exception ex) {
417 LOGGER.error("Could not create FlumePersistentManager", ex);
418
419
420
421 if (database != null) {
422 database.close();
423 database = null;
424 }
425 if (environment != null) {
426 environment.close();
427 environment = null;
428 }
429 return null;
430 }
431
432 try {
433 String key = null;
434 for (final Map.Entry<String, String> entry : properties.entrySet()) {
435 if (entry.getKey().equalsIgnoreCase(KEY_PROVIDER)) {
436 key = entry.getValue();
437 break;
438 }
439 }
440 if (key != null) {
441 final PluginManager manager = new PluginManager("KeyProvider", SecretKeyProvider.class);
442 manager.collectPlugins();
443 final Map<String, PluginType<?>> plugins = manager.getPlugins();
444 if (plugins != null) {
445 boolean found = false;
446 for (final Map.Entry<String, PluginType<?>> entry : plugins.entrySet()) {
447 if (entry.getKey().equalsIgnoreCase(key)) {
448 found = true;
449 final Class<?> cl = entry.getValue().getPluginClass();
450 try {
451 final SecretKeyProvider provider = (SecretKeyProvider) cl.newInstance();
452 secretKey = provider.getSecretKey();
453 LOGGER.debug("Persisting events using SecretKeyProvider {}", cl.getName());
454 } catch (final Exception ex) {
455 LOGGER.error("Unable to create SecretKeyProvider {}, encryption will be disabled",
456 cl.getName());
457 }
458 break;
459 }
460 }
461 if (!found) {
462 LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
463 }
464 } else {
465 LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
466 }
467 }
468 } catch (final Exception ex) {
469 LOGGER.warn("Error setting up encryption - encryption will be disabled", ex);
470 }
471 return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries,
472 data.connectionTimeout, data.requestTimeout, data.delay, database, environment, secretKey,
473 data.lockTimeoutRetryCount);
474 }
475 }
476
477
478
479
480 private static class WriterThread extends Thread {
481 private volatile boolean shutdown = false;
482 private final Database database;
483 private final Environment environment;
484 private final FlumePersistentManager manager;
485 private final Gate gate;
486 private final SecretKey secretKey;
487 private final int batchSize;
488 private final AtomicLong dbCounter;
489 private final int lockTimeoutRetryCount;
490
491 public WriterThread(final Database database, final Environment environment,
492 final FlumePersistentManager manager, final Gate gate, final int batchsize,
493 final SecretKey secretKey, final AtomicLong dbCount, final int lockTimeoutRetryCount) {
494 this.database = database;
495 this.environment = environment;
496 this.manager = manager;
497 this.gate = gate;
498 this.batchSize = batchsize;
499 this.secretKey = secretKey;
500 this.setDaemon(true);
501 this.dbCounter = dbCount;
502 this.lockTimeoutRetryCount = lockTimeoutRetryCount;
503 }
504
505 public void shutdown() {
506 LOGGER.debug("Writer thread shutting down");
507 this.shutdown = true;
508 gate.open();
509 }
510
511 public boolean isShutdown() {
512 return shutdown;
513 }
514
515 @Override
516 public void run() {
517 LOGGER.trace("WriterThread started - batch size = " + batchSize + ", delay = " + manager.delay);
518 long nextBatch = System.currentTimeMillis() + manager.delay;
519 while (!shutdown) {
520 long now = System.currentTimeMillis();
521 long dbCount = database.count();
522 dbCounter.set(dbCount);
523 if (dbCount >= batchSize || dbCount > 0 && nextBatch <= now) {
524 nextBatch = now + manager.delay;
525 try {
526 boolean errors = false;
527 DatabaseEntry key = new DatabaseEntry();
528 final DatabaseEntry data = new DatabaseEntry();
529
530 gate.close();
531 OperationStatus status;
532 if (batchSize > 1) {
533 try {
534 errors = sendBatch(key, data);
535 } catch (final Exception ex) {
536 break;
537 }
538 } else {
539 Exception exception = null;
540 for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
541 exception = null;
542 Transaction txn = null;
543 Cursor cursor = null;
544 try {
545 txn = environment.beginTransaction(null, null);
546 cursor = database.openCursor(txn, null);
547 try {
548 status = cursor.getFirst(key, data, LockMode.RMW);
549 while (status == OperationStatus.SUCCESS) {
550 final SimpleEvent event = createEvent(data);
551 if (event != null) {
552 try {
553 manager.doSend(event);
554 } catch (final Exception ioe) {
555 errors = true;
556 LOGGER.error("Error sending event", ioe);
557 break;
558 }
559 try {
560 cursor.delete();
561 } catch (final Exception ex) {
562 LOGGER.error("Unable to delete event", ex);
563 }
564 }
565 status = cursor.getNext(key, data, LockMode.RMW);
566 }
567 if (cursor != null) {
568 cursor.close();
569 cursor = null;
570 }
571 txn.commit();
572 txn = null;
573 dbCounter.decrementAndGet();
574 exception = null;
575 break;
576 } catch (final LockConflictException lce) {
577 exception = lce;
578
579 } catch (final Exception ex) {
580 LOGGER.error("Error reading or writing to database", ex);
581 shutdown = true;
582 break;
583 } finally {
584 if (cursor != null) {
585 cursor.close();
586 cursor = null;
587 }
588 if (txn != null) {
589 txn.abort();
590 txn = null;
591 }
592 }
593 } catch (LockConflictException lce) {
594 exception = lce;
595 if (cursor != null) {
596 try {
597 cursor.close();
598 cursor = null;
599 } catch (Exception ex) {
600 LOGGER.trace("Ignored exception closing cursor during lock conflict.");
601 }
602 }
603 if (txn != null) {
604 try {
605 txn.abort();
606 txn = null;
607 } catch (Exception ex) {
608 LOGGER.trace("Ignored exception aborting tx during lock conflict.");
609 }
610 }
611 }
612 try {
613 Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
614 } catch (InterruptedException ie) {
615
616 }
617 }
618 if (exception != null) {
619 LOGGER.error("Unable to read or update data base", exception);
620 }
621 }
622 if (errors) {
623 Thread.sleep(manager.delay);
624 continue;
625 }
626 } catch (final Exception ex) {
627 LOGGER.warn("WriterThread encountered an exception. Continuing.", ex);
628 }
629 } else {
630 if (nextBatch <= now) {
631 nextBatch = now + manager.delay;
632 }
633 try {
634 final long interval = nextBatch - now;
635 gate.waitForOpen(interval);
636 } catch (final InterruptedException ie) {
637 LOGGER.warn("WriterThread interrupted, continuing");
638 } catch (final Exception ex) {
639 LOGGER.error("WriterThread encountered an exception waiting for work", ex);
640 break;
641 }
642 }
643 }
644
645 if (batchSize > 1 && database.count() > 0) {
646 DatabaseEntry key = new DatabaseEntry();
647 final DatabaseEntry data = new DatabaseEntry();
648 try {
649 sendBatch(key, data);
650 } catch (final Exception ex) {
651 LOGGER.warn("Unable to write final batch");
652 }
653 }
654 LOGGER.trace("WriterThread exiting");
655 }
656
657 private boolean sendBatch(DatabaseEntry key, DatabaseEntry data) throws Exception {
658 boolean errors = false;
659 OperationStatus status;
660 Cursor cursor = database.openCursor(null, CursorConfig.DEFAULT);
661 try {
662 status = cursor.getFirst(key, data, null);
663
664 final BatchEvent batch = new BatchEvent();
665 for (int i = 0; status == OperationStatus.SUCCESS && i < batchSize; ++i) {
666 final SimpleEvent event = createEvent(data);
667 if (event != null) {
668 batch.addEvent(event);
669 }
670 status = cursor.getNext(key, data, null);
671 }
672 try {
673 manager.send(batch);
674 } catch (final Exception ioe) {
675 LOGGER.error("Error sending events", ioe);
676 errors = true;
677 }
678 if (!errors) {
679 cursor.close();
680 cursor = null;
681 Transaction txn = null;
682 Exception exception = null;
683 for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
684 try {
685 txn = environment.beginTransaction(null, null);
686 try {
687 for (final Event event : batch.getEvents()) {
688 try {
689 final Map<String, String> headers = event.getHeaders();
690 key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
691 database.delete(txn, key);
692 } catch (final Exception ex) {
693 LOGGER.error("Error deleting key from database", ex);
694 }
695 }
696 txn.commit();
697 long count = dbCounter.get();
698 while (!dbCounter.compareAndSet(count, count - batch.getEvents().size())) {
699 count = dbCounter.get();
700 }
701 exception = null;
702 break;
703 } catch (final LockConflictException lce) {
704 exception = lce;
705 if (cursor != null) {
706 try {
707 cursor.close();
708 cursor = null;
709 } catch (Exception ex) {
710 LOGGER.trace("Ignored exception closing cursor during lock conflict.");
711 }
712 }
713 if (txn != null) {
714 try {
715 txn.abort();
716 txn = null;
717 } catch (Exception ex) {
718 LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
719 }
720 }
721 } catch (final Exception ex) {
722 LOGGER.error("Unable to commit transaction", ex);
723 if (txn != null) {
724 txn.abort();
725 }
726 }
727 } catch (LockConflictException lce) {
728 exception = lce;
729 if (cursor != null) {
730 try {
731 cursor.close();
732 cursor = null;
733 } catch (Exception ex) {
734 LOGGER.trace("Ignored exception closing cursor during lock conflict.");
735 }
736 }
737 if (txn != null) {
738 try {
739 txn.abort();
740 txn = null;
741 } catch (Exception ex) {
742 LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
743 }
744 }
745 } finally {
746 if (cursor != null) {
747 cursor.close();
748 cursor = null;
749 }
750 if (txn != null) {
751 txn.abort();
752 txn = null;
753 }
754 }
755 try {
756 Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
757 } catch (InterruptedException ie) {
758
759 }
760 }
761 if (exception != null) {
762 LOGGER.error("Unable to delete events from data base", exception);
763 }
764 }
765 } catch (final Exception ex) {
766 LOGGER.error("Error reading database", ex);
767 shutdown = true;
768 throw ex;
769 } finally {
770 if (cursor != null) {
771 cursor.close();
772 }
773 }
774
775 return errors;
776 }
777
778 private SimpleEvent createEvent(final DatabaseEntry data) {
779 final SimpleEvent event = new SimpleEvent();
780 try {
781 byte[] eventData = data.getData();
782 if (secretKey != null) {
783 final Cipher cipher = Cipher.getInstance("AES");
784 cipher.init(Cipher.DECRYPT_MODE, secretKey);
785 eventData = cipher.doFinal(eventData);
786 }
787 final ByteArrayInputStream bais = new ByteArrayInputStream(eventData);
788 final DataInputStream dais = new DataInputStream(bais);
789 int length = dais.readInt();
790 final byte[] bytes = new byte[length];
791 dais.read(bytes, 0, length);
792 event.setBody(bytes);
793 length = dais.readInt();
794 final Map<String, String> map = new HashMap<String, String>(length);
795 for (int i = 0; i < length; ++i) {
796 final String headerKey = dais.readUTF();
797 final String value = dais.readUTF();
798 map.put(headerKey, value);
799 }
800 event.setHeaders(map);
801 return event;
802 } catch (final Exception ex) {
803 LOGGER.error("Error retrieving event", ex);
804 return null;
805 }
806 }
807
808 }
809
810
811
812
813 private static class DaemonThreadFactory implements ThreadFactory {
814 private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
815 private final ThreadGroup group;
816 private final AtomicInteger threadNumber = new AtomicInteger(1);
817 private final String namePrefix;
818
819 public DaemonThreadFactory() {
820 final SecurityManager securityManager = System.getSecurityManager();
821 group = securityManager != null ? securityManager.getThreadGroup() :
822 Thread.currentThread().getThreadGroup();
823 namePrefix = "DaemonPool-" + POOL_NUMBER.getAndIncrement() + "-thread-";
824 }
825
826 @Override
827 public Thread newThread(final Runnable r) {
828 final Thread thread = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
829 thread.setDaemon(true);
830 if (thread.getPriority() != Thread.NORM_PRIORITY) {
831 thread.setPriority(Thread.NORM_PRIORITY);
832 }
833 return thread;
834 }
835 }
836
837
838
839
840 private static class Gate {
841
842 private boolean isOpen = false;
843
844 public boolean isOpen() {
845 return isOpen;
846 }
847
848 public synchronized void open() {
849 isOpen = true;
850 notifyAll();
851 }
852
853 public synchronized void close() {
854 isOpen = false;
855 }
856
857 public synchronized void waitForOpen(long timeout) throws InterruptedException {
858 wait(timeout);
859 }
860 }
861 }