1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.core.appender;
18
19 import java.io.Serializable;
20 import java.util.ArrayList;
21 import java.util.List;
22 import java.util.Map;
23 import java.util.concurrent.ArrayBlockingQueue;
24 import java.util.concurrent.BlockingQueue;
25 import java.util.concurrent.atomic.AtomicLong;
26
27 import org.apache.logging.log4j.core.Appender;
28 import org.apache.logging.log4j.core.Filter;
29 import org.apache.logging.log4j.core.LogEvent;
30 import org.apache.logging.log4j.core.async.RingBufferLogEvent;
31 import org.apache.logging.log4j.core.config.AppenderControl;
32 import org.apache.logging.log4j.core.config.AppenderRef;
33 import org.apache.logging.log4j.core.config.Configuration;
34 import org.apache.logging.log4j.core.config.ConfigurationException;
35 import org.apache.logging.log4j.core.config.plugins.Plugin;
36 import org.apache.logging.log4j.core.config.plugins.PluginAliases;
37 import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
38 import org.apache.logging.log4j.core.config.plugins.PluginConfiguration;
39 import org.apache.logging.log4j.core.config.plugins.PluginElement;
40 import org.apache.logging.log4j.core.config.plugins.PluginFactory;
41 import org.apache.logging.log4j.core.impl.Log4jLogEvent;
42
43
44
45
46
47
48
49 @Plugin(name = "Async", category = "Core", elementType = "appender", printObject = true)
50 public final class AsyncAppender extends AbstractAppender {
51
52 private static final int DEFAULT_QUEUE_SIZE = 128;
53 private static final String SHUTDOWN = "Shutdown";
54
55 private final BlockingQueue<Serializable> queue;
56 private final int queueSize;
57 private final boolean blocking;
58 private final Configuration config;
59 private final AppenderRef[] appenderRefs;
60 private final String errorRef;
61 private final boolean includeLocation;
62 private AppenderControl errorAppender;
63 private AsyncThread thread;
64 private static final AtomicLong threadSequence = new AtomicLong(1);
65 private static ThreadLocal<Boolean> isAppenderThread = new ThreadLocal<Boolean>();
66
67
68 private AsyncAppender(final String name, final Filter filter, final AppenderRef[] appenderRefs,
69 final String errorRef, final int queueSize, final boolean blocking,
70 final boolean ignoreExceptions, final Configuration config,
71 final boolean includeLocation) {
72 super(name, filter, null, ignoreExceptions);
73 this.queue = new ArrayBlockingQueue<Serializable>(queueSize);
74 this.queueSize = queueSize;
75 this.blocking = blocking;
76 this.config = config;
77 this.appenderRefs = appenderRefs;
78 this.errorRef = errorRef;
79 this.includeLocation = includeLocation;
80 }
81
82 @Override
83 public void start() {
84 final Map<String, Appender> map = config.getAppenders();
85 final List<AppenderControl> appenders = new ArrayList<AppenderControl>();
86 for (final AppenderRef appenderRef : appenderRefs) {
87 if (map.containsKey(appenderRef.getRef())) {
88 appenders.add(new AppenderControl(map.get(appenderRef.getRef()), appenderRef.getLevel(),
89 appenderRef.getFilter()));
90 } else {
91 LOGGER.error("No appender named {} was configured", appenderRef);
92 }
93 }
94 if (errorRef != null) {
95 if (map.containsKey(errorRef)) {
96 errorAppender = new AppenderControl(map.get(errorRef), null, null);
97 } else {
98 LOGGER.error("Unable to set up error Appender. No appender named {} was configured", errorRef);
99 }
100 }
101 if (appenders.size() > 0) {
102 thread = new AsyncThread(appenders, queue);
103 thread.setName("AsyncAppender-" + getName());
104 } else if (errorRef == null) {
105 throw new ConfigurationException("No appenders are available for AsyncAppender " + getName());
106 }
107
108 thread.start();
109 super.start();
110 }
111
112 @Override
113 public void stop() {
114 super.stop();
115 LOGGER.trace("AsyncAppender stopping. Queue still has {} events.", queue.size());
116 thread.shutdown();
117 try {
118 thread.join();
119 } catch (final InterruptedException ex) {
120 LOGGER.warn("Interrupted while stopping AsyncAppender {}", getName());
121 }
122 LOGGER.trace("AsyncAppender stopped. Queue has {} events.", queue.size());
123 }
124
125
126
127
128
129
130 @Override
131 public void append(LogEvent logEvent) {
132 if (!isStarted()) {
133 throw new IllegalStateException("AsyncAppender " + getName() + " is not active");
134 }
135 if (!(logEvent instanceof Log4jLogEvent)) {
136 if (!(logEvent instanceof RingBufferLogEvent)) {
137 return;
138 }
139 logEvent = ((RingBufferLogEvent) logEvent).createMemento();
140 }
141 final Log4jLogEvent coreEvent = (Log4jLogEvent) logEvent;
142 boolean appendSuccessful = false;
143 if (blocking) {
144 if (isAppenderThread.get() == Boolean.TRUE && queue.remainingCapacity() == 0) {
145
146
147 coreEvent.setEndOfBatch(false);
148 appendSuccessful = thread.callAppenders(coreEvent);
149 } else {
150 try {
151
152 queue.put(Log4jLogEvent.serialize(coreEvent, includeLocation));
153 appendSuccessful = true;
154 } catch (final InterruptedException e) {
155 LOGGER.warn("Interrupted while waiting for a free slot in the AsyncAppender LogEvent-queue {}",
156 getName());
157 }
158 }
159 } else {
160 appendSuccessful = queue.offer(Log4jLogEvent.serialize(coreEvent, includeLocation));
161 if (!appendSuccessful) {
162 error("Appender " + getName() + " is unable to write primary appenders. queue is full");
163 }
164 }
165 if (!appendSuccessful && errorAppender != null) {
166 errorAppender.callAppender(coreEvent);
167 }
168 }
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184 @PluginFactory
185 public static AsyncAppender createAppender(@PluginElement("AppenderRef") final AppenderRef[] appenderRefs,
186 @PluginAttribute("errorRef") @PluginAliases("error-ref") final String errorRef,
187 @PluginAttribute(value = "blocking", defaultBoolean = true) final boolean blocking,
188 @PluginAttribute(value = "bufferSize", defaultInt = DEFAULT_QUEUE_SIZE) final int size,
189 @PluginAttribute("name") final String name,
190 @PluginAttribute(value = "includeLocation", defaultBoolean = false) final boolean includeLocation,
191 @PluginElement("Filter") final Filter filter,
192 @PluginConfiguration final Configuration config,
193 @PluginAttribute(value = "ignoreExceptions", defaultBoolean = true) final boolean ignoreExceptions) {
194 if (name == null) {
195 LOGGER.error("No name provided for AsyncAppender");
196 return null;
197 }
198 if (appenderRefs == null) {
199 LOGGER.error("No appender references provided to AsyncAppender {}", name);
200 }
201
202 return new AsyncAppender(name, filter, appenderRefs, errorRef,
203 size, blocking, ignoreExceptions, config, includeLocation);
204 }
205
206
207
208
209 private class AsyncThread extends Thread {
210
211 private volatile boolean shutdown = false;
212 private final List<AppenderControl> appenders;
213 private final BlockingQueue<Serializable> queue;
214
215 public AsyncThread(final List<AppenderControl> appenders, final BlockingQueue<Serializable> queue) {
216 this.appenders = appenders;
217 this.queue = queue;
218 setDaemon(true);
219 setName("AsyncAppenderThread" + threadSequence.getAndIncrement());
220 }
221
222 @Override
223 public void run() {
224 isAppenderThread.set(Boolean.TRUE);
225 while (!shutdown) {
226 Serializable s;
227 try {
228 s = queue.take();
229 if (s != null && s instanceof String && SHUTDOWN.equals(s.toString())) {
230 shutdown = true;
231 continue;
232 }
233 } catch (final InterruptedException ex) {
234
235 continue;
236 }
237 final Log4jLogEvent event = Log4jLogEvent.deserialize(s);
238 event.setEndOfBatch(queue.isEmpty());
239 final boolean success = callAppenders(event);
240 if (!success && errorAppender != null) {
241 try {
242 errorAppender.callAppender(event);
243 } catch (final Exception ex) {
244
245 }
246 }
247 }
248
249 LOGGER.trace("AsyncAppender.AsyncThread shutting down. Processing remaining {} queue events.",
250 queue.size());
251 int count= 0;
252 int ignored = 0;
253 while (!queue.isEmpty()) {
254 try {
255 final Serializable s = queue.take();
256 if (Log4jLogEvent.canDeserialize(s)) {
257 final Log4jLogEvent event = Log4jLogEvent.deserialize(s);
258 event.setEndOfBatch(queue.isEmpty());
259 callAppenders(event);
260 count++;
261 } else {
262 ignored++;
263 LOGGER.trace("Ignoring event of class {}", s.getClass().getName());
264 }
265 } catch (final InterruptedException ex) {
266
267 }
268 }
269 LOGGER.trace("AsyncAppender.AsyncThread stopped. Queue has {} events remaining. " +
270 "Processed {} and ignored {} events since shutdown started.",
271 queue.size(), count, ignored);
272 }
273
274
275
276
277
278
279
280
281
282
283 boolean callAppenders(final Log4jLogEvent event) {
284 boolean success = false;
285 for (final AppenderControl control : appenders) {
286 try {
287 control.callAppender(event);
288 success = true;
289 } catch (final Exception ex) {
290
291 }
292 }
293 return success;
294 }
295
296 public void shutdown() {
297 shutdown = true;
298 if (queue.isEmpty()) {
299 queue.offer(SHUTDOWN);
300 }
301 }
302 }
303
304
305
306
307
308
309 public String[] getAppenderRefStrings() {
310 final String[] result = new String[appenderRefs.length];
311 for (int i = 0; i < result.length; i++) {
312 result[i] = appenderRefs[i].getRef();
313 }
314 return result;
315 }
316
317
318
319
320
321
322
323 public boolean isIncludeLocation() {
324 return includeLocation;
325 }
326
327
328
329
330
331
332 public boolean isBlocking() {
333 return blocking;
334 }
335
336
337
338
339
340 public String getErrorRef() {
341 return errorRef;
342 }
343
344 public int getQueueCapacity() {
345 return queueSize;
346 }
347
348 public int getQueueRemainingCapacity() {
349 return queue.remainingCapacity();
350 }
351 }