001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017package org.apache.logging.log4j.core.appender; 018 019import java.util.ArrayList; 020import java.util.List; 021import java.util.Map; 022import java.util.concurrent.ArrayBlockingQueue; 023import java.util.concurrent.BlockingQueue; 024import java.util.concurrent.atomic.AtomicLong; 025 026import org.apache.logging.log4j.core.AbstractLogEvent; 027import org.apache.logging.log4j.core.Appender; 028import org.apache.logging.log4j.core.Filter; 029import org.apache.logging.log4j.core.LogEvent; 030import org.apache.logging.log4j.core.async.AsyncQueueFullPolicy; 031import org.apache.logging.log4j.core.async.AsyncQueueFullPolicyFactory; 032import org.apache.logging.log4j.core.async.DiscardingAsyncQueueFullPolicy; 033import org.apache.logging.log4j.core.async.EventRoute; 034import org.apache.logging.log4j.core.config.AppenderControl; 035import org.apache.logging.log4j.core.config.AppenderRef; 036import org.apache.logging.log4j.core.config.Configuration; 037import org.apache.logging.log4j.core.config.ConfigurationException; 038import org.apache.logging.log4j.core.config.plugins.Plugin; 039import org.apache.logging.log4j.core.config.plugins.PluginAliases; 040import org.apache.logging.log4j.core.config.plugins.PluginAttribute; 041import org.apache.logging.log4j.core.config.plugins.PluginConfiguration; 042import org.apache.logging.log4j.core.config.plugins.PluginElement; 043import org.apache.logging.log4j.core.config.plugins.PluginFactory; 044import org.apache.logging.log4j.core.impl.Log4jLogEvent; 045import org.apache.logging.log4j.core.util.Constants; 046 047/** 048 * Appends to one or more Appenders asynchronously. You can configure an AsyncAppender with one or more Appenders and an 049 * Appender to append to if the queue is full. The AsyncAppender does not allow a filter to be specified on the Appender 050 * references. 051 */ 052@Plugin(name = "Async", category = "Core", elementType = "appender", printObject = true) 053public final class AsyncAppender extends AbstractAppender { 054 055 private static final int DEFAULT_QUEUE_SIZE = 128; 056 private static final LogEvent SHUTDOWN = new AbstractLogEvent() { 057 }; 058 059 private static final AtomicLong THREAD_SEQUENCE = new AtomicLong(1); 060 061 private final BlockingQueue<LogEvent> queue; 062 private final int queueSize; 063 private final boolean blocking; 064 private final long shutdownTimeout; 065 private final Configuration config; 066 private final AppenderRef[] appenderRefs; 067 private final String errorRef; 068 private final boolean includeLocation; 069 private AppenderControl errorAppender; 070 private AsyncThread thread; 071 private AsyncQueueFullPolicy asyncQueueFullPolicy; 072 073 private AsyncAppender(final String name, final Filter filter, final AppenderRef[] appenderRefs, 074 final String errorRef, final int queueSize, final boolean blocking, 075 final boolean ignoreExceptions, 076 final long shutdownTimeout, final Configuration config, final boolean includeLocation) { 077 super(name, filter, null, ignoreExceptions); 078 this.queue = new ArrayBlockingQueue<>(queueSize); 079 this.queueSize = queueSize; 080 this.blocking = blocking; 081 this.shutdownTimeout = shutdownTimeout; 082 this.config = config; 083 this.appenderRefs = appenderRefs; 084 this.errorRef = errorRef; 085 this.includeLocation = includeLocation; 086 } 087 088 @Override 089 public void start() { 090 final Map<String, Appender> map = config.getAppenders(); 091 final List<AppenderControl> appenders = new ArrayList<>(); 092 for (final AppenderRef appenderRef : appenderRefs) { 093 final Appender appender = map.get(appenderRef.getRef()); 094 if (appender != null) { 095 appenders.add(new AppenderControl(appender, appenderRef.getLevel(), appenderRef.getFilter())); 096 } else { 097 LOGGER.error("No appender named {} was configured", appenderRef); 098 } 099 } 100 if (errorRef != null) { 101 final Appender appender = map.get(errorRef); 102 if (appender != null) { 103 errorAppender = new AppenderControl(appender, null, null); 104 } else { 105 LOGGER.error("Unable to set up error Appender. No appender named {} was configured", errorRef); 106 } 107 } 108 if (appenders.size() > 0) { 109 thread = new AsyncThread(appenders, queue); 110 thread.setName("AsyncAppender-" + getName()); 111 } else if (errorRef == null) { 112 throw new ConfigurationException("No appenders are available for AsyncAppender " + getName()); 113 } 114 asyncQueueFullPolicy = AsyncQueueFullPolicyFactory.create(); 115 116 thread.start(); 117 super.start(); 118 } 119 120 @Override 121 public void stop() { 122 super.stop(); 123 LOGGER.trace("AsyncAppender stopping. Queue still has {} events.", queue.size()); 124 thread.shutdown(); 125 try { 126 thread.join(shutdownTimeout); 127 } catch (final InterruptedException ex) { 128 LOGGER.warn("Interrupted while stopping AsyncAppender {}", getName()); 129 } 130 LOGGER.trace("AsyncAppender stopped. Queue has {} events.", queue.size()); 131 132 if (DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy) > 0) { 133 LOGGER.trace("AsyncAppender: {} discarded {} events.", asyncQueueFullPolicy, 134 DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy)); 135 } 136 } 137 138 /** 139 * Actual writing occurs here. 140 * 141 * @param logEvent The LogEvent. 142 */ 143 @Override 144 public void append(final LogEvent logEvent) { 145 if (!isStarted()) { 146 throw new IllegalStateException("AsyncAppender " + getName() + " is not active"); 147 } 148 if (!Constants.FORMAT_MESSAGES_IN_BACKGROUND) { // LOG4J2-898: user may choose 149 logEvent.getMessage().getFormattedMessage(); // LOG4J2-763: ask message to freeze parameters 150 } 151 final Log4jLogEvent memento = Log4jLogEvent.createMemento(logEvent, includeLocation); 152 if (!queue.offer(memento)) { 153 if (blocking) { 154 // delegate to the event router (which may discard, enqueue and block, or log in current thread) 155 final EventRoute route = asyncQueueFullPolicy.getRoute(thread.getId(), memento.getLevel()); 156 route.logMessage(this, memento); 157 } else { 158 error("Appender " + getName() + " is unable to write primary appenders. queue is full"); 159 logToErrorAppenderIfNecessary(false, memento); 160 } 161 } 162 } 163 164 /** 165 * FOR INTERNAL USE ONLY. 166 * 167 * @param logEvent the event to log 168 */ 169 public void logMessageInCurrentThread(final LogEvent logEvent) { 170 logEvent.setEndOfBatch(queue.isEmpty()); 171 final boolean appendSuccessful = thread.callAppenders(logEvent); 172 logToErrorAppenderIfNecessary(appendSuccessful, logEvent); 173 } 174 175 /** 176 * FOR INTERNAL USE ONLY. 177 * 178 * @param logEvent the event to log 179 */ 180 public void logMessageInBackgroundThread(final LogEvent logEvent) { 181 try { 182 // wait for free slots in the queue 183 queue.put(logEvent); 184 } catch (final InterruptedException e) { 185 final boolean appendSuccessful = handleInterruptedException(logEvent); 186 logToErrorAppenderIfNecessary(appendSuccessful, logEvent); 187 } 188 } 189 190 // LOG4J2-1049: Some applications use Thread.interrupt() to send 191 // messages between application threads. This does not necessarily 192 // mean that the queue is full. To prevent dropping a log message, 193 // quickly try to offer the event to the queue again. 194 // (Yes, this means there is a possibility the same event is logged twice.) 195 // 196 // Finally, catching the InterruptedException means the 197 // interrupted flag has been cleared on the current thread. 198 // This may interfere with the application's expectation of 199 // being interrupted, so when we are done, we set the interrupted 200 // flag again. 201 private boolean handleInterruptedException(final LogEvent memento) { 202 final boolean appendSuccessful = queue.offer(memento); 203 if (!appendSuccessful) { 204 LOGGER.warn("Interrupted while waiting for a free slot in the AsyncAppender LogEvent-queue {}", 205 getName()); 206 } 207 // set the interrupted flag again. 208 Thread.currentThread().interrupt(); 209 return appendSuccessful; 210 } 211 212 private void logToErrorAppenderIfNecessary(final boolean appendSuccessful, final LogEvent logEvent) { 213 if (!appendSuccessful && errorAppender != null) { 214 errorAppender.callAppender(logEvent); 215 } 216 } 217 218 /** 219 * Create an AsyncAppender. 220 * 221 * @param appenderRefs The Appenders to reference. 222 * @param errorRef An optional Appender to write to if the queue is full or other errors occur. 223 * @param blocking True if the Appender should wait when the queue is full. The default is true. 224 * @param shutdownTimeout How many milliseconds the Appender should wait to flush outstanding log events 225 * in the queue on shutdown. The default is zero which means to wait forever. 226 * @param size The size of the event queue. The default is 128. 227 * @param name The name of the Appender. 228 * @param includeLocation whether to include location information. The default is false. 229 * @param filter The Filter or null. 230 * @param config The Configuration. 231 * @param ignoreExceptions If {@code "true"} (default) exceptions encountered when appending events are logged; 232 * otherwise they are propagated to the caller. 233 * @return The AsyncAppender. 234 */ 235 @PluginFactory 236 public static AsyncAppender createAppender( 237 // @formatter:off 238 @PluginElement("AppenderRef") final AppenderRef[] appenderRefs, 239 @PluginAttribute("errorRef") @PluginAliases("error-ref") final String errorRef, 240 @PluginAttribute(value = "blocking", defaultBoolean = true) final boolean blocking, 241 @PluginAttribute(value = "shutdownTimeout", defaultLong = 0L) final long shutdownTimeout, 242 @PluginAttribute(value = "bufferSize", defaultInt = DEFAULT_QUEUE_SIZE) final int size, 243 @PluginAttribute("name") final String name, 244 @PluginAttribute(value = "includeLocation", defaultBoolean = false) final boolean includeLocation, 245 @PluginElement("Filter") final Filter filter, 246 @PluginConfiguration final Configuration config, 247 @PluginAttribute(value = "ignoreExceptions", defaultBoolean = true) final boolean ignoreExceptions) { 248 // @formatter:on 249 if (name == null) { 250 LOGGER.error("No name provided for AsyncAppender"); 251 return null; 252 } 253 if (appenderRefs == null) { 254 LOGGER.error("No appender references provided to AsyncAppender {}", name); 255 } 256 257 return new AsyncAppender(name, filter, appenderRefs, errorRef, size, blocking, ignoreExceptions, 258 shutdownTimeout, config, includeLocation); 259 } 260 261 /** 262 * Thread that calls the Appenders. 263 */ 264 private class AsyncThread extends Thread { 265 266 private volatile boolean shutdown = false; 267 private final List<AppenderControl> appenders; 268 private final BlockingQueue<LogEvent> queue; 269 270 public AsyncThread(final List<AppenderControl> appenders, final BlockingQueue<LogEvent> queue) { 271 this.appenders = appenders; 272 this.queue = queue; 273 setDaemon(true); 274 setName("AsyncAppenderThread" + THREAD_SEQUENCE.getAndIncrement()); 275 } 276 277 @Override 278 public void run() { 279 while (!shutdown) { 280 LogEvent event; 281 try { 282 event = queue.take(); 283 if (event == SHUTDOWN) { 284 shutdown = true; 285 continue; 286 } 287 } catch (final InterruptedException ex) { 288 break; // LOG4J2-830 289 } 290 event.setEndOfBatch(queue.isEmpty()); 291 final boolean success = callAppenders(event); 292 if (!success && errorAppender != null) { 293 try { 294 errorAppender.callAppender(event); 295 } catch (final Exception ex) { 296 // Silently accept the error. 297 } 298 } 299 } 300 // Process any remaining items in the queue. 301 LOGGER.trace("AsyncAppender.AsyncThread shutting down. Processing remaining {} queue events.", 302 queue.size()); 303 int count = 0; 304 int ignored = 0; 305 while (!queue.isEmpty()) { 306 try { 307 final LogEvent event = queue.take(); 308 if (event instanceof Log4jLogEvent) { 309 final Log4jLogEvent logEvent = (Log4jLogEvent) event; 310 logEvent.setEndOfBatch(queue.isEmpty()); 311 callAppenders(logEvent); 312 count++; 313 } else { 314 ignored++; 315 LOGGER.trace("Ignoring event of class {}", event.getClass().getName()); 316 } 317 } catch (final InterruptedException ex) { 318 // May have been interrupted to shut down. 319 // Here we ignore interrupts and try to process all remaining events. 320 } 321 } 322 LOGGER.trace("AsyncAppender.AsyncThread stopped. Queue has {} events remaining. " 323 + "Processed {} and ignored {} events since shutdown started.", queue.size(), count, ignored); 324 } 325 326 /** 327 * Calls {@link AppenderControl#callAppender(LogEvent) callAppender} on all registered {@code AppenderControl} 328 * objects, and returns {@code true} if at least one appender call was successful, {@code false} otherwise. Any 329 * exceptions are silently ignored. 330 * 331 * @param event the event to forward to the registered appenders 332 * @return {@code true} if at least one appender call succeeded, {@code false} otherwise 333 */ 334 boolean callAppenders(final LogEvent event) { 335 boolean success = false; 336 for (final AppenderControl control : appenders) { 337 try { 338 control.callAppender(event); 339 success = true; 340 } catch (final Exception ex) { 341 // If no appender is successful the error appender will get it. 342 } 343 } 344 return success; 345 } 346 347 public void shutdown() { 348 shutdown = true; 349 if (queue.isEmpty()) { 350 queue.offer(SHUTDOWN); 351 } 352 } 353 } 354 355 /** 356 * Returns the names of the appenders that this asyncAppender delegates to as an array of Strings. 357 * 358 * @return the names of the sink appenders 359 */ 360 public String[] getAppenderRefStrings() { 361 final String[] result = new String[appenderRefs.length]; 362 for (int i = 0; i < result.length; i++) { 363 result[i] = appenderRefs[i].getRef(); 364 } 365 return result; 366 } 367 368 /** 369 * Returns {@code true} if this AsyncAppender will take a snapshot of the stack with every log event to determine 370 * the class and method where the logging call was made. 371 * 372 * @return {@code true} if location is included with every event, {@code false} otherwise 373 */ 374 public boolean isIncludeLocation() { 375 return includeLocation; 376 } 377 378 /** 379 * Returns {@code true} if this AsyncAppender will block when the queue is full, or {@code false} if events are 380 * dropped when the queue is full. 381 * 382 * @return whether this AsyncAppender will block or drop events when the queue is full. 383 */ 384 public boolean isBlocking() { 385 return blocking; 386 } 387 388 /** 389 * Returns the name of the appender that any errors are logged to or {@code null}. 390 * 391 * @return the name of the appender that any errors are logged to or {@code null} 392 */ 393 public String getErrorRef() { 394 return errorRef; 395 } 396 397 public int getQueueCapacity() { 398 return queueSize; 399 } 400 401 public int getQueueRemainingCapacity() { 402 return queue.remainingCapacity(); 403 } 404}