1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.core.async;
18
19 import java.util.concurrent.ExecutorService;
20 import java.util.concurrent.Executors;
21 import java.util.concurrent.ThreadFactory;
22
23 import org.apache.logging.log4j.Logger;
24 import org.apache.logging.log4j.core.LogEvent;
25 import org.apache.logging.log4j.core.jmx.RingBufferAdmin;
26 import org.apache.logging.log4j.core.util.Integers;
27 import org.apache.logging.log4j.status.StatusLogger;
28 import org.apache.logging.log4j.util.PropertiesUtil;
29
30 import com.lmax.disruptor.BlockingWaitStrategy;
31 import com.lmax.disruptor.EventFactory;
32 import com.lmax.disruptor.EventHandler;
33 import com.lmax.disruptor.EventTranslatorTwoArg;
34 import com.lmax.disruptor.ExceptionHandler;
35 import com.lmax.disruptor.RingBuffer;
36 import com.lmax.disruptor.Sequence;
37 import com.lmax.disruptor.SequenceReportingEventHandler;
38 import com.lmax.disruptor.SleepingWaitStrategy;
39 import com.lmax.disruptor.WaitStrategy;
40 import com.lmax.disruptor.YieldingWaitStrategy;
41 import com.lmax.disruptor.dsl.Disruptor;
42 import com.lmax.disruptor.dsl.ProducerType;
43
44
45
46
47
48
49
50
51
52
53
54
55
56 class AsyncLoggerConfigHelper {
57
58 private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 200;
59 private static final int SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS = 50;
60 private static final int RINGBUFFER_MIN_SIZE = 128;
61 private static final int RINGBUFFER_DEFAULT_SIZE = 256 * 1024;
62 private static final Logger LOGGER = StatusLogger.getLogger();
63
64 private static ThreadFactory threadFactory = new DaemonThreadFactory("AsyncLoggerConfig-");
65 private static volatile Disruptor<Log4jEventWrapper> disruptor;
66 private static ExecutorService executor;
67
68 private static volatile int count = 0;
69 private static ThreadLocal<Boolean> isAppenderThread = new ThreadLocal<>();
70
71
72
73
74
75 private static final EventFactory<Log4jEventWrapper> FACTORY = new EventFactory<Log4jEventWrapper>() {
76 @Override
77 public Log4jEventWrapper newInstance() {
78 return new Log4jEventWrapper();
79 }
80 };
81
82
83
84
85 private final EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig> translator =
86 new EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig>() {
87
88 @Override
89 public void translateTo(final Log4jEventWrapper ringBufferElement, final long sequence,
90 final LogEvent logEvent, final AsyncLoggerConfig loggerConfig) {
91 ringBufferElement.event = logEvent;
92 ringBufferElement.loggerConfig = loggerConfig;
93 }
94 };
95
96 private final AsyncLoggerConfig asyncLoggerConfig;
97
98 public AsyncLoggerConfigHelper(final AsyncLoggerConfig asyncLoggerConfig) {
99 this.asyncLoggerConfig = asyncLoggerConfig;
100 claim();
101 }
102
103 private static synchronized void initDisruptor() {
104 if (disruptor != null) {
105 LOGGER.trace("AsyncLoggerConfigHelper not starting new disruptor, using existing object. Ref count is {}.",
106 count);
107 return;
108 }
109 LOGGER.trace("AsyncLoggerConfigHelper creating new disruptor. Ref count is {}.", count);
110 final int ringBufferSize = calculateRingBufferSize();
111 final WaitStrategy waitStrategy = createWaitStrategy();
112 executor = Executors.newSingleThreadExecutor(threadFactory);
113 initThreadLocalForExecutorThread();
114 disruptor = new Disruptor<>(FACTORY, ringBufferSize, executor, ProducerType.MULTI, waitStrategy);
115 final EventHandler<Log4jEventWrapper>[] handlers = new Log4jEventWrapperHandler[] {
116 new Log4jEventWrapperHandler()};
117 final ExceptionHandler<Log4jEventWrapper> errorHandler = getExceptionHandler();
118 disruptor.handleExceptionsWith(errorHandler);
119 disruptor.handleEventsWith(handlers);
120
121 LOGGER.debug(
122 "Starting AsyncLoggerConfig disruptor with ringbuffer size={}, waitStrategy={}, exceptionHandler={}...",
123 disruptor.getRingBuffer().getBufferSize(), waitStrategy.getClass().getSimpleName(), errorHandler);
124 disruptor.start();
125 }
126
127 private static WaitStrategy createWaitStrategy() {
128 final String strategy = System.getProperty("AsyncLoggerConfig.WaitStrategy");
129 LOGGER.debug("property AsyncLoggerConfig.WaitStrategy={}", strategy);
130 if ("Sleep".equals(strategy)) {
131 return new SleepingWaitStrategy();
132 } else if ("Yield".equals(strategy)) {
133 return new YieldingWaitStrategy();
134 } else if ("Block".equals(strategy)) {
135 return new BlockingWaitStrategy();
136 }
137 LOGGER.debug("disruptor event handler uses BlockingWaitStrategy");
138 return new BlockingWaitStrategy();
139 }
140
141 private static int calculateRingBufferSize() {
142 int ringBufferSize = RINGBUFFER_DEFAULT_SIZE;
143 final String userPreferredRBSize = PropertiesUtil.getProperties().getStringProperty(
144 "AsyncLoggerConfig.RingBufferSize", String.valueOf(ringBufferSize));
145 try {
146 int size = Integer.parseInt(userPreferredRBSize);
147 if (size < RINGBUFFER_MIN_SIZE) {
148 size = RINGBUFFER_MIN_SIZE;
149 LOGGER.warn("Invalid RingBufferSize {}, using minimum size {}.", userPreferredRBSize,
150 RINGBUFFER_MIN_SIZE);
151 }
152 ringBufferSize = size;
153 } catch (final Exception ex) {
154 LOGGER.warn("Invalid RingBufferSize {}, using default size {}.", userPreferredRBSize, ringBufferSize);
155 }
156 return Integers.ceilingNextPowerOfTwo(ringBufferSize);
157 }
158
159 private static ExceptionHandler<Log4jEventWrapper> getExceptionHandler() {
160 final String cls = System.getProperty("AsyncLoggerConfig.ExceptionHandler");
161 if (cls == null) {
162 return null;
163 }
164 try {
165 @SuppressWarnings("unchecked")
166 final Class<? extends ExceptionHandler<Log4jEventWrapper>> klass =
167 (Class<? extends ExceptionHandler<Log4jEventWrapper>>) Class.forName(cls);
168 return klass.newInstance();
169 } catch (final Exception ignored) {
170 LOGGER.debug("AsyncLoggerConfig.ExceptionHandler not set: error creating " + cls + ": ", ignored);
171 return null;
172 }
173 }
174
175
176
177
178 private static class Log4jEventWrapper {
179 private AsyncLoggerConfig loggerConfig;
180 private LogEvent event;
181
182
183
184
185 public void clear() {
186 loggerConfig = null;
187 event = null;
188 }
189 }
190
191
192
193
194 private static class Log4jEventWrapperHandler implements SequenceReportingEventHandler<Log4jEventWrapper> {
195 private static final int NOTIFY_PROGRESS_THRESHOLD = 50;
196 private Sequence sequenceCallback;
197 private int counter;
198
199 @Override
200 public void setSequenceCallback(final Sequence sequenceCallback) {
201 this.sequenceCallback = sequenceCallback;
202 }
203
204 @Override
205 public void onEvent(final Log4jEventWrapper event, final long sequence, final boolean endOfBatch)
206 throws Exception {
207 event.event.setEndOfBatch(endOfBatch);
208 event.loggerConfig.asyncCallAppenders(event.event);
209 event.clear();
210
211 notifyIntermediateProgress(sequence);
212 }
213
214
215
216
217
218 private void notifyIntermediateProgress(final long sequence) {
219 if (++counter > NOTIFY_PROGRESS_THRESHOLD) {
220 sequenceCallback.set(sequence);
221 counter = 0;
222 }
223 }
224 }
225
226
227
228
229
230
231
232 static synchronized void claim() {
233 count++;
234 initDisruptor();
235 }
236
237
238
239
240
241 static synchronized void release() {
242 if (--count > 0) {
243 LOGGER.trace("AsyncLoggerConfigHelper: not shutting down disruptor: ref count is {}.", count);
244 return;
245 }
246 final Disruptor<Log4jEventWrapper> temp = disruptor;
247 if (temp == null) {
248 LOGGER.trace("AsyncLoggerConfigHelper: disruptor already shut down: ref count is {}. (Resetting to zero.)",
249 count);
250 count = 0;
251 return;
252 }
253 LOGGER.trace("AsyncLoggerConfigHelper: shutting down disruptor: ref count is {}.", count);
254
255
256
257 disruptor = null;
258
259
260
261
262 for (int i = 0; hasBacklog(temp) && i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
263 try {
264 Thread.sleep(SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS);
265 } catch (final InterruptedException e) {
266 }
267 }
268 temp.shutdown();
269 executor.shutdown();
270 executor = null;
271 }
272
273
274
275
276 private static boolean hasBacklog(final Disruptor<?> theDisruptor) {
277 final RingBuffer<?> ringBuffer = theDisruptor.getRingBuffer();
278 return !ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize());
279 }
280
281
282
283
284
285 private static void initThreadLocalForExecutorThread() {
286 executor.submit(new Runnable() {
287 @Override
288 public void run() {
289 isAppenderThread.set(Boolean.TRUE);
290 }
291 });
292 }
293
294
295
296
297
298
299
300
301
302
303
304
305 public boolean callAppendersFromAnotherThread(final LogEvent event) {
306 final Disruptor<Log4jEventWrapper> temp = disruptor;
307 if (!hasLog4jBeenShutDown(temp)) {
308
309
310
311 if (isCalledFromAppenderThreadAndBufferFull(temp)) {
312
313 return false;
314 }
315 enqueueEvent(event);
316 }
317 return true;
318 }
319
320
321
322
323 private boolean hasLog4jBeenShutDown(final Disruptor<Log4jEventWrapper> aDisruptor) {
324 if (aDisruptor == null) {
325 LOGGER.fatal("Ignoring log event after log4j was shut down");
326 return true;
327 }
328 return false;
329 }
330
331 private void enqueueEvent(final LogEvent event) {
332
333 try {
334 final LogEvent logEvent = prepareEvent(event);
335 enqueue(logEvent);
336 } catch (final NullPointerException npe) {
337 LOGGER.fatal("Ignoring log event after log4j was shut down.");
338 }
339 }
340
341 private LogEvent prepareEvent(final LogEvent event) {
342 final LogEvent logEvent = ensureImmutable(event);
343 logEvent.getMessage().getFormattedMessage();
344 return logEvent;
345 }
346
347 private void enqueue(LogEvent logEvent) {
348
349
350
351 disruptor.getRingBuffer().publishEvent(translator, logEvent, asyncLoggerConfig);
352 }
353
354 private LogEvent ensureImmutable(final LogEvent event) {
355 LogEvent result = event;
356 if (event instanceof RingBufferLogEvent) {
357
358
359
360
361
362
363 result = ((RingBufferLogEvent) event).createMemento();
364 }
365 return result;
366 }
367
368
369
370
371 private boolean isCalledFromAppenderThreadAndBufferFull(Disruptor<Log4jEventWrapper> theDisruptor) {
372 return isAppenderThread.get() == Boolean.TRUE && theDisruptor.getRingBuffer().remainingCapacity() == 0;
373 }
374
375
376
377
378
379
380
381
382 public RingBufferAdmin createRingBufferAdmin(final String contextName, final String loggerConfigName) {
383 return RingBufferAdmin.forAsyncLoggerConfig(disruptor.getRingBuffer(), contextName, loggerConfigName);
384 }
385
386 }