001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017 package org.apache.logging.log4j.core.async; 018 019 import java.util.Map; 020 import java.util.concurrent.ExecutorService; 021 import java.util.concurrent.Executors; 022 023 import org.apache.logging.log4j.Level; 024 import org.apache.logging.log4j.Marker; 025 import org.apache.logging.log4j.ThreadContext; 026 import org.apache.logging.log4j.core.Logger; 027 import org.apache.logging.log4j.core.LoggerContext; 028 import org.apache.logging.log4j.core.config.Property; 029 import org.apache.logging.log4j.core.helpers.Clock; 030 import org.apache.logging.log4j.core.helpers.ClockFactory; 031 import org.apache.logging.log4j.core.impl.Log4jLogEvent; 032 import org.apache.logging.log4j.message.Message; 033 import org.apache.logging.log4j.message.MessageFactory; 034 import org.apache.logging.log4j.status.StatusLogger; 035 036 import com.lmax.disruptor.BlockingWaitStrategy; 037 import com.lmax.disruptor.EventHandler; 038 import com.lmax.disruptor.ExceptionHandler; 039 import com.lmax.disruptor.RingBuffer; 040 import com.lmax.disruptor.SleepingWaitStrategy; 041 import com.lmax.disruptor.WaitStrategy; 042 import com.lmax.disruptor.YieldingWaitStrategy; 043 import com.lmax.disruptor.dsl.Disruptor; 044 import com.lmax.disruptor.dsl.ProducerType; 045 import com.lmax.disruptor.util.Util; 046 047 /** 048 * AsyncLogger is a logger designed for high throughput and low latency logging. 049 * It does not perform any I/O in the calling (application) thread, but instead 050 * hands off the work to another thread as soon as possible. The actual logging 051 * is performed in the background thread. It uses the LMAX Disruptor library for 052 * inter-thread communication. (<a 053 * href="http://lmax-exchange.github.com/disruptor/" 054 * >http://lmax-exchange.github.com/disruptor/</a>) 055 * <p> 056 * To use AsyncLogger, specify the System property 057 * {@code -DLog4jContextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector} 058 * before you obtain a Logger, and all Loggers returned by LogManager.getLogger 059 * will be AsyncLoggers. 060 * <p> 061 * Note that for performance reasons, this logger does not include source 062 * location by default. You need to specify {@code includeLocation="true"} in 063 * the configuration or any %class, %location or %line conversion patterns in 064 * your log4j.xml configuration will produce either a "?" character or no output 065 * at all. 066 * <p> 067 * For best performance, use AsyncLogger with the FastFileAppender or 068 * FastRollingFileAppender, with immediateFlush=false. These appenders have 069 * built-in support for the batching mechanism used by the Disruptor library, 070 * and they will flush to disk at the end of each batch. This means that even 071 * with immediateFlush=false, there will never be any items left in the buffer; 072 * all log events will all be written to disk in a very efficient manner. 073 */ 074 public class AsyncLogger extends Logger { 075 private static final int HALF_A_SECOND = 500; 076 private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 20; 077 private static final int RINGBUFFER_MIN_SIZE = 128; 078 private static final int RINGBUFFER_DEFAULT_SIZE = 256 * 1024; 079 private static final StatusLogger LOGGER = StatusLogger.getLogger(); 080 081 private static volatile Disruptor<RingBufferLogEvent> disruptor; 082 private static Clock clock = ClockFactory.getClock(); 083 084 private static ExecutorService executor = Executors 085 .newSingleThreadExecutor(); 086 private ThreadLocal<Info> threadlocalInfo = new ThreadLocal<Info>(); 087 088 static { 089 int ringBufferSize = calculateRingBufferSize(); 090 091 WaitStrategy waitStrategy = createWaitStrategy(); 092 disruptor = new Disruptor<RingBufferLogEvent>( 093 RingBufferLogEvent.FACTORY, ringBufferSize, executor, 094 ProducerType.MULTI, waitStrategy); 095 EventHandler<RingBufferLogEvent>[] handlers = new RingBufferLogEventHandler[] {// 096 new RingBufferLogEventHandler() }; 097 disruptor.handleExceptionsWith(getExceptionHandler()); 098 disruptor.handleEventsWith(handlers); 099 100 LOGGER.debug( 101 "Starting AsyncLogger disruptor with ringbuffer size {}...", 102 disruptor.getRingBuffer().getBufferSize()); 103 disruptor.start(); 104 } 105 106 private static int calculateRingBufferSize() { 107 int ringBufferSize = RINGBUFFER_DEFAULT_SIZE; 108 String userPreferredRBSize = System.getProperty( 109 "AsyncLogger.RingBufferSize", String.valueOf(ringBufferSize)); 110 try { 111 int size = Integer.parseInt(userPreferredRBSize); 112 if (size < RINGBUFFER_MIN_SIZE) { 113 size = RINGBUFFER_MIN_SIZE; 114 LOGGER.warn( 115 "Invalid RingBufferSize {}, using minimum size {}.", 116 userPreferredRBSize, RINGBUFFER_MIN_SIZE); 117 } 118 ringBufferSize = size; 119 } catch (Exception ex) { 120 LOGGER.warn("Invalid RingBufferSize {}, using default size {}.", 121 userPreferredRBSize, ringBufferSize); 122 } 123 return Util.ceilingNextPowerOfTwo(ringBufferSize); 124 } 125 126 private static WaitStrategy createWaitStrategy() { 127 String strategy = System.getProperty("AsyncLogger.WaitStrategy"); 128 LOGGER.debug("property AsyncLogger.WaitStrategy={}", strategy); 129 if ("Sleep".equals(strategy)) { 130 LOGGER.debug("disruptor event handler uses SleepingWaitStrategy"); 131 return new SleepingWaitStrategy(); 132 } else if ("Yield".equals(strategy)) { 133 LOGGER.debug("disruptor event handler uses YieldingWaitStrategy"); 134 return new YieldingWaitStrategy(); 135 } else if ("Block".equals(strategy)) { 136 LOGGER.debug("disruptor event handler uses BlockingWaitStrategy"); 137 return new BlockingWaitStrategy(); 138 } 139 LOGGER.debug("disruptor event handler uses SleepingWaitStrategy"); 140 return new SleepingWaitStrategy(); 141 } 142 143 private static ExceptionHandler getExceptionHandler() { 144 String cls = System.getProperty("AsyncLogger.ExceptionHandler"); 145 if (cls == null) { 146 LOGGER.debug("No AsyncLogger.ExceptionHandler specified"); 147 return null; 148 } 149 try { 150 @SuppressWarnings("unchecked") 151 Class<? extends ExceptionHandler> klass = (Class<? extends ExceptionHandler>) Class 152 .forName(cls); 153 ExceptionHandler result = klass.newInstance(); 154 LOGGER.debug("AsyncLogger.ExceptionHandler=" + result); 155 return result; 156 } catch (Exception ignored) { 157 LOGGER.debug( 158 "AsyncLogger.ExceptionHandler not set: error creating " 159 + cls + ": ", ignored); 160 return null; 161 } 162 } 163 164 /** 165 * Constructs an {@code AsyncLogger} with the specified context, name and 166 * message factory. 167 * 168 * @param context context of this logger 169 * @param name name of this logger 170 * @param messageFactory message factory of this logger 171 */ 172 public AsyncLogger(LoggerContext context, String name, 173 MessageFactory messageFactory) { 174 super(context, name, messageFactory); 175 } 176 177 /** 178 * Tuple with the event translator and thread name for a thread. 179 */ 180 private static class Info { 181 private RingBufferLogEventTranslator translator; 182 private String cachedThreadName; 183 } 184 185 @Override 186 public void log(Marker marker, String fqcn, Level level, Message data, 187 Throwable t) { 188 Info info = threadlocalInfo.get(); 189 if (info == null) { 190 info = new Info(); 191 info.translator = new RingBufferLogEventTranslator(); 192 info.cachedThreadName = Thread.currentThread().getName(); 193 threadlocalInfo.set(info); 194 } 195 196 boolean includeLocation = config.loggerConfig.isIncludeLocation(); 197 info.translator.setValues(this, getName(), marker, fqcn, level, data, 198 t, // 199 200 // config properties are taken care of in the EventHandler 201 // thread in the #actualAsyncLog method 202 203 // needs shallow copy to be fast (LOG4J2-154) 204 ThreadContext.getImmutableContext(), // 205 206 // needs shallow copy to be fast (LOG4J2-154) 207 ThreadContext.getImmutableStack(), // 208 209 // Thread.currentThread().getName(), // 210 info.cachedThreadName, // 211 212 // location: very expensive operation. LOG4J2-153: 213 // Only include if "includeLocation=true" is specified, 214 // exclude if not specified or if "false" was specified. 215 includeLocation ? location(fqcn) : null, 216 217 // System.currentTimeMillis()); 218 // CoarseCachedClock: 20% faster than system clock, 16ms gaps 219 // CachedClock: 10% faster than system clock, smaller gaps 220 clock.currentTimeMillis()); 221 222 disruptor.publishEvent(info.translator); 223 } 224 225 private StackTraceElement location(String fqcnOfLogger) { 226 return Log4jLogEvent.calcLocation(fqcnOfLogger); 227 } 228 229 /** 230 * This method is called by the EventHandler that processes the 231 * RingBufferLogEvent in a separate thread. 232 * 233 * @param event the event to log 234 */ 235 public void actualAsyncLog(RingBufferLogEvent event) { 236 Map<Property, Boolean> properties = config.loggerConfig.getProperties(); 237 event.mergePropertiesIntoContextMap(properties, 238 config.config.getSubst()); 239 config.logEvent(event); 240 } 241 242 public static void stop() { 243 Disruptor<RingBufferLogEvent> temp = disruptor; 244 245 // Must guarantee that publishing to the RingBuffer has stopped 246 // before we call disruptor.shutdown() 247 disruptor = null; // client code fails with NPE if log after stop = OK 248 temp.shutdown(); 249 250 // wait up to 10 seconds for the ringbuffer to drain 251 RingBuffer<RingBufferLogEvent> ringBuffer = temp.getRingBuffer(); 252 for (int i = 0; i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) { 253 if (ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize())) { 254 break; 255 } 256 try { 257 // give ringbuffer some time to drain... 258 Thread.sleep(HALF_A_SECOND); 259 } catch (InterruptedException e) { 260 // ignored 261 } 262 } 263 executor.shutdown(); // finally, kill the processor thread 264 } 265 }