1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.core.async;
18
19 import java.util.Map;
20 import java.util.concurrent.ExecutorService;
21 import java.util.concurrent.Executors;
22
23 import org.apache.logging.log4j.Level;
24 import org.apache.logging.log4j.Marker;
25 import org.apache.logging.log4j.ThreadContext;
26 import org.apache.logging.log4j.core.Logger;
27 import org.apache.logging.log4j.core.LoggerContext;
28 import org.apache.logging.log4j.core.config.Property;
29 import org.apache.logging.log4j.core.helpers.Clock;
30 import org.apache.logging.log4j.core.helpers.ClockFactory;
31 import org.apache.logging.log4j.core.impl.Log4jLogEvent;
32 import org.apache.logging.log4j.core.jmx.RingBufferAdmin;
33 import org.apache.logging.log4j.message.Message;
34 import org.apache.logging.log4j.message.MessageFactory;
35 import org.apache.logging.log4j.status.StatusLogger;
36
37 import com.lmax.disruptor.BlockingWaitStrategy;
38 import com.lmax.disruptor.EventHandler;
39 import com.lmax.disruptor.ExceptionHandler;
40 import com.lmax.disruptor.RingBuffer;
41 import com.lmax.disruptor.SleepingWaitStrategy;
42 import com.lmax.disruptor.WaitStrategy;
43 import com.lmax.disruptor.YieldingWaitStrategy;
44 import com.lmax.disruptor.dsl.Disruptor;
45 import com.lmax.disruptor.dsl.ProducerType;
46 import com.lmax.disruptor.util.Util;
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76 public class AsyncLogger extends Logger {
77 private static final long serialVersionUID = 1L;
78 private static final int HALF_A_SECOND = 500;
79 private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 20;
80 private static final int RINGBUFFER_MIN_SIZE = 128;
81 private static final int RINGBUFFER_DEFAULT_SIZE = 256 * 1024;
82 private static final StatusLogger LOGGER = StatusLogger.getLogger();
83 private static final ThreadNameStrategy THREAD_NAME_STRATEGY = ThreadNameStrategy.create();
84 private static final ThreadLocal<Info> threadlocalInfo = new ThreadLocal<Info>();
85
86 static enum ThreadNameStrategy {
87 CACHED {
88 @Override
89 public String getThreadName(Info info) {
90 return info.cachedThreadName;
91 }
92 },
93 UNCACHED {
94 @Override
95 public String getThreadName(Info info) {
96 return Thread.currentThread().getName();
97 }
98 };
99 abstract String getThreadName(Info info);
100
101 static ThreadNameStrategy create() {
102 String name = System.getProperty("AsyncLogger.ThreadNameStrategy", CACHED.name());
103 try {
104 return ThreadNameStrategy.valueOf(name);
105 } catch (Exception ex) {
106 return CACHED;
107 }
108 }
109 }
110 private static volatile Disruptor<RingBufferLogEvent> disruptor;
111 private static Clock clock = ClockFactory.getClock();
112
113 private static ExecutorService executor = Executors
114 .newSingleThreadExecutor(new DaemonThreadFactory("AsyncLogger-"));
115
116 static {
117 initInfoForExecutorThread();
118 LOGGER.debug("AsyncLogger.ThreadNameStrategy={}", THREAD_NAME_STRATEGY);
119 final int ringBufferSize = calculateRingBufferSize();
120
121 final WaitStrategy waitStrategy = createWaitStrategy();
122 disruptor = new Disruptor<RingBufferLogEvent>(RingBufferLogEvent.FACTORY, ringBufferSize, executor,
123 ProducerType.MULTI, waitStrategy);
124 final EventHandler<RingBufferLogEvent>[] handlers = new RingBufferLogEventHandler[] {
125 new RingBufferLogEventHandler() };
126 disruptor.handleExceptionsWith(getExceptionHandler());
127 disruptor.handleEventsWith(handlers);
128
129 LOGGER.debug("Starting AsyncLogger disruptor with ringbuffer size {}...", disruptor.getRingBuffer()
130 .getBufferSize());
131 disruptor.start();
132 }
133
134 private static int calculateRingBufferSize() {
135 int ringBufferSize = RINGBUFFER_DEFAULT_SIZE;
136 final String userPreferredRBSize = System.getProperty("AsyncLogger.RingBufferSize",
137 String.valueOf(ringBufferSize));
138 try {
139 int size = Integer.parseInt(userPreferredRBSize);
140 if (size < RINGBUFFER_MIN_SIZE) {
141 size = RINGBUFFER_MIN_SIZE;
142 LOGGER.warn("Invalid RingBufferSize {}, using minimum size {}.", userPreferredRBSize,
143 RINGBUFFER_MIN_SIZE);
144 }
145 ringBufferSize = size;
146 } catch (final Exception ex) {
147 LOGGER.warn("Invalid RingBufferSize {}, using default size {}.", userPreferredRBSize, ringBufferSize);
148 }
149 return Util.ceilingNextPowerOfTwo(ringBufferSize);
150 }
151
152
153
154
155
156
157
158
159 private static void initInfoForExecutorThread() {
160 executor.submit(new Runnable(){
161 @Override
162 public void run() {
163 final boolean isAppenderThread = true;
164 final Info info = new Info(new RingBufferLogEventTranslator(),
165 Thread.currentThread().getName(), isAppenderThread);
166 threadlocalInfo.set(info);
167 }
168 });
169 }
170
171 private static WaitStrategy createWaitStrategy() {
172 final String strategy = System.getProperty("AsyncLogger.WaitStrategy");
173 LOGGER.debug("property AsyncLogger.WaitStrategy={}", strategy);
174 if ("Sleep".equals(strategy)) {
175 LOGGER.debug("disruptor event handler uses SleepingWaitStrategy");
176 return new SleepingWaitStrategy();
177 } else if ("Yield".equals(strategy)) {
178 LOGGER.debug("disruptor event handler uses YieldingWaitStrategy");
179 return new YieldingWaitStrategy();
180 } else if ("Block".equals(strategy)) {
181 LOGGER.debug("disruptor event handler uses BlockingWaitStrategy");
182 return new BlockingWaitStrategy();
183 }
184 LOGGER.debug("disruptor event handler uses SleepingWaitStrategy");
185 return new SleepingWaitStrategy();
186 }
187
188 private static ExceptionHandler getExceptionHandler() {
189 final String cls = System.getProperty("AsyncLogger.ExceptionHandler");
190 if (cls == null) {
191 LOGGER.debug("No AsyncLogger.ExceptionHandler specified");
192 return null;
193 }
194 try {
195 @SuppressWarnings("unchecked")
196 final Class<? extends ExceptionHandler> klass = (Class<? extends ExceptionHandler>) Class.forName(cls);
197 final ExceptionHandler result = klass.newInstance();
198 LOGGER.debug("AsyncLogger.ExceptionHandler=" + result);
199 return result;
200 } catch (final Exception ignored) {
201 LOGGER.debug("AsyncLogger.ExceptionHandler not set: error creating " + cls + ": ", ignored);
202 return null;
203 }
204 }
205
206
207
208
209
210
211
212
213
214 public AsyncLogger(final LoggerContext context, final String name, final MessageFactory messageFactory) {
215 super(context, name, messageFactory);
216 }
217
218
219
220
221 static class Info {
222 private final RingBufferLogEventTranslator translator;
223 private final String cachedThreadName;
224 private final boolean isAppenderThread;
225 public Info(RingBufferLogEventTranslator translator, String threadName, boolean appenderThread) {
226 this.translator = translator;
227 this.cachedThreadName = threadName;
228 this.isAppenderThread = appenderThread;
229 }
230 }
231
232 @Override
233 public void log(final Marker marker, final String fqcn, final Level level, final Message data, final Throwable t) {
234 Info info = threadlocalInfo.get();
235 if (info == null) {
236 info = new Info(new RingBufferLogEventTranslator(), Thread.currentThread().getName(), false);
237 threadlocalInfo.set(info);
238 }
239
240
241
242 if (info.isAppenderThread && disruptor.getRingBuffer().remainingCapacity() == 0) {
243
244 config.loggerConfig.log(getName(), marker, fqcn, level, data, t);
245 return;
246 }
247 final boolean includeLocation = config.loggerConfig.isIncludeLocation();
248 info.translator.setValues(this, getName(), marker, fqcn, level, data, t,
249
250
251
252
253
254 ThreadContext.getImmutableContext(),
255
256
257 ThreadContext.getImmutableStack(),
258
259
260
261 THREAD_NAME_STRATEGY.getThreadName(info),
262
263
264
265
266 includeLocation ? location(fqcn) : null,
267
268
269
270
271 clock.currentTimeMillis());
272
273 disruptor.publishEvent(info.translator);
274 }
275
276 private StackTraceElement location(final String fqcnOfLogger) {
277 return Log4jLogEvent.calcLocation(fqcnOfLogger);
278 }
279
280
281
282
283
284
285
286 public void actualAsyncLog(final RingBufferLogEvent event) {
287 final Map<Property, Boolean> properties = config.loggerConfig.getProperties();
288 event.mergePropertiesIntoContextMap(properties, config.config.getStrSubstitutor());
289 config.logEvent(event);
290 }
291
292 public static void stop() {
293 final Disruptor<RingBufferLogEvent> temp = disruptor;
294
295
296
297 disruptor = null;
298 if (temp == null) {
299 return;
300 }
301 temp.shutdown();
302
303
304 final RingBuffer<RingBufferLogEvent> ringBuffer = temp.getRingBuffer();
305 for (int i = 0; i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
306 if (ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize())) {
307 break;
308 }
309 try {
310
311 Thread.sleep(HALF_A_SECOND);
312 } catch (final InterruptedException e) {
313
314 }
315 }
316 executor.shutdown();
317 threadlocalInfo.remove();
318 }
319
320
321
322
323
324
325
326 public static RingBufferAdmin createRingBufferAdmin(String contextName) {
327 return RingBufferAdmin.forAsyncLogger(disruptor.getRingBuffer(), contextName);
328 }
329 }