1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.core.async;
18
19 import java.util.Map;
20 import java.util.concurrent.ExecutorService;
21 import java.util.concurrent.Executors;
22
23 import org.apache.logging.log4j.Level;
24 import org.apache.logging.log4j.Marker;
25 import org.apache.logging.log4j.ThreadContext;
26 import org.apache.logging.log4j.core.Logger;
27 import org.apache.logging.log4j.core.LoggerContext;
28 import org.apache.logging.log4j.core.config.Property;
29 import org.apache.logging.log4j.core.impl.Log4jLogEvent;
30 import org.apache.logging.log4j.message.Message;
31 import org.apache.logging.log4j.message.MessageFactory;
32 import org.apache.logging.log4j.status.StatusLogger;
33
34 import com.lmax.disruptor.BlockingWaitStrategy;
35 import com.lmax.disruptor.EventHandler;
36 import com.lmax.disruptor.ExceptionHandler;
37 import com.lmax.disruptor.RingBuffer;
38 import com.lmax.disruptor.SleepingWaitStrategy;
39 import com.lmax.disruptor.WaitStrategy;
40 import com.lmax.disruptor.YieldingWaitStrategy;
41 import com.lmax.disruptor.dsl.Disruptor;
42 import com.lmax.disruptor.dsl.ProducerType;
43 import com.lmax.disruptor.util.Util;
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72 public class AsyncLogger extends Logger {
73 private static final int HALF_A_SECOND = 500;
74 private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 20;
75 private static final int RINGBUFFER_MIN_SIZE = 128;
76 private static final int RINGBUFFER_DEFAULT_SIZE = 256 * 1024;
77 private static final StatusLogger LOGGER = StatusLogger.getLogger();
78
79 private static volatile Disruptor<RingBufferLogEvent> disruptor;
80 private static Clock clock = ClockFactory.getClock();
81
82 private static ExecutorService executor = Executors
83 .newSingleThreadExecutor();
84 private ThreadLocal<Info> threadlocalInfo = new ThreadLocal<Info>();
85
86 static {
87 int ringBufferSize = calculateRingBufferSize();
88
89 WaitStrategy waitStrategy = createWaitStrategy();
90 disruptor = new Disruptor<RingBufferLogEvent>(
91 RingBufferLogEvent.FACTORY, ringBufferSize, executor,
92 ProducerType.MULTI, waitStrategy);
93 EventHandler<RingBufferLogEvent>[] handlers = new RingBufferLogEventHandler[] {
94 new RingBufferLogEventHandler() };
95 disruptor.handleExceptionsWith(getExceptionHandler());
96 disruptor.handleEventsWith(handlers);
97
98 LOGGER.debug(
99 "Starting AsyncLogger disruptor with ringbuffer size {}...",
100 disruptor.getRingBuffer().getBufferSize());
101 disruptor.start();
102 }
103
104 private static int calculateRingBufferSize() {
105 int ringBufferSize = RINGBUFFER_DEFAULT_SIZE;
106 String userPreferredRBSize = System.getProperty(
107 "AsyncLogger.RingBufferSize", String.valueOf(ringBufferSize));
108 try {
109 int size = Integer.parseInt(userPreferredRBSize);
110 if (size < RINGBUFFER_MIN_SIZE) {
111 size = RINGBUFFER_MIN_SIZE;
112 LOGGER.warn(
113 "Invalid RingBufferSize {}, using minimum size {}.",
114 userPreferredRBSize, RINGBUFFER_MIN_SIZE);
115 }
116 ringBufferSize = size;
117 } catch (Exception ex) {
118 LOGGER.warn("Invalid RingBufferSize {}, using default size {}.",
119 userPreferredRBSize, ringBufferSize);
120 }
121 return Util.ceilingNextPowerOfTwo(ringBufferSize);
122 }
123
124 private static WaitStrategy createWaitStrategy() {
125 String strategy = System.getProperty("AsyncLogger.WaitStrategy");
126 LOGGER.debug("property AsyncLogger.WaitStrategy={}", strategy);
127 if ("Sleep".equals(strategy)) {
128 LOGGER.debug("disruptor event handler uses SleepingWaitStrategy");
129 return new SleepingWaitStrategy();
130 } else if ("Yield".equals(strategy)) {
131 LOGGER.debug("disruptor event handler uses YieldingWaitStrategy");
132 return new YieldingWaitStrategy();
133 } else if ("Block".equals(strategy)) {
134 LOGGER.debug("disruptor event handler uses BlockingWaitStrategy");
135 return new BlockingWaitStrategy();
136 }
137 LOGGER.debug("disruptor event handler uses SleepingWaitStrategy");
138 return new SleepingWaitStrategy();
139 }
140
141 private static ExceptionHandler getExceptionHandler() {
142 String cls = System.getProperty("AsyncLogger.ExceptionHandler");
143 if (cls == null) {
144 LOGGER.debug("No AsyncLogger.ExceptionHandler specified");
145 return null;
146 }
147 try {
148 @SuppressWarnings("unchecked")
149 Class<? extends ExceptionHandler> klass = (Class<? extends ExceptionHandler>) Class
150 .forName(cls);
151 ExceptionHandler result = klass.newInstance();
152 LOGGER.debug("AsyncLogger.ExceptionHandler=" + result);
153 return result;
154 } catch (Exception ignored) {
155 LOGGER.debug(
156 "AsyncLogger.ExceptionHandler not set: error creating "
157 + cls + ": ", ignored);
158 return null;
159 }
160 }
161
162
163
164
165
166
167
168
169
170 public AsyncLogger(LoggerContext context, String name,
171 MessageFactory messageFactory) {
172 super(context, name, messageFactory);
173 }
174
175
176
177
178 private static class Info {
179 private RingBufferLogEventTranslator translator;
180 private String cachedThreadName;
181 }
182
183 @Override
184 public void log(Marker marker, String fqcn, Level level, Message data,
185 Throwable t) {
186 Info info = threadlocalInfo.get();
187 if (info == null) {
188 info = new Info();
189 info.translator = new RingBufferLogEventTranslator();
190 info.cachedThreadName = Thread.currentThread().getName();
191 threadlocalInfo.set(info);
192 }
193
194 Boolean includeLocation = config.loggerConfig.isIncludeLocation();
195 info.translator.setValues(this, getName(), marker, fqcn, level, data,
196 t,
197
198
199
200
201
202 ThreadContext.getImmutableContext(),
203
204
205 ThreadContext.getImmutableStack(),
206
207
208 info.cachedThreadName,
209
210
211
212
213 includeLocation != null && includeLocation ? location(fqcn)
214 : null,
215
216
217
218
219 clock.currentTimeMillis());
220
221 disruptor.publishEvent(info.translator);
222 }
223
224 private StackTraceElement location(String fqcnOfLogger) {
225 return Log4jLogEvent.calcLocation(fqcnOfLogger);
226 }
227
228
229
230
231
232
233
234 public void actualAsyncLog(RingBufferLogEvent event) {
235 Map<Property, Boolean> properties = config.loggerConfig.getProperties();
236 event.mergePropertiesIntoContextMap(properties,
237 config.config.getSubst());
238 config.logEvent(event);
239 }
240
241 public static void stop() {
242 Disruptor<RingBufferLogEvent> temp = disruptor;
243
244
245
246 disruptor = null;
247 temp.shutdown();
248
249
250 RingBuffer<RingBufferLogEvent> ringBuffer = temp.getRingBuffer();
251 for (int i = 0; i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
252 if (ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize())) {
253 break;
254 }
255 try {
256
257 Thread.sleep(HALF_A_SECOND);
258 } catch (InterruptedException e) {
259
260 }
261 }
262 executor.shutdown();
263 }
264 }