1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.logging.log4j.core.async;
19
20 import java.util.concurrent.ExecutorService;
21 import java.util.concurrent.Executors;
22
23 import org.apache.logging.log4j.core.jmx.RingBufferAdmin;
24 import org.apache.logging.log4j.status.StatusLogger;
25
26 import com.lmax.disruptor.ExceptionHandler;
27 import com.lmax.disruptor.RingBuffer;
28 import com.lmax.disruptor.WaitStrategy;
29 import com.lmax.disruptor.dsl.Disruptor;
30 import com.lmax.disruptor.dsl.ProducerType;
31
32
33
34
35
36
37
38 class AsyncLoggerDisruptor {
39 private static final int SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS = 50;
40 private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 200;
41 private static final StatusLogger LOGGER = StatusLogger.getLogger();
42
43 private volatile Disruptor<RingBufferLogEvent> disruptor;
44 private ExecutorService executor;
45 private String contextName;
46
47 private boolean useThreadLocalTranslator;
48 private long backgroundThreadId;
49
50 AsyncLoggerDisruptor(String contextName) {
51 this.contextName = contextName;
52 }
53
54 public String getContextName() {
55 return contextName;
56 }
57
58 public void setContextName(String name) {
59 contextName = name;
60 }
61
62 Disruptor<RingBufferLogEvent> getDisruptor() {
63 return disruptor;
64 }
65
66
67
68
69
70
71 synchronized void start() {
72 if (disruptor != null) {
73 LOGGER.trace(
74 "[{}] AsyncLoggerDisruptor not starting new disruptor for this context, using existing object.",
75 contextName);
76 return;
77 }
78 LOGGER.trace("[{}] AsyncLoggerDisruptor creating new disruptor for this context.", contextName);
79 final int ringBufferSize = DisruptorUtil.calculateRingBufferSize("AsyncLogger.RingBufferSize");
80 final WaitStrategy waitStrategy = DisruptorUtil.createWaitStrategy("AsyncLogger.WaitStrategy");
81 executor = Executors.newSingleThreadExecutor(new DaemonThreadFactory("AsyncLogger[" + contextName + "]"));
82 backgroundThreadId = DisruptorUtil.getExecutorThreadId(executor);
83
84 disruptor = new Disruptor<>(RingBufferLogEvent.FACTORY, ringBufferSize, executor, ProducerType.MULTI,
85 waitStrategy);
86
87 final ExceptionHandler<RingBufferLogEvent> errorHandler = DisruptorUtil.getExceptionHandler(
88 "AsyncLogger.ExceptionHandler", RingBufferLogEvent.class);
89 disruptor.handleExceptionsWith(errorHandler);
90
91 final RingBufferLogEventHandler[] handlers = {new RingBufferLogEventHandler()};
92 disruptor.handleEventsWith(handlers);
93
94 LOGGER.debug("[{}] Starting AsyncLogger disruptor for this context with ringbufferSize={}, waitStrategy={}, "
95 + "exceptionHandler={}...", contextName, disruptor.getRingBuffer().getBufferSize(), waitStrategy
96 .getClass().getSimpleName(), errorHandler);
97 disruptor.start();
98
99 LOGGER.trace("[{}] AsyncLoggers use a {} translator", contextName, useThreadLocalTranslator ? "threadlocal"
100 : "vararg");
101 }
102
103
104
105
106
107 synchronized void stop() {
108 final Disruptor<RingBufferLogEvent> temp = getDisruptor();
109 if (temp == null) {
110 LOGGER.trace("[{}] AsyncLoggerDisruptor: disruptor for this context already shut down.", contextName);
111 return;
112 }
113 LOGGER.debug("[{}] AsyncLoggerDisruptor: shutting down disruptor for this context.", contextName);
114
115
116 disruptor = null;
117
118
119
120
121 for (int i = 0; hasBacklog(temp) && i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
122 try {
123 Thread.sleep(SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS);
124 } catch (final InterruptedException e) {
125 }
126 }
127 temp.shutdown();
128
129 LOGGER.trace("[{}] AsyncLoggerDisruptor: shutting down disruptor executor.", contextName);
130 executor.shutdown();
131 executor = null;
132 }
133
134
135
136
137 private static boolean hasBacklog(final Disruptor<?> theDisruptor) {
138 final RingBuffer<?> ringBuffer = theDisruptor.getRingBuffer();
139 return !ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize());
140 }
141
142
143
144
145
146
147
148 public RingBufferAdmin createRingBufferAdmin(final String jmxContextName) {
149 final RingBuffer<RingBufferLogEvent> ring = disruptor == null ? null : disruptor.getRingBuffer();
150 return RingBufferAdmin.forAsyncLogger(ring, jmxContextName);
151 }
152
153
154
155
156
157
158
159 boolean shouldLogInCurrentThread() {
160 return currentThreadIsAppenderThread() && isRingBufferFull();
161 }
162
163
164
165
166
167
168 private boolean currentThreadIsAppenderThread() {
169 return Thread.currentThread().getId() == backgroundThreadId;
170 }
171
172
173
174
175
176
177 private boolean isRingBufferFull() {
178 final Disruptor<RingBufferLogEvent> theDisruptor = this.disruptor;
179 return theDisruptor == null || theDisruptor.getRingBuffer().remainingCapacity() == 0;
180 }
181
182 void enqueueLogMessageInfo(final RingBufferLogEventTranslator translator) {
183
184 try {
185
186
187
188 disruptor.publishEvent(translator);
189 } catch (final NullPointerException npe) {
190 LOGGER.fatal("[{}] Ignoring log event after log4j was shut down.", contextName);
191 }
192 }
193
194
195
196
197
198
199
200
201 public boolean isUseThreadLocals() {
202 return useThreadLocalTranslator;
203 }
204
205
206
207
208
209
210
211
212
213 public void setUseThreadLocals(final boolean allow) {
214 useThreadLocalTranslator = allow;
215 }
216 }