1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.core.async;
18
19 import java.util.Map;
20 import java.util.concurrent.ExecutorService;
21 import java.util.concurrent.Executors;
22
23 import org.apache.logging.log4j.Level;
24 import org.apache.logging.log4j.Marker;
25 import org.apache.logging.log4j.ThreadContext;
26 import org.apache.logging.log4j.core.Logger;
27 import org.apache.logging.log4j.core.LoggerContext;
28 import org.apache.logging.log4j.core.config.Property;
29 import org.apache.logging.log4j.core.helpers.Clock;
30 import org.apache.logging.log4j.core.helpers.ClockFactory;
31 import org.apache.logging.log4j.core.impl.Log4jLogEvent;
32 import org.apache.logging.log4j.message.Message;
33 import org.apache.logging.log4j.message.MessageFactory;
34 import org.apache.logging.log4j.status.StatusLogger;
35
36 import com.lmax.disruptor.BlockingWaitStrategy;
37 import com.lmax.disruptor.EventHandler;
38 import com.lmax.disruptor.ExceptionHandler;
39 import com.lmax.disruptor.RingBuffer;
40 import com.lmax.disruptor.SleepingWaitStrategy;
41 import com.lmax.disruptor.WaitStrategy;
42 import com.lmax.disruptor.YieldingWaitStrategy;
43 import com.lmax.disruptor.dsl.Disruptor;
44 import com.lmax.disruptor.dsl.ProducerType;
45 import com.lmax.disruptor.util.Util;
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74 public class AsyncLogger extends Logger {
75 private static final int HALF_A_SECOND = 500;
76 private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 20;
77 private static final int RINGBUFFER_MIN_SIZE = 128;
78 private static final int RINGBUFFER_DEFAULT_SIZE = 256 * 1024;
79 private static final StatusLogger LOGGER = StatusLogger.getLogger();
80
81 private static volatile Disruptor<RingBufferLogEvent> disruptor;
82 private static Clock clock = ClockFactory.getClock();
83
84 private static ExecutorService executor = Executors
85 .newSingleThreadExecutor();
86 private ThreadLocal<Info> threadlocalInfo = new ThreadLocal<Info>();
87
88 static {
89 int ringBufferSize = calculateRingBufferSize();
90
91 WaitStrategy waitStrategy = createWaitStrategy();
92 disruptor = new Disruptor<RingBufferLogEvent>(
93 RingBufferLogEvent.FACTORY, ringBufferSize, executor,
94 ProducerType.MULTI, waitStrategy);
95 EventHandler<RingBufferLogEvent>[] handlers = new RingBufferLogEventHandler[] {
96 new RingBufferLogEventHandler() };
97 disruptor.handleExceptionsWith(getExceptionHandler());
98 disruptor.handleEventsWith(handlers);
99
100 LOGGER.debug(
101 "Starting AsyncLogger disruptor with ringbuffer size {}...",
102 disruptor.getRingBuffer().getBufferSize());
103 disruptor.start();
104 }
105
106 private static int calculateRingBufferSize() {
107 int ringBufferSize = RINGBUFFER_DEFAULT_SIZE;
108 String userPreferredRBSize = System.getProperty(
109 "AsyncLogger.RingBufferSize", String.valueOf(ringBufferSize));
110 try {
111 int size = Integer.parseInt(userPreferredRBSize);
112 if (size < RINGBUFFER_MIN_SIZE) {
113 size = RINGBUFFER_MIN_SIZE;
114 LOGGER.warn(
115 "Invalid RingBufferSize {}, using minimum size {}.",
116 userPreferredRBSize, RINGBUFFER_MIN_SIZE);
117 }
118 ringBufferSize = size;
119 } catch (Exception ex) {
120 LOGGER.warn("Invalid RingBufferSize {}, using default size {}.",
121 userPreferredRBSize, ringBufferSize);
122 }
123 return Util.ceilingNextPowerOfTwo(ringBufferSize);
124 }
125
126 private static WaitStrategy createWaitStrategy() {
127 String strategy = System.getProperty("AsyncLogger.WaitStrategy");
128 LOGGER.debug("property AsyncLogger.WaitStrategy={}", strategy);
129 if ("Sleep".equals(strategy)) {
130 LOGGER.debug("disruptor event handler uses SleepingWaitStrategy");
131 return new SleepingWaitStrategy();
132 } else if ("Yield".equals(strategy)) {
133 LOGGER.debug("disruptor event handler uses YieldingWaitStrategy");
134 return new YieldingWaitStrategy();
135 } else if ("Block".equals(strategy)) {
136 LOGGER.debug("disruptor event handler uses BlockingWaitStrategy");
137 return new BlockingWaitStrategy();
138 }
139 LOGGER.debug("disruptor event handler uses SleepingWaitStrategy");
140 return new SleepingWaitStrategy();
141 }
142
143 private static ExceptionHandler getExceptionHandler() {
144 String cls = System.getProperty("AsyncLogger.ExceptionHandler");
145 if (cls == null) {
146 LOGGER.debug("No AsyncLogger.ExceptionHandler specified");
147 return null;
148 }
149 try {
150 @SuppressWarnings("unchecked")
151 Class<? extends ExceptionHandler> klass = (Class<? extends ExceptionHandler>) Class
152 .forName(cls);
153 ExceptionHandler result = klass.newInstance();
154 LOGGER.debug("AsyncLogger.ExceptionHandler=" + result);
155 return result;
156 } catch (Exception ignored) {
157 LOGGER.debug(
158 "AsyncLogger.ExceptionHandler not set: error creating "
159 + cls + ": ", ignored);
160 return null;
161 }
162 }
163
164
165
166
167
168
169
170
171
172 public AsyncLogger(LoggerContext context, String name,
173 MessageFactory messageFactory) {
174 super(context, name, messageFactory);
175 }
176
177
178
179
180 private static class Info {
181 private RingBufferLogEventTranslator translator;
182 private String cachedThreadName;
183 }
184
185 @Override
186 public void log(Marker marker, String fqcn, Level level, Message data,
187 Throwable t) {
188 Info info = threadlocalInfo.get();
189 if (info == null) {
190 info = new Info();
191 info.translator = new RingBufferLogEventTranslator();
192 info.cachedThreadName = Thread.currentThread().getName();
193 threadlocalInfo.set(info);
194 }
195
196 boolean includeLocation = config.loggerConfig.isIncludeLocation();
197 info.translator.setValues(this, getName(), marker, fqcn, level, data,
198 t,
199
200
201
202
203
204 ThreadContext.getImmutableContext(),
205
206
207 ThreadContext.getImmutableStack(),
208
209
210 info.cachedThreadName,
211
212
213
214
215 includeLocation ? location(fqcn) : null,
216
217
218
219
220 clock.currentTimeMillis());
221
222 disruptor.publishEvent(info.translator);
223 }
224
225 private StackTraceElement location(String fqcnOfLogger) {
226 return Log4jLogEvent.calcLocation(fqcnOfLogger);
227 }
228
229
230
231
232
233
234
235 public void actualAsyncLog(RingBufferLogEvent event) {
236 Map<Property, Boolean> properties = config.loggerConfig.getProperties();
237 event.mergePropertiesIntoContextMap(properties,
238 config.config.getSubst());
239 config.logEvent(event);
240 }
241
242 public static void stop() {
243 Disruptor<RingBufferLogEvent> temp = disruptor;
244
245
246
247 disruptor = null;
248 temp.shutdown();
249
250
251 RingBuffer<RingBufferLogEvent> ringBuffer = temp.getRingBuffer();
252 for (int i = 0; i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
253 if (ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize())) {
254 break;
255 }
256 try {
257
258 Thread.sleep(HALF_A_SECOND);
259 } catch (InterruptedException e) {
260
261 }
262 }
263 executor.shutdown();
264 }
265 }