1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.core.appender;
18
19 import org.apache.logging.log4j.core.Appender;
20 import org.apache.logging.log4j.core.Filter;
21 import org.apache.logging.log4j.core.LogEvent;
22 import org.apache.logging.log4j.core.config.AppenderControl;
23 import org.apache.logging.log4j.core.config.AppenderRef;
24 import org.apache.logging.log4j.core.config.Configuration;
25 import org.apache.logging.log4j.core.config.ConfigurationException;
26 import org.apache.logging.log4j.core.config.plugins.Plugin;
27 import org.apache.logging.log4j.core.config.plugins.PluginAttr;
28 import org.apache.logging.log4j.core.config.plugins.PluginConfiguration;
29 import org.apache.logging.log4j.core.config.plugins.PluginElement;
30 import org.apache.logging.log4j.core.config.plugins.PluginFactory;
31 import org.apache.logging.log4j.core.impl.Log4jLogEvent;
32
33 import java.io.Serializable;
34 import java.util.ArrayList;
35 import java.util.List;
36 import java.util.Map;
37 import java.util.concurrent.ArrayBlockingQueue;
38 import java.util.concurrent.BlockingQueue;
39
40
41
42
43
44
45 @Plugin(name = "Asynch", type = "Core", elementType = "appender", printObject = true)
46 public final class AsynchAppender extends AbstractAppender {
47
48 private static final int DEFAULT_QUEUE_SIZE = 128;
49 private static final String SHUTDOWN = "Shutdown";
50
51 private final BlockingQueue<Serializable> queue;
52 private final boolean blocking;
53 private final Configuration config;
54 private final AppenderRef[] appenderRefs;
55 private final String errorRef;
56 private AppenderControl errorAppender = null;
57 private AsynchThread thread = null;
58
59 private AsynchAppender(String name, Filter filter, AppenderRef[] appenderRefs, String errorRef,
60 int queueSize, boolean blocking,
61 boolean handleExceptions, Configuration config) {
62 super(name, filter, null, handleExceptions);
63 this.queue = new ArrayBlockingQueue<Serializable>(queueSize);
64 this.blocking = blocking;
65 this.config = config;
66 this.appenderRefs = appenderRefs;
67 this.errorRef = errorRef;
68 }
69
70 @Override
71 public void start() {
72 Map<String, Appender> map = config.getAppenders();
73 List<AppenderControl> appenders = new ArrayList<AppenderControl>();
74 for (AppenderRef appenderRef : appenderRefs) {
75 if (map.containsKey(appenderRef.getRef())) {
76 appenders.add(new AppenderControl(map.get(appenderRef.getRef()), null, null));
77 } else {
78 LOGGER.error("No appender named {} was configured", appenderRef);
79 }
80 }
81 if (errorRef != null) {
82 if (map.containsKey(errorRef)) {
83 errorAppender = new AppenderControl(map.get(errorRef), null, null);
84 } else {
85 LOGGER.error("Unable to set up error Appender. No appender named {} was configured", errorRef);
86 }
87 }
88 if (appenders.size() > 0) {
89 thread = new AsynchThread(appenders, queue);
90 } else if (errorRef == null) {
91 throw new ConfigurationException("No appenders are available for AsynchAppender " + getName());
92 }
93
94 thread.start();
95 super.start();
96 }
97
98 @Override
99 public void stop() {
100 super.stop();
101 thread.shutdown();
102 try {
103 thread.join();
104 } catch (InterruptedException ex) {
105 LOGGER.warn("Interrupted while stopping AsynchAppender {}", getName());
106 }
107 }
108
109
110
111
112
113
114 public void append(LogEvent event) {
115 if (!isStarted()) {
116 throw new IllegalStateException("AsynchAppender " + getName() + " is not active");
117 }
118 if (event instanceof Log4jLogEvent) {
119 if (blocking && queue.remainingCapacity() > 0) {
120 try {
121 queue.add(Log4jLogEvent.serialize((Log4jLogEvent) event));
122 return;
123 } catch (IllegalStateException ex) {
124 error("Appender " + getName() + " is unable to write primary appenders. queue is full");
125 }
126 }
127 if (errorAppender != null) {
128 if (!blocking) {
129 error("Appender " + getName() + " is unable to write primary appenders. queue is full");
130 }
131 errorAppender.callAppender(event);
132 }
133 }
134 }
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149 @PluginFactory
150 public static AsynchAppender createAppender(@PluginElement("appender-ref") AppenderRef[] appenderRefs,
151 @PluginAttr("error-ref") String errorRef,
152 @PluginAttr("blocking") String blocking,
153 @PluginAttr("bufferSize") String size,
154 @PluginAttr("name") String name,
155 @PluginElement("filter") Filter filter,
156 @PluginConfiguration Configuration config,
157 @PluginAttr("suppressExceptions") String suppress) {
158 if (name == null) {
159 LOGGER.error("No name provided for AsynchAppender");
160 return null;
161 }
162 if (appenderRefs == null) {
163 LOGGER.error("No appender references provided to AsynchAppender {}", name);
164 }
165
166 boolean isBlocking = blocking == null ? true : Boolean.valueOf(blocking);
167 int queueSize = size == null ? DEFAULT_QUEUE_SIZE : Integer.parseInt(size);
168
169 boolean handleExceptions = suppress == null ? true : Boolean.valueOf(suppress);
170
171 return new AsynchAppender(name, filter, appenderRefs, errorRef, queueSize, isBlocking, handleExceptions,
172 config);
173 }
174
175
176
177
178 private class AsynchThread extends Thread {
179
180 private volatile boolean shutdown = false;
181 private final List<AppenderControl> appenders;
182 private final BlockingQueue<Serializable> queue;
183
184 public AsynchThread(List<AppenderControl> appenders, BlockingQueue<Serializable> queue) {
185 this.appenders = appenders;
186 this.queue = queue;
187 }
188
189 @Override
190 public void run() {
191 while (!shutdown) {
192 Serializable s;
193 try {
194 s = queue.take();
195 if (s != null && s instanceof String && SHUTDOWN.equals(s.toString())) {
196 shutdown = true;
197 continue;
198 }
199 } catch (InterruptedException ex) {
200
201 continue;
202 }
203 Log4jLogEvent event = Log4jLogEvent.deserialize(s);
204 boolean success = false;
205 for (AppenderControl control : appenders) {
206 try {
207 control.callAppender(event);
208 success = true;
209 } catch (Exception ex) {
210
211 }
212 }
213 if (!success && errorAppender != null) {
214 try {
215 errorAppender.callAppender(event);
216 } catch (Exception ex) {
217
218 }
219 }
220 }
221
222 while (!queue.isEmpty()) {
223 try {
224 Log4jLogEvent event = Log4jLogEvent.deserialize(queue.take());
225 for (AppenderControl control : appenders) {
226 control.callAppender(event);
227 }
228 } catch (InterruptedException ex) {
229
230 }
231 }
232 }
233
234 public void shutdown() {
235 shutdown = true;
236 if (queue.isEmpty()) {
237 queue.offer(SHUTDOWN);
238 }
239 }
240 }
241 }