1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.logging.log4j.core.async;
19
20 import java.util.concurrent.ExecutorService;
21 import java.util.concurrent.Executors;
22
23 import org.apache.logging.log4j.Level;
24 import org.apache.logging.log4j.core.jmx.RingBufferAdmin;
25 import org.apache.logging.log4j.status.StatusLogger;
26
27 import com.lmax.disruptor.ExceptionHandler;
28 import com.lmax.disruptor.RingBuffer;
29 import com.lmax.disruptor.WaitStrategy;
30 import com.lmax.disruptor.dsl.Disruptor;
31 import com.lmax.disruptor.dsl.ProducerType;
32
33
34
35
36
37
38
39 class AsyncLoggerDisruptor {
40 private static final int SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS = 50;
41 private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 200;
42 private static final StatusLogger LOGGER = StatusLogger.getLogger();
43
44 private volatile Disruptor<RingBufferLogEvent> disruptor;
45 private ExecutorService executor;
46 private String contextName;
47
48 private boolean useThreadLocalTranslator = true;
49 private long backgroundThreadId;
50 private AsyncQueueFullPolicy asyncQueueFullPolicy;
51 private int ringBufferSize;
52
53 AsyncLoggerDisruptor(String contextName) {
54 this.contextName = contextName;
55 }
56
57 public String getContextName() {
58 return contextName;
59 }
60
61 public void setContextName(String name) {
62 contextName = name;
63 }
64
65 Disruptor<RingBufferLogEvent> getDisruptor() {
66 return disruptor;
67 }
68
69
70
71
72
73
74 synchronized void start() {
75 if (disruptor != null) {
76 LOGGER.trace(
77 "[{}] AsyncLoggerDisruptor not starting new disruptor for this context, using existing object.",
78 contextName);
79 return;
80 }
81 LOGGER.trace("[{}] AsyncLoggerDisruptor creating new disruptor for this context.", contextName);
82 ringBufferSize = DisruptorUtil.calculateRingBufferSize("AsyncLogger.RingBufferSize");
83 final WaitStrategy waitStrategy = DisruptorUtil.createWaitStrategy("AsyncLogger.WaitStrategy");
84 executor = Executors.newSingleThreadExecutor(new DaemonThreadFactory("AsyncLogger[" + contextName + "]"));
85 backgroundThreadId = DisruptorUtil.getExecutorThreadId(executor);
86 asyncQueueFullPolicy = AsyncQueueFullPolicyFactory.create();
87
88 disruptor = new Disruptor<>(RingBufferLogEvent.FACTORY, ringBufferSize, executor, ProducerType.MULTI,
89 waitStrategy);
90
91 final ExceptionHandler<RingBufferLogEvent> errorHandler = DisruptorUtil.getAsyncLoggerExceptionHandler();
92 disruptor.handleExceptionsWith(errorHandler);
93
94 final RingBufferLogEventHandler[] handlers = {new RingBufferLogEventHandler()};
95 disruptor.handleEventsWith(handlers);
96
97 LOGGER.debug("[{}] Starting AsyncLogger disruptor for this context with ringbufferSize={}, waitStrategy={}, "
98 + "exceptionHandler={}...", contextName, disruptor.getRingBuffer().getBufferSize(), waitStrategy
99 .getClass().getSimpleName(), errorHandler);
100 disruptor.start();
101
102 LOGGER.trace("[{}] AsyncLoggers use a {} translator", contextName, useThreadLocalTranslator ? "threadlocal"
103 : "vararg");
104 }
105
106
107
108
109
110 synchronized void stop() {
111 final Disruptor<RingBufferLogEvent> temp = getDisruptor();
112 if (temp == null) {
113 LOGGER.trace("[{}] AsyncLoggerDisruptor: disruptor for this context already shut down.", contextName);
114 return;
115 }
116 LOGGER.debug("[{}] AsyncLoggerDisruptor: shutting down disruptor for this context.", contextName);
117
118
119 disruptor = null;
120
121
122
123
124 for (int i = 0; hasBacklog(temp) && i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
125 try {
126 Thread.sleep(SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS);
127 } catch (final InterruptedException e) {
128 }
129 }
130 temp.shutdown();
131
132 LOGGER.trace("[{}] AsyncLoggerDisruptor: shutting down disruptor executor.", contextName);
133 executor.shutdown();
134 executor = null;
135
136 if (DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy) > 0) {
137 LOGGER.trace("AsyncLoggerDisruptor: {} discarded {} events.", asyncQueueFullPolicy,
138 DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy));
139 }
140 }
141
142
143
144
145 private static boolean hasBacklog(final Disruptor<?> theDisruptor) {
146 final RingBuffer<?> ringBuffer = theDisruptor.getRingBuffer();
147 return !ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize());
148 }
149
150
151
152
153
154
155
156 public RingBufferAdmin createRingBufferAdmin(final String jmxContextName) {
157 final RingBuffer<RingBufferLogEvent> ring = disruptor == null ? null : disruptor.getRingBuffer();
158 return RingBufferAdmin.forAsyncLogger(ring, jmxContextName);
159 }
160
161 EventRoute getEventRoute(final Level logLevel) {
162 final int remainingCapacity = remainingDisruptorCapacity();
163 if (remainingCapacity < 0) {
164 return EventRoute.DISCARD;
165 }
166 return asyncQueueFullPolicy.getRoute(backgroundThreadId, logLevel);
167 }
168
169 private int remainingDisruptorCapacity() {
170 final Disruptor<RingBufferLogEvent> temp = disruptor;
171 if (hasLog4jBeenShutDown(temp)) {
172 return -1;
173 }
174 return (int) temp.getRingBuffer().remainingCapacity();
175 }
176
177
178
179 private boolean hasLog4jBeenShutDown(final Disruptor<RingBufferLogEvent> aDisruptor) {
180 if (aDisruptor == null) {
181 LOGGER.warn("Ignoring log event after log4j was shut down");
182 return true;
183 }
184 return false;
185 }
186
187 public boolean tryPublish(final RingBufferLogEventTranslator translator) {
188
189 try {
190 return disruptor.getRingBuffer().tryPublishEvent(translator);
191 } catch (final NullPointerException npe) {
192 LOGGER.warn("[{}] Ignoring log event after log4j was shut down.", contextName);
193 return false;
194 }
195 }
196
197 void enqueueLogMessageInfo(final RingBufferLogEventTranslator translator) {
198
199 try {
200
201
202
203 disruptor.publishEvent(translator);
204 } catch (final NullPointerException npe) {
205 LOGGER.warn("[{}] Ignoring log event after log4j was shut down.", contextName);
206 }
207 }
208
209
210
211
212
213
214
215
216 public boolean isUseThreadLocals() {
217 return useThreadLocalTranslator;
218 }
219
220
221
222
223
224
225
226
227
228
229
230
231 public void setUseThreadLocals(final boolean allow) {
232 useThreadLocalTranslator = allow;
233 LOGGER.trace("[{}] AsyncLoggers have been modified to use a {} translator", contextName,
234 useThreadLocalTranslator ? "threadlocal" : "vararg");
235 }
236 }