1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.core.async;
18
19 import java.util.concurrent.ExecutorService;
20 import java.util.concurrent.Executors;
21 import java.util.concurrent.ThreadFactory;
22
23 import org.apache.logging.log4j.Logger;
24 import org.apache.logging.log4j.core.LogEvent;
25 import org.apache.logging.log4j.core.jmx.RingBufferAdmin;
26 import org.apache.logging.log4j.status.StatusLogger;
27
28 import com.lmax.disruptor.BlockingWaitStrategy;
29 import com.lmax.disruptor.EventFactory;
30 import com.lmax.disruptor.EventHandler;
31 import com.lmax.disruptor.EventTranslatorTwoArg;
32 import com.lmax.disruptor.ExceptionHandler;
33 import com.lmax.disruptor.RingBuffer;
34 import com.lmax.disruptor.Sequence;
35 import com.lmax.disruptor.SequenceReportingEventHandler;
36 import com.lmax.disruptor.SleepingWaitStrategy;
37 import com.lmax.disruptor.WaitStrategy;
38 import com.lmax.disruptor.YieldingWaitStrategy;
39 import com.lmax.disruptor.dsl.Disruptor;
40 import com.lmax.disruptor.dsl.ProducerType;
41 import com.lmax.disruptor.util.Util;
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59 class AsyncLoggerConfigHelper {
60
61 private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 20;
62 private static final int HALF_A_SECOND = 500;
63 private static final int RINGBUFFER_MIN_SIZE = 128;
64 private static final int RINGBUFFER_DEFAULT_SIZE = 256 * 1024;
65 private static final Logger LOGGER = StatusLogger.getLogger();
66
67 private static ThreadFactory threadFactory = new DaemonThreadFactory(
68 "AsyncLoggerConfig-");
69 private static volatile Disruptor<Log4jEventWrapper> disruptor;
70 private static ExecutorService executor;
71
72 private static volatile int count = 0;
73 private static ThreadLocal<Boolean> isAppenderThread = new ThreadLocal<Boolean>();
74
75
76
77
78
79 private static final EventFactory<Log4jEventWrapper> FACTORY = new EventFactory<Log4jEventWrapper>() {
80 @Override
81 public Log4jEventWrapper newInstance() {
82 return new Log4jEventWrapper();
83 }
84 };
85
86
87
88
89 private final EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig> translator
90 = new EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig>() {
91
92 @Override
93 public void translateTo(Log4jEventWrapper ringBufferElement, long sequence,
94 LogEvent logEvent, AsyncLoggerConfig loggerConfig) {
95 ringBufferElement.event = logEvent;
96 ringBufferElement.loggerConfig = loggerConfig;
97 }
98 };
99
100 private final AsyncLoggerConfig asyncLoggerConfig;
101
102 public AsyncLoggerConfigHelper(final AsyncLoggerConfig asyncLoggerConfig) {
103 this.asyncLoggerConfig = asyncLoggerConfig;
104 claim();
105 }
106
107 private static synchronized void initDisruptor() {
108 if (disruptor != null) {
109 LOGGER.trace("AsyncLoggerConfigHelper not starting new disruptor, using existing object. Ref count is {}.", count);
110 return;
111 }
112 LOGGER.trace("AsyncLoggerConfigHelper creating new disruptor. Ref count is {}.", count);
113 final int ringBufferSize = calculateRingBufferSize();
114 final WaitStrategy waitStrategy = createWaitStrategy();
115 executor = Executors.newSingleThreadExecutor(threadFactory);
116 initThreadLocalForExecutorThread();
117 disruptor = new Disruptor<Log4jEventWrapper>(FACTORY, ringBufferSize,
118 executor, ProducerType.MULTI, waitStrategy);
119 final EventHandler<Log4jEventWrapper>[] handlers = new Log4jEventWrapperHandler[] {
120 new Log4jEventWrapperHandler() };
121 final ExceptionHandler errorHandler = getExceptionHandler();
122 disruptor.handleExceptionsWith(errorHandler);
123 disruptor.handleEventsWith(handlers);
124
125 LOGGER.debug(
126 "Starting AsyncLoggerConfig disruptor with ringbuffer size={}, waitStrategy={}, exceptionHandler={}...",
127 disruptor.getRingBuffer().getBufferSize(), waitStrategy.getClass().getSimpleName(), errorHandler);
128 disruptor.start();
129 }
130
131 private static WaitStrategy createWaitStrategy() {
132 final String strategy = System
133 .getProperty("AsyncLoggerConfig.WaitStrategy");
134 LOGGER.debug("property AsyncLoggerConfig.WaitStrategy={}", strategy);
135 if ("Sleep".equals(strategy)) {
136 return new SleepingWaitStrategy();
137 } else if ("Yield".equals(strategy)) {
138 return new YieldingWaitStrategy();
139 } else if ("Block".equals(strategy)) {
140 return new BlockingWaitStrategy();
141 }
142 return new SleepingWaitStrategy();
143 }
144
145 private static int calculateRingBufferSize() {
146 int ringBufferSize = RINGBUFFER_DEFAULT_SIZE;
147 final String userPreferredRBSize = System.getProperty(
148 "AsyncLoggerConfig.RingBufferSize",
149 String.valueOf(ringBufferSize));
150 try {
151 int size = Integer.parseInt(userPreferredRBSize);
152 if (size < RINGBUFFER_MIN_SIZE) {
153 size = RINGBUFFER_MIN_SIZE;
154 LOGGER.warn(
155 "Invalid RingBufferSize {}, using minimum size {}.",
156 userPreferredRBSize, RINGBUFFER_MIN_SIZE);
157 }
158 ringBufferSize = size;
159 } catch (final Exception ex) {
160 LOGGER.warn("Invalid RingBufferSize {}, using default size {}.",
161 userPreferredRBSize, ringBufferSize);
162 }
163 return Util.ceilingNextPowerOfTwo(ringBufferSize);
164 }
165
166 private static ExceptionHandler getExceptionHandler() {
167 final String cls = System
168 .getProperty("AsyncLoggerConfig.ExceptionHandler");
169 if (cls == null) {
170 return null;
171 }
172 try {
173 @SuppressWarnings("unchecked")
174 final Class<? extends ExceptionHandler> klass = (Class<? extends ExceptionHandler>) Class
175 .forName(cls);
176 final ExceptionHandler result = klass.newInstance();
177 return result;
178 } catch (final Exception ignored) {
179 LOGGER.debug(
180 "AsyncLoggerConfig.ExceptionHandler not set: error creating "
181 + cls + ": ", ignored);
182 return null;
183 }
184 }
185
186
187
188
189
190 private static class Log4jEventWrapper {
191 private AsyncLoggerConfig loggerConfig;
192 private LogEvent event;
193
194
195
196
197
198 public void clear() {
199 loggerConfig = null;
200 event = null;
201 }
202 }
203
204
205
206
207 private static class Log4jEventWrapperHandler implements
208 SequenceReportingEventHandler<Log4jEventWrapper> {
209 private static final int NOTIFY_PROGRESS_THRESHOLD = 50;
210 private Sequence sequenceCallback;
211 private int counter;
212
213 @Override
214 public void setSequenceCallback(final Sequence sequenceCallback) {
215 this.sequenceCallback = sequenceCallback;
216 }
217
218 @Override
219 public void onEvent(final Log4jEventWrapper event, final long sequence,
220 final boolean endOfBatch) throws Exception {
221 event.event.setEndOfBatch(endOfBatch);
222 event.loggerConfig.asyncCallAppenders(event.event);
223 event.clear();
224
225
226
227
228 if (++counter > NOTIFY_PROGRESS_THRESHOLD) {
229 sequenceCallback.set(sequence);
230 counter = 0;
231 }
232 }
233 }
234
235
236
237
238
239
240
241 synchronized static void claim() {
242 count++;
243 initDisruptor();
244 }
245
246
247
248
249
250
251 synchronized static void release() {
252 if (--count > 0) {
253 LOGGER.trace("AsyncLoggerConfigHelper: not shutting down disruptor: ref count is {}.", count);
254 return;
255 }
256 final Disruptor<Log4jEventWrapper> temp = disruptor;
257 if (temp == null) {
258 LOGGER.trace("AsyncLoggerConfigHelper: disruptor already shut down: ref count is {}.", count);
259 return;
260 }
261 LOGGER.trace("AsyncLoggerConfigHelper: shutting down disruptor: ref count is {}.", count);
262
263
264
265 disruptor = null;
266 temp.shutdown();
267
268
269 final RingBuffer<Log4jEventWrapper> ringBuffer = temp.getRingBuffer();
270 for (int i = 0; i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
271 if (ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize())) {
272 break;
273 }
274 try {
275
276 Thread.sleep(HALF_A_SECOND);
277 } catch (final InterruptedException e) {
278
279 }
280 }
281 executor.shutdown();
282 executor = null;
283 }
284
285
286
287
288
289
290 private static void initThreadLocalForExecutorThread() {
291 executor.submit(new Runnable(){
292 @Override
293 public void run() {
294 isAppenderThread.set(Boolean.TRUE);
295 }
296 });
297 }
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312 public boolean callAppendersFromAnotherThread(final LogEvent event) {
313
314
315
316 if (isAppenderThread.get() == Boolean.TRUE
317 && disruptor.getRingBuffer().remainingCapacity() == 0) {
318
319
320 return false;
321 }
322 disruptor.getRingBuffer().publishEvent(translator, event, asyncLoggerConfig);
323 return true;
324 }
325
326
327
328
329
330
331
332
333 public RingBufferAdmin createRingBufferAdmin(String contextName, String loggerConfigName) {
334 return RingBufferAdmin.forAsyncLoggerConfig(disruptor.getRingBuffer(), contextName, loggerConfigName);
335 }
336
337 }