001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017 package org.apache.logging.log4j.core.appender; 018 019 import java.io.Serializable; 020 import java.util.ArrayList; 021 import java.util.List; 022 import java.util.Map; 023 import java.util.concurrent.ArrayBlockingQueue; 024 import java.util.concurrent.BlockingQueue; 025 import java.util.concurrent.atomic.AtomicLong; 026 027 import org.apache.logging.log4j.core.Appender; 028 import org.apache.logging.log4j.core.Filter; 029 import org.apache.logging.log4j.core.LogEvent; 030 import org.apache.logging.log4j.core.async.RingBufferLogEvent; 031 import org.apache.logging.log4j.core.config.AppenderControl; 032 import org.apache.logging.log4j.core.config.AppenderRef; 033 import org.apache.logging.log4j.core.config.Configuration; 034 import org.apache.logging.log4j.core.config.ConfigurationException; 035 import org.apache.logging.log4j.core.config.plugins.Plugin; 036 import org.apache.logging.log4j.core.config.plugins.PluginAliases; 037 import org.apache.logging.log4j.core.config.plugins.PluginAttribute; 038 import org.apache.logging.log4j.core.config.plugins.PluginConfiguration; 039 import org.apache.logging.log4j.core.config.plugins.PluginElement; 040 import org.apache.logging.log4j.core.config.plugins.PluginFactory; 041 import org.apache.logging.log4j.core.impl.Log4jLogEvent; 042 043 /** 044 * Appends to one or more Appenders asynchronously. You can configure an 045 * AsyncAppender with one or more Appenders and an Appender to append to if the 046 * queue is full. The AsyncAppender does not allow a filter to be specified on 047 * the Appender references. 048 */ 049 @Plugin(name = "Async", category = "Core", elementType = "appender", printObject = true) 050 public final class AsyncAppender extends AbstractAppender { 051 052 private static final int DEFAULT_QUEUE_SIZE = 128; 053 private static final String SHUTDOWN = "Shutdown"; 054 055 private final BlockingQueue<Serializable> queue; 056 private final int queueSize; 057 private final boolean blocking; 058 private final Configuration config; 059 private final AppenderRef[] appenderRefs; 060 private final String errorRef; 061 private final boolean includeLocation; 062 private AppenderControl errorAppender; 063 private AsyncThread thread; 064 private static final AtomicLong threadSequence = new AtomicLong(1); 065 private static ThreadLocal<Boolean> isAppenderThread = new ThreadLocal<Boolean>(); 066 067 068 private AsyncAppender(final String name, final Filter filter, final AppenderRef[] appenderRefs, 069 final String errorRef, final int queueSize, final boolean blocking, 070 final boolean ignoreExceptions, final Configuration config, 071 final boolean includeLocation) { 072 super(name, filter, null, ignoreExceptions); 073 this.queue = new ArrayBlockingQueue<Serializable>(queueSize); 074 this.queueSize = queueSize; 075 this.blocking = blocking; 076 this.config = config; 077 this.appenderRefs = appenderRefs; 078 this.errorRef = errorRef; 079 this.includeLocation = includeLocation; 080 } 081 082 @Override 083 public void start() { 084 final Map<String, Appender> map = config.getAppenders(); 085 final List<AppenderControl> appenders = new ArrayList<AppenderControl>(); 086 for (final AppenderRef appenderRef : appenderRefs) { 087 if (map.containsKey(appenderRef.getRef())) { 088 appenders.add(new AppenderControl(map.get(appenderRef.getRef()), appenderRef.getLevel(), 089 appenderRef.getFilter())); 090 } else { 091 LOGGER.error("No appender named {} was configured", appenderRef); 092 } 093 } 094 if (errorRef != null) { 095 if (map.containsKey(errorRef)) { 096 errorAppender = new AppenderControl(map.get(errorRef), null, null); 097 } else { 098 LOGGER.error("Unable to set up error Appender. No appender named {} was configured", errorRef); 099 } 100 } 101 if (appenders.size() > 0) { 102 thread = new AsyncThread(appenders, queue); 103 thread.setName("AsyncAppender-" + getName()); 104 } else if (errorRef == null) { 105 throw new ConfigurationException("No appenders are available for AsyncAppender " + getName()); 106 } 107 108 thread.start(); 109 super.start(); 110 } 111 112 @Override 113 public void stop() { 114 super.stop(); 115 LOGGER.trace("AsyncAppender stopping. Queue still has {} events.", queue.size()); 116 thread.shutdown(); 117 try { 118 thread.join(); 119 } catch (final InterruptedException ex) { 120 LOGGER.warn("Interrupted while stopping AsyncAppender {}", getName()); 121 } 122 LOGGER.trace("AsyncAppender stopped. Queue has {} events.", queue.size()); 123 } 124 125 /** 126 * Actual writing occurs here. 127 * <p/> 128 * @param logEvent The LogEvent. 129 */ 130 @Override 131 public void append(LogEvent logEvent) { 132 if (!isStarted()) { 133 throw new IllegalStateException("AsyncAppender " + getName() + " is not active"); 134 } 135 if (!(logEvent instanceof Log4jLogEvent)) { 136 if (!(logEvent instanceof RingBufferLogEvent)) { 137 return; // only know how to Serialize Log4jLogEvents and RingBufferLogEvents 138 } 139 logEvent = ((RingBufferLogEvent) logEvent).createMemento(); 140 } 141 logEvent.getMessage().getFormattedMessage(); // LOG4J2-763: ask message to freeze parameters 142 final Log4jLogEvent coreEvent = (Log4jLogEvent) logEvent; 143 boolean appendSuccessful = false; 144 if (blocking) { 145 if (isAppenderThread.get() == Boolean.TRUE && queue.remainingCapacity() == 0) { 146 // LOG4J2-485: avoid deadlock that would result from trying 147 // to add to a full queue from appender thread 148 coreEvent.setEndOfBatch(false); // queue is definitely not empty! 149 appendSuccessful = thread.callAppenders(coreEvent); 150 } else { 151 try { 152 // wait for free slots in the queue 153 queue.put(Log4jLogEvent.serialize(coreEvent, includeLocation)); 154 appendSuccessful = true; 155 } catch (final InterruptedException e) { 156 LOGGER.warn("Interrupted while waiting for a free slot in the AsyncAppender LogEvent-queue {}", 157 getName()); 158 } 159 } 160 } else { 161 appendSuccessful = queue.offer(Log4jLogEvent.serialize(coreEvent, includeLocation)); 162 if (!appendSuccessful) { 163 error("Appender " + getName() + " is unable to write primary appenders. queue is full"); 164 } 165 } 166 if (!appendSuccessful && errorAppender != null) { 167 errorAppender.callAppender(coreEvent); 168 } 169 } 170 171 /** 172 * Create an AsyncAppender. 173 * @param appenderRefs The Appenders to reference. 174 * @param errorRef An optional Appender to write to if the queue is full or other errors occur. 175 * @param blocking True if the Appender should wait when the queue is full. The default is true. 176 * @param size The size of the event queue. The default is 128. 177 * @param name The name of the Appender. 178 * @param includeLocation whether to include location information. The default is false. 179 * @param filter The Filter or null. 180 * @param config The Configuration. 181 * @param ignoreExceptions If {@code "true"} (default) exceptions encountered when appending events are logged; 182 * otherwise they are propagated to the caller. 183 * @return The AsyncAppender. 184 */ 185 @PluginFactory 186 public static AsyncAppender createAppender(@PluginElement("AppenderRef") final AppenderRef[] appenderRefs, 187 @PluginAttribute("errorRef") @PluginAliases("error-ref") final String errorRef, 188 @PluginAttribute(value = "blocking", defaultBoolean = true) final boolean blocking, 189 @PluginAttribute(value = "bufferSize", defaultInt = DEFAULT_QUEUE_SIZE) final int size, 190 @PluginAttribute("name") final String name, 191 @PluginAttribute(value = "includeLocation", defaultBoolean = false) final boolean includeLocation, 192 @PluginElement("Filter") final Filter filter, 193 @PluginConfiguration final Configuration config, 194 @PluginAttribute(value = "ignoreExceptions", defaultBoolean = true) final boolean ignoreExceptions) { 195 if (name == null) { 196 LOGGER.error("No name provided for AsyncAppender"); 197 return null; 198 } 199 if (appenderRefs == null) { 200 LOGGER.error("No appender references provided to AsyncAppender {}", name); 201 } 202 203 return new AsyncAppender(name, filter, appenderRefs, errorRef, 204 size, blocking, ignoreExceptions, config, includeLocation); 205 } 206 207 /** 208 * Thread that calls the Appenders. 209 */ 210 private class AsyncThread extends Thread { 211 212 private volatile boolean shutdown = false; 213 private final List<AppenderControl> appenders; 214 private final BlockingQueue<Serializable> queue; 215 216 public AsyncThread(final List<AppenderControl> appenders, final BlockingQueue<Serializable> queue) { 217 this.appenders = appenders; 218 this.queue = queue; 219 setDaemon(true); 220 setName("AsyncAppenderThread" + threadSequence.getAndIncrement()); 221 } 222 223 @Override 224 public void run() { 225 isAppenderThread.set(Boolean.TRUE); // LOG4J2-485 226 while (!shutdown) { 227 Serializable s; 228 try { 229 s = queue.take(); 230 if (s != null && s instanceof String && SHUTDOWN.equals(s.toString())) { 231 shutdown = true; 232 continue; 233 } 234 } catch (final InterruptedException ex) { 235 // No good reason for this. 236 continue; 237 } 238 final Log4jLogEvent event = Log4jLogEvent.deserialize(s); 239 event.setEndOfBatch(queue.isEmpty()); 240 final boolean success = callAppenders(event); 241 if (!success && errorAppender != null) { 242 try { 243 errorAppender.callAppender(event); 244 } catch (final Exception ex) { 245 // Silently accept the error. 246 } 247 } 248 } 249 // Process any remaining items in the queue. 250 LOGGER.trace("AsyncAppender.AsyncThread shutting down. Processing remaining {} queue events.", 251 queue.size()); 252 int count= 0; 253 int ignored = 0; 254 while (!queue.isEmpty()) { 255 try { 256 final Serializable s = queue.take(); 257 if (Log4jLogEvent.canDeserialize(s)) { 258 final Log4jLogEvent event = Log4jLogEvent.deserialize(s); 259 event.setEndOfBatch(queue.isEmpty()); 260 callAppenders(event); 261 count++; 262 } else { 263 ignored++; 264 LOGGER.trace("Ignoring event of class {}", s.getClass().getName()); 265 } 266 } catch (final InterruptedException ex) { 267 // May have been interrupted to shut down. 268 } 269 } 270 LOGGER.trace("AsyncAppender.AsyncThread stopped. Queue has {} events remaining. " + 271 "Processed {} and ignored {} events since shutdown started.", 272 queue.size(), count, ignored); 273 } 274 275 /** 276 * Calls {@link AppenderControl#callAppender(LogEvent) callAppender} on 277 * all registered {@code AppenderControl} objects, and returns {@code true} 278 * if at least one appender call was successful, {@code false} otherwise. 279 * Any exceptions are silently ignored. 280 * 281 * @param event the event to forward to the registered appenders 282 * @return {@code true} if at least one appender call succeeded, {@code false} otherwise 283 */ 284 boolean callAppenders(final Log4jLogEvent event) { 285 boolean success = false; 286 for (final AppenderControl control : appenders) { 287 try { 288 control.callAppender(event); 289 success = true; 290 } catch (final Exception ex) { 291 // If no appender is successful the error appender will get it. 292 } 293 } 294 return success; 295 } 296 297 public void shutdown() { 298 shutdown = true; 299 if (queue.isEmpty()) { 300 queue.offer(SHUTDOWN); 301 } 302 } 303 } 304 305 /** 306 * Returns the names of the appenders that this asyncAppender delegates to 307 * as an array of Strings. 308 * @return the names of the sink appenders 309 */ 310 public String[] getAppenderRefStrings() { 311 final String[] result = new String[appenderRefs.length]; 312 for (int i = 0; i < result.length; i++) { 313 result[i] = appenderRefs[i].getRef(); 314 } 315 return result; 316 } 317 318 /** 319 * Returns {@code true} if this AsyncAppender will take a snapshot of the stack with 320 * every log event to determine the class and method where the logging call 321 * was made. 322 * @return {@code true} if location is included with every event, {@code false} otherwise 323 */ 324 public boolean isIncludeLocation() { 325 return includeLocation; 326 } 327 328 /** 329 * Returns {@code true} if this AsyncAppender will block when the queue is full, 330 * or {@code false} if events are dropped when the queue is full. 331 * @return whether this AsyncAppender will block or drop events when the queue is full. 332 */ 333 public boolean isBlocking() { 334 return blocking; 335 } 336 337 /** 338 * Returns the name of the appender that any errors are logged to or {@code null}. 339 * @return the name of the appender that any errors are logged to or {@code null} 340 */ 341 public String getErrorRef() { 342 return errorRef; 343 } 344 345 public int getQueueCapacity() { 346 return queueSize; 347 } 348 349 public int getQueueRemainingCapacity() { 350 return queue.remainingCapacity(); 351 } 352 }