View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.core.async;
18  
19  import java.util.Map;
20  import java.util.Objects;
21  import java.util.concurrent.ExecutorService;
22  import java.util.concurrent.Executors;
23  
24  import org.apache.logging.log4j.Level;
25  import org.apache.logging.log4j.Marker;
26  import org.apache.logging.log4j.ThreadContext;
27  import org.apache.logging.log4j.core.Logger;
28  import org.apache.logging.log4j.core.LoggerContext;
29  import org.apache.logging.log4j.core.config.Property;
30  import org.apache.logging.log4j.core.config.ReliabilityStrategy;
31  import org.apache.logging.log4j.core.impl.Log4jLogEvent;
32  import org.apache.logging.log4j.core.jmx.RingBufferAdmin;
33  import org.apache.logging.log4j.core.util.Clock;
34  import org.apache.logging.log4j.core.util.ClockFactory;
35  import org.apache.logging.log4j.core.util.DummyNanoClock;
36  import org.apache.logging.log4j.core.util.Integers;
37  import org.apache.logging.log4j.core.util.Loader;
38  import org.apache.logging.log4j.core.util.NanoClock;
39  import org.apache.logging.log4j.message.Message;
40  import org.apache.logging.log4j.message.MessageFactory;
41  import org.apache.logging.log4j.message.TimestampMessage;
42  import org.apache.logging.log4j.status.StatusLogger;
43  import org.apache.logging.log4j.util.PropertiesUtil;
44  
45  import com.lmax.disruptor.BlockingWaitStrategy;
46  import com.lmax.disruptor.ExceptionHandler;
47  import com.lmax.disruptor.RingBuffer;
48  import com.lmax.disruptor.SleepingWaitStrategy;
49  import com.lmax.disruptor.WaitStrategy;
50  import com.lmax.disruptor.YieldingWaitStrategy;
51  import com.lmax.disruptor.dsl.Disruptor;
52  import com.lmax.disruptor.dsl.ProducerType;
53  
54  /**
55   * AsyncLogger is a logger designed for high throughput and low latency logging. It does not perform any I/O in the
56   * calling (application) thread, but instead hands off the work to another thread as soon as possible. The actual
57   * logging is performed in the background thread. It uses the LMAX Disruptor library for inter-thread communication. (<a
58   * href="http://lmax-exchange.github.com/disruptor/" >http://lmax-exchange.github.com/disruptor/</a>)
59   * <p>
60   * To use AsyncLogger, specify the System property
61   * {@code -DLog4jContextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector} before you obtain a
62   * Logger, and all Loggers returned by LogManager.getLogger will be AsyncLoggers.
63   * <p>
64   * Note that for performance reasons, this logger does not include source location by default. You need to specify
65   * {@code includeLocation="true"} in the configuration or any %class, %location or %line conversion patterns in your
66   * log4j.xml configuration will produce either a "?" character or no output at all.
67   * <p>
68   * For best performance, use AsyncLogger with the RandomAccessFileAppender or RollingRandomAccessFileAppender, with
69   * immediateFlush=false. These appenders have built-in support for the batching mechanism used by the Disruptor library,
70   * and they will flush to disk at the end of each batch. This means that even with immediateFlush=false, there will
71   * never be any items left in the buffer; all log events will all be written to disk in a very efficient manner.
72   */
73  public class AsyncLogger extends Logger {
74      private static final long serialVersionUID = 1L;
75      private static final int SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS = 50;
76      private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 200;
77      private static final int RINGBUFFER_MIN_SIZE = 128;
78      private static final int RINGBUFFER_DEFAULT_SIZE = 256 * 1024;
79      private static final StatusLogger LOGGER = StatusLogger.getLogger();
80      private static final ThreadNameStrategy THREAD_NAME_STRATEGY = ThreadNameStrategy.create();
81  
82      /**
83       * Strategy for deciding whether thread name should be cached or not.
84       */
85      static enum ThreadNameStrategy { // LOG4J2-467
86          CACHED {
87              @Override
88              public String getThreadName(final Info info) {
89                  return info.cachedThreadName;
90              }
91          },
92          UNCACHED {
93              @Override
94              public String getThreadName(final Info info) {
95                  return Thread.currentThread().getName();
96              }
97          };
98          abstract String getThreadName(Info info);
99  
100         static ThreadNameStrategy create() {
101             final String name = PropertiesUtil.getProperties().getStringProperty("AsyncLogger.ThreadNameStrategy",
102                     CACHED.name());
103             try {
104                 return ThreadNameStrategy.valueOf(name);
105             } catch (final Exception ex) {
106                 LOGGER.debug("Using AsyncLogger.ThreadNameStrategy.CACHED: '{}' not valid: {}", name, ex.toString());
107                 return CACHED;
108             }
109         }
110     }
111 
112     private static volatile Disruptor<RingBufferLogEvent> disruptor;
113     private static final Clock CLOCK = ClockFactory.getClock();
114     private static volatile NanoClock nanoClock = new DummyNanoClock();
115 
116     private static final ExecutorService EXECUTOR = Executors.newSingleThreadExecutor(new DaemonThreadFactory(
117             "AsyncLogger-"));
118 
119     static {
120         initInfoForExecutorThread();
121         LOGGER.debug("AsyncLogger.ThreadNameStrategy={}", THREAD_NAME_STRATEGY);
122         final int ringBufferSize = calculateRingBufferSize();
123 
124         final WaitStrategy waitStrategy = createWaitStrategy();
125         disruptor = new Disruptor<>(RingBufferLogEvent.FACTORY, ringBufferSize, EXECUTOR, ProducerType.MULTI,
126                 waitStrategy);
127         disruptor.handleExceptionsWith(getExceptionHandler());
128         disruptor.handleEventsWith(new RingBufferLogEventHandler());
129 
130         LOGGER.debug("Starting AsyncLogger disruptor with ringbuffer size {}...", disruptor.getRingBuffer()
131                 .getBufferSize());
132         disruptor.start();
133     }
134 
135     /**
136      * Constructs an {@code AsyncLogger} with the specified context, name and message factory.
137      *
138      * @param context context of this logger
139      * @param name name of this logger
140      * @param messageFactory message factory of this logger
141      */
142     public AsyncLogger(final LoggerContext context, final String name, final MessageFactory messageFactory) {
143         super(context, name, messageFactory);
144     }
145 
146     private static int calculateRingBufferSize() {
147         int ringBufferSize = RINGBUFFER_DEFAULT_SIZE;
148         final String userPreferredRBSize = PropertiesUtil.getProperties().getStringProperty(
149                 "AsyncLogger.RingBufferSize", String.valueOf(ringBufferSize));
150         try {
151             int size = Integer.parseInt(userPreferredRBSize);
152             if (size < RINGBUFFER_MIN_SIZE) {
153                 size = RINGBUFFER_MIN_SIZE;
154                 LOGGER.warn("Invalid RingBufferSize {}, using minimum size {}.", userPreferredRBSize,
155                         RINGBUFFER_MIN_SIZE);
156             }
157             ringBufferSize = size;
158         } catch (final Exception ex) {
159             LOGGER.warn("Invalid RingBufferSize {}, using default size {}.", userPreferredRBSize, ringBufferSize);
160         }
161         return Integers.ceilingNextPowerOfTwo(ringBufferSize);
162     }
163 
164     /**
165      * Initialize an {@code Info} object that is threadlocal to the consumer/appender thread. This Info object uniquely
166      * has attribute {@code isAppenderThread} set to {@code true}. All other Info objects will have this attribute set
167      * to {@code false}. This allows us to detect Logger.log() calls initiated from the appender thread, which may cause
168      * deadlock when the RingBuffer is full. (LOG4J2-471)
169      */
170     private static void initInfoForExecutorThread() {
171         EXECUTOR.submit(new Runnable() {
172             @Override
173             public void run() {
174                 final boolean isAppenderThread = true;
175                 final Info info = new Info(new RingBufferLogEventTranslator(), //
176                         Thread.currentThread().getName(), isAppenderThread);
177                 Info.THREADLOCAL.set(info);
178             }
179         });
180     }
181 
182     private static WaitStrategy createWaitStrategy() {
183         final String strategy = PropertiesUtil.getProperties().getStringProperty("AsyncLogger.WaitStrategy");
184         LOGGER.debug("property AsyncLogger.WaitStrategy={}", strategy);
185         if ("Sleep".equals(strategy)) {
186             return new SleepingWaitStrategy();
187         } else if ("Yield".equals(strategy)) {
188             return new YieldingWaitStrategy();
189         } else if ("Block".equals(strategy)) {
190             return new BlockingWaitStrategy();
191         }
192         LOGGER.debug("disruptor event handler uses BlockingWaitStrategy");
193         return new BlockingWaitStrategy();
194     }
195 
196     private static ExceptionHandler<RingBufferLogEvent> getExceptionHandler() {
197         final String cls = PropertiesUtil.getProperties().getStringProperty("AsyncLogger.ExceptionHandler");
198         if (cls == null) {
199             LOGGER.debug("No AsyncLogger.ExceptionHandler specified");
200             return null;
201         }
202         try {
203             @SuppressWarnings("unchecked")
204             final ExceptionHandler<RingBufferLogEvent> result = Loader
205                     .newCheckedInstanceOf(cls, ExceptionHandler.class);
206             LOGGER.debug("AsyncLogger.ExceptionHandler={}", result);
207             return result;
208         } catch (final Exception ignored) {
209             LOGGER.debug("AsyncLogger.ExceptionHandler not set: error creating " + cls + ": ", ignored);
210             return null;
211         }
212     }
213 
214     /**
215      * Tuple with the event translator and thread name for a thread.
216      */
217     static class Info {
218         private static final ThreadLocal<Info> THREADLOCAL = new ThreadLocal<Info>() {
219             @Override
220             protected Info initialValue() {
221                 // by default, set isAppenderThread to false
222                 return new Info(new RingBufferLogEventTranslator(), Thread.currentThread().getName(), false);
223             }
224         };
225         private final RingBufferLogEventTranslator translator;
226         private final String cachedThreadName;
227         private final boolean isAppenderThread;
228 
229         public Info(final RingBufferLogEventTranslator translator, final String threadName,
230                 final boolean appenderThread) {
231             this.translator = translator;
232             this.cachedThreadName = threadName;
233             this.isAppenderThread = appenderThread;
234         }
235 
236         // LOG4J2-467
237         private String threadName() {
238             return THREAD_NAME_STRATEGY.getThreadName(this);
239         }
240     }
241 
242     @Override
243     public void logMessage(final String fqcn, final Level level, final Marker marker, final Message message,
244             final Throwable thrown) {
245 
246         final Disruptor<RingBufferLogEvent> temp = disruptor;
247         if (temp == null) { // LOG4J2-639
248             LOGGER.fatal("Ignoring log event after log4j was shut down");
249         } else {
250             logMessage0(temp, fqcn, level, marker, message, thrown);
251         }
252     }
253 
254     private void logMessage0(final Disruptor<RingBufferLogEvent> theDisruptor, final String fqcn, final Level level,
255             final Marker marker, final Message message, final Throwable thrown) {
256         final Info info = Info.THREADLOCAL.get();
257         logMessageInAppropriateThread(info, theDisruptor, fqcn, level, marker, message, thrown);
258     }
259 
260     private void logMessageInAppropriateThread(final Info info, final Disruptor<RingBufferLogEvent> theDisruptor,
261             final String fqcn, final Level level, final Marker marker, final Message message, final Throwable thrown) {
262         if (!logMessageInCurrentThread(info, theDisruptor, fqcn, level, marker, message, thrown)) {
263             logMessageInBackgroundThread(info, fqcn, level, marker, message, thrown);
264         }
265     }
266 
267     /**
268      * LOG4J2-471: prevent deadlock when RingBuffer is full and object being logged calls Logger.log() from its
269      * toString() method
270      *
271      * @param info threadlocal information - used to determine if the current thread is the background appender thread
272      * @param theDisruptor used to check if the buffer is full
273      * @param fqcn fully qualified caller name
274      * @param level log level
275      * @param marker optional marker
276      * @param message log message
277      * @param thrown optional exception
278      * @return {@code true} if the event has been logged in the current thread, {@code false} if it should be passed to
279      *         the background thread
280      */
281     private boolean logMessageInCurrentThread(Info info, final Disruptor<RingBufferLogEvent> theDisruptor,
282             final String fqcn, final Level level, final Marker marker, final Message message, final Throwable thrown) {
283         if (info.isAppenderThread && theDisruptor.getRingBuffer().remainingCapacity() == 0) {
284             // bypass RingBuffer and invoke Appender directly
285             final ReliabilityStrategy strategy = privateConfig.loggerConfig.getReliabilityStrategy();
286             strategy.log(this, getName(), fqcn, marker, level, message, thrown);
287             return true;
288         }
289         return false;
290     }
291 
292     /**
293      * Enqueues the specified message to be logged in the background thread.
294      * 
295      * @param info holds some cached information
296      * @param fqcn fully qualified caller name
297      * @param level log level
298      * @param marker optional marker
299      * @param message log message
300      * @param thrown optional exception
301      */
302     private void logMessageInBackgroundThread(Info info, final String fqcn, final Level level, final Marker marker,
303             final Message message, final Throwable thrown) {
304 
305         message.getFormattedMessage(); // LOG4J2-763: ask message to freeze parameters
306 
307         initLogMessageInfo(info, fqcn, level, marker, message, thrown);
308         enqueueLogMessageInfo(info);
309     }
310 
311     private void initLogMessageInfo(Info info, final String fqcn, final Level level, final Marker marker,
312             final Message message, final Throwable thrown) {
313         info.translator.setValues(this, getName(), marker, fqcn, level, message, //
314                 // don't construct ThrowableProxy until required
315                 thrown, //
316 
317                 // config properties are taken care of in the EventHandler
318                 // thread in the #actualAsyncLog method
319 
320                 // needs shallow copy to be fast (LOG4J2-154)
321                 ThreadContext.getImmutableContext(), //
322 
323                 // needs shallow copy to be fast (LOG4J2-154)
324                 ThreadContext.getImmutableStack(), //
325 
326                 // Thread.currentThread().getName(), //
327                 // info.cachedThreadName, //
328                 info.threadName(), //
329 
330                 // location: very expensive operation. LOG4J2-153:
331                 // Only include if "includeLocation=true" is specified,
332                 // exclude if not specified or if "false" was specified.
333                 calcLocationIfRequested(fqcn),
334 
335                 // System.currentTimeMillis());
336                 // CoarseCachedClock: 20% faster than system clock, 16ms gaps
337                 // CachedClock: 10% faster than system clock, smaller gaps
338                 // LOG4J2-744 avoid calling clock altogether if message has the timestamp
339                 eventTimeMillis(message), //
340                 nanoClock.nanoTime() //
341                 );
342     }
343 
344     private long eventTimeMillis(final Message message) {
345         return message instanceof TimestampMessage ? ((TimestampMessage) message).getTimestamp() : CLOCK
346                 .currentTimeMillis();
347     }
348 
349     /**
350      * Returns the caller location if requested, {@code null} otherwise.
351      * 
352      * @param fqcn fully qualified caller name.
353      * @return the caller location if requested, {@code null} otherwise.
354      */
355     private StackTraceElement calcLocationIfRequested(String fqcn) {
356         final boolean includeLocation = privateConfig.loggerConfig.isIncludeLocation();
357         return includeLocation ? location(fqcn) : null;
358     }
359 
360     private void enqueueLogMessageInfo(Info info) {
361         // LOG4J2-639: catch NPE if disruptor field was set to null after our check above
362         try {
363             // Note: do NOT use the temp variable above!
364             // That could result in adding a log event to the disruptor after it was shut down,
365             // which could cause the publishEvent method to hang and never return.
366             disruptor.publishEvent(info.translator);
367         } catch (final NullPointerException npe) {
368             LOGGER.fatal("Ignoring log event after log4j was shut down.");
369         }
370     }
371 
372     private static StackTraceElement location(final String fqcnOfLogger) {
373         return Log4jLogEvent.calcLocation(fqcnOfLogger);
374     }
375 
376     /**
377      * This method is called by the EventHandler that processes the RingBufferLogEvent in a separate thread.
378      *
379      * @param event the event to log
380      */
381     public void actualAsyncLog(final RingBufferLogEvent event) {
382         final Map<Property, Boolean> properties = privateConfig.loggerConfig.getProperties();
383         event.mergePropertiesIntoContextMap(properties, privateConfig.config.getStrSubstitutor());
384         final ReliabilityStrategy strategy = privateConfig.loggerConfig.getReliabilityStrategy();
385         strategy.log(this, event);
386     }
387 
388     public static void stop() {
389         final Disruptor<RingBufferLogEvent> temp = disruptor;
390 
391         // Must guarantee that publishing to the RingBuffer has stopped
392         // before we call disruptor.shutdown()
393         disruptor = null; // client code fails with NPE if log after stop = OK
394         if (temp == null) {
395             return; // stop() has already been called
396         }
397 
398         // Calling Disruptor.shutdown() will wait until all enqueued events are fully processed,
399         // but this waiting happens in a busy-spin. To avoid (postpone) wasting CPU,
400         // we sleep in short chunks, up to 10 seconds, waiting for the ringbuffer to drain.
401         for (int i = 0; hasBacklog(temp) && i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
402             try {
403                 Thread.sleep(SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS); // give up the CPU for a while
404             } catch (final InterruptedException e) { // ignored
405             }
406         }
407         temp.shutdown(); // busy-spins until all events currently in the disruptor have been processed
408         EXECUTOR.shutdown(); // finally, kill the processor thread
409         Info.THREADLOCAL.remove(); // LOG4J2-323
410     }
411 
412     /**
413      * Returns {@code true} if the specified disruptor still has unprocessed events.
414      */
415     private static boolean hasBacklog(final Disruptor<?> theDisruptor) {
416         final RingBuffer<?> ringBuffer = theDisruptor.getRingBuffer();
417         return !ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize());
418     }
419 
420     /**
421      * Creates and returns a new {@code RingBufferAdmin} that instruments the ringbuffer of the {@code AsyncLogger}.
422      *
423      * @param contextName name of the global {@code AsyncLoggerContext}
424      * @return a new {@code RingBufferAdmin} that instruments the ringbuffer
425      */
426     public static RingBufferAdmin createRingBufferAdmin(final String contextName) {
427         return RingBufferAdmin.forAsyncLogger(disruptor.getRingBuffer(), contextName);
428     }
429 
430     /**
431      * Returns the {@code NanoClock} to use for creating the nanoTime timestamp of log events.
432      * 
433      * @return the {@code NanoClock} to use for creating the nanoTime timestamp of log events
434      */
435     public static NanoClock getNanoClock() {
436         return nanoClock;
437     }
438 
439     /**
440      * Sets the {@code NanoClock} to use for creating the nanoTime timestamp of log events.
441      * <p>
442      * FOR INTERNAL USE. This method may be called with a different {@code NanoClock} implementation when the
443      * configuration changes.
444      * 
445      * @param nanoClock the {@code NanoClock} to use for creating the nanoTime timestamp of log events
446      */
447     public static void setNanoClock(NanoClock nanoClock) {
448         AsyncLogger.nanoClock = Objects.requireNonNull(nanoClock, "NanoClock must be non-null");
449     }
450 }