View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.core.async;
18  
19  import java.util.Map;
20  import java.util.concurrent.ExecutorService;
21  import java.util.concurrent.Executors;
22  
23  import org.apache.logging.log4j.Level;
24  import org.apache.logging.log4j.Marker;
25  import org.apache.logging.log4j.ThreadContext;
26  import org.apache.logging.log4j.core.Logger;
27  import org.apache.logging.log4j.core.LoggerContext;
28  import org.apache.logging.log4j.core.config.Property;
29  import org.apache.logging.log4j.core.impl.Log4jLogEvent;
30  import org.apache.logging.log4j.message.Message;
31  import org.apache.logging.log4j.message.MessageFactory;
32  import org.apache.logging.log4j.status.StatusLogger;
33  
34  import com.lmax.disruptor.BlockingWaitStrategy;
35  import com.lmax.disruptor.EventHandler;
36  import com.lmax.disruptor.ExceptionHandler;
37  import com.lmax.disruptor.RingBuffer;
38  import com.lmax.disruptor.SleepingWaitStrategy;
39  import com.lmax.disruptor.WaitStrategy;
40  import com.lmax.disruptor.YieldingWaitStrategy;
41  import com.lmax.disruptor.dsl.Disruptor;
42  import com.lmax.disruptor.dsl.ProducerType;
43  import com.lmax.disruptor.util.Util;
44  
45  /**
46   * AsyncLogger is a logger designed for high throughput and low latency logging.
47   * It does not perform any I/O in the calling (application) thread, but instead
48   * hands off the work to another thread as soon as possible. The actual logging
49   * is performed in the background thread. It uses the LMAX Disruptor library for
50   * inter-thread communication. (<a
51   * href="http://lmax-exchange.github.com/disruptor/"
52   * >http://lmax-exchange.github.com/disruptor/</a>)
53   * <p>
54   * To use AsyncLogger, specify the System property
55   * {@code -DLog4jContextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector}
56   * before you obtain a Logger, and all Loggers returned by LogManager.getLogger
57   * will be AsyncLoggers.
58   * <p>
59   * Note that for performance reasons, this logger does not include source
60   * location by default. You need to specify {@code includeLocation="true"} in
61   * the configuration or any %class, %location or %line conversion patterns in
62   * your log4j.xml configuration will produce either a "?" character or no output
63   * at all.
64   * <p>
65   * For best performance, use AsyncLogger with the FastFileAppender or
66   * FastRollingFileAppender, with immediateFlush=false. These appenders have
67   * built-in support for the batching mechanism used by the Disruptor library,
68   * and they will flush to disk at the end of each batch. This means that even
69   * with immediateFlush=false, there will never be any items left in the buffer;
70   * all log events will all be written to disk in a very efficient manner.
71   */
72  public class AsyncLogger extends Logger {
73      private static final int HALF_A_SECOND = 500;
74      private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 20;
75      private static final int RINGBUFFER_MIN_SIZE = 128;
76      private static final int RINGBUFFER_DEFAULT_SIZE = 256 * 1024;
77      private static final StatusLogger LOGGER = StatusLogger.getLogger();
78  
79      private static volatile Disruptor<RingBufferLogEvent> disruptor;
80      private static Clock clock = ClockFactory.getClock();
81  
82      private static ExecutorService executor = Executors
83              .newSingleThreadExecutor();
84      private ThreadLocal<Info> threadlocalInfo = new ThreadLocal<Info>();
85  
86      static {
87          int ringBufferSize = calculateRingBufferSize();
88  
89          WaitStrategy waitStrategy = createWaitStrategy();
90          disruptor = new Disruptor<RingBufferLogEvent>(
91                  RingBufferLogEvent.FACTORY, ringBufferSize, executor,
92                  ProducerType.MULTI, waitStrategy);
93          EventHandler<RingBufferLogEvent>[] handlers = new RingBufferLogEventHandler[] {//
94          new RingBufferLogEventHandler() };
95          disruptor.handleExceptionsWith(getExceptionHandler());
96          disruptor.handleEventsWith(handlers);
97  
98          LOGGER.debug(
99                  "Starting AsyncLogger disruptor with ringbuffer size {}...",
100                 disruptor.getRingBuffer().getBufferSize());
101         disruptor.start();
102     }
103 
104     private static int calculateRingBufferSize() {
105         int ringBufferSize = RINGBUFFER_DEFAULT_SIZE;
106         String userPreferredRBSize = System.getProperty(
107                 "AsyncLogger.RingBufferSize", String.valueOf(ringBufferSize));
108         try {
109             int size = Integer.parseInt(userPreferredRBSize);
110             if (size < RINGBUFFER_MIN_SIZE) {
111                 size = RINGBUFFER_MIN_SIZE;
112                 LOGGER.warn(
113                         "Invalid RingBufferSize {}, using minimum size {}.",
114                         userPreferredRBSize, RINGBUFFER_MIN_SIZE);
115             }
116             ringBufferSize = size;
117         } catch (Exception ex) {
118             LOGGER.warn("Invalid RingBufferSize {}, using default size {}.",
119                     userPreferredRBSize, ringBufferSize);
120         }
121         return Util.ceilingNextPowerOfTwo(ringBufferSize);
122     }
123 
124     private static WaitStrategy createWaitStrategy() {
125         String strategy = System.getProperty("AsyncLogger.WaitStrategy");
126         LOGGER.debug("property AsyncLogger.WaitStrategy={}", strategy);
127         if ("Sleep".equals(strategy)) {
128             LOGGER.debug("disruptor event handler uses SleepingWaitStrategy");
129             return new SleepingWaitStrategy();
130         } else if ("Yield".equals(strategy)) {
131             LOGGER.debug("disruptor event handler uses YieldingWaitStrategy");
132             return new YieldingWaitStrategy();
133         } else if ("Block".equals(strategy)) {
134             LOGGER.debug("disruptor event handler uses BlockingWaitStrategy");
135             return new BlockingWaitStrategy();
136         }
137         LOGGER.debug("disruptor event handler uses SleepingWaitStrategy");
138         return new SleepingWaitStrategy();
139     }
140 
141     private static ExceptionHandler getExceptionHandler() {
142         String cls = System.getProperty("AsyncLogger.ExceptionHandler");
143         if (cls == null) {
144             LOGGER.debug("No AsyncLogger.ExceptionHandler specified");
145             return null;
146         }
147         try {
148             @SuppressWarnings("unchecked")
149             Class<? extends ExceptionHandler> klass = (Class<? extends ExceptionHandler>) Class
150                     .forName(cls);
151             ExceptionHandler result = klass.newInstance();
152             LOGGER.debug("AsyncLogger.ExceptionHandler=" + result);
153             return result;
154         } catch (Exception ignored) {
155             LOGGER.debug(
156                     "AsyncLogger.ExceptionHandler not set: error creating "
157                             + cls + ": ", ignored);
158             return null;
159         }
160     }
161 
162     /**
163      * Constructs an {@code AsyncLogger} with the specified context, name and
164      * message factory.
165      * 
166      * @param context context of this logger
167      * @param name name of this logger
168      * @param messageFactory message factory of this logger
169      */
170     public AsyncLogger(LoggerContext context, String name,
171             MessageFactory messageFactory) {
172         super(context, name, messageFactory);
173     }
174 
175     /**
176      * Tuple with the event translator and thread name for a thread.
177      */
178     private static class Info {
179         private RingBufferLogEventTranslator translator;
180         private String cachedThreadName;
181     }
182 
183     @Override
184     public void log(Marker marker, String fqcn, Level level, Message data,
185             Throwable t) {
186         Info info = threadlocalInfo.get();
187         if (info == null) {
188             info = new Info();
189             info.translator = new RingBufferLogEventTranslator();
190             info.cachedThreadName = Thread.currentThread().getName();
191             threadlocalInfo.set(info);
192         }
193 
194         Boolean includeLocation = config.loggerConfig.isIncludeLocation();
195         info.translator.setValues(this, getName(), marker, fqcn, level, data,
196                 t, //
197 
198                 // config properties are taken care of in the EventHandler
199                 // thread in the #actualAsyncLog method
200 
201                 // needs shallow copy to be fast (LOG4J2-154)
202                 ThreadContext.getImmutableContext(), //
203 
204                 // needs shallow copy to be fast (LOG4J2-154)
205                 ThreadContext.getImmutableStack(), //
206 
207                 // Thread.currentThread().getName(), //
208                 info.cachedThreadName, //
209 
210                 // location: very expensive operation. LOG4J2-153:
211                 // Only include if "includeLocation=true" is specified,
212                 // exclude if not specified or if "false" was specified.
213                 includeLocation != null && includeLocation ? location(fqcn)
214                         : null,
215 
216                 // System.currentTimeMillis());
217                 // CoarseCachedClock: 20% faster than system clock, 16ms gaps
218                 // CachedClock: 10% faster than system clock, smaller gaps
219                 clock.currentTimeMillis());
220 
221         disruptor.publishEvent(info.translator);
222     }
223 
224     private StackTraceElement location(String fqcnOfLogger) {
225         return Log4jLogEvent.calcLocation(fqcnOfLogger);
226     }
227 
228     /**
229      * This method is called by the EventHandler that processes the
230      * RingBufferLogEvent in a separate thread.
231      * 
232      * @param event the event to log
233      */
234     public void actualAsyncLog(RingBufferLogEvent event) {
235         Map<Property, Boolean> properties = config.loggerConfig.getProperties();
236         event.mergePropertiesIntoContextMap(properties,
237                 config.config.getSubst());
238         config.logEvent(event);
239     }
240 
241     public static void stop() {
242         Disruptor<RingBufferLogEvent> temp = disruptor;
243 
244         // Must guarantee that publishing to the RingBuffer has stopped
245         // before we call disruptor.shutdown()
246         disruptor = null; // client code fails with NPE if log after stop = OK
247         temp.shutdown();
248 
249         // wait up to 10 seconds for the ringbuffer to drain
250         RingBuffer<RingBufferLogEvent> ringBuffer = temp.getRingBuffer();
251         for (int i = 0; i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
252             if (ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize())) {
253                 break;
254             }
255             try {
256                 // give ringbuffer some time to drain...
257                 Thread.sleep(HALF_A_SECOND);
258             } catch (InterruptedException e) {
259                 // ignored
260             }
261         }
262         executor.shutdown(); // finally, kill the processor thread
263     }
264 }