1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.core.async;
18
19 import java.util.Map;
20 import java.util.concurrent.ExecutorService;
21 import java.util.concurrent.Executors;
22
23 import org.apache.logging.log4j.Level;
24 import org.apache.logging.log4j.Marker;
25 import org.apache.logging.log4j.ThreadContext;
26 import org.apache.logging.log4j.core.Logger;
27 import org.apache.logging.log4j.core.LoggerContext;
28 import org.apache.logging.log4j.core.config.Property;
29 import org.apache.logging.log4j.core.impl.Log4jLogEvent;
30 import org.apache.logging.log4j.core.jmx.RingBufferAdmin;
31 import org.apache.logging.log4j.core.util.Clock;
32 import org.apache.logging.log4j.core.util.ClockFactory;
33 import org.apache.logging.log4j.core.util.Integers;
34 import org.apache.logging.log4j.core.util.Loader;
35 import org.apache.logging.log4j.message.Message;
36 import org.apache.logging.log4j.message.MessageFactory;
37 import org.apache.logging.log4j.message.TimestampMessage;
38 import org.apache.logging.log4j.status.StatusLogger;
39
40 import com.lmax.disruptor.BlockingWaitStrategy;
41 import com.lmax.disruptor.ExceptionHandler;
42 import com.lmax.disruptor.RingBuffer;
43 import com.lmax.disruptor.SleepingWaitStrategy;
44 import com.lmax.disruptor.WaitStrategy;
45 import com.lmax.disruptor.YieldingWaitStrategy;
46 import com.lmax.disruptor.dsl.Disruptor;
47 import com.lmax.disruptor.dsl.ProducerType;
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77 public class AsyncLogger extends Logger {
78 private static final long serialVersionUID = 1L;
79 private static final int SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS = 50;
80 private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 200;
81 private static final int RINGBUFFER_MIN_SIZE = 128;
82 private static final int RINGBUFFER_DEFAULT_SIZE = 256 * 1024;
83 private static final StatusLogger LOGGER = StatusLogger.getLogger();
84 private static final ThreadNameStrategy THREAD_NAME_STRATEGY = ThreadNameStrategy.create();
85 private static final ThreadLocal<Info> threadlocalInfo = new ThreadLocal<Info>();
86
87 static enum ThreadNameStrategy {
88 CACHED {
89 @Override
90 public String getThreadName(final Info info) {
91 return info.cachedThreadName;
92 }
93 },
94 UNCACHED {
95 @Override
96 public String getThreadName(final Info info) {
97 return Thread.currentThread().getName();
98 }
99 };
100 abstract String getThreadName(Info info);
101
102 static ThreadNameStrategy create() {
103 final String name = System.getProperty("AsyncLogger.ThreadNameStrategy", CACHED.name());
104 try {
105 return ThreadNameStrategy.valueOf(name);
106 } catch (final Exception ex) {
107 LOGGER.debug("Using AsyncLogger.ThreadNameStrategy.CACHED: '{}' not valid: {}", name, ex.toString());
108 return CACHED;
109 }
110 }
111 }
112 private static volatile Disruptor<RingBufferLogEvent> disruptor;
113 private static final Clock clock = ClockFactory.getClock();
114
115 private static final ExecutorService executor = Executors
116 .newSingleThreadExecutor(new DaemonThreadFactory("AsyncLogger-"));
117
118 static {
119 initInfoForExecutorThread();
120 LOGGER.debug("AsyncLogger.ThreadNameStrategy={}", THREAD_NAME_STRATEGY);
121 final int ringBufferSize = calculateRingBufferSize();
122
123 final WaitStrategy waitStrategy = createWaitStrategy();
124 disruptor = new Disruptor<RingBufferLogEvent>(RingBufferLogEvent.FACTORY, ringBufferSize, executor,
125 ProducerType.MULTI, waitStrategy);
126 disruptor.handleExceptionsWith(getExceptionHandler());
127 disruptor.handleEventsWith(new RingBufferLogEventHandler());
128
129 LOGGER.debug("Starting AsyncLogger disruptor with ringbuffer size {}...", disruptor.getRingBuffer()
130 .getBufferSize());
131 disruptor.start();
132 }
133
134 private static int calculateRingBufferSize() {
135 int ringBufferSize = RINGBUFFER_DEFAULT_SIZE;
136 final String userPreferredRBSize = System.getProperty("AsyncLogger.RingBufferSize",
137 String.valueOf(ringBufferSize));
138 try {
139 int size = Integer.parseInt(userPreferredRBSize);
140 if (size < RINGBUFFER_MIN_SIZE) {
141 size = RINGBUFFER_MIN_SIZE;
142 LOGGER.warn("Invalid RingBufferSize {}, using minimum size {}.", userPreferredRBSize,
143 RINGBUFFER_MIN_SIZE);
144 }
145 ringBufferSize = size;
146 } catch (final Exception ex) {
147 LOGGER.warn("Invalid RingBufferSize {}, using default size {}.", userPreferredRBSize, ringBufferSize);
148 }
149 return Integers.ceilingNextPowerOfTwo(ringBufferSize);
150 }
151
152
153
154
155
156
157
158
159 private static void initInfoForExecutorThread() {
160 executor.submit(new Runnable(){
161 @Override
162 public void run() {
163 final boolean isAppenderThread = true;
164 final Info info = new Info(new RingBufferLogEventTranslator(),
165 Thread.currentThread().getName(), isAppenderThread);
166 threadlocalInfo.set(info);
167 }
168 });
169 }
170
171 private static WaitStrategy createWaitStrategy() {
172 final String strategy = System.getProperty("AsyncLogger.WaitStrategy");
173 LOGGER.debug("property AsyncLogger.WaitStrategy={}", strategy);
174 if ("Sleep".equals(strategy)) {
175 return new SleepingWaitStrategy();
176 } else if ("Yield".equals(strategy)) {
177 return new YieldingWaitStrategy();
178 } else if ("Block".equals(strategy)) {
179 return new BlockingWaitStrategy();
180 }
181 LOGGER.debug("disruptor event handler uses BlockingWaitStrategy");
182 return new BlockingWaitStrategy();
183 }
184
185 private static ExceptionHandler getExceptionHandler() {
186 final String cls = System.getProperty("AsyncLogger.ExceptionHandler");
187 if (cls == null) {
188 LOGGER.debug("No AsyncLogger.ExceptionHandler specified");
189 return null;
190 }
191 try {
192 final ExceptionHandler result = Loader.newCheckedInstanceOf(cls, ExceptionHandler.class);
193 LOGGER.debug("AsyncLogger.ExceptionHandler={}", result);
194 return result;
195 } catch (final Exception ignored) {
196 LOGGER.debug("AsyncLogger.ExceptionHandler not set: error creating " + cls + ": ", ignored);
197 return null;
198 }
199 }
200
201
202
203
204
205
206
207
208
209 public AsyncLogger(final LoggerContext context, final String name, final MessageFactory messageFactory) {
210 super(context, name, messageFactory);
211 }
212
213
214
215
216 static class Info {
217 private final RingBufferLogEventTranslator translator;
218 private final String cachedThreadName;
219 private final boolean isAppenderThread;
220 public Info(final RingBufferLogEventTranslator translator, final String threadName, final boolean appenderThread) {
221 this.translator = translator;
222 this.cachedThreadName = threadName;
223 this.isAppenderThread = appenderThread;
224 }
225 }
226
227 @Override
228 public void logMessage(final String fqcn, final Level level, final Marker marker, final Message message, final Throwable thrown) {
229
230 Info info = threadlocalInfo.get();
231 if (info == null) {
232 info = new Info(new RingBufferLogEventTranslator(), Thread.currentThread().getName(), false);
233 threadlocalInfo.set(info);
234 }
235
236 final Disruptor<RingBufferLogEvent> temp = disruptor;
237 if (temp == null) {
238 LOGGER.fatal("Ignoring log event after log4j was shut down");
239 return;
240 }
241
242
243
244 if (info.isAppenderThread && temp.getRingBuffer().remainingCapacity() == 0) {
245
246 config.loggerConfig.log(getName(), fqcn, marker, level, message, thrown);
247 return;
248 }
249 message.getFormattedMessage();
250 final boolean includeLocation = config.loggerConfig.isIncludeLocation();
251 info.translator.setValues(this, getName(), marker, fqcn, level, message,
252
253 thrown,
254
255
256
257
258
259 ThreadContext.getImmutableContext(),
260
261
262 ThreadContext.getImmutableStack(),
263
264
265
266 THREAD_NAME_STRATEGY.getThreadName(info),
267
268
269
270
271 includeLocation ? location(fqcn) : null,
272
273
274
275
276
277 message instanceof TimestampMessage ? ((TimestampMessage) message).getTimestamp() :
278 clock.currentTimeMillis());
279
280
281 try {
282
283
284
285 disruptor.publishEvent(info.translator);
286 } catch (final NullPointerException npe) {
287 LOGGER.fatal("Ignoring log event after log4j was shut down.");
288 }
289 }
290
291 private static StackTraceElement location(final String fqcnOfLogger) {
292 return Log4jLogEvent.calcLocation(fqcnOfLogger);
293 }
294
295
296
297
298
299
300
301 public void actualAsyncLog(final RingBufferLogEvent event) {
302 final Map<Property, Boolean> properties = config.loggerConfig.getProperties();
303 event.mergePropertiesIntoContextMap(properties, config.config.getStrSubstitutor());
304 config.logEvent(event);
305 }
306
307 public static void stop() {
308 final Disruptor<RingBufferLogEvent> temp = disruptor;
309
310
311
312 disruptor = null;
313 if (temp == null) {
314 return;
315 }
316
317
318
319
320 for (int i = 0; hasBacklog(temp) && i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
321 try {
322 Thread.sleep(SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS);
323 } catch (final InterruptedException e) {
324 }
325 }
326 temp.shutdown();
327 executor.shutdown();
328 threadlocalInfo.remove();
329 }
330
331
332
333
334 private static boolean hasBacklog(final Disruptor<?> disruptor) {
335 final RingBuffer<?> ringBuffer = disruptor.getRingBuffer();
336 return !ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize());
337 }
338
339
340
341
342
343
344
345 public static RingBufferAdmin createRingBufferAdmin(final String contextName) {
346 return RingBufferAdmin.forAsyncLogger(disruptor.getRingBuffer(), contextName);
347 }
348 }