1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.core.async;
18
19 import java.util.Map;
20 import java.util.concurrent.ExecutorService;
21 import java.util.concurrent.Executors;
22
23 import org.apache.logging.log4j.Level;
24 import org.apache.logging.log4j.Marker;
25 import org.apache.logging.log4j.ThreadContext;
26 import org.apache.logging.log4j.core.Logger;
27 import org.apache.logging.log4j.core.LoggerContext;
28 import org.apache.logging.log4j.core.config.Property;
29 import org.apache.logging.log4j.core.impl.Log4jLogEvent;
30 import org.apache.logging.log4j.core.jmx.RingBufferAdmin;
31 import org.apache.logging.log4j.core.util.Clock;
32 import org.apache.logging.log4j.core.util.ClockFactory;
33 import org.apache.logging.log4j.core.util.Loader;
34 import org.apache.logging.log4j.message.Message;
35 import org.apache.logging.log4j.message.MessageFactory;
36 import org.apache.logging.log4j.status.StatusLogger;
37
38 import com.lmax.disruptor.BlockingWaitStrategy;
39 import com.lmax.disruptor.ExceptionHandler;
40 import com.lmax.disruptor.RingBuffer;
41 import com.lmax.disruptor.SleepingWaitStrategy;
42 import com.lmax.disruptor.WaitStrategy;
43 import com.lmax.disruptor.YieldingWaitStrategy;
44 import com.lmax.disruptor.dsl.Disruptor;
45 import com.lmax.disruptor.dsl.ProducerType;
46 import com.lmax.disruptor.util.Util;
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76 public class AsyncLogger extends Logger {
77 private static final long serialVersionUID = 1L;
78 private static final int SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS = 50;
79 private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 200;
80 private static final int RINGBUFFER_MIN_SIZE = 128;
81 private static final int RINGBUFFER_DEFAULT_SIZE = 256 * 1024;
82 private static final StatusLogger LOGGER = StatusLogger.getLogger();
83 private static final ThreadNameStrategy THREAD_NAME_STRATEGY = ThreadNameStrategy.create();
84 private static final ThreadLocal<Info> threadlocalInfo = new ThreadLocal<Info>();
85
86 static enum ThreadNameStrategy {
87 CACHED {
88 @Override
89 public String getThreadName(final Info info) {
90 return info.cachedThreadName;
91 }
92 },
93 UNCACHED {
94 @Override
95 public String getThreadName(final Info info) {
96 return Thread.currentThread().getName();
97 }
98 };
99 abstract String getThreadName(Info info);
100
101 static ThreadNameStrategy create() {
102 final String name = System.getProperty("AsyncLogger.ThreadNameStrategy", CACHED.name());
103 try {
104 return ThreadNameStrategy.valueOf(name);
105 } catch (final Exception ex) {
106 LOGGER.debug("Using AsyncLogger.ThreadNameStrategy.CACHED: '{}' not valid: {}", name, ex.toString());
107 return CACHED;
108 }
109 }
110 }
111 private static volatile Disruptor<RingBufferLogEvent> disruptor;
112 private static final Clock clock = ClockFactory.getClock();
113
114 private static final ExecutorService executor = Executors
115 .newSingleThreadExecutor(new DaemonThreadFactory("AsyncLogger-"));
116
117 static {
118 initInfoForExecutorThread();
119 LOGGER.debug("AsyncLogger.ThreadNameStrategy={}", THREAD_NAME_STRATEGY);
120 final int ringBufferSize = calculateRingBufferSize();
121
122 final WaitStrategy waitStrategy = createWaitStrategy();
123 disruptor = new Disruptor<RingBufferLogEvent>(RingBufferLogEvent.FACTORY, ringBufferSize, executor,
124 ProducerType.MULTI, waitStrategy);
125 disruptor.handleExceptionsWith(getExceptionHandler());
126 disruptor.handleEventsWith(new RingBufferLogEventHandler());
127
128 LOGGER.debug("Starting AsyncLogger disruptor with ringbuffer size {}...", disruptor.getRingBuffer()
129 .getBufferSize());
130 disruptor.start();
131 }
132
133 private static int calculateRingBufferSize() {
134 int ringBufferSize = RINGBUFFER_DEFAULT_SIZE;
135 final String userPreferredRBSize = System.getProperty("AsyncLogger.RingBufferSize",
136 String.valueOf(ringBufferSize));
137 try {
138 int size = Integer.parseInt(userPreferredRBSize);
139 if (size < RINGBUFFER_MIN_SIZE) {
140 size = RINGBUFFER_MIN_SIZE;
141 LOGGER.warn("Invalid RingBufferSize {}, using minimum size {}.", userPreferredRBSize,
142 RINGBUFFER_MIN_SIZE);
143 }
144 ringBufferSize = size;
145 } catch (final Exception ex) {
146 LOGGER.warn("Invalid RingBufferSize {}, using default size {}.", userPreferredRBSize, ringBufferSize);
147 }
148 return Util.ceilingNextPowerOfTwo(ringBufferSize);
149 }
150
151
152
153
154
155
156
157
158 private static void initInfoForExecutorThread() {
159 executor.submit(new Runnable(){
160 @Override
161 public void run() {
162 final boolean isAppenderThread = true;
163 final Info info = new Info(new RingBufferLogEventTranslator(),
164 Thread.currentThread().getName(), isAppenderThread);
165 threadlocalInfo.set(info);
166 }
167 });
168 }
169
170 private static WaitStrategy createWaitStrategy() {
171 final String strategy = System.getProperty("AsyncLogger.WaitStrategy");
172 LOGGER.debug("property AsyncLogger.WaitStrategy={}", strategy);
173 if ("Sleep".equals(strategy)) {
174 return new SleepingWaitStrategy();
175 } else if ("Yield".equals(strategy)) {
176 return new YieldingWaitStrategy();
177 } else if ("Block".equals(strategy)) {
178 return new BlockingWaitStrategy();
179 }
180 LOGGER.debug("disruptor event handler uses BlockingWaitStrategy");
181 return new BlockingWaitStrategy();
182 }
183
184 private static ExceptionHandler getExceptionHandler() {
185 final String cls = System.getProperty("AsyncLogger.ExceptionHandler");
186 if (cls == null) {
187 LOGGER.debug("No AsyncLogger.ExceptionHandler specified");
188 return null;
189 }
190 try {
191 final ExceptionHandler result = Loader.newCheckedInstanceOf(cls, ExceptionHandler.class);
192 LOGGER.debug("AsyncLogger.ExceptionHandler={}", result);
193 return result;
194 } catch (final Exception ignored) {
195 LOGGER.debug("AsyncLogger.ExceptionHandler not set: error creating " + cls + ": ", ignored);
196 return null;
197 }
198 }
199
200
201
202
203
204
205
206
207
208 public AsyncLogger(final LoggerContext context, final String name, final MessageFactory messageFactory) {
209 super(context, name, messageFactory);
210 }
211
212
213
214
215 static class Info {
216 private final RingBufferLogEventTranslator translator;
217 private final String cachedThreadName;
218 private final boolean isAppenderThread;
219 public Info(final RingBufferLogEventTranslator translator, final String threadName, final boolean appenderThread) {
220 this.translator = translator;
221 this.cachedThreadName = threadName;
222 this.isAppenderThread = appenderThread;
223 }
224 }
225
226 @Override
227 public void logMessage(final String fqcn, final Level level, final Marker marker, final Message message, final Throwable thrown) {
228 Info info = threadlocalInfo.get();
229 if (info == null) {
230 info = new Info(new RingBufferLogEventTranslator(), Thread.currentThread().getName(), false);
231 threadlocalInfo.set(info);
232 }
233
234 final Disruptor<RingBufferLogEvent> temp = disruptor;
235 if (temp == null) {
236 LOGGER.fatal("Ignoring log event after log4j was shut down");
237 return;
238 }
239
240
241
242 if (info.isAppenderThread && temp.getRingBuffer().remainingCapacity() == 0) {
243
244 config.loggerConfig.log(getName(), fqcn, marker, level, message, thrown);
245 return;
246 }
247 final boolean includeLocation = config.loggerConfig.isIncludeLocation();
248 info.translator.setValues(this, getName(), marker, fqcn, level, message,
249
250 thrown,
251
252
253
254
255
256 ThreadContext.getImmutableContext(),
257
258
259 ThreadContext.getImmutableStack(),
260
261
262
263 THREAD_NAME_STRATEGY.getThreadName(info),
264
265
266
267
268 includeLocation ? location(fqcn) : null,
269
270
271
272
273 clock.currentTimeMillis());
274
275
276 try {
277
278
279
280 disruptor.publishEvent(info.translator);
281 } catch (final NullPointerException npe) {
282 LOGGER.fatal("Ignoring log event after log4j was shut down.");
283 }
284 }
285
286 private static StackTraceElement location(final String fqcnOfLogger) {
287 return Log4jLogEvent.calcLocation(fqcnOfLogger);
288 }
289
290
291
292
293
294
295
296 public void actualAsyncLog(final RingBufferLogEvent event) {
297 final Map<Property, Boolean> properties = config.loggerConfig.getProperties();
298 event.mergePropertiesIntoContextMap(properties, config.config.getStrSubstitutor());
299 config.logEvent(event);
300 }
301
302 public static void stop() {
303 final Disruptor<RingBufferLogEvent> temp = disruptor;
304
305
306
307 disruptor = null;
308 if (temp == null) {
309 return;
310 }
311
312
313
314
315 for (int i = 0; hasBacklog(temp) && i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
316 try {
317 Thread.sleep(SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS);
318 } catch (final InterruptedException e) {
319 }
320 }
321 temp.shutdown();
322 executor.shutdown();
323 threadlocalInfo.remove();
324 }
325
326
327
328
329 private static boolean hasBacklog(final Disruptor<?> disruptor) {
330 final RingBuffer<?> ringBuffer = disruptor.getRingBuffer();
331 return !ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize());
332 }
333
334
335
336
337
338
339
340 public static RingBufferAdmin createRingBufferAdmin(final String contextName) {
341 return RingBufferAdmin.forAsyncLogger(disruptor.getRingBuffer(), contextName);
342 }
343 }