001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017package org.apache.logging.log4j.core.async;
018
019import java.util.Map;
020import java.util.Objects;
021import java.util.concurrent.ExecutorService;
022import java.util.concurrent.Executors;
023
024import org.apache.logging.log4j.Level;
025import org.apache.logging.log4j.Marker;
026import org.apache.logging.log4j.ThreadContext;
027import org.apache.logging.log4j.core.Logger;
028import org.apache.logging.log4j.core.LoggerContext;
029import org.apache.logging.log4j.core.config.Property;
030import org.apache.logging.log4j.core.config.ReliabilityStrategy;
031import org.apache.logging.log4j.core.impl.Log4jLogEvent;
032import org.apache.logging.log4j.core.jmx.RingBufferAdmin;
033import org.apache.logging.log4j.core.util.Clock;
034import org.apache.logging.log4j.core.util.ClockFactory;
035import org.apache.logging.log4j.core.util.DummyNanoClock;
036import org.apache.logging.log4j.core.util.Integers;
037import org.apache.logging.log4j.core.util.Loader;
038import org.apache.logging.log4j.core.util.NanoClock;
039import org.apache.logging.log4j.message.Message;
040import org.apache.logging.log4j.message.MessageFactory;
041import org.apache.logging.log4j.message.TimestampMessage;
042import org.apache.logging.log4j.status.StatusLogger;
043import org.apache.logging.log4j.util.PropertiesUtil;
044
045import com.lmax.disruptor.BlockingWaitStrategy;
046import com.lmax.disruptor.ExceptionHandler;
047import com.lmax.disruptor.RingBuffer;
048import com.lmax.disruptor.SleepingWaitStrategy;
049import com.lmax.disruptor.WaitStrategy;
050import com.lmax.disruptor.YieldingWaitStrategy;
051import com.lmax.disruptor.dsl.Disruptor;
052import com.lmax.disruptor.dsl.ProducerType;
053
054/**
055 * AsyncLogger is a logger designed for high throughput and low latency logging. It does not perform any I/O in the
056 * calling (application) thread, but instead hands off the work to another thread as soon as possible. The actual
057 * logging is performed in the background thread. It uses the LMAX Disruptor library for inter-thread communication. (<a
058 * href="http://lmax-exchange.github.com/disruptor/" >http://lmax-exchange.github.com/disruptor/</a>)
059 * <p>
060 * To use AsyncLogger, specify the System property
061 * {@code -DLog4jContextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector} before you obtain a
062 * Logger, and all Loggers returned by LogManager.getLogger will be AsyncLoggers.
063 * <p>
064 * Note that for performance reasons, this logger does not include source location by default. You need to specify
065 * {@code includeLocation="true"} in the configuration or any %class, %location or %line conversion patterns in your
066 * log4j.xml configuration will produce either a "?" character or no output at all.
067 * <p>
068 * For best performance, use AsyncLogger with the RandomAccessFileAppender or RollingRandomAccessFileAppender, with
069 * immediateFlush=false. These appenders have built-in support for the batching mechanism used by the Disruptor library,
070 * and they will flush to disk at the end of each batch. This means that even with immediateFlush=false, there will
071 * never be any items left in the buffer; all log events will all be written to disk in a very efficient manner.
072 */
073public class AsyncLogger extends Logger {
074    private static final long serialVersionUID = 1L;
075    private static final int SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS = 50;
076    private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 200;
077    private static final int RINGBUFFER_MIN_SIZE = 128;
078    private static final int RINGBUFFER_DEFAULT_SIZE = 256 * 1024;
079    private static final StatusLogger LOGGER = StatusLogger.getLogger();
080    private static final ThreadNameStrategy THREAD_NAME_STRATEGY = ThreadNameStrategy.create();
081
082    /**
083     * Strategy for deciding whether thread name should be cached or not.
084     */
085    static enum ThreadNameStrategy { // LOG4J2-467
086        CACHED {
087            @Override
088            public String getThreadName(final Info info) {
089                return info.cachedThreadName;
090            }
091        },
092        UNCACHED {
093            @Override
094            public String getThreadName(final Info info) {
095                return Thread.currentThread().getName();
096            }
097        };
098        abstract String getThreadName(Info info);
099
100        static ThreadNameStrategy create() {
101            final String name = PropertiesUtil.getProperties().getStringProperty("AsyncLogger.ThreadNameStrategy",
102                    CACHED.name());
103            try {
104                return ThreadNameStrategy.valueOf(name);
105            } catch (final Exception ex) {
106                LOGGER.debug("Using AsyncLogger.ThreadNameStrategy.CACHED: '{}' not valid: {}", name, ex.toString());
107                return CACHED;
108            }
109        }
110    }
111
112    private static volatile Disruptor<RingBufferLogEvent> disruptor;
113    private static final Clock CLOCK = ClockFactory.getClock();
114    private static volatile NanoClock nanoClock = new DummyNanoClock();
115
116    private static final ExecutorService EXECUTOR = Executors.newSingleThreadExecutor(new DaemonThreadFactory(
117            "AsyncLogger-"));
118
119    static {
120        initInfoForExecutorThread();
121        LOGGER.debug("AsyncLogger.ThreadNameStrategy={}", THREAD_NAME_STRATEGY);
122        final int ringBufferSize = calculateRingBufferSize();
123
124        final WaitStrategy waitStrategy = createWaitStrategy();
125        disruptor = new Disruptor<>(RingBufferLogEvent.FACTORY, ringBufferSize, EXECUTOR, ProducerType.MULTI,
126                waitStrategy);
127        disruptor.handleExceptionsWith(getExceptionHandler());
128        disruptor.handleEventsWith(new RingBufferLogEventHandler());
129
130        LOGGER.debug("Starting AsyncLogger disruptor with ringbuffer size {}...", disruptor.getRingBuffer()
131                .getBufferSize());
132        disruptor.start();
133    }
134
135    /**
136     * Constructs an {@code AsyncLogger} with the specified context, name and message factory.
137     *
138     * @param context context of this logger
139     * @param name name of this logger
140     * @param messageFactory message factory of this logger
141     */
142    public AsyncLogger(final LoggerContext context, final String name, final MessageFactory messageFactory) {
143        super(context, name, messageFactory);
144    }
145
146    private static int calculateRingBufferSize() {
147        int ringBufferSize = RINGBUFFER_DEFAULT_SIZE;
148        final String userPreferredRBSize = PropertiesUtil.getProperties().getStringProperty(
149                "AsyncLogger.RingBufferSize", String.valueOf(ringBufferSize));
150        try {
151            int size = Integer.parseInt(userPreferredRBSize);
152            if (size < RINGBUFFER_MIN_SIZE) {
153                size = RINGBUFFER_MIN_SIZE;
154                LOGGER.warn("Invalid RingBufferSize {}, using minimum size {}.", userPreferredRBSize,
155                        RINGBUFFER_MIN_SIZE);
156            }
157            ringBufferSize = size;
158        } catch (final Exception ex) {
159            LOGGER.warn("Invalid RingBufferSize {}, using default size {}.", userPreferredRBSize, ringBufferSize);
160        }
161        return Integers.ceilingNextPowerOfTwo(ringBufferSize);
162    }
163
164    /**
165     * Initialize an {@code Info} object that is threadlocal to the consumer/appender thread. This Info object uniquely
166     * has attribute {@code isAppenderThread} set to {@code true}. All other Info objects will have this attribute set
167     * to {@code false}. This allows us to detect Logger.log() calls initiated from the appender thread, which may cause
168     * deadlock when the RingBuffer is full. (LOG4J2-471)
169     */
170    private static void initInfoForExecutorThread() {
171        EXECUTOR.submit(new Runnable() {
172            @Override
173            public void run() {
174                final boolean isAppenderThread = true;
175                final Info info = new Info(new RingBufferLogEventTranslator(), //
176                        Thread.currentThread().getName(), isAppenderThread);
177                Info.THREADLOCAL.set(info);
178            }
179        });
180    }
181
182    private static WaitStrategy createWaitStrategy() {
183        final String strategy = PropertiesUtil.getProperties().getStringProperty("AsyncLogger.WaitStrategy");
184        LOGGER.debug("property AsyncLogger.WaitStrategy={}", strategy);
185        if ("Sleep".equals(strategy)) {
186            return new SleepingWaitStrategy();
187        } else if ("Yield".equals(strategy)) {
188            return new YieldingWaitStrategy();
189        } else if ("Block".equals(strategy)) {
190            return new BlockingWaitStrategy();
191        }
192        LOGGER.debug("disruptor event handler uses BlockingWaitStrategy");
193        return new BlockingWaitStrategy();
194    }
195
196    private static ExceptionHandler<RingBufferLogEvent> getExceptionHandler() {
197        final String cls = PropertiesUtil.getProperties().getStringProperty("AsyncLogger.ExceptionHandler");
198        if (cls == null) {
199            LOGGER.debug("No AsyncLogger.ExceptionHandler specified");
200            return null;
201        }
202        try {
203            @SuppressWarnings("unchecked")
204            final ExceptionHandler<RingBufferLogEvent> result = Loader
205                    .newCheckedInstanceOf(cls, ExceptionHandler.class);
206            LOGGER.debug("AsyncLogger.ExceptionHandler={}", result);
207            return result;
208        } catch (final Exception ignored) {
209            LOGGER.debug("AsyncLogger.ExceptionHandler not set: error creating " + cls + ": ", ignored);
210            return null;
211        }
212    }
213
214    /**
215     * Tuple with the event translator and thread name for a thread.
216     */
217    static class Info {
218        private static final ThreadLocal<Info> THREADLOCAL = new ThreadLocal<Info>() {
219            @Override
220            protected Info initialValue() {
221                // by default, set isAppenderThread to false
222                return new Info(new RingBufferLogEventTranslator(), Thread.currentThread().getName(), false);
223            }
224        };
225        private final RingBufferLogEventTranslator translator;
226        private final String cachedThreadName;
227        private final boolean isAppenderThread;
228
229        public Info(final RingBufferLogEventTranslator translator, final String threadName,
230                final boolean appenderThread) {
231            this.translator = translator;
232            this.cachedThreadName = threadName;
233            this.isAppenderThread = appenderThread;
234        }
235
236        // LOG4J2-467
237        private String threadName() {
238            return THREAD_NAME_STRATEGY.getThreadName(this);
239        }
240    }
241
242    @Override
243    public void logMessage(final String fqcn, final Level level, final Marker marker, final Message message,
244            final Throwable thrown) {
245
246        final Disruptor<RingBufferLogEvent> temp = disruptor;
247        if (temp == null) { // LOG4J2-639
248            LOGGER.fatal("Ignoring log event after log4j was shut down");
249        } else {
250            logMessage0(temp, fqcn, level, marker, message, thrown);
251        }
252    }
253
254    private void logMessage0(final Disruptor<RingBufferLogEvent> theDisruptor, final String fqcn, final Level level,
255            final Marker marker, final Message message, final Throwable thrown) {
256        final Info info = Info.THREADLOCAL.get();
257        logMessageInAppropriateThread(info, theDisruptor, fqcn, level, marker, message, thrown);
258    }
259
260    private void logMessageInAppropriateThread(final Info info, final Disruptor<RingBufferLogEvent> theDisruptor,
261            final String fqcn, final Level level, final Marker marker, final Message message, final Throwable thrown) {
262        if (!logMessageInCurrentThread(info, theDisruptor, fqcn, level, marker, message, thrown)) {
263            logMessageInBackgroundThread(info, fqcn, level, marker, message, thrown);
264        }
265    }
266
267    /**
268     * LOG4J2-471: prevent deadlock when RingBuffer is full and object being logged calls Logger.log() from its
269     * toString() method
270     *
271     * @param info threadlocal information - used to determine if the current thread is the background appender thread
272     * @param theDisruptor used to check if the buffer is full
273     * @param fqcn fully qualified caller name
274     * @param level log level
275     * @param marker optional marker
276     * @param message log message
277     * @param thrown optional exception
278     * @return {@code true} if the event has been logged in the current thread, {@code false} if it should be passed to
279     *         the background thread
280     */
281    private boolean logMessageInCurrentThread(Info info, final Disruptor<RingBufferLogEvent> theDisruptor,
282            final String fqcn, final Level level, final Marker marker, final Message message, final Throwable thrown) {
283        if (info.isAppenderThread && theDisruptor.getRingBuffer().remainingCapacity() == 0) {
284            // bypass RingBuffer and invoke Appender directly
285            final ReliabilityStrategy strategy = privateConfig.loggerConfig.getReliabilityStrategy();
286            strategy.log(this, getName(), fqcn, marker, level, message, thrown);
287            return true;
288        }
289        return false;
290    }
291
292    /**
293     * Enqueues the specified message to be logged in the background thread.
294     * 
295     * @param info holds some cached information
296     * @param fqcn fully qualified caller name
297     * @param level log level
298     * @param marker optional marker
299     * @param message log message
300     * @param thrown optional exception
301     */
302    private void logMessageInBackgroundThread(Info info, final String fqcn, final Level level, final Marker marker,
303            final Message message, final Throwable thrown) {
304
305        message.getFormattedMessage(); // LOG4J2-763: ask message to freeze parameters
306
307        initLogMessageInfo(info, fqcn, level, marker, message, thrown);
308        enqueueLogMessageInfo(info);
309    }
310
311    private void initLogMessageInfo(Info info, final String fqcn, final Level level, final Marker marker,
312            final Message message, final Throwable thrown) {
313        info.translator.setValues(this, getName(), marker, fqcn, level, message, //
314                // don't construct ThrowableProxy until required
315                thrown, //
316
317                // config properties are taken care of in the EventHandler
318                // thread in the #actualAsyncLog method
319
320                // needs shallow copy to be fast (LOG4J2-154)
321                ThreadContext.getImmutableContext(), //
322
323                // needs shallow copy to be fast (LOG4J2-154)
324                ThreadContext.getImmutableStack(), //
325
326                // Thread.currentThread().getName(), //
327                // info.cachedThreadName, //
328                info.threadName(), //
329
330                // location: very expensive operation. LOG4J2-153:
331                // Only include if "includeLocation=true" is specified,
332                // exclude if not specified or if "false" was specified.
333                calcLocationIfRequested(fqcn),
334
335                // System.currentTimeMillis());
336                // CoarseCachedClock: 20% faster than system clock, 16ms gaps
337                // CachedClock: 10% faster than system clock, smaller gaps
338                // LOG4J2-744 avoid calling clock altogether if message has the timestamp
339                eventTimeMillis(message), //
340                nanoClock.nanoTime() //
341                );
342    }
343
344    private long eventTimeMillis(final Message message) {
345        return message instanceof TimestampMessage ? ((TimestampMessage) message).getTimestamp() : CLOCK
346                .currentTimeMillis();
347    }
348
349    /**
350     * Returns the caller location if requested, {@code null} otherwise.
351     * 
352     * @param fqcn fully qualified caller name.
353     * @return the caller location if requested, {@code null} otherwise.
354     */
355    private StackTraceElement calcLocationIfRequested(String fqcn) {
356        final boolean includeLocation = privateConfig.loggerConfig.isIncludeLocation();
357        return includeLocation ? location(fqcn) : null;
358    }
359
360    private void enqueueLogMessageInfo(Info info) {
361        // LOG4J2-639: catch NPE if disruptor field was set to null after our check above
362        try {
363            // Note: do NOT use the temp variable above!
364            // That could result in adding a log event to the disruptor after it was shut down,
365            // which could cause the publishEvent method to hang and never return.
366            disruptor.publishEvent(info.translator);
367        } catch (final NullPointerException npe) {
368            LOGGER.fatal("Ignoring log event after log4j was shut down.");
369        }
370    }
371
372    private static StackTraceElement location(final String fqcnOfLogger) {
373        return Log4jLogEvent.calcLocation(fqcnOfLogger);
374    }
375
376    /**
377     * This method is called by the EventHandler that processes the RingBufferLogEvent in a separate thread.
378     *
379     * @param event the event to log
380     */
381    public void actualAsyncLog(final RingBufferLogEvent event) {
382        final Map<Property, Boolean> properties = privateConfig.loggerConfig.getProperties();
383        event.mergePropertiesIntoContextMap(properties, privateConfig.config.getStrSubstitutor());
384        final ReliabilityStrategy strategy = privateConfig.loggerConfig.getReliabilityStrategy();
385        strategy.log(this, event);
386    }
387
388    public static void stop() {
389        final Disruptor<RingBufferLogEvent> temp = disruptor;
390
391        // Must guarantee that publishing to the RingBuffer has stopped
392        // before we call disruptor.shutdown()
393        disruptor = null; // client code fails with NPE if log after stop = OK
394        if (temp == null) {
395            return; // stop() has already been called
396        }
397
398        // Calling Disruptor.shutdown() will wait until all enqueued events are fully processed,
399        // but this waiting happens in a busy-spin. To avoid (postpone) wasting CPU,
400        // we sleep in short chunks, up to 10 seconds, waiting for the ringbuffer to drain.
401        for (int i = 0; hasBacklog(temp) && i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
402            try {
403                Thread.sleep(SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS); // give up the CPU for a while
404            } catch (final InterruptedException e) { // ignored
405            }
406        }
407        temp.shutdown(); // busy-spins until all events currently in the disruptor have been processed
408        EXECUTOR.shutdown(); // finally, kill the processor thread
409        Info.THREADLOCAL.remove(); // LOG4J2-323
410    }
411
412    /**
413     * Returns {@code true} if the specified disruptor still has unprocessed events.
414     */
415    private static boolean hasBacklog(final Disruptor<?> theDisruptor) {
416        final RingBuffer<?> ringBuffer = theDisruptor.getRingBuffer();
417        return !ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize());
418    }
419
420    /**
421     * Creates and returns a new {@code RingBufferAdmin} that instruments the ringbuffer of the {@code AsyncLogger}.
422     *
423     * @param contextName name of the global {@code AsyncLoggerContext}
424     * @return a new {@code RingBufferAdmin} that instruments the ringbuffer
425     */
426    public static RingBufferAdmin createRingBufferAdmin(final String contextName) {
427        return RingBufferAdmin.forAsyncLogger(disruptor.getRingBuffer(), contextName);
428    }
429
430    /**
431     * Returns the {@code NanoClock} to use for creating the nanoTime timestamp of log events.
432     * 
433     * @return the {@code NanoClock} to use for creating the nanoTime timestamp of log events
434     */
435    public static NanoClock getNanoClock() {
436        return nanoClock;
437    }
438
439    /**
440     * Sets the {@code NanoClock} to use for creating the nanoTime timestamp of log events.
441     * <p>
442     * FOR INTERNAL USE. This method may be called with a different {@code NanoClock} implementation when the
443     * configuration changes.
444     * 
445     * @param nanoClock the {@code NanoClock} to use for creating the nanoTime timestamp of log events
446     */
447    public static void setNanoClock(NanoClock nanoClock) {
448        AsyncLogger.nanoClock = Objects.requireNonNull(nanoClock, "NanoClock must be non-null");
449    }
450}