001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017package org.apache.logging.log4j.core.async; 018 019import java.util.Map; 020import java.util.Objects; 021import java.util.concurrent.ExecutorService; 022import java.util.concurrent.Executors; 023 024import org.apache.logging.log4j.Level; 025import org.apache.logging.log4j.Marker; 026import org.apache.logging.log4j.ThreadContext; 027import org.apache.logging.log4j.core.Logger; 028import org.apache.logging.log4j.core.LoggerContext; 029import org.apache.logging.log4j.core.config.Property; 030import org.apache.logging.log4j.core.config.ReliabilityStrategy; 031import org.apache.logging.log4j.core.impl.Log4jLogEvent; 032import org.apache.logging.log4j.core.jmx.RingBufferAdmin; 033import org.apache.logging.log4j.core.util.Clock; 034import org.apache.logging.log4j.core.util.ClockFactory; 035import org.apache.logging.log4j.core.util.DummyNanoClock; 036import org.apache.logging.log4j.core.util.Integers; 037import org.apache.logging.log4j.core.util.Loader; 038import org.apache.logging.log4j.core.util.NanoClock; 039import org.apache.logging.log4j.message.Message; 040import org.apache.logging.log4j.message.MessageFactory; 041import org.apache.logging.log4j.message.TimestampMessage; 042import org.apache.logging.log4j.status.StatusLogger; 043import org.apache.logging.log4j.util.PropertiesUtil; 044 045import com.lmax.disruptor.BlockingWaitStrategy; 046import com.lmax.disruptor.ExceptionHandler; 047import com.lmax.disruptor.RingBuffer; 048import com.lmax.disruptor.SleepingWaitStrategy; 049import com.lmax.disruptor.WaitStrategy; 050import com.lmax.disruptor.YieldingWaitStrategy; 051import com.lmax.disruptor.dsl.Disruptor; 052import com.lmax.disruptor.dsl.ProducerType; 053 054/** 055 * AsyncLogger is a logger designed for high throughput and low latency logging. It does not perform any I/O in the 056 * calling (application) thread, but instead hands off the work to another thread as soon as possible. The actual 057 * logging is performed in the background thread. It uses the LMAX Disruptor library for inter-thread communication. (<a 058 * href="http://lmax-exchange.github.com/disruptor/" >http://lmax-exchange.github.com/disruptor/</a>) 059 * <p> 060 * To use AsyncLogger, specify the System property 061 * {@code -DLog4jContextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector} before you obtain a 062 * Logger, and all Loggers returned by LogManager.getLogger will be AsyncLoggers. 063 * <p> 064 * Note that for performance reasons, this logger does not include source location by default. You need to specify 065 * {@code includeLocation="true"} in the configuration or any %class, %location or %line conversion patterns in your 066 * log4j.xml configuration will produce either a "?" character or no output at all. 067 * <p> 068 * For best performance, use AsyncLogger with the RandomAccessFileAppender or RollingRandomAccessFileAppender, with 069 * immediateFlush=false. These appenders have built-in support for the batching mechanism used by the Disruptor library, 070 * and they will flush to disk at the end of each batch. This means that even with immediateFlush=false, there will 071 * never be any items left in the buffer; all log events will all be written to disk in a very efficient manner. 072 */ 073public class AsyncLogger extends Logger { 074 private static final long serialVersionUID = 1L; 075 private static final int SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS = 50; 076 private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 200; 077 private static final int RINGBUFFER_MIN_SIZE = 128; 078 private static final int RINGBUFFER_DEFAULT_SIZE = 256 * 1024; 079 private static final StatusLogger LOGGER = StatusLogger.getLogger(); 080 private static final ThreadNameStrategy THREAD_NAME_STRATEGY = ThreadNameStrategy.create(); 081 082 /** 083 * Strategy for deciding whether thread name should be cached or not. 084 */ 085 static enum ThreadNameStrategy { // LOG4J2-467 086 CACHED { 087 @Override 088 public String getThreadName(final Info info) { 089 return info.cachedThreadName; 090 } 091 }, 092 UNCACHED { 093 @Override 094 public String getThreadName(final Info info) { 095 return Thread.currentThread().getName(); 096 } 097 }; 098 abstract String getThreadName(Info info); 099 100 static ThreadNameStrategy create() { 101 final String name = PropertiesUtil.getProperties().getStringProperty("AsyncLogger.ThreadNameStrategy", 102 CACHED.name()); 103 try { 104 return ThreadNameStrategy.valueOf(name); 105 } catch (final Exception ex) { 106 LOGGER.debug("Using AsyncLogger.ThreadNameStrategy.CACHED: '{}' not valid: {}", name, ex.toString()); 107 return CACHED; 108 } 109 } 110 } 111 112 private static volatile Disruptor<RingBufferLogEvent> disruptor; 113 private static final Clock CLOCK = ClockFactory.getClock(); 114 private static volatile NanoClock nanoClock = new DummyNanoClock(); 115 116 private static final ExecutorService EXECUTOR = Executors.newSingleThreadExecutor(new DaemonThreadFactory( 117 "AsyncLogger-")); 118 119 static { 120 initInfoForExecutorThread(); 121 LOGGER.debug("AsyncLogger.ThreadNameStrategy={}", THREAD_NAME_STRATEGY); 122 final int ringBufferSize = calculateRingBufferSize(); 123 124 final WaitStrategy waitStrategy = createWaitStrategy(); 125 disruptor = new Disruptor<>(RingBufferLogEvent.FACTORY, ringBufferSize, EXECUTOR, ProducerType.MULTI, 126 waitStrategy); 127 disruptor.handleExceptionsWith(getExceptionHandler()); 128 disruptor.handleEventsWith(new RingBufferLogEventHandler()); 129 130 LOGGER.debug("Starting AsyncLogger disruptor with ringbuffer size {}...", disruptor.getRingBuffer() 131 .getBufferSize()); 132 disruptor.start(); 133 } 134 135 /** 136 * Constructs an {@code AsyncLogger} with the specified context, name and message factory. 137 * 138 * @param context context of this logger 139 * @param name name of this logger 140 * @param messageFactory message factory of this logger 141 */ 142 public AsyncLogger(final LoggerContext context, final String name, final MessageFactory messageFactory) { 143 super(context, name, messageFactory); 144 } 145 146 private static int calculateRingBufferSize() { 147 int ringBufferSize = RINGBUFFER_DEFAULT_SIZE; 148 final String userPreferredRBSize = PropertiesUtil.getProperties().getStringProperty( 149 "AsyncLogger.RingBufferSize", String.valueOf(ringBufferSize)); 150 try { 151 int size = Integer.parseInt(userPreferredRBSize); 152 if (size < RINGBUFFER_MIN_SIZE) { 153 size = RINGBUFFER_MIN_SIZE; 154 LOGGER.warn("Invalid RingBufferSize {}, using minimum size {}.", userPreferredRBSize, 155 RINGBUFFER_MIN_SIZE); 156 } 157 ringBufferSize = size; 158 } catch (final Exception ex) { 159 LOGGER.warn("Invalid RingBufferSize {}, using default size {}.", userPreferredRBSize, ringBufferSize); 160 } 161 return Integers.ceilingNextPowerOfTwo(ringBufferSize); 162 } 163 164 /** 165 * Initialize an {@code Info} object that is threadlocal to the consumer/appender thread. This Info object uniquely 166 * has attribute {@code isAppenderThread} set to {@code true}. All other Info objects will have this attribute set 167 * to {@code false}. This allows us to detect Logger.log() calls initiated from the appender thread, which may cause 168 * deadlock when the RingBuffer is full. (LOG4J2-471) 169 */ 170 private static void initInfoForExecutorThread() { 171 EXECUTOR.submit(new Runnable() { 172 @Override 173 public void run() { 174 final boolean isAppenderThread = true; 175 final Info info = new Info(new RingBufferLogEventTranslator(), // 176 Thread.currentThread().getName(), isAppenderThread); 177 Info.THREADLOCAL.set(info); 178 } 179 }); 180 } 181 182 private static WaitStrategy createWaitStrategy() { 183 final String strategy = PropertiesUtil.getProperties().getStringProperty("AsyncLogger.WaitStrategy"); 184 LOGGER.debug("property AsyncLogger.WaitStrategy={}", strategy); 185 if ("Sleep".equals(strategy)) { 186 return new SleepingWaitStrategy(); 187 } else if ("Yield".equals(strategy)) { 188 return new YieldingWaitStrategy(); 189 } else if ("Block".equals(strategy)) { 190 return new BlockingWaitStrategy(); 191 } 192 LOGGER.debug("disruptor event handler uses BlockingWaitStrategy"); 193 return new BlockingWaitStrategy(); 194 } 195 196 private static ExceptionHandler<RingBufferLogEvent> getExceptionHandler() { 197 final String cls = PropertiesUtil.getProperties().getStringProperty("AsyncLogger.ExceptionHandler"); 198 if (cls == null) { 199 LOGGER.debug("No AsyncLogger.ExceptionHandler specified"); 200 return null; 201 } 202 try { 203 @SuppressWarnings("unchecked") 204 final ExceptionHandler<RingBufferLogEvent> result = Loader 205 .newCheckedInstanceOf(cls, ExceptionHandler.class); 206 LOGGER.debug("AsyncLogger.ExceptionHandler={}", result); 207 return result; 208 } catch (final Exception ignored) { 209 LOGGER.debug("AsyncLogger.ExceptionHandler not set: error creating " + cls + ": ", ignored); 210 return null; 211 } 212 } 213 214 /** 215 * Tuple with the event translator and thread name for a thread. 216 */ 217 static class Info { 218 private static final ThreadLocal<Info> THREADLOCAL = new ThreadLocal<Info>() { 219 @Override 220 protected Info initialValue() { 221 // by default, set isAppenderThread to false 222 return new Info(new RingBufferLogEventTranslator(), Thread.currentThread().getName(), false); 223 } 224 }; 225 private final RingBufferLogEventTranslator translator; 226 private final String cachedThreadName; 227 private final boolean isAppenderThread; 228 229 public Info(final RingBufferLogEventTranslator translator, final String threadName, 230 final boolean appenderThread) { 231 this.translator = translator; 232 this.cachedThreadName = threadName; 233 this.isAppenderThread = appenderThread; 234 } 235 236 // LOG4J2-467 237 private String threadName() { 238 return THREAD_NAME_STRATEGY.getThreadName(this); 239 } 240 } 241 242 @Override 243 public void logMessage(final String fqcn, final Level level, final Marker marker, final Message message, 244 final Throwable thrown) { 245 246 final Disruptor<RingBufferLogEvent> temp = disruptor; 247 if (temp == null) { // LOG4J2-639 248 LOGGER.fatal("Ignoring log event after log4j was shut down"); 249 } else { 250 logMessage0(temp, fqcn, level, marker, message, thrown); 251 } 252 } 253 254 private void logMessage0(final Disruptor<RingBufferLogEvent> theDisruptor, final String fqcn, final Level level, 255 final Marker marker, final Message message, final Throwable thrown) { 256 final Info info = Info.THREADLOCAL.get(); 257 logMessageInAppropriateThread(info, theDisruptor, fqcn, level, marker, message, thrown); 258 } 259 260 private void logMessageInAppropriateThread(final Info info, final Disruptor<RingBufferLogEvent> theDisruptor, 261 final String fqcn, final Level level, final Marker marker, final Message message, final Throwable thrown) { 262 if (!logMessageInCurrentThread(info, theDisruptor, fqcn, level, marker, message, thrown)) { 263 logMessageInBackgroundThread(info, fqcn, level, marker, message, thrown); 264 } 265 } 266 267 /** 268 * LOG4J2-471: prevent deadlock when RingBuffer is full and object being logged calls Logger.log() from its 269 * toString() method 270 * 271 * @param info threadlocal information - used to determine if the current thread is the background appender thread 272 * @param theDisruptor used to check if the buffer is full 273 * @param fqcn fully qualified caller name 274 * @param level log level 275 * @param marker optional marker 276 * @param message log message 277 * @param thrown optional exception 278 * @return {@code true} if the event has been logged in the current thread, {@code false} if it should be passed to 279 * the background thread 280 */ 281 private boolean logMessageInCurrentThread(Info info, final Disruptor<RingBufferLogEvent> theDisruptor, 282 final String fqcn, final Level level, final Marker marker, final Message message, final Throwable thrown) { 283 if (info.isAppenderThread && theDisruptor.getRingBuffer().remainingCapacity() == 0) { 284 // bypass RingBuffer and invoke Appender directly 285 final ReliabilityStrategy strategy = privateConfig.loggerConfig.getReliabilityStrategy(); 286 strategy.log(this, getName(), fqcn, marker, level, message, thrown); 287 return true; 288 } 289 return false; 290 } 291 292 /** 293 * Enqueues the specified message to be logged in the background thread. 294 * 295 * @param info holds some cached information 296 * @param fqcn fully qualified caller name 297 * @param level log level 298 * @param marker optional marker 299 * @param message log message 300 * @param thrown optional exception 301 */ 302 private void logMessageInBackgroundThread(Info info, final String fqcn, final Level level, final Marker marker, 303 final Message message, final Throwable thrown) { 304 305 message.getFormattedMessage(); // LOG4J2-763: ask message to freeze parameters 306 307 initLogMessageInfo(info, fqcn, level, marker, message, thrown); 308 enqueueLogMessageInfo(info); 309 } 310 311 private void initLogMessageInfo(Info info, final String fqcn, final Level level, final Marker marker, 312 final Message message, final Throwable thrown) { 313 info.translator.setValues(this, getName(), marker, fqcn, level, message, // 314 // don't construct ThrowableProxy until required 315 thrown, // 316 317 // config properties are taken care of in the EventHandler 318 // thread in the #actualAsyncLog method 319 320 // needs shallow copy to be fast (LOG4J2-154) 321 ThreadContext.getImmutableContext(), // 322 323 // needs shallow copy to be fast (LOG4J2-154) 324 ThreadContext.getImmutableStack(), // 325 326 // Thread.currentThread().getName(), // 327 // info.cachedThreadName, // 328 info.threadName(), // 329 330 // location: very expensive operation. LOG4J2-153: 331 // Only include if "includeLocation=true" is specified, 332 // exclude if not specified or if "false" was specified. 333 calcLocationIfRequested(fqcn), 334 335 // System.currentTimeMillis()); 336 // CoarseCachedClock: 20% faster than system clock, 16ms gaps 337 // CachedClock: 10% faster than system clock, smaller gaps 338 // LOG4J2-744 avoid calling clock altogether if message has the timestamp 339 eventTimeMillis(message), // 340 nanoClock.nanoTime() // 341 ); 342 } 343 344 private long eventTimeMillis(final Message message) { 345 return message instanceof TimestampMessage ? ((TimestampMessage) message).getTimestamp() : CLOCK 346 .currentTimeMillis(); 347 } 348 349 /** 350 * Returns the caller location if requested, {@code null} otherwise. 351 * 352 * @param fqcn fully qualified caller name. 353 * @return the caller location if requested, {@code null} otherwise. 354 */ 355 private StackTraceElement calcLocationIfRequested(String fqcn) { 356 final boolean includeLocation = privateConfig.loggerConfig.isIncludeLocation(); 357 return includeLocation ? location(fqcn) : null; 358 } 359 360 private void enqueueLogMessageInfo(Info info) { 361 // LOG4J2-639: catch NPE if disruptor field was set to null after our check above 362 try { 363 // Note: do NOT use the temp variable above! 364 // That could result in adding a log event to the disruptor after it was shut down, 365 // which could cause the publishEvent method to hang and never return. 366 disruptor.publishEvent(info.translator); 367 } catch (final NullPointerException npe) { 368 LOGGER.fatal("Ignoring log event after log4j was shut down."); 369 } 370 } 371 372 private static StackTraceElement location(final String fqcnOfLogger) { 373 return Log4jLogEvent.calcLocation(fqcnOfLogger); 374 } 375 376 /** 377 * This method is called by the EventHandler that processes the RingBufferLogEvent in a separate thread. 378 * 379 * @param event the event to log 380 */ 381 public void actualAsyncLog(final RingBufferLogEvent event) { 382 final Map<Property, Boolean> properties = privateConfig.loggerConfig.getProperties(); 383 event.mergePropertiesIntoContextMap(properties, privateConfig.config.getStrSubstitutor()); 384 final ReliabilityStrategy strategy = privateConfig.loggerConfig.getReliabilityStrategy(); 385 strategy.log(this, event); 386 } 387 388 public static void stop() { 389 final Disruptor<RingBufferLogEvent> temp = disruptor; 390 391 // Must guarantee that publishing to the RingBuffer has stopped 392 // before we call disruptor.shutdown() 393 disruptor = null; // client code fails with NPE if log after stop = OK 394 if (temp == null) { 395 return; // stop() has already been called 396 } 397 398 // Calling Disruptor.shutdown() will wait until all enqueued events are fully processed, 399 // but this waiting happens in a busy-spin. To avoid (postpone) wasting CPU, 400 // we sleep in short chunks, up to 10 seconds, waiting for the ringbuffer to drain. 401 for (int i = 0; hasBacklog(temp) && i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) { 402 try { 403 Thread.sleep(SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS); // give up the CPU for a while 404 } catch (final InterruptedException e) { // ignored 405 } 406 } 407 temp.shutdown(); // busy-spins until all events currently in the disruptor have been processed 408 EXECUTOR.shutdown(); // finally, kill the processor thread 409 Info.THREADLOCAL.remove(); // LOG4J2-323 410 } 411 412 /** 413 * Returns {@code true} if the specified disruptor still has unprocessed events. 414 */ 415 private static boolean hasBacklog(final Disruptor<?> theDisruptor) { 416 final RingBuffer<?> ringBuffer = theDisruptor.getRingBuffer(); 417 return !ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize()); 418 } 419 420 /** 421 * Creates and returns a new {@code RingBufferAdmin} that instruments the ringbuffer of the {@code AsyncLogger}. 422 * 423 * @param contextName name of the global {@code AsyncLoggerContext} 424 * @return a new {@code RingBufferAdmin} that instruments the ringbuffer 425 */ 426 public static RingBufferAdmin createRingBufferAdmin(final String contextName) { 427 return RingBufferAdmin.forAsyncLogger(disruptor.getRingBuffer(), contextName); 428 } 429 430 /** 431 * Returns the {@code NanoClock} to use for creating the nanoTime timestamp of log events. 432 * 433 * @return the {@code NanoClock} to use for creating the nanoTime timestamp of log events 434 */ 435 public static NanoClock getNanoClock() { 436 return nanoClock; 437 } 438 439 /** 440 * Sets the {@code NanoClock} to use for creating the nanoTime timestamp of log events. 441 * <p> 442 * FOR INTERNAL USE. This method may be called with a different {@code NanoClock} implementation when the 443 * configuration changes. 444 * 445 * @param nanoClock the {@code NanoClock} to use for creating the nanoTime timestamp of log events 446 */ 447 public static void setNanoClock(NanoClock nanoClock) { 448 AsyncLogger.nanoClock = Objects.requireNonNull(nanoClock, "NanoClock must be non-null"); 449 } 450}