001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017package org.apache.logging.log4j.core.async;
018
019import java.util.Map;
020import java.util.Objects;
021import java.util.concurrent.ExecutorService;
022import java.util.concurrent.Executors;
023
024import org.apache.logging.log4j.Level;
025import org.apache.logging.log4j.Marker;
026import org.apache.logging.log4j.ThreadContext;
027import org.apache.logging.log4j.core.Logger;
028import org.apache.logging.log4j.core.LoggerContext;
029import org.apache.logging.log4j.core.config.Property;
030import org.apache.logging.log4j.core.config.ReliabilityStrategy;
031import org.apache.logging.log4j.core.impl.Log4jLogEvent;
032import org.apache.logging.log4j.core.jmx.RingBufferAdmin;
033import org.apache.logging.log4j.core.util.Clock;
034import org.apache.logging.log4j.core.util.ClockFactory;
035import org.apache.logging.log4j.core.util.DummyNanoClock;
036import org.apache.logging.log4j.core.util.Integers;
037import org.apache.logging.log4j.core.util.Loader;
038import org.apache.logging.log4j.core.util.NanoClock;
039import org.apache.logging.log4j.message.Message;
040import org.apache.logging.log4j.message.MessageFactory;
041import org.apache.logging.log4j.message.TimestampMessage;
042import org.apache.logging.log4j.status.StatusLogger;
043import org.apache.logging.log4j.util.PropertiesUtil;
044
045import com.lmax.disruptor.BlockingWaitStrategy;
046import com.lmax.disruptor.ExceptionHandler;
047import com.lmax.disruptor.RingBuffer;
048import com.lmax.disruptor.SleepingWaitStrategy;
049import com.lmax.disruptor.WaitStrategy;
050import com.lmax.disruptor.YieldingWaitStrategy;
051import com.lmax.disruptor.dsl.Disruptor;
052import com.lmax.disruptor.dsl.ProducerType;
053
054/**
055 * AsyncLogger is a logger designed for high throughput and low latency logging.
056 * It does not perform any I/O in the calling (application) thread, but instead
057 * hands off the work to another thread as soon as possible. The actual logging
058 * is performed in the background thread. It uses the LMAX Disruptor library for
059 * inter-thread communication. (<a
060 * href="http://lmax-exchange.github.com/disruptor/"
061 * >http://lmax-exchange.github.com/disruptor/</a>)
062 * <p>
063 * To use AsyncLogger, specify the System property
064 * {@code -DLog4jContextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector}
065 * before you obtain a Logger, and all Loggers returned by LogManager.getLogger
066 * will be AsyncLoggers.
067 * <p>
068 * Note that for performance reasons, this logger does not include source
069 * location by default. You need to specify {@code includeLocation="true"} in
070 * the configuration or any %class, %location or %line conversion patterns in
071 * your log4j.xml configuration will produce either a "?" character or no output
072 * at all.
073 * <p>
074 * For best performance, use AsyncLogger with the RandomAccessFileAppender or
075 * RollingRandomAccessFileAppender, with immediateFlush=false. These appenders
076 * have built-in support for the batching mechanism used by the Disruptor
077 * library, and they will flush to disk at the end of each batch. This means
078 * that even with immediateFlush=false, there will never be any items left in
079 * the buffer; all log events will all be written to disk in a very efficient
080 * manner.
081 */
082public class AsyncLogger extends Logger {
083    private static final long serialVersionUID = 1L;
084    private static final int SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS = 50;
085    private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 200;
086    private static final int RINGBUFFER_MIN_SIZE = 128;
087    private static final int RINGBUFFER_DEFAULT_SIZE = 256 * 1024;
088    private static final StatusLogger LOGGER = StatusLogger.getLogger();
089    private static final ThreadNameStrategy THREAD_NAME_STRATEGY = ThreadNameStrategy.create();
090
091    static enum ThreadNameStrategy { // LOG4J2-467
092        CACHED {
093            @Override
094            public String getThreadName(final Info info) {
095                return info.cachedThreadName;
096            }
097        },
098        UNCACHED {
099            @Override
100            public String getThreadName(final Info info) {
101                return Thread.currentThread().getName();
102            }
103        };
104        abstract String getThreadName(Info info);
105
106        static ThreadNameStrategy create() {
107            final String name = PropertiesUtil.getProperties().getStringProperty("AsyncLogger.ThreadNameStrategy", CACHED.name());
108            try {
109                return ThreadNameStrategy.valueOf(name);
110            } catch (final Exception ex) {
111                LOGGER.debug("Using AsyncLogger.ThreadNameStrategy.CACHED: '{}' not valid: {}", name, ex.toString());
112                return CACHED;
113            }
114        }
115    }
116    private static volatile Disruptor<RingBufferLogEvent> disruptor;
117    private static final Clock CLOCK = ClockFactory.getClock();
118    private static volatile NanoClock nanoClock = new DummyNanoClock();
119
120    private static final ExecutorService executor = Executors
121            .newSingleThreadExecutor(new DaemonThreadFactory("AsyncLogger-"));
122
123    static {
124        initInfoForExecutorThread();
125        LOGGER.debug("AsyncLogger.ThreadNameStrategy={}", THREAD_NAME_STRATEGY);
126        final int ringBufferSize = calculateRingBufferSize();
127
128        final WaitStrategy waitStrategy = createWaitStrategy();
129        disruptor = new Disruptor<>(RingBufferLogEvent.FACTORY, ringBufferSize, executor, ProducerType.MULTI,
130                waitStrategy);
131        disruptor.handleExceptionsWith(getExceptionHandler());
132        disruptor.handleEventsWith(new RingBufferLogEventHandler());
133
134        LOGGER.debug("Starting AsyncLogger disruptor with ringbuffer size {}...", disruptor.getRingBuffer()
135                .getBufferSize());
136        disruptor.start();
137    }
138
139    private static int calculateRingBufferSize() {
140        int ringBufferSize = RINGBUFFER_DEFAULT_SIZE;
141        final String userPreferredRBSize = PropertiesUtil.getProperties().getStringProperty("AsyncLogger.RingBufferSize",
142                String.valueOf(ringBufferSize));
143        try {
144            int size = Integer.parseInt(userPreferredRBSize);
145            if (size < RINGBUFFER_MIN_SIZE) {
146                size = RINGBUFFER_MIN_SIZE;
147                LOGGER.warn("Invalid RingBufferSize {}, using minimum size {}.", userPreferredRBSize,
148                        RINGBUFFER_MIN_SIZE);
149            }
150            ringBufferSize = size;
151        } catch (final Exception ex) {
152            LOGGER.warn("Invalid RingBufferSize {}, using default size {}.", userPreferredRBSize, ringBufferSize);
153        }
154        return Integers.ceilingNextPowerOfTwo(ringBufferSize);
155    }
156
157    /**
158     * Initialize an {@code Info} object that is threadlocal to the consumer/appender thread.
159     * This Info object uniquely has attribute {@code isAppenderThread} set to {@code true}.
160     * All other Info objects will have this attribute set to {@code false}.
161     * This allows us to detect Logger.log() calls initiated from the appender thread,
162     * which may cause deadlock when the RingBuffer is full. (LOG4J2-471)
163     */
164    private static void initInfoForExecutorThread() {
165        executor.submit(new Runnable(){
166            @Override
167            public void run() {
168                final boolean isAppenderThread = true;
169                final Info info = new Info(new RingBufferLogEventTranslator(), //
170                        Thread.currentThread().getName(), isAppenderThread);
171                Info.threadlocalInfo.set(info);
172            }
173        });
174    }
175
176    private static WaitStrategy createWaitStrategy() {
177        final String strategy = PropertiesUtil.getProperties().getStringProperty("AsyncLogger.WaitStrategy");
178        LOGGER.debug("property AsyncLogger.WaitStrategy={}", strategy);
179        if ("Sleep".equals(strategy)) {
180            return new SleepingWaitStrategy();
181        } else if ("Yield".equals(strategy)) {
182            return new YieldingWaitStrategy();
183        } else if ("Block".equals(strategy)) {
184            return new BlockingWaitStrategy();
185        }
186        LOGGER.debug("disruptor event handler uses BlockingWaitStrategy");
187        return new BlockingWaitStrategy();
188    }
189
190    private static ExceptionHandler<RingBufferLogEvent> getExceptionHandler() {
191        final String cls = PropertiesUtil.getProperties().getStringProperty("AsyncLogger.ExceptionHandler");
192        if (cls == null) {
193            LOGGER.debug("No AsyncLogger.ExceptionHandler specified");
194            return null;
195        }
196        try {
197            @SuppressWarnings("unchecked")
198            final ExceptionHandler<RingBufferLogEvent> result = Loader.newCheckedInstanceOf(cls, ExceptionHandler.class);
199            LOGGER.debug("AsyncLogger.ExceptionHandler={}", result);
200            return result;
201        } catch (final Exception ignored) {
202            LOGGER.debug("AsyncLogger.ExceptionHandler not set: error creating " + cls + ": ", ignored);
203            return null;
204        }
205    }
206
207    /**
208     * Constructs an {@code AsyncLogger} with the specified context, name and
209     * message factory.
210     *
211     * @param context context of this logger
212     * @param name name of this logger
213     * @param messageFactory message factory of this logger
214     */
215    public AsyncLogger(final LoggerContext context, final String name, final MessageFactory messageFactory) {
216        super(context, name, messageFactory);
217    }
218
219    /**
220     * Tuple with the event translator and thread name for a thread.
221     */
222    static class Info {
223        private static final ThreadLocal<Info> threadlocalInfo = new ThreadLocal<Info>() {
224            @Override
225            protected Info initialValue() {
226                // by default, set isAppenderThread to false
227                return new Info(new RingBufferLogEventTranslator(), Thread.currentThread().getName(), false);
228            }
229        };
230        private final RingBufferLogEventTranslator translator;
231        private final String cachedThreadName;
232        private final boolean isAppenderThread;
233        
234        public Info(final RingBufferLogEventTranslator translator, final String threadName, final boolean appenderThread) {
235            this.translator = translator;
236            this.cachedThreadName = threadName;
237            this.isAppenderThread = appenderThread;
238        }
239
240        // LOG4J2-467
241        private String threadName() {
242            return THREAD_NAME_STRATEGY.getThreadName(this);
243        }
244    }
245
246    @Override
247    public void logMessage(final String fqcn, final Level level, final Marker marker, final Message message,
248            final Throwable thrown) {
249        
250        final Disruptor<RingBufferLogEvent> temp = disruptor;
251        if (temp == null) { // LOG4J2-639
252            LOGGER.fatal("Ignoring log event after log4j was shut down");
253        } else {
254            logMessage0(temp, fqcn, level, marker, message, thrown);
255        }
256    }
257
258    private void logMessage0(final Disruptor<RingBufferLogEvent> theDisruptor, final String fqcn, final Level level,
259            final Marker marker, final Message message, final Throwable thrown) {
260        final Info info = Info.threadlocalInfo.get();
261        logMessageInAppropriateThread(info, theDisruptor, fqcn, level, marker, message, thrown);
262    }
263
264    private void logMessageInAppropriateThread(final Info info, final Disruptor<RingBufferLogEvent> theDisruptor,
265            final String fqcn, final Level level, final Marker marker, final Message message, final Throwable thrown) {
266        if (!logMessageInCurrentThread(info, theDisruptor, fqcn, level, marker, message, thrown)) {
267            logMessageInBackgroundThread(info, fqcn, level, marker, message, thrown);
268        }
269    }
270
271    /**
272     * LOG4J2-471: prevent deadlock when RingBuffer is full and object
273     * being logged calls Logger.log() from its toString() method
274     *
275     * @param info threadlocal information - used to determine if the current thread is the background appender thread
276     * @param theDisruptor used to check if the buffer is full
277     * @param fqcn fully qualified caller name
278     * @param level log level
279     * @param marker optional marker
280     * @param message log message
281     * @param thrown optional exception
282     * @return {@code true} if the event has been logged in the current thread, {@code false} if it should be passed to
283     *          the background thread
284     */
285    private boolean logMessageInCurrentThread(Info info, final Disruptor<RingBufferLogEvent> theDisruptor,
286            final String fqcn, final Level level, final Marker marker, final Message message, final Throwable thrown) {
287        if (info.isAppenderThread && theDisruptor.getRingBuffer().remainingCapacity() == 0) {
288            // bypass RingBuffer and invoke Appender directly
289            final ReliabilityStrategy strategy = privateConfig.loggerConfig.getReliabilityStrategy();
290            strategy.log(this, getName(), fqcn, marker, level, message, thrown);
291            return true;
292        }
293        return false;
294    }
295
296    /**
297     * Enqueues the specified message to be logged in the background thread.
298     * 
299     * @param info holds some cached information
300     * @param fqcn fully qualified caller name
301     * @param level log level
302     * @param marker optional marker
303     * @param message log message
304     * @param thrown optional exception
305     */
306    private void logMessageInBackgroundThread(Info info, final String fqcn, final Level level, final Marker marker,
307            final Message message, final Throwable thrown) {
308        
309        message.getFormattedMessage(); // LOG4J2-763: ask message to freeze parameters
310
311        initLogMessageInfo(info, fqcn, level, marker, message, thrown);
312        enqueueLogMessageInfo(info);
313    }
314
315    private void initLogMessageInfo(Info info, final String fqcn, final Level level, final Marker marker,
316            final Message message, final Throwable thrown) {
317        info.translator.setValues(this, getName(), marker, fqcn, level, message, //
318                // don't construct ThrowableProxy until required
319                thrown, //
320
321                // config properties are taken care of in the EventHandler
322                // thread in the #actualAsyncLog method
323
324                // needs shallow copy to be fast (LOG4J2-154)
325                ThreadContext.getImmutableContext(), //
326
327                // needs shallow copy to be fast (LOG4J2-154)
328                ThreadContext.getImmutableStack(), //
329
330                // Thread.currentThread().getName(), //
331                // info.cachedThreadName, //
332                info.threadName(), //
333
334                // location: very expensive operation. LOG4J2-153:
335                // Only include if "includeLocation=true" is specified,
336                // exclude if not specified or if "false" was specified.
337                calcLocationIfRequested(fqcn),
338
339                // System.currentTimeMillis());
340                // CoarseCachedClock: 20% faster than system clock, 16ms gaps
341                // CachedClock: 10% faster than system clock, smaller gaps
342                // LOG4J2-744 avoid calling clock altogether if message has the timestamp
343                eventTimeMillis(message), //
344                nanoClock.nanoTime() //
345        );
346    }
347
348    private long eventTimeMillis(final Message message) {
349        return message instanceof TimestampMessage ? ((TimestampMessage) message).getTimestamp() :
350                CLOCK.currentTimeMillis();
351    }
352
353    /**
354     * Returns the caller location if requested, {@code null} otherwise.
355     * @param fqcn fully qualified caller name.
356     * @return the caller location if requested, {@code null} otherwise.
357     */
358    private StackTraceElement calcLocationIfRequested(String fqcn) {
359        final boolean includeLocation = privateConfig.loggerConfig.isIncludeLocation();
360        return includeLocation ? location(fqcn) : null;
361    }
362
363    private void enqueueLogMessageInfo(Info info) {
364        // LOG4J2-639: catch NPE if disruptor field was set to null after our check above
365        try {
366            // Note: do NOT use the temp variable above!
367            // That could result in adding a log event to the disruptor after it was shut down,
368            // which could cause the publishEvent method to hang and never return.
369            disruptor.publishEvent(info.translator);
370        } catch (final NullPointerException npe) {
371            LOGGER.fatal("Ignoring log event after log4j was shut down.");
372        }
373    }
374
375    private static StackTraceElement location(final String fqcnOfLogger) {
376        return Log4jLogEvent.calcLocation(fqcnOfLogger);
377    }
378
379    /**
380     * This method is called by the EventHandler that processes the
381     * RingBufferLogEvent in a separate thread.
382     *
383     * @param event the event to log
384     */
385    public void actualAsyncLog(final RingBufferLogEvent event) {
386        final Map<Property, Boolean> properties = privateConfig.loggerConfig.getProperties();
387        event.mergePropertiesIntoContextMap(properties, privateConfig.config.getStrSubstitutor());
388        final ReliabilityStrategy strategy = privateConfig.loggerConfig.getReliabilityStrategy();
389        strategy.log(this, event);
390    }
391
392    public static void stop() {
393        final Disruptor<RingBufferLogEvent> temp = disruptor;
394
395        // Must guarantee that publishing to the RingBuffer has stopped
396        // before we call disruptor.shutdown()
397        disruptor = null; // client code fails with NPE if log after stop = OK
398        if (temp == null) {
399            return; // stop() has already been called
400        }
401
402        // Calling Disruptor.shutdown() will wait until all enqueued events are fully processed,
403        // but this waiting happens in a busy-spin. To avoid (postpone) wasting CPU,
404        // we sleep in short chunks, up to 10 seconds, waiting for the ringbuffer to drain.
405        for (int i = 0; hasBacklog(temp) && i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
406            try {
407                Thread.sleep(SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS); // give up the CPU for a while
408            } catch (final InterruptedException e) { // ignored
409            }
410        }
411        temp.shutdown(); // busy-spins until all events currently in the disruptor have been processed
412        executor.shutdown(); // finally, kill the processor thread
413        Info.threadlocalInfo.remove(); // LOG4J2-323
414    }
415
416    /**
417     * Returns {@code true} if the specified disruptor still has unprocessed events.
418     */
419    private static boolean hasBacklog(final Disruptor<?> disruptor) {
420        final RingBuffer<?> ringBuffer = disruptor.getRingBuffer();
421        return !ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize());
422    }
423
424    /**
425     * Creates and returns a new {@code RingBufferAdmin} that instruments the
426     * ringbuffer of the {@code AsyncLogger}.
427     *
428     * @param contextName name of the global {@code AsyncLoggerContext}
429     */
430    public static RingBufferAdmin createRingBufferAdmin(final String contextName) {
431        return RingBufferAdmin.forAsyncLogger(disruptor.getRingBuffer(), contextName);
432    }
433    
434    /**
435     * Returns the {@code NanoClock} to use for creating the nanoTime timestamp of log events.
436     * @return the {@code NanoClock} to use for creating the nanoTime timestamp of log events
437     */
438    public static NanoClock getNanoClock() {
439        return nanoClock;
440    }
441    
442    /**
443     * Sets the {@code NanoClock} to use for creating the nanoTime timestamp of log events.
444     * <p>
445     * FOR INTERNAL USE. This method may be called with a different {@code NanoClock} implementation when the
446     * configuration changes.
447     * 
448     * @param nanoClock the {@code NanoClock} to use for creating the nanoTime timestamp of log events
449     */
450    public static void setNanoClock(NanoClock nanoClock) {
451        AsyncLogger.nanoClock = Objects.requireNonNull(nanoClock, "NanoClock must be non-null");
452    }
453}