1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.core.async;
18
19 import java.util.concurrent.ExecutorService;
20 import java.util.concurrent.Executors;
21
22 import org.apache.logging.log4j.Logger;
23 import org.apache.logging.log4j.core.LogEvent;
24 import org.apache.logging.log4j.status.StatusLogger;
25
26 import com.lmax.disruptor.BlockingWaitStrategy;
27 import com.lmax.disruptor.EventFactory;
28 import com.lmax.disruptor.EventHandler;
29 import com.lmax.disruptor.EventTranslator;
30 import com.lmax.disruptor.ExceptionHandler;
31 import com.lmax.disruptor.RingBuffer;
32 import com.lmax.disruptor.Sequence;
33 import com.lmax.disruptor.SequenceReportingEventHandler;
34 import com.lmax.disruptor.SleepingWaitStrategy;
35 import com.lmax.disruptor.WaitStrategy;
36 import com.lmax.disruptor.YieldingWaitStrategy;
37 import com.lmax.disruptor.dsl.Disruptor;
38 import com.lmax.disruptor.dsl.ProducerType;
39 import com.lmax.disruptor.util.Util;
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57 class AsyncLoggerConfigHelper {
58
59 private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 20;
60 private static final int HALF_A_SECOND = 500;
61 private static final int RINGBUFFER_MIN_SIZE = 128;
62 private static final int RINGBUFFER_DEFAULT_SIZE = 256 * 1024;
63 private static final Logger LOGGER = StatusLogger.getLogger();
64
65 private static volatile Disruptor<RingBufferLog4jEvent> disruptor;
66 private static ExecutorService executor = Executors
67 .newSingleThreadExecutor();
68
69
70
71
72
73 private static final EventFactory<RingBufferLog4jEvent> FACTORY = new EventFactory<RingBufferLog4jEvent>() {
74 @Override
75 public RingBufferLog4jEvent newInstance() {
76 return new RingBufferLog4jEvent();
77 }
78 };
79
80
81
82
83 private final EventTranslator<RingBufferLog4jEvent> translator = new EventTranslator<RingBufferLog4jEvent>() {
84 @Override
85 public void translateTo(RingBufferLog4jEvent event, long sequence) {
86 event.event = currentLogEvent.get();
87 event.loggerConfig = asyncLoggerConfig;
88 }
89 };
90
91 private ThreadLocal<LogEvent> currentLogEvent = new ThreadLocal<LogEvent>();
92 private AsyncLoggerConfig asyncLoggerConfig;
93
94 public AsyncLoggerConfigHelper(AsyncLoggerConfig asyncLoggerConfig) {
95 this.asyncLoggerConfig = asyncLoggerConfig;
96 initDisruptor();
97 }
98
99 private static synchronized void initDisruptor() {
100 if (disruptor != null) {
101 return;
102 }
103 int ringBufferSize = calculateRingBufferSize();
104 WaitStrategy waitStrategy = createWaitStrategy();
105 disruptor = new Disruptor<RingBufferLog4jEvent>(FACTORY,
106 ringBufferSize, executor, ProducerType.MULTI, waitStrategy);
107 EventHandler<RingBufferLog4jEvent>[] handlers = new RingBufferLog4jEventHandler[] {
108 new RingBufferLog4jEventHandler() };
109 disruptor.handleExceptionsWith(getExceptionHandler());
110 disruptor.handleEventsWith(handlers);
111
112 LOGGER.debug(
113 "Starting AsyncLoggerConfig disruptor with ringbuffer size {}...",
114 disruptor.getRingBuffer().getBufferSize());
115 disruptor.start();
116 }
117
118 private static WaitStrategy createWaitStrategy() {
119 String strategy = System.getProperty("AsyncLoggerConfig.WaitStrategy");
120 LOGGER.debug("property AsyncLoggerConfig.WaitStrategy={}", strategy);
121 if ("Sleep".equals(strategy)) {
122 LOGGER.debug("disruptor event handler uses SleepingWaitStrategy");
123 return new SleepingWaitStrategy();
124 } else if ("Yield".equals(strategy)) {
125 LOGGER.debug("disruptor event handler uses YieldingWaitStrategy");
126 return new YieldingWaitStrategy();
127 } else if ("Block".equals(strategy)) {
128 LOGGER.debug("disruptor event handler uses BlockingWaitStrategy");
129 return new BlockingWaitStrategy();
130 }
131 LOGGER.debug("disruptor event handler uses SleepingWaitStrategy");
132 return new SleepingWaitStrategy();
133 }
134
135 private static int calculateRingBufferSize() {
136 int ringBufferSize = RINGBUFFER_DEFAULT_SIZE;
137 String userPreferredRBSize = System.getProperty(
138 "AsyncLoggerConfig.RingBufferSize",
139 String.valueOf(ringBufferSize));
140 try {
141 int size = Integer.parseInt(userPreferredRBSize);
142 if (size < RINGBUFFER_MIN_SIZE) {
143 size = RINGBUFFER_MIN_SIZE;
144 LOGGER.warn(
145 "Invalid RingBufferSize {}, using minimum size {}.",
146 userPreferredRBSize, RINGBUFFER_MIN_SIZE);
147 }
148 ringBufferSize = size;
149 } catch (Exception ex) {
150 LOGGER.warn("Invalid RingBufferSize {}, using default size {}.",
151 userPreferredRBSize, ringBufferSize);
152 }
153 return Util.ceilingNextPowerOfTwo(ringBufferSize);
154 }
155
156 private static ExceptionHandler getExceptionHandler() {
157 String cls = System.getProperty("AsyncLoggerConfig.ExceptionHandler");
158 if (cls == null) {
159 LOGGER.debug("No AsyncLoggerConfig.ExceptionHandler specified");
160 return null;
161 }
162 try {
163 @SuppressWarnings("unchecked")
164 Class<? extends ExceptionHandler> klass = (Class<? extends ExceptionHandler>) Class
165 .forName(cls);
166 ExceptionHandler result = klass.newInstance();
167 LOGGER.debug("AsyncLoggerConfig.ExceptionHandler=" + result);
168 return result;
169 } catch (Exception ignored) {
170 LOGGER.debug(
171 "AsyncLoggerConfig.ExceptionHandler not set: error creating "
172 + cls + ": ", ignored);
173 return null;
174 }
175 }
176
177
178
179
180
181 private static class RingBufferLog4jEvent {
182 private AsyncLoggerConfig loggerConfig;
183 private LogEvent event;
184 }
185
186
187
188
189 private static class RingBufferLog4jEventHandler implements
190 SequenceReportingEventHandler<RingBufferLog4jEvent> {
191 private static final int NOTIFY_PROGRESS_THRESHOLD = 50;
192 private Sequence sequenceCallback;
193 private int counter;
194
195 @Override
196 public void setSequenceCallback(Sequence sequenceCallback) {
197 this.sequenceCallback = sequenceCallback;
198 }
199
200 @Override
201 public void onEvent(RingBufferLog4jEvent event, long sequence,
202 boolean endOfBatch) throws Exception {
203 event.event.setEndOfBatch(endOfBatch);
204 event.loggerConfig.asyncCallAppenders(event.event);
205
206
207
208
209 if (++counter > NOTIFY_PROGRESS_THRESHOLD) {
210 sequenceCallback.set(sequence);
211 counter = 0;
212 }
213 }
214 }
215
216 public void shutdown() {
217 Disruptor<RingBufferLog4jEvent> temp = disruptor;
218
219
220
221 disruptor = null;
222 temp.shutdown();
223
224
225 RingBuffer<RingBufferLog4jEvent> ringBuffer = temp.getRingBuffer();
226 for (int i = 0; i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
227 if (ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize())) {
228 break;
229 }
230 try {
231
232 Thread.sleep(HALF_A_SECOND);
233 } catch (InterruptedException e) {
234
235 }
236 }
237 executor.shutdown();
238 }
239
240 public void callAppendersFromAnotherThread(LogEvent event) {
241 currentLogEvent.set(event);
242 disruptor.publishEvent(translator);
243 }
244
245 }