001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017package org.apache.logging.log4j.core.async; 018 019import java.util.Map; 020import java.util.Objects; 021import java.util.concurrent.ExecutorService; 022import java.util.concurrent.Executors; 023 024import org.apache.logging.log4j.Level; 025import org.apache.logging.log4j.Marker; 026import org.apache.logging.log4j.ThreadContext; 027import org.apache.logging.log4j.core.Logger; 028import org.apache.logging.log4j.core.LoggerContext; 029import org.apache.logging.log4j.core.config.Property; 030import org.apache.logging.log4j.core.config.ReliabilityStrategy; 031import org.apache.logging.log4j.core.impl.Log4jLogEvent; 032import org.apache.logging.log4j.core.jmx.RingBufferAdmin; 033import org.apache.logging.log4j.core.util.Clock; 034import org.apache.logging.log4j.core.util.ClockFactory; 035import org.apache.logging.log4j.core.util.DummyNanoClock; 036import org.apache.logging.log4j.core.util.Integers; 037import org.apache.logging.log4j.core.util.Loader; 038import org.apache.logging.log4j.core.util.NanoClock; 039import org.apache.logging.log4j.message.Message; 040import org.apache.logging.log4j.message.MessageFactory; 041import org.apache.logging.log4j.message.TimestampMessage; 042import org.apache.logging.log4j.status.StatusLogger; 043import org.apache.logging.log4j.util.PropertiesUtil; 044 045import com.lmax.disruptor.BlockingWaitStrategy; 046import com.lmax.disruptor.ExceptionHandler; 047import com.lmax.disruptor.RingBuffer; 048import com.lmax.disruptor.SleepingWaitStrategy; 049import com.lmax.disruptor.WaitStrategy; 050import com.lmax.disruptor.YieldingWaitStrategy; 051import com.lmax.disruptor.dsl.Disruptor; 052import com.lmax.disruptor.dsl.ProducerType; 053 054/** 055 * AsyncLogger is a logger designed for high throughput and low latency logging. 056 * It does not perform any I/O in the calling (application) thread, but instead 057 * hands off the work to another thread as soon as possible. The actual logging 058 * is performed in the background thread. It uses the LMAX Disruptor library for 059 * inter-thread communication. (<a 060 * href="http://lmax-exchange.github.com/disruptor/" 061 * >http://lmax-exchange.github.com/disruptor/</a>) 062 * <p> 063 * To use AsyncLogger, specify the System property 064 * {@code -DLog4jContextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector} 065 * before you obtain a Logger, and all Loggers returned by LogManager.getLogger 066 * will be AsyncLoggers. 067 * <p> 068 * Note that for performance reasons, this logger does not include source 069 * location by default. You need to specify {@code includeLocation="true"} in 070 * the configuration or any %class, %location or %line conversion patterns in 071 * your log4j.xml configuration will produce either a "?" character or no output 072 * at all. 073 * <p> 074 * For best performance, use AsyncLogger with the RandomAccessFileAppender or 075 * RollingRandomAccessFileAppender, with immediateFlush=false. These appenders 076 * have built-in support for the batching mechanism used by the Disruptor 077 * library, and they will flush to disk at the end of each batch. This means 078 * that even with immediateFlush=false, there will never be any items left in 079 * the buffer; all log events will all be written to disk in a very efficient 080 * manner. 081 */ 082public class AsyncLogger extends Logger { 083 private static final long serialVersionUID = 1L; 084 private static final int SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS = 50; 085 private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 200; 086 private static final int RINGBUFFER_MIN_SIZE = 128; 087 private static final int RINGBUFFER_DEFAULT_SIZE = 256 * 1024; 088 private static final StatusLogger LOGGER = StatusLogger.getLogger(); 089 private static final ThreadNameStrategy THREAD_NAME_STRATEGY = ThreadNameStrategy.create(); 090 091 static enum ThreadNameStrategy { // LOG4J2-467 092 CACHED { 093 @Override 094 public String getThreadName(final Info info) { 095 return info.cachedThreadName; 096 } 097 }, 098 UNCACHED { 099 @Override 100 public String getThreadName(final Info info) { 101 return Thread.currentThread().getName(); 102 } 103 }; 104 abstract String getThreadName(Info info); 105 106 static ThreadNameStrategy create() { 107 final String name = PropertiesUtil.getProperties().getStringProperty("AsyncLogger.ThreadNameStrategy", CACHED.name()); 108 try { 109 return ThreadNameStrategy.valueOf(name); 110 } catch (final Exception ex) { 111 LOGGER.debug("Using AsyncLogger.ThreadNameStrategy.CACHED: '{}' not valid: {}", name, ex.toString()); 112 return CACHED; 113 } 114 } 115 } 116 private static volatile Disruptor<RingBufferLogEvent> disruptor; 117 private static final Clock CLOCK = ClockFactory.getClock(); 118 private static volatile NanoClock nanoClock = new DummyNanoClock(); 119 120 private static final ExecutorService executor = Executors 121 .newSingleThreadExecutor(new DaemonThreadFactory("AsyncLogger-")); 122 123 static { 124 initInfoForExecutorThread(); 125 LOGGER.debug("AsyncLogger.ThreadNameStrategy={}", THREAD_NAME_STRATEGY); 126 final int ringBufferSize = calculateRingBufferSize(); 127 128 final WaitStrategy waitStrategy = createWaitStrategy(); 129 disruptor = new Disruptor<>(RingBufferLogEvent.FACTORY, ringBufferSize, executor, ProducerType.MULTI, 130 waitStrategy); 131 disruptor.handleExceptionsWith(getExceptionHandler()); 132 disruptor.handleEventsWith(new RingBufferLogEventHandler()); 133 134 LOGGER.debug("Starting AsyncLogger disruptor with ringbuffer size {}...", disruptor.getRingBuffer() 135 .getBufferSize()); 136 disruptor.start(); 137 } 138 139 private static int calculateRingBufferSize() { 140 int ringBufferSize = RINGBUFFER_DEFAULT_SIZE; 141 final String userPreferredRBSize = PropertiesUtil.getProperties().getStringProperty("AsyncLogger.RingBufferSize", 142 String.valueOf(ringBufferSize)); 143 try { 144 int size = Integer.parseInt(userPreferredRBSize); 145 if (size < RINGBUFFER_MIN_SIZE) { 146 size = RINGBUFFER_MIN_SIZE; 147 LOGGER.warn("Invalid RingBufferSize {}, using minimum size {}.", userPreferredRBSize, 148 RINGBUFFER_MIN_SIZE); 149 } 150 ringBufferSize = size; 151 } catch (final Exception ex) { 152 LOGGER.warn("Invalid RingBufferSize {}, using default size {}.", userPreferredRBSize, ringBufferSize); 153 } 154 return Integers.ceilingNextPowerOfTwo(ringBufferSize); 155 } 156 157 /** 158 * Initialize an {@code Info} object that is threadlocal to the consumer/appender thread. 159 * This Info object uniquely has attribute {@code isAppenderThread} set to {@code true}. 160 * All other Info objects will have this attribute set to {@code false}. 161 * This allows us to detect Logger.log() calls initiated from the appender thread, 162 * which may cause deadlock when the RingBuffer is full. (LOG4J2-471) 163 */ 164 private static void initInfoForExecutorThread() { 165 executor.submit(new Runnable(){ 166 @Override 167 public void run() { 168 final boolean isAppenderThread = true; 169 final Info info = new Info(new RingBufferLogEventTranslator(), // 170 Thread.currentThread().getName(), isAppenderThread); 171 Info.threadlocalInfo.set(info); 172 } 173 }); 174 } 175 176 private static WaitStrategy createWaitStrategy() { 177 final String strategy = PropertiesUtil.getProperties().getStringProperty("AsyncLogger.WaitStrategy"); 178 LOGGER.debug("property AsyncLogger.WaitStrategy={}", strategy); 179 if ("Sleep".equals(strategy)) { 180 return new SleepingWaitStrategy(); 181 } else if ("Yield".equals(strategy)) { 182 return new YieldingWaitStrategy(); 183 } else if ("Block".equals(strategy)) { 184 return new BlockingWaitStrategy(); 185 } 186 LOGGER.debug("disruptor event handler uses BlockingWaitStrategy"); 187 return new BlockingWaitStrategy(); 188 } 189 190 private static ExceptionHandler<RingBufferLogEvent> getExceptionHandler() { 191 final String cls = PropertiesUtil.getProperties().getStringProperty("AsyncLogger.ExceptionHandler"); 192 if (cls == null) { 193 LOGGER.debug("No AsyncLogger.ExceptionHandler specified"); 194 return null; 195 } 196 try { 197 @SuppressWarnings("unchecked") 198 final ExceptionHandler<RingBufferLogEvent> result = Loader.newCheckedInstanceOf(cls, ExceptionHandler.class); 199 LOGGER.debug("AsyncLogger.ExceptionHandler={}", result); 200 return result; 201 } catch (final Exception ignored) { 202 LOGGER.debug("AsyncLogger.ExceptionHandler not set: error creating " + cls + ": ", ignored); 203 return null; 204 } 205 } 206 207 /** 208 * Constructs an {@code AsyncLogger} with the specified context, name and 209 * message factory. 210 * 211 * @param context context of this logger 212 * @param name name of this logger 213 * @param messageFactory message factory of this logger 214 */ 215 public AsyncLogger(final LoggerContext context, final String name, final MessageFactory messageFactory) { 216 super(context, name, messageFactory); 217 } 218 219 /** 220 * Tuple with the event translator and thread name for a thread. 221 */ 222 static class Info { 223 private static final ThreadLocal<Info> threadlocalInfo = new ThreadLocal<Info>() { 224 @Override 225 protected Info initialValue() { 226 // by default, set isAppenderThread to false 227 return new Info(new RingBufferLogEventTranslator(), Thread.currentThread().getName(), false); 228 } 229 }; 230 private final RingBufferLogEventTranslator translator; 231 private final String cachedThreadName; 232 private final boolean isAppenderThread; 233 234 public Info(final RingBufferLogEventTranslator translator, final String threadName, final boolean appenderThread) { 235 this.translator = translator; 236 this.cachedThreadName = threadName; 237 this.isAppenderThread = appenderThread; 238 } 239 240 // LOG4J2-467 241 private String threadName() { 242 return THREAD_NAME_STRATEGY.getThreadName(this); 243 } 244 } 245 246 @Override 247 public void logMessage(final String fqcn, final Level level, final Marker marker, final Message message, 248 final Throwable thrown) { 249 250 final Disruptor<RingBufferLogEvent> temp = disruptor; 251 if (temp == null) { // LOG4J2-639 252 LOGGER.fatal("Ignoring log event after log4j was shut down"); 253 } else { 254 logMessage0(temp, fqcn, level, marker, message, thrown); 255 } 256 } 257 258 private void logMessage0(final Disruptor<RingBufferLogEvent> theDisruptor, final String fqcn, final Level level, 259 final Marker marker, final Message message, final Throwable thrown) { 260 final Info info = Info.threadlocalInfo.get(); 261 logMessageInAppropriateThread(info, theDisruptor, fqcn, level, marker, message, thrown); 262 } 263 264 private void logMessageInAppropriateThread(final Info info, final Disruptor<RingBufferLogEvent> theDisruptor, 265 final String fqcn, final Level level, final Marker marker, final Message message, final Throwable thrown) { 266 if (!logMessageInCurrentThread(info, theDisruptor, fqcn, level, marker, message, thrown)) { 267 logMessageInBackgroundThread(info, fqcn, level, marker, message, thrown); 268 } 269 } 270 271 /** 272 * LOG4J2-471: prevent deadlock when RingBuffer is full and object 273 * being logged calls Logger.log() from its toString() method 274 * 275 * @param info threadlocal information - used to determine if the current thread is the background appender thread 276 * @param theDisruptor used to check if the buffer is full 277 * @param fqcn fully qualified caller name 278 * @param level log level 279 * @param marker optional marker 280 * @param message log message 281 * @param thrown optional exception 282 * @return {@code true} if the event has been logged in the current thread, {@code false} if it should be passed to 283 * the background thread 284 */ 285 private boolean logMessageInCurrentThread(Info info, final Disruptor<RingBufferLogEvent> theDisruptor, 286 final String fqcn, final Level level, final Marker marker, final Message message, final Throwable thrown) { 287 if (info.isAppenderThread && theDisruptor.getRingBuffer().remainingCapacity() == 0) { 288 // bypass RingBuffer and invoke Appender directly 289 final ReliabilityStrategy strategy = privateConfig.loggerConfig.getReliabilityStrategy(); 290 strategy.log(this, getName(), fqcn, marker, level, message, thrown); 291 return true; 292 } 293 return false; 294 } 295 296 /** 297 * Enqueues the specified message to be logged in the background thread. 298 * 299 * @param info holds some cached information 300 * @param fqcn fully qualified caller name 301 * @param level log level 302 * @param marker optional marker 303 * @param message log message 304 * @param thrown optional exception 305 */ 306 private void logMessageInBackgroundThread(Info info, final String fqcn, final Level level, final Marker marker, 307 final Message message, final Throwable thrown) { 308 309 message.getFormattedMessage(); // LOG4J2-763: ask message to freeze parameters 310 311 initLogMessageInfo(info, fqcn, level, marker, message, thrown); 312 enqueueLogMessageInfo(info); 313 } 314 315 private void initLogMessageInfo(Info info, final String fqcn, final Level level, final Marker marker, 316 final Message message, final Throwable thrown) { 317 info.translator.setValues(this, getName(), marker, fqcn, level, message, // 318 // don't construct ThrowableProxy until required 319 thrown, // 320 321 // config properties are taken care of in the EventHandler 322 // thread in the #actualAsyncLog method 323 324 // needs shallow copy to be fast (LOG4J2-154) 325 ThreadContext.getImmutableContext(), // 326 327 // needs shallow copy to be fast (LOG4J2-154) 328 ThreadContext.getImmutableStack(), // 329 330 // Thread.currentThread().getName(), // 331 // info.cachedThreadName, // 332 info.threadName(), // 333 334 // location: very expensive operation. LOG4J2-153: 335 // Only include if "includeLocation=true" is specified, 336 // exclude if not specified or if "false" was specified. 337 calcLocationIfRequested(fqcn), 338 339 // System.currentTimeMillis()); 340 // CoarseCachedClock: 20% faster than system clock, 16ms gaps 341 // CachedClock: 10% faster than system clock, smaller gaps 342 // LOG4J2-744 avoid calling clock altogether if message has the timestamp 343 eventTimeMillis(message), // 344 nanoClock.nanoTime() // 345 ); 346 } 347 348 private long eventTimeMillis(final Message message) { 349 return message instanceof TimestampMessage ? ((TimestampMessage) message).getTimestamp() : 350 CLOCK.currentTimeMillis(); 351 } 352 353 /** 354 * Returns the caller location if requested, {@code null} otherwise. 355 * @param fqcn fully qualified caller name. 356 * @return the caller location if requested, {@code null} otherwise. 357 */ 358 private StackTraceElement calcLocationIfRequested(String fqcn) { 359 final boolean includeLocation = privateConfig.loggerConfig.isIncludeLocation(); 360 return includeLocation ? location(fqcn) : null; 361 } 362 363 private void enqueueLogMessageInfo(Info info) { 364 // LOG4J2-639: catch NPE if disruptor field was set to null after our check above 365 try { 366 // Note: do NOT use the temp variable above! 367 // That could result in adding a log event to the disruptor after it was shut down, 368 // which could cause the publishEvent method to hang and never return. 369 disruptor.publishEvent(info.translator); 370 } catch (final NullPointerException npe) { 371 LOGGER.fatal("Ignoring log event after log4j was shut down."); 372 } 373 } 374 375 private static StackTraceElement location(final String fqcnOfLogger) { 376 return Log4jLogEvent.calcLocation(fqcnOfLogger); 377 } 378 379 /** 380 * This method is called by the EventHandler that processes the 381 * RingBufferLogEvent in a separate thread. 382 * 383 * @param event the event to log 384 */ 385 public void actualAsyncLog(final RingBufferLogEvent event) { 386 final Map<Property, Boolean> properties = privateConfig.loggerConfig.getProperties(); 387 event.mergePropertiesIntoContextMap(properties, privateConfig.config.getStrSubstitutor()); 388 final ReliabilityStrategy strategy = privateConfig.loggerConfig.getReliabilityStrategy(); 389 strategy.log(this, event); 390 } 391 392 public static void stop() { 393 final Disruptor<RingBufferLogEvent> temp = disruptor; 394 395 // Must guarantee that publishing to the RingBuffer has stopped 396 // before we call disruptor.shutdown() 397 disruptor = null; // client code fails with NPE if log after stop = OK 398 if (temp == null) { 399 return; // stop() has already been called 400 } 401 402 // Calling Disruptor.shutdown() will wait until all enqueued events are fully processed, 403 // but this waiting happens in a busy-spin. To avoid (postpone) wasting CPU, 404 // we sleep in short chunks, up to 10 seconds, waiting for the ringbuffer to drain. 405 for (int i = 0; hasBacklog(temp) && i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) { 406 try { 407 Thread.sleep(SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS); // give up the CPU for a while 408 } catch (final InterruptedException e) { // ignored 409 } 410 } 411 temp.shutdown(); // busy-spins until all events currently in the disruptor have been processed 412 executor.shutdown(); // finally, kill the processor thread 413 Info.threadlocalInfo.remove(); // LOG4J2-323 414 } 415 416 /** 417 * Returns {@code true} if the specified disruptor still has unprocessed events. 418 */ 419 private static boolean hasBacklog(final Disruptor<?> disruptor) { 420 final RingBuffer<?> ringBuffer = disruptor.getRingBuffer(); 421 return !ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize()); 422 } 423 424 /** 425 * Creates and returns a new {@code RingBufferAdmin} that instruments the 426 * ringbuffer of the {@code AsyncLogger}. 427 * 428 * @param contextName name of the global {@code AsyncLoggerContext} 429 */ 430 public static RingBufferAdmin createRingBufferAdmin(final String contextName) { 431 return RingBufferAdmin.forAsyncLogger(disruptor.getRingBuffer(), contextName); 432 } 433 434 /** 435 * Returns the {@code NanoClock} to use for creating the nanoTime timestamp of log events. 436 * @return the {@code NanoClock} to use for creating the nanoTime timestamp of log events 437 */ 438 public static NanoClock getNanoClock() { 439 return nanoClock; 440 } 441 442 /** 443 * Sets the {@code NanoClock} to use for creating the nanoTime timestamp of log events. 444 * <p> 445 * FOR INTERNAL USE. This method may be called with a different {@code NanoClock} implementation when the 446 * configuration changes. 447 * 448 * @param nanoClock the {@code NanoClock} to use for creating the nanoTime timestamp of log events 449 */ 450 public static void setNanoClock(NanoClock nanoClock) { 451 AsyncLogger.nanoClock = Objects.requireNonNull(nanoClock, "NanoClock must be non-null"); 452 } 453}