1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.core.async;
18
19 import java.util.concurrent.ExecutorService;
20 import java.util.concurrent.Executors;
21
22 import org.apache.logging.log4j.Logger;
23 import org.apache.logging.log4j.core.LogEvent;
24 import org.apache.logging.log4j.status.StatusLogger;
25
26 import com.lmax.disruptor.BlockingWaitStrategy;
27 import com.lmax.disruptor.EventFactory;
28 import com.lmax.disruptor.EventHandler;
29 import com.lmax.disruptor.EventTranslator;
30 import com.lmax.disruptor.ExceptionHandler;
31 import com.lmax.disruptor.RingBuffer;
32 import com.lmax.disruptor.Sequence;
33 import com.lmax.disruptor.SequenceReportingEventHandler;
34 import com.lmax.disruptor.SleepingWaitStrategy;
35 import com.lmax.disruptor.WaitStrategy;
36 import com.lmax.disruptor.YieldingWaitStrategy;
37 import com.lmax.disruptor.dsl.Disruptor;
38 import com.lmax.disruptor.dsl.ProducerType;
39 import com.lmax.disruptor.util.Util;
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57 class AsyncLoggerConfigHelper {
58
59 private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 20;
60 private static final int HALF_A_SECOND = 500;
61 private static final int RINGBUFFER_MIN_SIZE = 128;
62 private static final int RINGBUFFER_DEFAULT_SIZE = 256 * 1024;
63 private static final Logger LOGGER = StatusLogger.getLogger();
64
65 private static volatile Disruptor<Log4jEventWrapper> disruptor;
66 private static ExecutorService executor = Executors.newSingleThreadExecutor();
67
68 private static volatile int count = 0;
69
70
71
72
73
74 private static final EventFactory<Log4jEventWrapper> FACTORY = new EventFactory<Log4jEventWrapper>() {
75 @Override
76 public Log4jEventWrapper newInstance() {
77 return new Log4jEventWrapper();
78 }
79 };
80
81
82
83
84 private final EventTranslator<Log4jEventWrapper> translator = new EventTranslator<Log4jEventWrapper>() {
85 @Override
86 public void translateTo(Log4jEventWrapper event, long sequence) {
87 event.event = currentLogEvent.get();
88 event.loggerConfig = asyncLoggerConfig;
89 }
90 };
91
92 private ThreadLocal<LogEvent> currentLogEvent = new ThreadLocal<LogEvent>();
93 private AsyncLoggerConfig asyncLoggerConfig;
94
95 public AsyncLoggerConfigHelper(AsyncLoggerConfig asyncLoggerConfig) {
96 this.asyncLoggerConfig = asyncLoggerConfig;
97 initDisruptor();
98 }
99
100 private static synchronized void initDisruptor() {
101 ++count;
102 if (disruptor != null) {
103 return;
104 }
105 int ringBufferSize = calculateRingBufferSize();
106 WaitStrategy waitStrategy = createWaitStrategy();
107 disruptor = new Disruptor<Log4jEventWrapper>(FACTORY,
108 ringBufferSize, executor, ProducerType.MULTI, waitStrategy);
109 EventHandler<Log4jEventWrapper>[] handlers = new Log4jEventWrapperHandler[] {
110 new Log4jEventWrapperHandler() };
111 disruptor.handleExceptionsWith(getExceptionHandler());
112 disruptor.handleEventsWith(handlers);
113
114 LOGGER.debug(
115 "Starting AsyncLoggerConfig disruptor with ringbuffer size {}...",
116 disruptor.getRingBuffer().getBufferSize());
117 disruptor.start();
118 }
119
120 private static WaitStrategy createWaitStrategy() {
121 String strategy = System.getProperty("AsyncLoggerConfig.WaitStrategy");
122 LOGGER.debug("property AsyncLoggerConfig.WaitStrategy={}", strategy);
123 if ("Sleep".equals(strategy)) {
124 LOGGER.debug("disruptor event handler uses SleepingWaitStrategy");
125 return new SleepingWaitStrategy();
126 } else if ("Yield".equals(strategy)) {
127 LOGGER.debug("disruptor event handler uses YieldingWaitStrategy");
128 return new YieldingWaitStrategy();
129 } else if ("Block".equals(strategy)) {
130 LOGGER.debug("disruptor event handler uses BlockingWaitStrategy");
131 return new BlockingWaitStrategy();
132 }
133 LOGGER.debug("disruptor event handler uses SleepingWaitStrategy");
134 return new SleepingWaitStrategy();
135 }
136
137 private static int calculateRingBufferSize() {
138 int ringBufferSize = RINGBUFFER_DEFAULT_SIZE;
139 String userPreferredRBSize = System.getProperty(
140 "AsyncLoggerConfig.RingBufferSize",
141 String.valueOf(ringBufferSize));
142 try {
143 int size = Integer.parseInt(userPreferredRBSize);
144 if (size < RINGBUFFER_MIN_SIZE) {
145 size = RINGBUFFER_MIN_SIZE;
146 LOGGER.warn(
147 "Invalid RingBufferSize {}, using minimum size {}.",
148 userPreferredRBSize, RINGBUFFER_MIN_SIZE);
149 }
150 ringBufferSize = size;
151 } catch (Exception ex) {
152 LOGGER.warn("Invalid RingBufferSize {}, using default size {}.",
153 userPreferredRBSize, ringBufferSize);
154 }
155 return Util.ceilingNextPowerOfTwo(ringBufferSize);
156 }
157
158 private static ExceptionHandler getExceptionHandler() {
159 String cls = System.getProperty("AsyncLoggerConfig.ExceptionHandler");
160 if (cls == null) {
161 LOGGER.debug("No AsyncLoggerConfig.ExceptionHandler specified");
162 return null;
163 }
164 try {
165 @SuppressWarnings("unchecked")
166 Class<? extends ExceptionHandler> klass = (Class<? extends ExceptionHandler>) Class
167 .forName(cls);
168 ExceptionHandler result = klass.newInstance();
169 LOGGER.debug("AsyncLoggerConfig.ExceptionHandler=" + result);
170 return result;
171 } catch (Exception ignored) {
172 LOGGER.debug(
173 "AsyncLoggerConfig.ExceptionHandler not set: error creating "
174 + cls + ": ", ignored);
175 return null;
176 }
177 }
178
179
180
181
182
183 private static class Log4jEventWrapper {
184 private AsyncLoggerConfig loggerConfig;
185 private LogEvent event;
186 }
187
188
189
190
191 private static class Log4jEventWrapperHandler implements
192 SequenceReportingEventHandler<Log4jEventWrapper> {
193 private static final int NOTIFY_PROGRESS_THRESHOLD = 50;
194 private Sequence sequenceCallback;
195 private int counter;
196
197 @Override
198 public void setSequenceCallback(Sequence sequenceCallback) {
199 this.sequenceCallback = sequenceCallback;
200 }
201
202 @Override
203 public void onEvent(Log4jEventWrapper event, long sequence,
204 boolean endOfBatch) throws Exception {
205 event.event.setEndOfBatch(endOfBatch);
206 event.loggerConfig.asyncCallAppenders(event.event);
207
208
209
210
211 if (++counter > NOTIFY_PROGRESS_THRESHOLD) {
212 sequenceCallback.set(sequence);
213 counter = 0;
214 }
215 }
216 }
217
218 public synchronized void shutdown() {
219 if (--count > 0) {
220 return;
221 }
222 Disruptor<Log4jEventWrapper> temp = disruptor;
223 if (temp == null) {
224 return;
225 }
226
227
228
229 disruptor = null;
230 temp.shutdown();
231
232
233 RingBuffer<Log4jEventWrapper> ringBuffer = temp.getRingBuffer();
234 for (int i = 0; i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
235 if (ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize())) {
236 break;
237 }
238 try {
239
240 Thread.sleep(HALF_A_SECOND);
241 } catch (InterruptedException e) {
242
243 }
244 }
245 executor.shutdown();
246 }
247
248 public void callAppendersFromAnotherThread(LogEvent event) {
249 currentLogEvent.set(event);
250 disruptor.publishEvent(translator);
251 }
252
253 }