1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.core.async;
18
19 import java.util.concurrent.ExecutorService;
20 import java.util.concurrent.Executors;
21 import java.util.concurrent.ThreadFactory;
22
23 import org.apache.logging.log4j.Logger;
24 import org.apache.logging.log4j.core.LogEvent;
25 import org.apache.logging.log4j.core.jmx.RingBufferAdmin;
26 import org.apache.logging.log4j.core.util.Constants;
27 import org.apache.logging.log4j.status.StatusLogger;
28
29 import com.lmax.disruptor.EventFactory;
30 import com.lmax.disruptor.EventTranslatorTwoArg;
31 import com.lmax.disruptor.ExceptionHandler;
32 import com.lmax.disruptor.RingBuffer;
33 import com.lmax.disruptor.Sequence;
34 import com.lmax.disruptor.SequenceReportingEventHandler;
35 import com.lmax.disruptor.WaitStrategy;
36 import com.lmax.disruptor.dsl.Disruptor;
37 import com.lmax.disruptor.dsl.ProducerType;
38
39
40
41
42
43
44
45
46
47
48
49
50
51 public class AsyncLoggerConfigDisruptor implements AsyncLoggerConfigDelegate {
52
53 private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 200;
54 private static final int SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS = 50;
55 private static final Logger LOGGER = StatusLogger.getLogger();
56
57
58
59
60 private static class Log4jEventWrapper {
61 private AsyncLoggerConfig loggerConfig;
62 private LogEvent event;
63
64
65
66
67 public void clear() {
68 loggerConfig = null;
69 event = null;
70 }
71 }
72
73
74
75
76 private static class Log4jEventWrapperHandler implements SequenceReportingEventHandler<Log4jEventWrapper> {
77 private static final int NOTIFY_PROGRESS_THRESHOLD = 50;
78 private Sequence sequenceCallback;
79 private int counter;
80
81 @Override
82 public void setSequenceCallback(final Sequence sequenceCallback) {
83 this.sequenceCallback = sequenceCallback;
84 }
85
86 @Override
87 public void onEvent(final Log4jEventWrapper event, final long sequence, final boolean endOfBatch)
88 throws Exception {
89 event.event.setEndOfBatch(endOfBatch);
90 event.loggerConfig.asyncCallAppenders(event.event);
91 event.clear();
92
93 notifyIntermediateProgress(sequence);
94 }
95
96
97
98
99
100 private void notifyIntermediateProgress(final long sequence) {
101 if (++counter > NOTIFY_PROGRESS_THRESHOLD) {
102 sequenceCallback.set(sequence);
103 counter = 0;
104 }
105 }
106 }
107
108
109
110
111
112 private static final EventFactory<Log4jEventWrapper> FACTORY = new EventFactory<Log4jEventWrapper>() {
113 @Override
114 public Log4jEventWrapper newInstance() {
115 return new Log4jEventWrapper();
116 }
117 };
118
119
120
121
122 private static final EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig> TRANSLATOR =
123 new EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig>() {
124
125 @Override
126 public void translateTo(final Log4jEventWrapper ringBufferElement, final long sequence,
127 final LogEvent logEvent, final AsyncLoggerConfig loggerConfig) {
128 ringBufferElement.event = logEvent;
129 ringBufferElement.loggerConfig = loggerConfig;
130 }
131 };
132
133 private static final ThreadFactory THREAD_FACTORY = new DaemonThreadFactory("AsyncLoggerConfig-");
134
135 private volatile Disruptor<Log4jEventWrapper> disruptor;
136 private ExecutorService executor;
137 private long backgroundThreadId;
138
139 public AsyncLoggerConfigDisruptor() {
140 }
141
142
143
144
145
146
147
148 public synchronized void start() {
149 if (disruptor != null) {
150 LOGGER.trace("AsyncLoggerConfigHelper not starting new disruptor for this configuration, "
151 + "using existing object.");
152 return;
153 }
154 LOGGER.trace("AsyncLoggerConfigHelper creating new disruptor for this configuration.");
155 final int ringBufferSize = DisruptorUtil.calculateRingBufferSize("AsyncLoggerConfig.RingBufferSize");
156 final WaitStrategy waitStrategy = DisruptorUtil.createWaitStrategy("AsyncLoggerConfig.WaitStrategy");
157 executor = Executors.newSingleThreadExecutor(THREAD_FACTORY);
158 backgroundThreadId = DisruptorUtil.getExecutorThreadId(executor);
159
160 disruptor = new Disruptor<>(FACTORY, ringBufferSize, executor, ProducerType.MULTI, waitStrategy);
161
162 final ExceptionHandler<Log4jEventWrapper> errorHandler = DisruptorUtil.getExceptionHandler(
163 "AsyncLoggerConfig.ExceptionHandler", Log4jEventWrapper.class);
164 disruptor.handleExceptionsWith(errorHandler);
165
166 final Log4jEventWrapperHandler[] handlers = {new Log4jEventWrapperHandler()};
167 disruptor.handleEventsWith(handlers);
168
169 LOGGER.debug("Starting AsyncLoggerConfig disruptor for this configuration with ringbufferSize={}, "
170 + "waitStrategy={}, exceptionHandler={}...", disruptor.getRingBuffer().getBufferSize(), waitStrategy
171 .getClass().getSimpleName(), errorHandler);
172 disruptor.start();
173 }
174
175
176
177
178
179 public synchronized void stop() {
180 final Disruptor<Log4jEventWrapper> temp = disruptor;
181 if (temp == null) {
182 LOGGER.trace("AsyncLoggerConfigHelper: disruptor for this configuration already shut down.");
183 return;
184 }
185 LOGGER.trace("AsyncLoggerConfigHelper: shutting down disruptor for this configuration.");
186
187
188 disruptor = null;
189
190
191
192
193 for (int i = 0; hasBacklog(temp) && i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
194 try {
195 Thread.sleep(SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS);
196 } catch (final InterruptedException e) {
197 }
198 }
199 temp.shutdown();
200
201 LOGGER.trace("AsyncLoggerConfigHelper: shutting down disruptor executor for this configuration.");
202 executor.shutdown();
203 executor = null;
204 }
205
206
207
208
209 private static boolean hasBacklog(final Disruptor<?> theDisruptor) {
210 final RingBuffer<?> ringBuffer = theDisruptor.getRingBuffer();
211 return !ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize());
212 }
213
214
215
216
217
218
219
220
221 @Override
222 public boolean tryCallAppendersInBackground(final LogEvent event, final AsyncLoggerConfig asyncLoggerConfig) {
223 final Disruptor<Log4jEventWrapper> temp = disruptor;
224 if (!hasLog4jBeenShutDown(temp)) {
225
226
227
228 if (isCalledFromAppenderThreadAndBufferFull(temp)) {
229
230 return false;
231 }
232 enqueueEvent(event, asyncLoggerConfig);
233 }
234 return true;
235 }
236
237
238
239
240 private boolean hasLog4jBeenShutDown(final Disruptor<Log4jEventWrapper> aDisruptor) {
241 if (aDisruptor == null) {
242 LOGGER.fatal("Ignoring log event after log4j was shut down");
243 return true;
244 }
245 return false;
246 }
247
248 private void enqueueEvent(final LogEvent event, final AsyncLoggerConfig asyncLoggerConfig) {
249
250 try {
251 final LogEvent logEvent = prepareEvent(event);
252 enqueue(logEvent, asyncLoggerConfig);
253 } catch (final NullPointerException npe) {
254 LOGGER.fatal("Ignoring log event after log4j was shut down.");
255 }
256 }
257
258 private LogEvent prepareEvent(final LogEvent event) {
259 final LogEvent logEvent = ensureImmutable(event);
260 if (!Constants.FORMAT_MESSAGES_IN_BACKGROUND) {
261 logEvent.getMessage().getFormattedMessage();
262 }
263 return logEvent;
264 }
265
266 private void enqueue(final LogEvent logEvent, final AsyncLoggerConfig asyncLoggerConfig) {
267
268
269
270 disruptor.getRingBuffer().publishEvent(TRANSLATOR, logEvent, asyncLoggerConfig);
271 }
272
273 private LogEvent ensureImmutable(final LogEvent event) {
274 LogEvent result = event;
275 if (event instanceof RingBufferLogEvent) {
276
277
278
279
280
281
282 result = ((RingBufferLogEvent) event).createMemento();
283 }
284 return result;
285 }
286
287
288
289
290 private boolean isCalledFromAppenderThreadAndBufferFull(Disruptor<Log4jEventWrapper> theDisruptor) {
291 return currentThreadIsAppenderThread() && theDisruptor.getRingBuffer().remainingCapacity() == 0;
292 }
293
294
295
296
297
298
299 private boolean currentThreadIsAppenderThread() {
300 return Thread.currentThread().getId() == backgroundThreadId;
301 }
302
303
304
305
306
307
308
309 @Override
310 public RingBufferAdmin createRingBufferAdmin(final String contextName, final String loggerConfigName) {
311 return RingBufferAdmin.forAsyncLoggerConfig(disruptor.getRingBuffer(), contextName, loggerConfigName);
312 }
313
314 }