1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.core.async;
18
19 import java.util.concurrent.ExecutorService;
20 import java.util.concurrent.Executors;
21 import java.util.concurrent.ThreadFactory;
22
23 import org.apache.logging.log4j.Logger;
24 import org.apache.logging.log4j.core.LogEvent;
25 import org.apache.logging.log4j.core.jmx.RingBufferAdmin;
26 import org.apache.logging.log4j.status.StatusLogger;
27
28 import com.lmax.disruptor.BlockingWaitStrategy;
29 import com.lmax.disruptor.EventFactory;
30 import com.lmax.disruptor.EventHandler;
31 import com.lmax.disruptor.EventTranslatorTwoArg;
32 import com.lmax.disruptor.ExceptionHandler;
33 import com.lmax.disruptor.RingBuffer;
34 import com.lmax.disruptor.Sequence;
35 import com.lmax.disruptor.SequenceReportingEventHandler;
36 import com.lmax.disruptor.SleepingWaitStrategy;
37 import com.lmax.disruptor.WaitStrategy;
38 import com.lmax.disruptor.YieldingWaitStrategy;
39 import com.lmax.disruptor.dsl.Disruptor;
40 import com.lmax.disruptor.dsl.ProducerType;
41 import com.lmax.disruptor.util.Util;
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59 class AsyncLoggerConfigHelper {
60
61 private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 200;
62 private static final int SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS = 50;
63 private static final int RINGBUFFER_MIN_SIZE = 128;
64 private static final int RINGBUFFER_DEFAULT_SIZE = 256 * 1024;
65 private static final Logger LOGGER = StatusLogger.getLogger();
66
67 private static ThreadFactory threadFactory = new DaemonThreadFactory("AsyncLoggerConfig-");
68 private static volatile Disruptor<Log4jEventWrapper> disruptor;
69 private static ExecutorService executor;
70
71 private static volatile int count = 0;
72 private static ThreadLocal<Boolean> isAppenderThread = new ThreadLocal<Boolean>();
73
74
75
76
77
78 private static final EventFactory<Log4jEventWrapper> FACTORY = new EventFactory<Log4jEventWrapper>() {
79 @Override
80 public Log4jEventWrapper newInstance() {
81 return new Log4jEventWrapper();
82 }
83 };
84
85
86
87
88 private final EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig> translator
89 = new EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig>() {
90
91 @Override
92 public void translateTo(final Log4jEventWrapper ringBufferElement, final long sequence,
93 final LogEvent logEvent, final AsyncLoggerConfig loggerConfig) {
94 ringBufferElement.event = logEvent;
95 ringBufferElement.loggerConfig = loggerConfig;
96 }
97 };
98
99 private final AsyncLoggerConfig asyncLoggerConfig;
100
101 public AsyncLoggerConfigHelper(final AsyncLoggerConfig asyncLoggerConfig) {
102 this.asyncLoggerConfig = asyncLoggerConfig;
103 claim();
104 }
105
106 private static synchronized void initDisruptor() {
107 if (disruptor != null) {
108 LOGGER.trace("AsyncLoggerConfigHelper not starting new disruptor, using existing object. Ref count is {}.", count);
109 return;
110 }
111 LOGGER.trace("AsyncLoggerConfigHelper creating new disruptor. Ref count is {}.", count);
112 final int ringBufferSize = calculateRingBufferSize();
113 final WaitStrategy waitStrategy = createWaitStrategy();
114 executor = Executors.newSingleThreadExecutor(threadFactory);
115 initThreadLocalForExecutorThread();
116 disruptor = new Disruptor<Log4jEventWrapper>(FACTORY, ringBufferSize,
117 executor, ProducerType.MULTI, waitStrategy);
118 final EventHandler<Log4jEventWrapper>[] handlers = new Log4jEventWrapperHandler[] {
119 new Log4jEventWrapperHandler() };
120 final ExceptionHandler errorHandler = getExceptionHandler();
121 disruptor.handleExceptionsWith(errorHandler);
122 disruptor.handleEventsWith(handlers);
123
124 LOGGER.debug(
125 "Starting AsyncLoggerConfig disruptor with ringbuffer size={}, waitStrategy={}, exceptionHandler={}...",
126 disruptor.getRingBuffer().getBufferSize(), waitStrategy.getClass().getSimpleName(), errorHandler);
127 disruptor.start();
128 }
129
130 private static WaitStrategy createWaitStrategy() {
131 final String strategy = System
132 .getProperty("AsyncLoggerConfig.WaitStrategy");
133 LOGGER.debug("property AsyncLoggerConfig.WaitStrategy={}", strategy);
134 if ("Sleep".equals(strategy)) {
135 return new SleepingWaitStrategy();
136 } else if ("Yield".equals(strategy)) {
137 return new YieldingWaitStrategy();
138 } else if ("Block".equals(strategy)) {
139 return new BlockingWaitStrategy();
140 }
141 LOGGER.debug("disruptor event handler uses BlockingWaitStrategy");
142 return new BlockingWaitStrategy();
143 }
144
145 private static int calculateRingBufferSize() {
146 int ringBufferSize = RINGBUFFER_DEFAULT_SIZE;
147 final String userPreferredRBSize = System.getProperty(
148 "AsyncLoggerConfig.RingBufferSize",
149 String.valueOf(ringBufferSize));
150 try {
151 int size = Integer.parseInt(userPreferredRBSize);
152 if (size < RINGBUFFER_MIN_SIZE) {
153 size = RINGBUFFER_MIN_SIZE;
154 LOGGER.warn(
155 "Invalid RingBufferSize {}, using minimum size {}.",
156 userPreferredRBSize, RINGBUFFER_MIN_SIZE);
157 }
158 ringBufferSize = size;
159 } catch (final Exception ex) {
160 LOGGER.warn("Invalid RingBufferSize {}, using default size {}.",
161 userPreferredRBSize, ringBufferSize);
162 }
163 return Util.ceilingNextPowerOfTwo(ringBufferSize);
164 }
165
166 private static ExceptionHandler getExceptionHandler() {
167 final String cls = System
168 .getProperty("AsyncLoggerConfig.ExceptionHandler");
169 if (cls == null) {
170 return null;
171 }
172 try {
173 @SuppressWarnings("unchecked")
174 final Class<? extends ExceptionHandler> klass = (Class<? extends ExceptionHandler>) Class
175 .forName(cls);
176 final ExceptionHandler result = klass.newInstance();
177 return result;
178 } catch (final Exception ignored) {
179 LOGGER.debug(
180 "AsyncLoggerConfig.ExceptionHandler not set: error creating "
181 + cls + ": ", ignored);
182 return null;
183 }
184 }
185
186
187
188
189
190 private static class Log4jEventWrapper {
191 private AsyncLoggerConfig loggerConfig;
192 private LogEvent event;
193
194
195
196
197
198 public void clear() {
199 loggerConfig = null;
200 event = null;
201 }
202 }
203
204
205
206
207 private static class Log4jEventWrapperHandler implements
208 SequenceReportingEventHandler<Log4jEventWrapper> {
209 private static final int NOTIFY_PROGRESS_THRESHOLD = 50;
210 private Sequence sequenceCallback;
211 private int counter;
212
213 @Override
214 public void setSequenceCallback(final Sequence sequenceCallback) {
215 this.sequenceCallback = sequenceCallback;
216 }
217
218 @Override
219 public void onEvent(final Log4jEventWrapper event, final long sequence,
220 final boolean endOfBatch) throws Exception {
221 event.event.setEndOfBatch(endOfBatch);
222 event.loggerConfig.asyncCallAppenders(event.event);
223 event.clear();
224
225
226
227
228 if (++counter > NOTIFY_PROGRESS_THRESHOLD) {
229 sequenceCallback.set(sequence);
230 counter = 0;
231 }
232 }
233 }
234
235
236
237
238
239
240
241 synchronized static void claim() {
242 count++;
243 initDisruptor();
244 }
245
246
247
248
249
250
251 synchronized static void release() {
252 if (--count > 0) {
253 LOGGER.trace("AsyncLoggerConfigHelper: not shutting down disruptor: ref count is {}.", count);
254 return;
255 }
256 final Disruptor<Log4jEventWrapper> temp = disruptor;
257 if (temp == null) {
258 LOGGER.trace("AsyncLoggerConfigHelper: disruptor already shut down: ref count is {}. (Resetting to zero.)",
259 count);
260 count = 0;
261 return;
262 }
263 LOGGER.trace("AsyncLoggerConfigHelper: shutting down disruptor: ref count is {}.", count);
264
265
266
267 disruptor = null;
268
269
270
271
272 for (int i = 0; hasBacklog(temp) && i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
273 try {
274 Thread.sleep(SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS);
275 } catch (final InterruptedException e) {
276 }
277 }
278 temp.shutdown();
279 executor.shutdown();
280 executor = null;
281 }
282
283
284
285
286 private static boolean hasBacklog(final Disruptor<?> disruptor) {
287 final RingBuffer<?> ringBuffer = disruptor.getRingBuffer();
288 return !ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize());
289 }
290
291
292
293
294
295
296 private static void initThreadLocalForExecutorThread() {
297 executor.submit(new Runnable() {
298 @Override
299 public void run() {
300 isAppenderThread.set(Boolean.TRUE);
301 }
302 });
303 }
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318 public boolean callAppendersFromAnotherThread(final LogEvent event) {
319
320 final Disruptor<Log4jEventWrapper> temp = disruptor;
321 if (temp == null) {
322 LOGGER.fatal("Ignoring log event after log4j was shut down");
323 return true;
324 }
325
326
327
328 if (isAppenderThread.get() == Boolean.TRUE
329 && temp.getRingBuffer().remainingCapacity() == 0) {
330
331
332 return false;
333 }
334
335 try {
336 LogEvent logEvent = event;
337 if (event instanceof RingBufferLogEvent) {
338 logEvent = ((RingBufferLogEvent) event).createMemento();
339 }
340 logEvent.getMessage().getFormattedMessage();
341
342
343
344
345 disruptor.getRingBuffer().publishEvent(translator, logEvent, asyncLoggerConfig);
346 } catch (final NullPointerException npe) {
347 LOGGER.fatal("Ignoring log event after log4j was shut down.");
348 }
349 return true;
350 }
351
352
353
354
355
356
357
358
359 public RingBufferAdmin createRingBufferAdmin(final String contextName, final String loggerConfigName) {
360 return RingBufferAdmin.forAsyncLoggerConfig(disruptor.getRingBuffer(), contextName, loggerConfigName);
361 }
362
363 }