1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.core.async;
18
19 import java.util.Map;
20 import java.util.Objects;
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.Executors;
23
24 import org.apache.logging.log4j.Level;
25 import org.apache.logging.log4j.Marker;
26 import org.apache.logging.log4j.ThreadContext;
27 import org.apache.logging.log4j.core.Logger;
28 import org.apache.logging.log4j.core.LoggerContext;
29 import org.apache.logging.log4j.core.config.Property;
30 import org.apache.logging.log4j.core.config.ReliabilityStrategy;
31 import org.apache.logging.log4j.core.impl.Log4jLogEvent;
32 import org.apache.logging.log4j.core.jmx.RingBufferAdmin;
33 import org.apache.logging.log4j.core.util.Clock;
34 import org.apache.logging.log4j.core.util.ClockFactory;
35 import org.apache.logging.log4j.core.util.DummyNanoClock;
36 import org.apache.logging.log4j.core.util.Integers;
37 import org.apache.logging.log4j.core.util.Loader;
38 import org.apache.logging.log4j.core.util.NanoClock;
39 import org.apache.logging.log4j.message.Message;
40 import org.apache.logging.log4j.message.MessageFactory;
41 import org.apache.logging.log4j.message.TimestampMessage;
42 import org.apache.logging.log4j.status.StatusLogger;
43 import org.apache.logging.log4j.util.PropertiesUtil;
44
45 import com.lmax.disruptor.BlockingWaitStrategy;
46 import com.lmax.disruptor.ExceptionHandler;
47 import com.lmax.disruptor.RingBuffer;
48 import com.lmax.disruptor.SleepingWaitStrategy;
49 import com.lmax.disruptor.WaitStrategy;
50 import com.lmax.disruptor.YieldingWaitStrategy;
51 import com.lmax.disruptor.dsl.Disruptor;
52 import com.lmax.disruptor.dsl.ProducerType;
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73 public class AsyncLogger extends Logger {
74 private static final long serialVersionUID = 1L;
75 private static final int SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS = 50;
76 private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 200;
77 private static final int RINGBUFFER_MIN_SIZE = 128;
78 private static final int RINGBUFFER_DEFAULT_SIZE = 256 * 1024;
79 private static final StatusLogger LOGGER = StatusLogger.getLogger();
80 private static final ThreadNameStrategy THREAD_NAME_STRATEGY = ThreadNameStrategy.create();
81
82
83
84
85 static enum ThreadNameStrategy {
86 CACHED {
87 @Override
88 public String getThreadName(final Info info) {
89 return info.cachedThreadName;
90 }
91 },
92 UNCACHED {
93 @Override
94 public String getThreadName(final Info info) {
95 return Thread.currentThread().getName();
96 }
97 };
98 abstract String getThreadName(Info info);
99
100 static ThreadNameStrategy create() {
101 final String name = PropertiesUtil.getProperties().getStringProperty("AsyncLogger.ThreadNameStrategy",
102 CACHED.name());
103 try {
104 return ThreadNameStrategy.valueOf(name);
105 } catch (final Exception ex) {
106 LOGGER.debug("Using AsyncLogger.ThreadNameStrategy.CACHED: '{}' not valid: {}", name, ex.toString());
107 return CACHED;
108 }
109 }
110 }
111
112 private static volatile Disruptor<RingBufferLogEvent> disruptor;
113 private static final Clock CLOCK = ClockFactory.getClock();
114 private static volatile NanoClock nanoClock = new DummyNanoClock();
115
116 private static final ExecutorService EXECUTOR = Executors.newSingleThreadExecutor(new DaemonThreadFactory(
117 "AsyncLogger-"));
118
119 static {
120 initInfoForExecutorThread();
121 LOGGER.debug("AsyncLogger.ThreadNameStrategy={}", THREAD_NAME_STRATEGY);
122 final int ringBufferSize = calculateRingBufferSize();
123
124 final WaitStrategy waitStrategy = createWaitStrategy();
125 disruptor = new Disruptor<>(RingBufferLogEvent.FACTORY, ringBufferSize, EXECUTOR, ProducerType.MULTI,
126 waitStrategy);
127 disruptor.handleExceptionsWith(getExceptionHandler());
128 disruptor.handleEventsWith(new RingBufferLogEventHandler());
129
130 LOGGER.debug("Starting AsyncLogger disruptor with ringbuffer size {}...", disruptor.getRingBuffer()
131 .getBufferSize());
132 disruptor.start();
133 }
134
135
136
137
138
139
140
141
142 public AsyncLogger(final LoggerContext context, final String name, final MessageFactory messageFactory) {
143 super(context, name, messageFactory);
144 }
145
146 private static int calculateRingBufferSize() {
147 int ringBufferSize = RINGBUFFER_DEFAULT_SIZE;
148 final String userPreferredRBSize = PropertiesUtil.getProperties().getStringProperty(
149 "AsyncLogger.RingBufferSize", String.valueOf(ringBufferSize));
150 try {
151 int size = Integer.parseInt(userPreferredRBSize);
152 if (size < RINGBUFFER_MIN_SIZE) {
153 size = RINGBUFFER_MIN_SIZE;
154 LOGGER.warn("Invalid RingBufferSize {}, using minimum size {}.", userPreferredRBSize,
155 RINGBUFFER_MIN_SIZE);
156 }
157 ringBufferSize = size;
158 } catch (final Exception ex) {
159 LOGGER.warn("Invalid RingBufferSize {}, using default size {}.", userPreferredRBSize, ringBufferSize);
160 }
161 return Integers.ceilingNextPowerOfTwo(ringBufferSize);
162 }
163
164
165
166
167
168
169
170 private static void initInfoForExecutorThread() {
171 EXECUTOR.submit(new Runnable() {
172 @Override
173 public void run() {
174 final boolean isAppenderThread = true;
175 final Info info = new Info(new RingBufferLogEventTranslator(),
176 Thread.currentThread().getName(), isAppenderThread);
177 Info.THREADLOCAL.set(info);
178 }
179 });
180 }
181
182 private static WaitStrategy createWaitStrategy() {
183 final String strategy = PropertiesUtil.getProperties().getStringProperty("AsyncLogger.WaitStrategy");
184 LOGGER.debug("property AsyncLogger.WaitStrategy={}", strategy);
185 if ("Sleep".equals(strategy)) {
186 return new SleepingWaitStrategy();
187 } else if ("Yield".equals(strategy)) {
188 return new YieldingWaitStrategy();
189 } else if ("Block".equals(strategy)) {
190 return new BlockingWaitStrategy();
191 }
192 LOGGER.debug("disruptor event handler uses BlockingWaitStrategy");
193 return new BlockingWaitStrategy();
194 }
195
196 private static ExceptionHandler<RingBufferLogEvent> getExceptionHandler() {
197 final String cls = PropertiesUtil.getProperties().getStringProperty("AsyncLogger.ExceptionHandler");
198 if (cls == null) {
199 LOGGER.debug("No AsyncLogger.ExceptionHandler specified");
200 return null;
201 }
202 try {
203 @SuppressWarnings("unchecked")
204 final ExceptionHandler<RingBufferLogEvent> result = Loader
205 .newCheckedInstanceOf(cls, ExceptionHandler.class);
206 LOGGER.debug("AsyncLogger.ExceptionHandler={}", result);
207 return result;
208 } catch (final Exception ignored) {
209 LOGGER.debug("AsyncLogger.ExceptionHandler not set: error creating " + cls + ": ", ignored);
210 return null;
211 }
212 }
213
214
215
216
217 static class Info {
218 private static final ThreadLocal<Info> THREADLOCAL = new ThreadLocal<Info>() {
219 @Override
220 protected Info initialValue() {
221
222 return new Info(new RingBufferLogEventTranslator(), Thread.currentThread().getName(), false);
223 }
224 };
225 private final RingBufferLogEventTranslator translator;
226 private final String cachedThreadName;
227 private final boolean isAppenderThread;
228
229 public Info(final RingBufferLogEventTranslator translator, final String threadName,
230 final boolean appenderThread) {
231 this.translator = translator;
232 this.cachedThreadName = threadName;
233 this.isAppenderThread = appenderThread;
234 }
235
236
237 private String threadName() {
238 return THREAD_NAME_STRATEGY.getThreadName(this);
239 }
240 }
241
242 @Override
243 public void logMessage(final String fqcn, final Level level, final Marker marker, final Message message,
244 final Throwable thrown) {
245
246 final Disruptor<RingBufferLogEvent> temp = disruptor;
247 if (temp == null) {
248 LOGGER.fatal("Ignoring log event after log4j was shut down");
249 } else {
250 logMessage0(temp, fqcn, level, marker, message, thrown);
251 }
252 }
253
254 private void logMessage0(final Disruptor<RingBufferLogEvent> theDisruptor, final String fqcn, final Level level,
255 final Marker marker, final Message message, final Throwable thrown) {
256 final Info info = Info.THREADLOCAL.get();
257 logMessageInAppropriateThread(info, theDisruptor, fqcn, level, marker, message, thrown);
258 }
259
260 private void logMessageInAppropriateThread(final Info info, final Disruptor<RingBufferLogEvent> theDisruptor,
261 final String fqcn, final Level level, final Marker marker, final Message message, final Throwable thrown) {
262 if (!logMessageInCurrentThread(info, theDisruptor, fqcn, level, marker, message, thrown)) {
263 logMessageInBackgroundThread(info, fqcn, level, marker, message, thrown);
264 }
265 }
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281 private boolean logMessageInCurrentThread(Info info, final Disruptor<RingBufferLogEvent> theDisruptor,
282 final String fqcn, final Level level, final Marker marker, final Message message, final Throwable thrown) {
283 if (info.isAppenderThread && theDisruptor.getRingBuffer().remainingCapacity() == 0) {
284
285 final ReliabilityStrategy strategy = privateConfig.loggerConfig.getReliabilityStrategy();
286 strategy.log(this, getName(), fqcn, marker, level, message, thrown);
287 return true;
288 }
289 return false;
290 }
291
292
293
294
295
296
297
298
299
300
301
302 private void logMessageInBackgroundThread(Info info, final String fqcn, final Level level, final Marker marker,
303 final Message message, final Throwable thrown) {
304
305 message.getFormattedMessage();
306
307 initLogMessageInfo(info, fqcn, level, marker, message, thrown);
308 enqueueLogMessageInfo(info);
309 }
310
311 private void initLogMessageInfo(Info info, final String fqcn, final Level level, final Marker marker,
312 final Message message, final Throwable thrown) {
313 info.translator.setValues(this, getName(), marker, fqcn, level, message,
314
315 thrown,
316
317
318
319
320
321 ThreadContext.getImmutableContext(),
322
323
324 ThreadContext.getImmutableStack(),
325
326
327
328 info.threadName(),
329
330
331
332
333 calcLocationIfRequested(fqcn),
334
335
336
337
338
339 eventTimeMillis(message),
340 nanoClock.nanoTime()
341 );
342 }
343
344 private long eventTimeMillis(final Message message) {
345 return message instanceof TimestampMessage ? ((TimestampMessage) message).getTimestamp() : CLOCK
346 .currentTimeMillis();
347 }
348
349
350
351
352
353
354
355 private StackTraceElement calcLocationIfRequested(String fqcn) {
356 final boolean includeLocation = privateConfig.loggerConfig.isIncludeLocation();
357 return includeLocation ? location(fqcn) : null;
358 }
359
360 private void enqueueLogMessageInfo(Info info) {
361
362 try {
363
364
365
366 disruptor.publishEvent(info.translator);
367 } catch (final NullPointerException npe) {
368 LOGGER.fatal("Ignoring log event after log4j was shut down.");
369 }
370 }
371
372 private static StackTraceElement location(final String fqcnOfLogger) {
373 return Log4jLogEvent.calcLocation(fqcnOfLogger);
374 }
375
376
377
378
379
380
381 public void actualAsyncLog(final RingBufferLogEvent event) {
382 final Map<Property, Boolean> properties = privateConfig.loggerConfig.getProperties();
383 event.mergePropertiesIntoContextMap(properties, privateConfig.config.getStrSubstitutor());
384 final ReliabilityStrategy strategy = privateConfig.loggerConfig.getReliabilityStrategy();
385 strategy.log(this, event);
386 }
387
388 public static void stop() {
389 final Disruptor<RingBufferLogEvent> temp = disruptor;
390
391
392
393 disruptor = null;
394 if (temp == null) {
395 return;
396 }
397
398
399
400
401 for (int i = 0; hasBacklog(temp) && i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
402 try {
403 Thread.sleep(SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS);
404 } catch (final InterruptedException e) {
405 }
406 }
407 temp.shutdown();
408 EXECUTOR.shutdown();
409 Info.THREADLOCAL.remove();
410 }
411
412
413
414
415 private static boolean hasBacklog(final Disruptor<?> theDisruptor) {
416 final RingBuffer<?> ringBuffer = theDisruptor.getRingBuffer();
417 return !ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize());
418 }
419
420
421
422
423
424
425
426 public static RingBufferAdmin createRingBufferAdmin(final String contextName) {
427 return RingBufferAdmin.forAsyncLogger(disruptor.getRingBuffer(), contextName);
428 }
429
430
431
432
433
434
435 public static NanoClock getNanoClock() {
436 return nanoClock;
437 }
438
439
440
441
442
443
444
445
446
447 public static void setNanoClock(NanoClock nanoClock) {
448 AsyncLogger.nanoClock = Objects.requireNonNull(nanoClock, "NanoClock must be non-null");
449 }
450 }