001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017package org.apache.logging.log4j.core.async; 018 019import java.util.Map; 020import java.util.concurrent.ExecutorService; 021import java.util.concurrent.Executors; 022 023import org.apache.logging.log4j.Level; 024import org.apache.logging.log4j.Marker; 025import org.apache.logging.log4j.ThreadContext; 026import org.apache.logging.log4j.core.Logger; 027import org.apache.logging.log4j.core.LoggerContext; 028import org.apache.logging.log4j.core.config.Property; 029import org.apache.logging.log4j.core.helpers.Clock; 030import org.apache.logging.log4j.core.helpers.ClockFactory; 031import org.apache.logging.log4j.core.impl.Log4jLogEvent; 032import org.apache.logging.log4j.core.jmx.RingBufferAdmin; 033import org.apache.logging.log4j.message.Message; 034import org.apache.logging.log4j.message.MessageFactory; 035import org.apache.logging.log4j.status.StatusLogger; 036 037import com.lmax.disruptor.BlockingWaitStrategy; 038import com.lmax.disruptor.EventHandler; 039import com.lmax.disruptor.ExceptionHandler; 040import com.lmax.disruptor.RingBuffer; 041import com.lmax.disruptor.SleepingWaitStrategy; 042import com.lmax.disruptor.WaitStrategy; 043import com.lmax.disruptor.YieldingWaitStrategy; 044import com.lmax.disruptor.dsl.Disruptor; 045import com.lmax.disruptor.dsl.ProducerType; 046import com.lmax.disruptor.util.Util; 047 048/** 049 * AsyncLogger is a logger designed for high throughput and low latency logging. 050 * It does not perform any I/O in the calling (application) thread, but instead 051 * hands off the work to another thread as soon as possible. The actual logging 052 * is performed in the background thread. It uses the LMAX Disruptor library for 053 * inter-thread communication. (<a 054 * href="http://lmax-exchange.github.com/disruptor/" 055 * >http://lmax-exchange.github.com/disruptor/</a>) 056 * <p> 057 * To use AsyncLogger, specify the System property 058 * {@code -DLog4jContextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector} 059 * before you obtain a Logger, and all Loggers returned by LogManager.getLogger 060 * will be AsyncLoggers. 061 * <p> 062 * Note that for performance reasons, this logger does not include source 063 * location by default. You need to specify {@code includeLocation="true"} in 064 * the configuration or any %class, %location or %line conversion patterns in 065 * your log4j.xml configuration will produce either a "?" character or no output 066 * at all. 067 * <p> 068 * For best performance, use AsyncLogger with the RandomAccessFileAppender or 069 * RollingRandomAccessFileAppender, with immediateFlush=false. These appenders 070 * have built-in support for the batching mechanism used by the Disruptor 071 * library, and they will flush to disk at the end of each batch. This means 072 * that even with immediateFlush=false, there will never be any items left in 073 * the buffer; all log events will all be written to disk in a very efficient 074 * manner. 075 */ 076public class AsyncLogger extends Logger { 077 private static final long serialVersionUID = 1L; 078 private static final int HALF_A_SECOND = 500; 079 private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 20; 080 private static final int RINGBUFFER_MIN_SIZE = 128; 081 private static final int RINGBUFFER_DEFAULT_SIZE = 256 * 1024; 082 private static final StatusLogger LOGGER = StatusLogger.getLogger(); 083 private static final ThreadNameStrategy THREAD_NAME_STRATEGY = ThreadNameStrategy.create(); 084 private static final ThreadLocal<Info> threadlocalInfo = new ThreadLocal<Info>(); 085 086 static enum ThreadNameStrategy { // LOG4J2-467 087 CACHED { 088 @Override 089 public String getThreadName(Info info) { 090 return info.cachedThreadName; 091 } 092 }, 093 UNCACHED { 094 @Override 095 public String getThreadName(Info info) { 096 return Thread.currentThread().getName(); 097 } 098 }; 099 abstract String getThreadName(Info info); 100 101 static ThreadNameStrategy create() { 102 String name = System.getProperty("AsyncLogger.ThreadNameStrategy", CACHED.name()); 103 try { 104 return ThreadNameStrategy.valueOf(name); 105 } catch (Exception ex) { 106 return CACHED; 107 } 108 } 109 } 110 private static volatile Disruptor<RingBufferLogEvent> disruptor; 111 private static Clock clock = ClockFactory.getClock(); 112 113 private static ExecutorService executor = Executors 114 .newSingleThreadExecutor(new DaemonThreadFactory("AsyncLogger-")); 115 116 static { 117 initInfoForExecutorThread(); 118 LOGGER.debug("AsyncLogger.ThreadNameStrategy={}", THREAD_NAME_STRATEGY); 119 final int ringBufferSize = calculateRingBufferSize(); 120 121 final WaitStrategy waitStrategy = createWaitStrategy(); 122 disruptor = new Disruptor<RingBufferLogEvent>(RingBufferLogEvent.FACTORY, ringBufferSize, executor, 123 ProducerType.MULTI, waitStrategy); 124 final EventHandler<RingBufferLogEvent>[] handlers = new RingBufferLogEventHandler[] {// 125 new RingBufferLogEventHandler() }; 126 disruptor.handleExceptionsWith(getExceptionHandler()); 127 disruptor.handleEventsWith(handlers); 128 129 LOGGER.debug("Starting AsyncLogger disruptor with ringbuffer size {}...", disruptor.getRingBuffer() 130 .getBufferSize()); 131 disruptor.start(); 132 } 133 134 private static int calculateRingBufferSize() { 135 int ringBufferSize = RINGBUFFER_DEFAULT_SIZE; 136 final String userPreferredRBSize = System.getProperty("AsyncLogger.RingBufferSize", 137 String.valueOf(ringBufferSize)); 138 try { 139 int size = Integer.parseInt(userPreferredRBSize); 140 if (size < RINGBUFFER_MIN_SIZE) { 141 size = RINGBUFFER_MIN_SIZE; 142 LOGGER.warn("Invalid RingBufferSize {}, using minimum size {}.", userPreferredRBSize, 143 RINGBUFFER_MIN_SIZE); 144 } 145 ringBufferSize = size; 146 } catch (final Exception ex) { 147 LOGGER.warn("Invalid RingBufferSize {}, using default size {}.", userPreferredRBSize, ringBufferSize); 148 } 149 return Util.ceilingNextPowerOfTwo(ringBufferSize); 150 } 151 152 /** 153 * Initialize an {@code Info} object that is threadlocal to the consumer/appender thread. 154 * This Info object uniquely has attribute {@code isAppenderThread} set to {@code true}. 155 * All other Info objects will have this attribute set to {@code false}. 156 * This allows us to detect Logger.log() calls initiated from the appender thread, 157 * which may cause deadlock when the RingBuffer is full. (LOG4J2-471) 158 */ 159 private static void initInfoForExecutorThread() { 160 executor.submit(new Runnable(){ 161 @Override 162 public void run() { 163 final boolean isAppenderThread = true; 164 final Info info = new Info(new RingBufferLogEventTranslator(), // 165 Thread.currentThread().getName(), isAppenderThread); 166 threadlocalInfo.set(info); 167 } 168 }); 169 } 170 171 private static WaitStrategy createWaitStrategy() { 172 final String strategy = System.getProperty("AsyncLogger.WaitStrategy"); 173 LOGGER.debug("property AsyncLogger.WaitStrategy={}", strategy); 174 if ("Sleep".equals(strategy)) { 175 LOGGER.debug("disruptor event handler uses SleepingWaitStrategy"); 176 return new SleepingWaitStrategy(); 177 } else if ("Yield".equals(strategy)) { 178 LOGGER.debug("disruptor event handler uses YieldingWaitStrategy"); 179 return new YieldingWaitStrategy(); 180 } else if ("Block".equals(strategy)) { 181 LOGGER.debug("disruptor event handler uses BlockingWaitStrategy"); 182 return new BlockingWaitStrategy(); 183 } 184 LOGGER.debug("disruptor event handler uses SleepingWaitStrategy"); 185 return new SleepingWaitStrategy(); 186 } 187 188 private static ExceptionHandler getExceptionHandler() { 189 final String cls = System.getProperty("AsyncLogger.ExceptionHandler"); 190 if (cls == null) { 191 LOGGER.debug("No AsyncLogger.ExceptionHandler specified"); 192 return null; 193 } 194 try { 195 @SuppressWarnings("unchecked") 196 final Class<? extends ExceptionHandler> klass = (Class<? extends ExceptionHandler>) Class.forName(cls); 197 final ExceptionHandler result = klass.newInstance(); 198 LOGGER.debug("AsyncLogger.ExceptionHandler=" + result); 199 return result; 200 } catch (final Exception ignored) { 201 LOGGER.debug("AsyncLogger.ExceptionHandler not set: error creating " + cls + ": ", ignored); 202 return null; 203 } 204 } 205 206 /** 207 * Constructs an {@code AsyncLogger} with the specified context, name and 208 * message factory. 209 * 210 * @param context context of this logger 211 * @param name name of this logger 212 * @param messageFactory message factory of this logger 213 */ 214 public AsyncLogger(final LoggerContext context, final String name, final MessageFactory messageFactory) { 215 super(context, name, messageFactory); 216 } 217 218 /** 219 * Tuple with the event translator and thread name for a thread. 220 */ 221 static class Info { 222 private final RingBufferLogEventTranslator translator; 223 private final String cachedThreadName; 224 private final boolean isAppenderThread; 225 public Info(RingBufferLogEventTranslator translator, String threadName, boolean appenderThread) { 226 this.translator = translator; 227 this.cachedThreadName = threadName; 228 this.isAppenderThread = appenderThread; 229 } 230 } 231 232 @Override 233 public void log(final Marker marker, final String fqcn, final Level level, final Message data, final Throwable t) { 234 Info info = threadlocalInfo.get(); 235 if (info == null) { 236 info = new Info(new RingBufferLogEventTranslator(), Thread.currentThread().getName(), false); 237 threadlocalInfo.set(info); 238 } 239 240 // LOG4J2-471: prevent deadlock when RingBuffer is full and object 241 // being logged calls Logger.log() from its toString() method 242 if (info.isAppenderThread && disruptor.getRingBuffer().remainingCapacity() == 0) { 243 // bypass RingBuffer and invoke Appender directly 244 config.loggerConfig.log(getName(), marker, fqcn, level, data, t); 245 return; 246 } 247 final boolean includeLocation = config.loggerConfig.isIncludeLocation(); 248 info.translator.setValues(this, getName(), marker, fqcn, level, data, t, // 249 250 // config properties are taken care of in the EventHandler 251 // thread in the #actualAsyncLog method 252 253 // needs shallow copy to be fast (LOG4J2-154) 254 ThreadContext.getImmutableContext(), // 255 256 // needs shallow copy to be fast (LOG4J2-154) 257 ThreadContext.getImmutableStack(), // 258 259 // Thread.currentThread().getName(), // 260 // info.cachedThreadName, // 261 THREAD_NAME_STRATEGY.getThreadName(info), // LOG4J2-467 262 263 // location: very expensive operation. LOG4J2-153: 264 // Only include if "includeLocation=true" is specified, 265 // exclude if not specified or if "false" was specified. 266 includeLocation ? location(fqcn) : null, 267 268 // System.currentTimeMillis()); 269 // CoarseCachedClock: 20% faster than system clock, 16ms gaps 270 // CachedClock: 10% faster than system clock, smaller gaps 271 clock.currentTimeMillis()); 272 273 disruptor.publishEvent(info.translator); 274 } 275 276 private StackTraceElement location(final String fqcnOfLogger) { 277 return Log4jLogEvent.calcLocation(fqcnOfLogger); 278 } 279 280 /** 281 * This method is called by the EventHandler that processes the 282 * RingBufferLogEvent in a separate thread. 283 * 284 * @param event the event to log 285 */ 286 public void actualAsyncLog(final RingBufferLogEvent event) { 287 final Map<Property, Boolean> properties = config.loggerConfig.getProperties(); 288 event.mergePropertiesIntoContextMap(properties, config.config.getStrSubstitutor()); 289 config.logEvent(event); 290 } 291 292 public static void stop() { 293 final Disruptor<RingBufferLogEvent> temp = disruptor; 294 295 // Must guarantee that publishing to the RingBuffer has stopped 296 // before we call disruptor.shutdown() 297 disruptor = null; // client code fails with NPE if log after stop = OK 298 if (temp == null) { 299 return; // stop() has already been called 300 } 301 temp.shutdown(); 302 303 // wait up to 10 seconds for the ringbuffer to drain 304 final RingBuffer<RingBufferLogEvent> ringBuffer = temp.getRingBuffer(); 305 for (int i = 0; i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) { 306 if (ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize())) { 307 break; 308 } 309 try { 310 // give ringbuffer some time to drain... 311 Thread.sleep(HALF_A_SECOND); 312 } catch (final InterruptedException e) { 313 // ignored 314 } 315 } 316 executor.shutdown(); // finally, kill the processor thread 317 threadlocalInfo.remove(); // LOG4J2-323 318 } 319 320 /** 321 * Creates and returns a new {@code RingBufferAdmin} that instruments the 322 * ringbuffer of the {@code AsyncLogger}. 323 * 324 * @param contextName name of the global {@code AsyncLoggerContext} 325 */ 326 public static RingBufferAdmin createRingBufferAdmin(String contextName) { 327 return RingBufferAdmin.forAsyncLogger(disruptor.getRingBuffer(), contextName); 328 } 329}