1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.flume.appender;
18
19 import java.io.ByteArrayInputStream;
20 import java.io.ByteArrayOutputStream;
21 import java.io.DataInputStream;
22 import java.io.DataOutputStream;
23 import java.io.File;
24 import java.nio.charset.Charset;
25 import java.util.HashMap;
26 import java.util.Map;
27 import java.util.concurrent.Callable;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.Future;
31 import java.util.concurrent.ThreadFactory;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.concurrent.atomic.AtomicLong;
35
36 import javax.crypto.Cipher;
37 import javax.crypto.SecretKey;
38
39 import org.apache.flume.Event;
40 import org.apache.flume.event.SimpleEvent;
41 import org.apache.logging.log4j.LoggingException;
42 import org.apache.logging.log4j.core.appender.ManagerFactory;
43 import org.apache.logging.log4j.core.config.Property;
44 import org.apache.logging.log4j.core.config.plugins.util.PluginManager;
45 import org.apache.logging.log4j.core.config.plugins.util.PluginType;
46 import org.apache.logging.log4j.core.util.FileUtils;
47 import org.apache.logging.log4j.core.util.SecretKeyProvider;
48 import org.apache.logging.log4j.util.Strings;
49
50 import com.sleepycat.je.Cursor;
51 import com.sleepycat.je.CursorConfig;
52 import com.sleepycat.je.Database;
53 import com.sleepycat.je.DatabaseConfig;
54 import com.sleepycat.je.DatabaseEntry;
55 import com.sleepycat.je.Environment;
56 import com.sleepycat.je.EnvironmentConfig;
57 import com.sleepycat.je.LockConflictException;
58 import com.sleepycat.je.LockMode;
59 import com.sleepycat.je.OperationStatus;
60 import com.sleepycat.je.StatsConfig;
61 import com.sleepycat.je.Transaction;
62
63
64
65
66 public class FlumePersistentManager extends FlumeAvroManager {
67
68
69 public static final String KEY_PROVIDER = "keyProvider";
70
71 private static final Charset UTF8 = Charset.forName("UTF-8");
72
73 private static final String DEFAULT_DATA_DIR = ".log4j/flumeData";
74
75 private static final int SHUTDOWN_WAIT = 60;
76
77 private static final int MILLIS_PER_SECOND = 1000;
78
79 private static final int LOCK_TIMEOUT_SLEEP_MILLIS = 500;
80
81 private static BDBManagerFactory factory = new BDBManagerFactory();
82
83 private final Database database;
84
85 private final Environment environment;
86
87 private final WriterThread worker;
88
89 private final Gate gate = new Gate();
90
91 private final SecretKey secretKey;
92
93 private final int delay;
94
95 private final int lockTimeoutRetryCount;
96
97 private final ExecutorService threadPool;
98
99 private final AtomicLong dbCount = new AtomicLong();
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116 protected FlumePersistentManager(final String name, final String shortName, final Agent[] agents,
117 final int batchSize, final int retries, final int connectionTimeout,
118 final int requestTimeout, final int delay, final Database database,
119 final Environment environment, final SecretKey secretKey,
120 final int lockTimeoutRetryCount) {
121 super(name, shortName, agents, batchSize, retries, connectionTimeout, requestTimeout);
122 this.delay = delay;
123 this.database = database;
124 this.environment = environment;
125 dbCount.set(database.count());
126 this.worker = new WriterThread(database, environment, this, gate, batchSize, secretKey, dbCount,
127 lockTimeoutRetryCount);
128 this.worker.start();
129 this.secretKey = secretKey;
130 this.threadPool = Executors.newCachedThreadPool(new DaemonThreadFactory());
131 this.lockTimeoutRetryCount = lockTimeoutRetryCount;
132 }
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149 public static FlumePersistentManager getManager(final String name, final Agent[] agents,
150 final Property[] properties, int batchSize, final int retries,
151 final int connectionTimeout, final int requestTimeout,
152 final int delay, final int lockTimeoutRetryCount,
153 final String dataDir) {
154 if (agents == null || agents.length == 0) {
155 throw new IllegalArgumentException("At least one agent is required");
156 }
157
158 if (batchSize <= 0) {
159 batchSize = 1;
160 }
161 final String dataDirectory = Strings.isEmpty(dataDir) ? DEFAULT_DATA_DIR : dataDir;
162
163 final StringBuilder sb = new StringBuilder("FlumePersistent[");
164 boolean first = true;
165 for (final Agent agent : agents) {
166 if (!first) {
167 sb.append(',');
168 }
169 sb.append(agent.getHost()).append(':').append(agent.getPort());
170 first = false;
171 }
172 sb.append(']');
173 sb.append(' ').append(dataDirectory);
174 return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries,
175 connectionTimeout, requestTimeout, delay, lockTimeoutRetryCount, dataDir, properties));
176 }
177
178 @Override
179 public void send(final Event event) {
180 if (worker.isShutdown()) {
181 throw new LoggingException("Unable to record event");
182 }
183
184 final Map<String, String> headers = event.getHeaders();
185 final byte[] keyData = headers.get(FlumeEvent.GUID).getBytes(UTF8);
186 try {
187 final ByteArrayOutputStream baos = new ByteArrayOutputStream();
188 final DataOutputStream daos = new DataOutputStream(baos);
189 daos.writeInt(event.getBody().length);
190 daos.write(event.getBody(), 0, event.getBody().length);
191 daos.writeInt(event.getHeaders().size());
192 for (final Map.Entry<String, String> entry : headers.entrySet()) {
193 daos.writeUTF(entry.getKey());
194 daos.writeUTF(entry.getValue());
195 }
196 byte[] eventData = baos.toByteArray();
197 if (secretKey != null) {
198 final Cipher cipher = Cipher.getInstance("AES");
199 cipher.init(Cipher.ENCRYPT_MODE, secretKey);
200 eventData = cipher.doFinal(eventData);
201 }
202 final Future<Integer> future = threadPool.submit(new BDBWriter(keyData, eventData, environment, database,
203 gate, dbCount, getBatchSize(), lockTimeoutRetryCount));
204 boolean interrupted = false;
205 int count = 0;
206 do {
207 try {
208 future.get();
209 } catch (final InterruptedException ie) {
210 interrupted = true;
211 ++count;
212 }
213 } while (interrupted && count <= 1);
214
215 } catch (final Exception ex) {
216 throw new LoggingException("Exception occurred writing log event", ex);
217 }
218 }
219
220 @Override
221 protected void releaseSub() {
222 LOGGER.debug("Shutting down FlumePersistentManager");
223 worker.shutdown();
224 try {
225 worker.join(SHUTDOWN_WAIT * MILLIS_PER_SECOND);
226 } catch (final InterruptedException ie) {
227
228 }
229 threadPool.shutdown();
230 try {
231 threadPool.awaitTermination(SHUTDOWN_WAIT, TimeUnit.SECONDS);
232 } catch (final InterruptedException ie) {
233 LOGGER.warn("PersistentManager Thread pool failed to shut down");
234 }
235 try {
236 worker.join();
237 } catch (final InterruptedException ex) {
238 LOGGER.debug("Interrupted while waiting for worker to complete");
239 }
240 try {
241 LOGGER.debug("FlumePersistenceManager dataset status: {}", database.getStats(new StatsConfig()));
242 database.close();
243 } catch (final Exception ex) {
244 LOGGER.warn("Failed to close database", ex);
245 }
246 try {
247 environment.cleanLog();
248 environment.close();
249 } catch (final Exception ex) {
250 LOGGER.warn("Failed to close environment", ex);
251 }
252 super.releaseSub();
253 }
254
255 private void doSend(final SimpleEvent event) {
256 LOGGER.debug("Sending event to Flume");
257 super.send(event);
258 }
259
260
261
262
263 private static class BDBWriter implements Callable<Integer> {
264 private final byte[] eventData;
265 private final byte[] keyData;
266 private final Environment environment;
267 private final Database database;
268 private final Gate gate;
269 private final AtomicLong dbCount;
270 private final long batchSize;
271 private final int lockTimeoutRetryCount;
272
273 public BDBWriter(final byte[] keyData, final byte[] eventData, final Environment environment,
274 final Database database, final Gate gate, final AtomicLong dbCount, final long batchSize,
275 final int lockTimeoutRetryCount) {
276 this.keyData = keyData;
277 this.eventData = eventData;
278 this.environment = environment;
279 this.database = database;
280 this.gate = gate;
281 this.dbCount = dbCount;
282 this.batchSize = batchSize;
283 this.lockTimeoutRetryCount = lockTimeoutRetryCount;
284 }
285
286 @Override
287 public Integer call() throws Exception {
288 final DatabaseEntry key = new DatabaseEntry(keyData);
289 final DatabaseEntry data = new DatabaseEntry(eventData);
290 Exception exception = null;
291 for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
292 Transaction txn = null;
293 try {
294 txn = environment.beginTransaction(null, null);
295 try {
296 database.put(txn, key, data);
297 txn.commit();
298 txn = null;
299 if (dbCount.incrementAndGet() >= batchSize) {
300 gate.open();
301 }
302 exception = null;
303 break;
304 } catch (final LockConflictException lce) {
305 exception = lce;
306
307 } catch (final Exception ex) {
308 if (txn != null) {
309 txn.abort();
310 }
311 throw ex;
312 } finally {
313 if (txn != null) {
314 txn.abort();
315 txn = null;
316 }
317 }
318 } catch (final LockConflictException lce) {
319 exception = lce;
320 if (txn != null) {
321 try {
322 txn.abort();
323 txn = null;
324 } catch (final Exception ex) {
325 LOGGER.trace("Ignoring exception while aborting transaction during lock conflict.");
326 }
327 }
328
329 }
330 try {
331 Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
332 } catch (final InterruptedException ie) {
333
334 }
335 }
336 if (exception != null) {
337 throw exception;
338 }
339 return eventData.length;
340 }
341 }
342
343
344
345
346 private static class FactoryData {
347 private final String name;
348 private final Agent[] agents;
349 private final int batchSize;
350 private final String dataDir;
351 private final int retries;
352 private final int connectionTimeout;
353 private final int requestTimeout;
354 private final int delay;
355 private final int lockTimeoutRetryCount;
356 private final Property[] properties;
357
358
359
360
361
362
363
364
365 public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries,
366 final int connectionTimeout, final int requestTimeout, final int delay,
367 final int lockTimeoutRetryCount, final String dataDir, final Property[] properties) {
368 this.name = name;
369 this.agents = agents;
370 this.batchSize = batchSize;
371 this.dataDir = dataDir;
372 this.retries = retries;
373 this.connectionTimeout = connectionTimeout;
374 this.requestTimeout = requestTimeout;
375 this.delay = delay;
376 this.lockTimeoutRetryCount = lockTimeoutRetryCount;
377 this.properties = properties;
378 }
379 }
380
381
382
383
384 private static class BDBManagerFactory implements ManagerFactory<FlumePersistentManager, FactoryData> {
385
386
387
388
389
390
391
392 @Override
393 public FlumePersistentManager createManager(final String name, final FactoryData data) {
394 SecretKey secretKey = null;
395 Database database = null;
396 Environment environment = null;
397
398 final Map<String, String> properties = new HashMap<String, String>();
399 if (data.properties != null) {
400 for (final Property property : data.properties) {
401 properties.put(property.getName(), property.getValue());
402 }
403 }
404
405 try {
406 final File dir = new File(data.dataDir);
407 FileUtils.mkdir(dir, true);
408 final EnvironmentConfig dbEnvConfig = new EnvironmentConfig();
409 dbEnvConfig.setTransactional(true);
410 dbEnvConfig.setAllowCreate(true);
411 dbEnvConfig.setLockTimeout(5, TimeUnit.SECONDS);
412 environment = new Environment(dir, dbEnvConfig);
413 final DatabaseConfig dbConfig = new DatabaseConfig();
414 dbConfig.setTransactional(true);
415 dbConfig.setAllowCreate(true);
416 database = environment.openDatabase(null, name, dbConfig);
417 } catch (final Exception ex) {
418 LOGGER.error("Could not create FlumePersistentManager", ex);
419
420
421
422 if (database != null) {
423 database.close();
424 database = null;
425 }
426 if (environment != null) {
427 environment.close();
428 environment = null;
429 }
430 return null;
431 }
432
433 try {
434 String key = null;
435 for (final Map.Entry<String, String> entry : properties.entrySet()) {
436 if (entry.getKey().equalsIgnoreCase(KEY_PROVIDER)) {
437 key = entry.getValue();
438 break;
439 }
440 }
441 if (key != null) {
442 final PluginManager manager = new PluginManager("KeyProvider");
443 manager.collectPlugins();
444 final Map<String, PluginType<?>> plugins = manager.getPlugins();
445 if (plugins != null) {
446 boolean found = false;
447 for (final Map.Entry<String, PluginType<?>> entry : plugins.entrySet()) {
448 if (entry.getKey().equalsIgnoreCase(key)) {
449 found = true;
450 final Class<?> cl = entry.getValue().getPluginClass();
451 try {
452 final SecretKeyProvider provider = (SecretKeyProvider) cl.newInstance();
453 secretKey = provider.getSecretKey();
454 LOGGER.debug("Persisting events using SecretKeyProvider {}", cl.getName());
455 } catch (final Exception ex) {
456 LOGGER.error("Unable to create SecretKeyProvider {}, encryption will be disabled",
457 cl.getName());
458 }
459 break;
460 }
461 }
462 if (!found) {
463 LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
464 }
465 } else {
466 LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
467 }
468 }
469 } catch (final Exception ex) {
470 LOGGER.warn("Error setting up encryption - encryption will be disabled", ex);
471 }
472 return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries,
473 data.connectionTimeout, data.requestTimeout, data.delay, database, environment, secretKey,
474 data.lockTimeoutRetryCount);
475 }
476 }
477
478
479
480
481 private static class WriterThread extends Thread {
482 private volatile boolean shutdown = false;
483 private final Database database;
484 private final Environment environment;
485 private final FlumePersistentManager manager;
486 private final Gate gate;
487 private final SecretKey secretKey;
488 private final int batchSize;
489 private final AtomicLong dbCounter;
490 private final int lockTimeoutRetryCount;
491
492 public WriterThread(final Database database, final Environment environment,
493 final FlumePersistentManager manager, final Gate gate, final int batchsize,
494 final SecretKey secretKey, final AtomicLong dbCount, final int lockTimeoutRetryCount) {
495 this.database = database;
496 this.environment = environment;
497 this.manager = manager;
498 this.gate = gate;
499 this.batchSize = batchsize;
500 this.secretKey = secretKey;
501 this.setDaemon(true);
502 this.dbCounter = dbCount;
503 this.lockTimeoutRetryCount = lockTimeoutRetryCount;
504 }
505
506 public void shutdown() {
507 LOGGER.debug("Writer thread shutting down");
508 this.shutdown = true;
509 gate.open();
510 }
511
512 public boolean isShutdown() {
513 return shutdown;
514 }
515
516 @Override
517 public void run() {
518 LOGGER.trace("WriterThread started - batch size = " + batchSize + ", delay = " + manager.delay);
519 long nextBatch = System.currentTimeMillis() + manager.delay;
520 while (!shutdown) {
521 final long now = System.currentTimeMillis();
522 final long dbCount = database.count();
523 dbCounter.set(dbCount);
524 if (dbCount >= batchSize || dbCount > 0 && nextBatch <= now) {
525 nextBatch = now + manager.delay;
526 try {
527 boolean errors = false;
528 final DatabaseEntry key = new DatabaseEntry();
529 final DatabaseEntry data = new DatabaseEntry();
530
531 gate.close();
532 OperationStatus status;
533 if (batchSize > 1) {
534 try {
535 errors = sendBatch(key, data);
536 } catch (final Exception ex) {
537 break;
538 }
539 } else {
540 Exception exception = null;
541 for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
542 exception = null;
543 Transaction txn = null;
544 Cursor cursor = null;
545 try {
546 txn = environment.beginTransaction(null, null);
547 cursor = database.openCursor(txn, null);
548 try {
549 status = cursor.getFirst(key, data, LockMode.RMW);
550 while (status == OperationStatus.SUCCESS) {
551 final SimpleEvent event = createEvent(data);
552 if (event != null) {
553 try {
554 manager.doSend(event);
555 } catch (final Exception ioe) {
556 errors = true;
557 LOGGER.error("Error sending event", ioe);
558 break;
559 }
560 try {
561 cursor.delete();
562 } catch (final Exception ex) {
563 LOGGER.error("Unable to delete event", ex);
564 }
565 }
566 status = cursor.getNext(key, data, LockMode.RMW);
567 }
568 if (cursor != null) {
569 cursor.close();
570 cursor = null;
571 }
572 txn.commit();
573 txn = null;
574 dbCounter.decrementAndGet();
575 exception = null;
576 break;
577 } catch (final LockConflictException lce) {
578 exception = lce;
579
580 } catch (final Exception ex) {
581 LOGGER.error("Error reading or writing to database", ex);
582 shutdown = true;
583 break;
584 } finally {
585 if (cursor != null) {
586 cursor.close();
587 cursor = null;
588 }
589 if (txn != null) {
590 txn.abort();
591 txn = null;
592 }
593 }
594 } catch (final LockConflictException lce) {
595 exception = lce;
596 if (cursor != null) {
597 try {
598 cursor.close();
599 cursor = null;
600 } catch (final Exception ex) {
601 LOGGER.trace("Ignored exception closing cursor during lock conflict.");
602 }
603 }
604 if (txn != null) {
605 try {
606 txn.abort();
607 txn = null;
608 } catch (final Exception ex) {
609 LOGGER.trace("Ignored exception aborting tx during lock conflict.");
610 }
611 }
612 }
613 try {
614 Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
615 } catch (final InterruptedException ie) {
616
617 }
618 }
619 if (exception != null) {
620 LOGGER.error("Unable to read or update data base", exception);
621 }
622 }
623 if (errors) {
624 Thread.sleep(manager.delay);
625 continue;
626 }
627 } catch (final Exception ex) {
628 LOGGER.warn("WriterThread encountered an exception. Continuing.", ex);
629 }
630 } else {
631 if (nextBatch <= now) {
632 nextBatch = now + manager.delay;
633 }
634 try {
635 final long interval = nextBatch - now;
636 gate.waitForOpen(interval);
637 } catch (final InterruptedException ie) {
638 LOGGER.warn("WriterThread interrupted, continuing");
639 } catch (final Exception ex) {
640 LOGGER.error("WriterThread encountered an exception waiting for work", ex);
641 break;
642 }
643 }
644 }
645
646 if (batchSize > 1 && database.count() > 0) {
647 final DatabaseEntry key = new DatabaseEntry();
648 final DatabaseEntry data = new DatabaseEntry();
649 try {
650 sendBatch(key, data);
651 } catch (final Exception ex) {
652 LOGGER.warn("Unable to write final batch");
653 }
654 }
655 LOGGER.trace("WriterThread exiting");
656 }
657
658 private boolean sendBatch(DatabaseEntry key, final DatabaseEntry data) throws Exception {
659 boolean errors = false;
660 OperationStatus status;
661 Cursor cursor = database.openCursor(null, CursorConfig.DEFAULT);
662 try {
663 status = cursor.getFirst(key, data, null);
664
665 final BatchEvent batch = new BatchEvent();
666 for (int i = 0; status == OperationStatus.SUCCESS && i < batchSize; ++i) {
667 final SimpleEvent event = createEvent(data);
668 if (event != null) {
669 batch.addEvent(event);
670 }
671 status = cursor.getNext(key, data, null);
672 }
673 try {
674 manager.send(batch);
675 } catch (final Exception ioe) {
676 LOGGER.error("Error sending events", ioe);
677 errors = true;
678 }
679 if (!errors) {
680 cursor.close();
681 cursor = null;
682 Transaction txn = null;
683 Exception exception = null;
684 for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
685 try {
686 txn = environment.beginTransaction(null, null);
687 try {
688 for (final Event event : batch.getEvents()) {
689 try {
690 final Map<String, String> headers = event.getHeaders();
691 key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
692 database.delete(txn, key);
693 } catch (final Exception ex) {
694 LOGGER.error("Error deleting key from database", ex);
695 }
696 }
697 txn.commit();
698 long count = dbCounter.get();
699 while (!dbCounter.compareAndSet(count, count - batch.getEvents().size())) {
700 count = dbCounter.get();
701 }
702 exception = null;
703 break;
704 } catch (final LockConflictException lce) {
705 exception = lce;
706 if (cursor != null) {
707 try {
708 cursor.close();
709 cursor = null;
710 } catch (final Exception ex) {
711 LOGGER.trace("Ignored exception closing cursor during lock conflict.");
712 }
713 }
714 if (txn != null) {
715 try {
716 txn.abort();
717 txn = null;
718 } catch (final Exception ex) {
719 LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
720 }
721 }
722 } catch (final Exception ex) {
723 LOGGER.error("Unable to commit transaction", ex);
724 if (txn != null) {
725 txn.abort();
726 }
727 }
728 } catch (final LockConflictException lce) {
729 exception = lce;
730 if (cursor != null) {
731 try {
732 cursor.close();
733 cursor = null;
734 } catch (final Exception ex) {
735 LOGGER.trace("Ignored exception closing cursor during lock conflict.");
736 }
737 }
738 if (txn != null) {
739 try {
740 txn.abort();
741 txn = null;
742 } catch (final Exception ex) {
743 LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
744 }
745 }
746 } finally {
747 if (cursor != null) {
748 cursor.close();
749 cursor = null;
750 }
751 if (txn != null) {
752 txn.abort();
753 txn = null;
754 }
755 }
756 try {
757 Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
758 } catch (final InterruptedException ie) {
759
760 }
761 }
762 if (exception != null) {
763 LOGGER.error("Unable to delete events from data base", exception);
764 }
765 }
766 } catch (final Exception ex) {
767 LOGGER.error("Error reading database", ex);
768 shutdown = true;
769 throw ex;
770 } finally {
771 if (cursor != null) {
772 cursor.close();
773 }
774 }
775
776 return errors;
777 }
778
779 private SimpleEvent createEvent(final DatabaseEntry data) {
780 final SimpleEvent event = new SimpleEvent();
781 try {
782 byte[] eventData = data.getData();
783 if (secretKey != null) {
784 final Cipher cipher = Cipher.getInstance("AES");
785 cipher.init(Cipher.DECRYPT_MODE, secretKey);
786 eventData = cipher.doFinal(eventData);
787 }
788 final ByteArrayInputStream bais = new ByteArrayInputStream(eventData);
789 final DataInputStream dais = new DataInputStream(bais);
790 int length = dais.readInt();
791 final byte[] bytes = new byte[length];
792 dais.read(bytes, 0, length);
793 event.setBody(bytes);
794 length = dais.readInt();
795 final Map<String, String> map = new HashMap<String, String>(length);
796 for (int i = 0; i < length; ++i) {
797 final String headerKey = dais.readUTF();
798 final String value = dais.readUTF();
799 map.put(headerKey, value);
800 }
801 event.setHeaders(map);
802 return event;
803 } catch (final Exception ex) {
804 LOGGER.error("Error retrieving event", ex);
805 return null;
806 }
807 }
808
809 }
810
811
812
813
814 private static class DaemonThreadFactory implements ThreadFactory {
815 private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
816 private final ThreadGroup group;
817 private final AtomicInteger threadNumber = new AtomicInteger(1);
818 private final String namePrefix;
819
820 public DaemonThreadFactory() {
821 final SecurityManager securityManager = System.getSecurityManager();
822 group = securityManager != null ? securityManager.getThreadGroup() :
823 Thread.currentThread().getThreadGroup();
824 namePrefix = "DaemonPool-" + POOL_NUMBER.getAndIncrement() + "-thread-";
825 }
826
827 @Override
828 public Thread newThread(final Runnable r) {
829 final Thread thread = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
830 thread.setDaemon(true);
831 if (thread.getPriority() != Thread.NORM_PRIORITY) {
832 thread.setPriority(Thread.NORM_PRIORITY);
833 }
834 return thread;
835 }
836 }
837
838
839
840
841 private static class Gate {
842
843 private boolean isOpen = false;
844
845 public boolean isOpen() {
846 return isOpen;
847 }
848
849 public synchronized void open() {
850 isOpen = true;
851 notifyAll();
852 }
853
854 public synchronized void close() {
855 isOpen = false;
856 }
857
858 public synchronized void waitForOpen(final long timeout) throws InterruptedException {
859 wait(timeout);
860 }
861 }
862 }