001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017package org.apache.logging.log4j.core.appender; 018 019import java.io.Serializable; 020import java.util.ArrayList; 021import java.util.List; 022import java.util.Map; 023import java.util.concurrent.ArrayBlockingQueue; 024import java.util.concurrent.BlockingQueue; 025import java.util.concurrent.atomic.AtomicLong; 026 027import org.apache.logging.log4j.core.Appender; 028import org.apache.logging.log4j.core.Filter; 029import org.apache.logging.log4j.core.LogEvent; 030import org.apache.logging.log4j.core.async.RingBufferLogEvent; 031import org.apache.logging.log4j.core.config.AppenderControl; 032import org.apache.logging.log4j.core.config.AppenderRef; 033import org.apache.logging.log4j.core.config.Configuration; 034import org.apache.logging.log4j.core.config.ConfigurationException; 035import org.apache.logging.log4j.core.config.plugins.Plugin; 036import org.apache.logging.log4j.core.config.plugins.PluginAliases; 037import org.apache.logging.log4j.core.config.plugins.PluginAttribute; 038import org.apache.logging.log4j.core.config.plugins.PluginConfiguration; 039import org.apache.logging.log4j.core.config.plugins.PluginElement; 040import org.apache.logging.log4j.core.config.plugins.PluginFactory; 041import org.apache.logging.log4j.core.impl.Log4jLogEvent; 042import org.apache.logging.log4j.core.util.Constants; 043 044/** 045 * Appends to one or more Appenders asynchronously. You can configure an AsyncAppender with one or more Appenders and an 046 * Appender to append to if the queue is full. The AsyncAppender does not allow a filter to be specified on the Appender 047 * references. 048 */ 049@Plugin(name = "Async", category = "Core", elementType = "appender", printObject = true) 050public final class AsyncAppender extends AbstractAppender { 051 052 private static final long serialVersionUID = 1L; 053 private static final int DEFAULT_QUEUE_SIZE = 128; 054 private static final String SHUTDOWN = "Shutdown"; 055 056 private static final AtomicLong THREAD_SEQUENCE = new AtomicLong(1); 057 private static ThreadLocal<Boolean> isAppenderThread = new ThreadLocal<>(); 058 059 private final BlockingQueue<Serializable> queue; 060 private final int queueSize; 061 private final boolean blocking; 062 private final long shutdownTimeout; 063 private final Configuration config; 064 private final AppenderRef[] appenderRefs; 065 private final String errorRef; 066 private final boolean includeLocation; 067 private AppenderControl errorAppender; 068 private AsyncThread thread; 069 070 private AsyncAppender(final String name, final Filter filter, final AppenderRef[] appenderRefs, 071 final String errorRef, final int queueSize, final boolean blocking, final boolean ignoreExceptions, 072 final long shutdownTimeout, final Configuration config, final boolean includeLocation) { 073 super(name, filter, null, ignoreExceptions); 074 this.queue = new ArrayBlockingQueue<>(queueSize); 075 this.queueSize = queueSize; 076 this.blocking = blocking; 077 this.shutdownTimeout = shutdownTimeout; 078 this.config = config; 079 this.appenderRefs = appenderRefs; 080 this.errorRef = errorRef; 081 this.includeLocation = includeLocation; 082 } 083 084 @Override 085 public void start() { 086 final Map<String, Appender> map = config.getAppenders(); 087 final List<AppenderControl> appenders = new ArrayList<>(); 088 for (final AppenderRef appenderRef : appenderRefs) { 089 final Appender appender = map.get(appenderRef.getRef()); 090 if (appender != null) { 091 appenders.add(new AppenderControl(appender, appenderRef.getLevel(), appenderRef.getFilter())); 092 } else { 093 LOGGER.error("No appender named {} was configured", appenderRef); 094 } 095 } 096 if (errorRef != null) { 097 final Appender appender = map.get(errorRef); 098 if (appender != null) { 099 errorAppender = new AppenderControl(appender, null, null); 100 } else { 101 LOGGER.error("Unable to set up error Appender. No appender named {} was configured", errorRef); 102 } 103 } 104 if (appenders.size() > 0) { 105 thread = new AsyncThread(appenders, queue); 106 thread.setName("AsyncAppender-" + getName()); 107 } else if (errorRef == null) { 108 throw new ConfigurationException("No appenders are available for AsyncAppender " + getName()); 109 } 110 111 thread.start(); 112 super.start(); 113 } 114 115 @Override 116 public void stop() { 117 super.stop(); 118 LOGGER.trace("AsyncAppender stopping. Queue still has {} events.", queue.size()); 119 thread.shutdown(); 120 try { 121 thread.join(shutdownTimeout); 122 } catch (final InterruptedException ex) { 123 LOGGER.warn("Interrupted while stopping AsyncAppender {}", getName()); 124 } 125 LOGGER.trace("AsyncAppender stopped. Queue has {} events.", queue.size()); 126 } 127 128 /** 129 * Actual writing occurs here. 130 * 131 * @param logEvent The LogEvent. 132 */ 133 @Override 134 public void append(LogEvent logEvent) { 135 if (!isStarted()) { 136 throw new IllegalStateException("AsyncAppender " + getName() + " is not active"); 137 } 138 if (!(logEvent instanceof Log4jLogEvent)) { 139 if (!(logEvent instanceof RingBufferLogEvent)) { 140 return; // only know how to Serialize Log4jLogEvents and RingBufferLogEvents 141 } 142 logEvent = ((RingBufferLogEvent) logEvent).createMemento(); 143 } 144 if (!Constants.FORMAT_MESSAGES_IN_BACKGROUND) { // LOG4J2-898: user may choose 145 logEvent.getMessage().getFormattedMessage(); // LOG4J2-763: ask message to freeze parameters 146 } 147 final Log4jLogEvent coreEvent = (Log4jLogEvent) logEvent; 148 boolean appendSuccessful = false; 149 if (blocking) { 150 if (isAppenderThread.get() == Boolean.TRUE && queue.remainingCapacity() == 0) { 151 // LOG4J2-485: avoid deadlock that would result from trying 152 // to add to a full queue from appender thread 153 coreEvent.setEndOfBatch(false); // queue is definitely not empty! 154 appendSuccessful = thread.callAppenders(coreEvent); 155 } else { 156 final Serializable serialized = Log4jLogEvent.serialize(coreEvent, includeLocation); 157 try { 158 // wait for free slots in the queue 159 queue.put(serialized); 160 appendSuccessful = true; 161 } catch (final InterruptedException e) { 162 // LOG4J2-1049: Some applications use Thread.interrupt() to send 163 // messages between application threads. This does not necessarily 164 // mean that the queue is full. To prevent dropping a log message, 165 // quickly try to offer the event to the queue again. 166 // (Yes, this means there is a possibility the same event is logged twice.) 167 // 168 // Finally, catching the InterruptedException means the 169 // interrupted flag has been cleared on the current thread. 170 // This may interfere with the application's expectation of 171 // being interrupted, so when we are done, we set the interrupted 172 // flag again. 173 appendSuccessful = queue.offer(serialized); 174 if (!appendSuccessful) { 175 LOGGER.warn("Interrupted while waiting for a free slot in the AsyncAppender LogEvent-queue {}", 176 getName()); 177 } 178 // set the interrupted flag again. 179 Thread.currentThread().interrupt(); 180 } 181 } 182 } else { 183 appendSuccessful = queue.offer(Log4jLogEvent.serialize(coreEvent, includeLocation)); 184 if (!appendSuccessful) { 185 error("Appender " + getName() + " is unable to write primary appenders. queue is full"); 186 } 187 } 188 if (!appendSuccessful && errorAppender != null) { 189 errorAppender.callAppender(coreEvent); 190 } 191 } 192 193 /** 194 * Create an AsyncAppender. 195 * 196 * @param appenderRefs The Appenders to reference. 197 * @param errorRef An optional Appender to write to if the queue is full or other errors occur. 198 * @param blocking True if the Appender should wait when the queue is full. The default is true. 199 * @param shutdownTimeout How many milliseconds the Appender should wait to flush outstanding log events 200 * in the queue on shutdown. The default is zero which means to wait forever. 201 * @param size The size of the event queue. The default is 128. 202 * @param name The name of the Appender. 203 * @param includeLocation whether to include location information. The default is false. 204 * @param filter The Filter or null. 205 * @param config The Configuration. 206 * @param ignoreExceptions If {@code "true"} (default) exceptions encountered when appending events are logged; 207 * otherwise they are propagated to the caller. 208 * @return The AsyncAppender. 209 */ 210 @PluginFactory 211 public static AsyncAppender createAppender(@PluginElement("AppenderRef") final AppenderRef[] appenderRefs, 212 @PluginAttribute("errorRef") @PluginAliases("error-ref") final String errorRef, 213 @PluginAttribute(value = "blocking", defaultBoolean = true) final boolean blocking, 214 @PluginAttribute(value = "shutdownTimeout", defaultLong = 0L) final long shutdownTimeout, 215 @PluginAttribute(value = "bufferSize", defaultInt = DEFAULT_QUEUE_SIZE) final int size, 216 @PluginAttribute("name") final String name, 217 @PluginAttribute(value = "includeLocation", defaultBoolean = false) final boolean includeLocation, 218 @PluginElement("Filter") final Filter filter, @PluginConfiguration final Configuration config, 219 @PluginAttribute(value = "ignoreExceptions", defaultBoolean = true) final boolean ignoreExceptions) { 220 if (name == null) { 221 LOGGER.error("No name provided for AsyncAppender"); 222 return null; 223 } 224 if (appenderRefs == null) { 225 LOGGER.error("No appender references provided to AsyncAppender {}", name); 226 } 227 228 return new AsyncAppender(name, filter, appenderRefs, errorRef, size, blocking, ignoreExceptions, 229 shutdownTimeout, config, includeLocation); 230 } 231 232 /** 233 * Thread that calls the Appenders. 234 */ 235 private class AsyncThread extends Thread { 236 237 private volatile boolean shutdown = false; 238 private final List<AppenderControl> appenders; 239 private final BlockingQueue<Serializable> queue; 240 241 public AsyncThread(final List<AppenderControl> appenders, final BlockingQueue<Serializable> queue) { 242 this.appenders = appenders; 243 this.queue = queue; 244 setDaemon(true); 245 setName("AsyncAppenderThread" + THREAD_SEQUENCE.getAndIncrement()); 246 } 247 248 @Override 249 public void run() { 250 isAppenderThread.set(Boolean.TRUE); // LOG4J2-485 251 while (!shutdown) { 252 Serializable s; 253 try { 254 s = queue.take(); 255 if (s != null && s instanceof String && SHUTDOWN.equals(s.toString())) { 256 shutdown = true; 257 continue; 258 } 259 } catch (final InterruptedException ex) { 260 break; // LOG4J2-830 261 } 262 final Log4jLogEvent event = Log4jLogEvent.deserialize(s); 263 event.setEndOfBatch(queue.isEmpty()); 264 final boolean success = callAppenders(event); 265 if (!success && errorAppender != null) { 266 try { 267 errorAppender.callAppender(event); 268 } catch (final Exception ex) { 269 // Silently accept the error. 270 } 271 } 272 } 273 // Process any remaining items in the queue. 274 LOGGER.trace("AsyncAppender.AsyncThread shutting down. Processing remaining {} queue events.", 275 queue.size()); 276 int count = 0; 277 int ignored = 0; 278 while (!queue.isEmpty()) { 279 try { 280 final Serializable s = queue.take(); 281 if (Log4jLogEvent.canDeserialize(s)) { 282 final Log4jLogEvent event = Log4jLogEvent.deserialize(s); 283 event.setEndOfBatch(queue.isEmpty()); 284 callAppenders(event); 285 count++; 286 } else { 287 ignored++; 288 LOGGER.trace("Ignoring event of class {}", s.getClass().getName()); 289 } 290 } catch (final InterruptedException ex) { 291 // May have been interrupted to shut down. 292 // Here we ignore interrupts and try to process all remaining events. 293 } 294 } 295 LOGGER.trace("AsyncAppender.AsyncThread stopped. Queue has {} events remaining. " 296 + "Processed {} and ignored {} events since shutdown started.", queue.size(), count, ignored); 297 } 298 299 /** 300 * Calls {@link AppenderControl#callAppender(LogEvent) callAppender} on all registered {@code AppenderControl} 301 * objects, and returns {@code true} if at least one appender call was successful, {@code false} otherwise. Any 302 * exceptions are silently ignored. 303 * 304 * @param event the event to forward to the registered appenders 305 * @return {@code true} if at least one appender call succeeded, {@code false} otherwise 306 */ 307 boolean callAppenders(final Log4jLogEvent event) { 308 boolean success = false; 309 for (final AppenderControl control : appenders) { 310 try { 311 control.callAppender(event); 312 success = true; 313 } catch (final Exception ex) { 314 // If no appender is successful the error appender will get it. 315 } 316 } 317 return success; 318 } 319 320 public void shutdown() { 321 shutdown = true; 322 if (queue.isEmpty()) { 323 queue.offer(SHUTDOWN); 324 } 325 } 326 } 327 328 /** 329 * Returns the names of the appenders that this asyncAppender delegates to as an array of Strings. 330 * 331 * @return the names of the sink appenders 332 */ 333 public String[] getAppenderRefStrings() { 334 final String[] result = new String[appenderRefs.length]; 335 for (int i = 0; i < result.length; i++) { 336 result[i] = appenderRefs[i].getRef(); 337 } 338 return result; 339 } 340 341 /** 342 * Returns {@code true} if this AsyncAppender will take a snapshot of the stack with every log event to determine 343 * the class and method where the logging call was made. 344 * 345 * @return {@code true} if location is included with every event, {@code false} otherwise 346 */ 347 public boolean isIncludeLocation() { 348 return includeLocation; 349 } 350 351 /** 352 * Returns {@code true} if this AsyncAppender will block when the queue is full, or {@code false} if events are 353 * dropped when the queue is full. 354 * 355 * @return whether this AsyncAppender will block or drop events when the queue is full. 356 */ 357 public boolean isBlocking() { 358 return blocking; 359 } 360 361 /** 362 * Returns the name of the appender that any errors are logged to or {@code null}. 363 * 364 * @return the name of the appender that any errors are logged to or {@code null} 365 */ 366 public String getErrorRef() { 367 return errorRef; 368 } 369 370 public int getQueueCapacity() { 371 return queueSize; 372 } 373 374 public int getQueueRemainingCapacity() { 375 return queue.remainingCapacity(); 376 } 377}