1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.core.appender;
18
19 import java.io.Serializable;
20 import java.util.ArrayList;
21 import java.util.List;
22 import java.util.Map;
23 import java.util.concurrent.ArrayBlockingQueue;
24 import java.util.concurrent.BlockingQueue;
25 import java.util.concurrent.atomic.AtomicLong;
26
27 import org.apache.logging.log4j.core.Appender;
28 import org.apache.logging.log4j.core.Filter;
29 import org.apache.logging.log4j.core.LogEvent;
30 import org.apache.logging.log4j.core.async.RingBufferLogEvent;
31 import org.apache.logging.log4j.core.config.AppenderControl;
32 import org.apache.logging.log4j.core.config.AppenderRef;
33 import org.apache.logging.log4j.core.config.Configuration;
34 import org.apache.logging.log4j.core.config.ConfigurationException;
35 import org.apache.logging.log4j.core.config.plugins.Plugin;
36 import org.apache.logging.log4j.core.config.plugins.PluginAliases;
37 import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
38 import org.apache.logging.log4j.core.config.plugins.PluginConfiguration;
39 import org.apache.logging.log4j.core.config.plugins.PluginElement;
40 import org.apache.logging.log4j.core.config.plugins.PluginFactory;
41 import org.apache.logging.log4j.core.impl.Log4jLogEvent;
42
43
44
45
46
47
48 @Plugin(name = "Async", category = "Core", elementType = "appender", printObject = true)
49 public final class AsyncAppender extends AbstractAppender {
50
51 private static final long serialVersionUID = 1L;
52 private static final int DEFAULT_QUEUE_SIZE = 128;
53 private static final String SHUTDOWN = "Shutdown";
54
55 private static final AtomicLong THREAD_SEQUENCE = new AtomicLong(1);
56 private static ThreadLocal<Boolean> isAppenderThread = new ThreadLocal<>();
57
58 private final BlockingQueue<Serializable> queue;
59 private final int queueSize;
60 private final boolean blocking;
61 private final long shutdownTimeout;
62 private final Configuration config;
63 private final AppenderRef[] appenderRefs;
64 private final String errorRef;
65 private final boolean includeLocation;
66 private AppenderControl errorAppender;
67 private AsyncThread thread;
68
69 private AsyncAppender(final String name, final Filter filter, final AppenderRef[] appenderRefs,
70 final String errorRef, final int queueSize, final boolean blocking, final boolean ignoreExceptions,
71 final long shutdownTimeout, final Configuration config, final boolean includeLocation) {
72 super(name, filter, null, ignoreExceptions);
73 this.queue = new ArrayBlockingQueue<>(queueSize);
74 this.queueSize = queueSize;
75 this.blocking = blocking;
76 this.shutdownTimeout = shutdownTimeout;
77 this.config = config;
78 this.appenderRefs = appenderRefs;
79 this.errorRef = errorRef;
80 this.includeLocation = includeLocation;
81 }
82
83 @Override
84 public void start() {
85 final Map<String, Appender> map = config.getAppenders();
86 final List<AppenderControl> appenders = new ArrayList<>();
87 for (final AppenderRef appenderRef : appenderRefs) {
88 final Appender appender = map.get(appenderRef.getRef());
89 if (appender != null) {
90 appenders.add(new AppenderControl(appender, appenderRef.getLevel(), appenderRef.getFilter()));
91 } else {
92 LOGGER.error("No appender named {} was configured", appenderRef);
93 }
94 }
95 if (errorRef != null) {
96 final Appender appender = map.get(errorRef);
97 if (appender != null) {
98 errorAppender = new AppenderControl(appender, null, null);
99 } else {
100 LOGGER.error("Unable to set up error Appender. No appender named {} was configured", errorRef);
101 }
102 }
103 if (appenders.size() > 0) {
104 thread = new AsyncThread(appenders, queue);
105 thread.setName("AsyncAppender-" + getName());
106 } else if (errorRef == null) {
107 throw new ConfigurationException("No appenders are available for AsyncAppender " + getName());
108 }
109
110 thread.start();
111 super.start();
112 }
113
114 @Override
115 public void stop() {
116 super.stop();
117 LOGGER.trace("AsyncAppender stopping. Queue still has {} events.", queue.size());
118 thread.shutdown();
119 try {
120 thread.join(shutdownTimeout);
121 } catch (final InterruptedException ex) {
122 LOGGER.warn("Interrupted while stopping AsyncAppender {}", getName());
123 }
124 LOGGER.trace("AsyncAppender stopped. Queue has {} events.", queue.size());
125 }
126
127
128
129
130
131
132 @Override
133 public void append(LogEvent logEvent) {
134 if (!isStarted()) {
135 throw new IllegalStateException("AsyncAppender " + getName() + " is not active");
136 }
137 if (!(logEvent instanceof Log4jLogEvent)) {
138 if (!(logEvent instanceof RingBufferLogEvent)) {
139 return;
140 }
141 logEvent = ((RingBufferLogEvent) logEvent).createMemento();
142 }
143 logEvent.getMessage().getFormattedMessage();
144 final Log4jLogEvent coreEvent = (Log4jLogEvent) logEvent;
145 boolean appendSuccessful = false;
146 if (blocking) {
147 if (isAppenderThread.get() == Boolean.TRUE && queue.remainingCapacity() == 0) {
148
149
150 coreEvent.setEndOfBatch(false);
151 appendSuccessful = thread.callAppenders(coreEvent);
152 } else {
153 final Serializable serialized = Log4jLogEvent.serialize(coreEvent, includeLocation);
154 try {
155
156 queue.put(serialized);
157 appendSuccessful = true;
158 } catch (final InterruptedException e) {
159
160
161
162
163
164
165
166
167
168
169
170 appendSuccessful = queue.offer(serialized);
171 if (!appendSuccessful) {
172 LOGGER.warn("Interrupted while waiting for a free slot in the AsyncAppender LogEvent-queue {}",
173 getName());
174 }
175
176 Thread.currentThread().interrupt();
177 }
178 }
179 } else {
180 appendSuccessful = queue.offer(Log4jLogEvent.serialize(coreEvent, includeLocation));
181 if (!appendSuccessful) {
182 error("Appender " + getName() + " is unable to write primary appenders. queue is full");
183 }
184 }
185 if (!appendSuccessful && errorAppender != null) {
186 errorAppender.callAppender(coreEvent);
187 }
188 }
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207 @PluginFactory
208 public static AsyncAppender createAppender(@PluginElement("AppenderRef") final AppenderRef[] appenderRefs,
209 @PluginAttribute("errorRef") @PluginAliases("error-ref") final String errorRef,
210 @PluginAttribute(value = "blocking", defaultBoolean = true) final boolean blocking,
211 @PluginAttribute(value = "shutdownTimeout", defaultLong = 0L) final long shutdownTimeout,
212 @PluginAttribute(value = "bufferSize", defaultInt = DEFAULT_QUEUE_SIZE) final int size,
213 @PluginAttribute("name") final String name,
214 @PluginAttribute(value = "includeLocation", defaultBoolean = false) final boolean includeLocation,
215 @PluginElement("Filter") final Filter filter, @PluginConfiguration final Configuration config,
216 @PluginAttribute(value = "ignoreExceptions", defaultBoolean = true) final boolean ignoreExceptions) {
217 if (name == null) {
218 LOGGER.error("No name provided for AsyncAppender");
219 return null;
220 }
221 if (appenderRefs == null) {
222 LOGGER.error("No appender references provided to AsyncAppender {}", name);
223 }
224
225 return new AsyncAppender(name, filter, appenderRefs, errorRef, size, blocking, ignoreExceptions,
226 shutdownTimeout, config, includeLocation);
227 }
228
229
230
231
232 private class AsyncThread extends Thread {
233
234 private volatile boolean shutdown = false;
235 private final List<AppenderControl> appenders;
236 private final BlockingQueue<Serializable> queue;
237
238 public AsyncThread(final List<AppenderControl> appenders, final BlockingQueue<Serializable> queue) {
239 this.appenders = appenders;
240 this.queue = queue;
241 setDaemon(true);
242 setName("AsyncAppenderThread" + THREAD_SEQUENCE.getAndIncrement());
243 }
244
245 @Override
246 public void run() {
247 isAppenderThread.set(Boolean.TRUE);
248 while (!shutdown) {
249 Serializable s;
250 try {
251 s = queue.take();
252 if (s != null && s instanceof String && SHUTDOWN.equals(s.toString())) {
253 shutdown = true;
254 continue;
255 }
256 } catch (final InterruptedException ex) {
257 break;
258 }
259 final Log4jLogEvent event = Log4jLogEvent.deserialize(s);
260 event.setEndOfBatch(queue.isEmpty());
261 final boolean success = callAppenders(event);
262 if (!success && errorAppender != null) {
263 try {
264 errorAppender.callAppender(event);
265 } catch (final Exception ex) {
266
267 }
268 }
269 }
270
271 LOGGER.trace("AsyncAppender.AsyncThread shutting down. Processing remaining {} queue events.",
272 queue.size());
273 int count = 0;
274 int ignored = 0;
275 while (!queue.isEmpty()) {
276 try {
277 final Serializable s = queue.take();
278 if (Log4jLogEvent.canDeserialize(s)) {
279 final Log4jLogEvent event = Log4jLogEvent.deserialize(s);
280 event.setEndOfBatch(queue.isEmpty());
281 callAppenders(event);
282 count++;
283 } else {
284 ignored++;
285 LOGGER.trace("Ignoring event of class {}", s.getClass().getName());
286 }
287 } catch (final InterruptedException ex) {
288
289
290 }
291 }
292 LOGGER.trace("AsyncAppender.AsyncThread stopped. Queue has {} events remaining. "
293 + "Processed {} and ignored {} events since shutdown started.", queue.size(), count, ignored);
294 }
295
296
297
298
299
300
301
302
303
304 boolean callAppenders(final Log4jLogEvent event) {
305 boolean success = false;
306 for (final AppenderControl control : appenders) {
307 try {
308 control.callAppender(event);
309 success = true;
310 } catch (final Exception ex) {
311
312 }
313 }
314 return success;
315 }
316
317 public void shutdown() {
318 shutdown = true;
319 if (queue.isEmpty()) {
320 queue.offer(SHUTDOWN);
321 }
322 }
323 }
324
325
326
327
328
329
330 public String[] getAppenderRefStrings() {
331 final String[] result = new String[appenderRefs.length];
332 for (int i = 0; i < result.length; i++) {
333 result[i] = appenderRefs[i].getRef();
334 }
335 return result;
336 }
337
338
339
340
341
342
343
344 public boolean isIncludeLocation() {
345 return includeLocation;
346 }
347
348
349
350
351
352
353
354 public boolean isBlocking() {
355 return blocking;
356 }
357
358
359
360
361
362
363 public String getErrorRef() {
364 return errorRef;
365 }
366
367 public int getQueueCapacity() {
368 return queueSize;
369 }
370
371 public int getQueueRemainingCapacity() {
372 return queue.remainingCapacity();
373 }
374 }