001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017 package org.apache.logging.log4j.core.async; 018 019 import java.util.Map; 020 import java.util.concurrent.ExecutorService; 021 import java.util.concurrent.Executors; 022 023 import org.apache.logging.log4j.Level; 024 import org.apache.logging.log4j.Marker; 025 import org.apache.logging.log4j.ThreadContext; 026 import org.apache.logging.log4j.core.Logger; 027 import org.apache.logging.log4j.core.LoggerContext; 028 import org.apache.logging.log4j.core.config.Property; 029 import org.apache.logging.log4j.core.impl.Log4jLogEvent; 030 import org.apache.logging.log4j.message.Message; 031 import org.apache.logging.log4j.message.MessageFactory; 032 import org.apache.logging.log4j.status.StatusLogger; 033 034 import com.lmax.disruptor.BlockingWaitStrategy; 035 import com.lmax.disruptor.EventHandler; 036 import com.lmax.disruptor.ExceptionHandler; 037 import com.lmax.disruptor.RingBuffer; 038 import com.lmax.disruptor.SleepingWaitStrategy; 039 import com.lmax.disruptor.WaitStrategy; 040 import com.lmax.disruptor.YieldingWaitStrategy; 041 import com.lmax.disruptor.dsl.Disruptor; 042 import com.lmax.disruptor.dsl.ProducerType; 043 import com.lmax.disruptor.util.Util; 044 045 /** 046 * AsyncLogger is a logger designed for high throughput and low latency logging. 047 * It does not perform any I/O in the calling (application) thread, but instead 048 * hands off the work to another thread as soon as possible. The actual logging 049 * is performed in the background thread. It uses the LMAX Disruptor library for 050 * inter-thread communication. (<a 051 * href="http://lmax-exchange.github.com/disruptor/" 052 * >http://lmax-exchange.github.com/disruptor/</a>) 053 * <p> 054 * To use AsyncLogger, specify the System property 055 * {@code -DLog4jContextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector} 056 * before you obtain a Logger, and all Loggers returned by LogManager.getLogger 057 * will be AsyncLoggers. 058 * <p> 059 * Note that for performance reasons, this logger does not include source 060 * location by default. You need to specify {@code includeLocation="true"} in 061 * the configuration or any %class, %location or %line conversion patterns in 062 * your log4j.xml configuration will produce either a "?" character or no output 063 * at all. 064 * <p> 065 * For best performance, use AsyncLogger with the FastFileAppender or 066 * FastRollingFileAppender, with immediateFlush=false. These appenders have 067 * built-in support for the batching mechanism used by the Disruptor library, 068 * and they will flush to disk at the end of each batch. This means that even 069 * with immediateFlush=false, there will never be any items left in the buffer; 070 * all log events will all be written to disk in a very efficient manner. 071 */ 072 public class AsyncLogger extends Logger { 073 private static final int HALF_A_SECOND = 500; 074 private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 20; 075 private static final int RINGBUFFER_MIN_SIZE = 128; 076 private static final int RINGBUFFER_DEFAULT_SIZE = 256 * 1024; 077 private static final StatusLogger LOGGER = StatusLogger.getLogger(); 078 079 private static volatile Disruptor<RingBufferLogEvent> disruptor; 080 private static Clock clock = ClockFactory.getClock(); 081 082 private static ExecutorService executor = Executors 083 .newSingleThreadExecutor(); 084 private ThreadLocal<Info> threadlocalInfo = new ThreadLocal<Info>(); 085 086 static { 087 int ringBufferSize = calculateRingBufferSize(); 088 089 WaitStrategy waitStrategy = createWaitStrategy(); 090 disruptor = new Disruptor<RingBufferLogEvent>( 091 RingBufferLogEvent.FACTORY, ringBufferSize, executor, 092 ProducerType.MULTI, waitStrategy); 093 EventHandler<RingBufferLogEvent>[] handlers = new RingBufferLogEventHandler[] {// 094 new RingBufferLogEventHandler() }; 095 disruptor.handleExceptionsWith(getExceptionHandler()); 096 disruptor.handleEventsWith(handlers); 097 098 LOGGER.debug( 099 "Starting AsyncLogger disruptor with ringbuffer size {}...", 100 disruptor.getRingBuffer().getBufferSize()); 101 disruptor.start(); 102 } 103 104 private static int calculateRingBufferSize() { 105 int ringBufferSize = RINGBUFFER_DEFAULT_SIZE; 106 String userPreferredRBSize = System.getProperty( 107 "AsyncLogger.RingBufferSize", String.valueOf(ringBufferSize)); 108 try { 109 int size = Integer.parseInt(userPreferredRBSize); 110 if (size < RINGBUFFER_MIN_SIZE) { 111 size = RINGBUFFER_MIN_SIZE; 112 LOGGER.warn( 113 "Invalid RingBufferSize {}, using minimum size {}.", 114 userPreferredRBSize, RINGBUFFER_MIN_SIZE); 115 } 116 ringBufferSize = size; 117 } catch (Exception ex) { 118 LOGGER.warn("Invalid RingBufferSize {}, using default size {}.", 119 userPreferredRBSize, ringBufferSize); 120 } 121 return Util.ceilingNextPowerOfTwo(ringBufferSize); 122 } 123 124 private static WaitStrategy createWaitStrategy() { 125 String strategy = System.getProperty("AsyncLogger.WaitStrategy"); 126 LOGGER.debug("property AsyncLogger.WaitStrategy={}", strategy); 127 if ("Sleep".equals(strategy)) { 128 LOGGER.debug("disruptor event handler uses SleepingWaitStrategy"); 129 return new SleepingWaitStrategy(); 130 } else if ("Yield".equals(strategy)) { 131 LOGGER.debug("disruptor event handler uses YieldingWaitStrategy"); 132 return new YieldingWaitStrategy(); 133 } else if ("Block".equals(strategy)) { 134 LOGGER.debug("disruptor event handler uses BlockingWaitStrategy"); 135 return new BlockingWaitStrategy(); 136 } 137 LOGGER.debug("disruptor event handler uses SleepingWaitStrategy"); 138 return new SleepingWaitStrategy(); 139 } 140 141 private static ExceptionHandler getExceptionHandler() { 142 String cls = System.getProperty("AsyncLogger.ExceptionHandler"); 143 if (cls == null) { 144 LOGGER.debug("No AsyncLogger.ExceptionHandler specified"); 145 return null; 146 } 147 try { 148 @SuppressWarnings("unchecked") 149 Class<? extends ExceptionHandler> klass = (Class<? extends ExceptionHandler>) Class 150 .forName(cls); 151 ExceptionHandler result = klass.newInstance(); 152 LOGGER.debug("AsyncLogger.ExceptionHandler=" + result); 153 return result; 154 } catch (Exception ignored) { 155 LOGGER.debug( 156 "AsyncLogger.ExceptionHandler not set: error creating " 157 + cls + ": ", ignored); 158 return null; 159 } 160 } 161 162 /** 163 * Constructs an {@code AsyncLogger} with the specified context, name and 164 * message factory. 165 * 166 * @param context context of this logger 167 * @param name name of this logger 168 * @param messageFactory message factory of this logger 169 */ 170 public AsyncLogger(LoggerContext context, String name, 171 MessageFactory messageFactory) { 172 super(context, name, messageFactory); 173 } 174 175 /** 176 * Tuple with the event translator and thread name for a thread. 177 */ 178 private static class Info { 179 private RingBufferLogEventTranslator translator; 180 private String cachedThreadName; 181 } 182 183 @Override 184 public void log(Marker marker, String fqcn, Level level, Message data, 185 Throwable t) { 186 Info info = threadlocalInfo.get(); 187 if (info == null) { 188 info = new Info(); 189 info.translator = new RingBufferLogEventTranslator(); 190 info.cachedThreadName = Thread.currentThread().getName(); 191 threadlocalInfo.set(info); 192 } 193 194 Boolean includeLocation = config.loggerConfig.isIncludeLocation(); 195 info.translator.setValues(this, getName(), marker, fqcn, level, data, 196 t, // 197 198 // config properties are taken care of in the EventHandler 199 // thread in the #actualAsyncLog method 200 201 // needs shallow copy to be fast (LOG4J2-154) 202 ThreadContext.getImmutableContext(), // 203 204 // needs shallow copy to be fast (LOG4J2-154) 205 ThreadContext.getImmutableStack(), // 206 207 // Thread.currentThread().getName(), // 208 info.cachedThreadName, // 209 210 // location: very expensive operation. LOG4J2-153: 211 // Only include if "includeLocation=true" is specified, 212 // exclude if not specified or if "false" was specified. 213 includeLocation != null && includeLocation ? location(fqcn) 214 : null, 215 216 // System.currentTimeMillis()); 217 // CoarseCachedClock: 20% faster than system clock, 16ms gaps 218 // CachedClock: 10% faster than system clock, smaller gaps 219 clock.currentTimeMillis()); 220 221 disruptor.publishEvent(info.translator); 222 } 223 224 private StackTraceElement location(String fqcnOfLogger) { 225 return Log4jLogEvent.calcLocation(fqcnOfLogger); 226 } 227 228 /** 229 * This method is called by the EventHandler that processes the 230 * RingBufferLogEvent in a separate thread. 231 * 232 * @param event the event to log 233 */ 234 public void actualAsyncLog(RingBufferLogEvent event) { 235 Map<Property, Boolean> properties = config.loggerConfig.getProperties(); 236 event.mergePropertiesIntoContextMap(properties, 237 config.config.getSubst()); 238 config.logEvent(event); 239 } 240 241 public static void stop() { 242 Disruptor<RingBufferLogEvent> temp = disruptor; 243 244 // Must guarantee that publishing to the RingBuffer has stopped 245 // before we call disruptor.shutdown() 246 disruptor = null; // client code fails with NPE if log after stop = OK 247 temp.shutdown(); 248 249 // wait up to 10 seconds for the ringbuffer to drain 250 RingBuffer<RingBufferLogEvent> ringBuffer = temp.getRingBuffer(); 251 for (int i = 0; i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) { 252 if (ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize())) { 253 break; 254 } 255 try { 256 // give ringbuffer some time to drain... 257 Thread.sleep(HALF_A_SECOND); 258 } catch (InterruptedException e) { 259 // ignored 260 } 261 } 262 executor.shutdown(); // finally, kill the processor thread 263 } 264 }