001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017package org.apache.logging.log4j.core.async;
018
019import java.util.Map;
020import java.util.concurrent.ExecutorService;
021import java.util.concurrent.Executors;
022
023import org.apache.logging.log4j.Level;
024import org.apache.logging.log4j.Marker;
025import org.apache.logging.log4j.ThreadContext;
026import org.apache.logging.log4j.core.Logger;
027import org.apache.logging.log4j.core.LoggerContext;
028import org.apache.logging.log4j.core.config.Property;
029import org.apache.logging.log4j.core.helpers.Clock;
030import org.apache.logging.log4j.core.helpers.ClockFactory;
031import org.apache.logging.log4j.core.impl.Log4jLogEvent;
032import org.apache.logging.log4j.core.jmx.RingBufferAdmin;
033import org.apache.logging.log4j.message.Message;
034import org.apache.logging.log4j.message.MessageFactory;
035import org.apache.logging.log4j.status.StatusLogger;
036
037import com.lmax.disruptor.BlockingWaitStrategy;
038import com.lmax.disruptor.EventHandler;
039import com.lmax.disruptor.ExceptionHandler;
040import com.lmax.disruptor.RingBuffer;
041import com.lmax.disruptor.SleepingWaitStrategy;
042import com.lmax.disruptor.WaitStrategy;
043import com.lmax.disruptor.YieldingWaitStrategy;
044import com.lmax.disruptor.dsl.Disruptor;
045import com.lmax.disruptor.dsl.ProducerType;
046import com.lmax.disruptor.util.Util;
047
048/**
049 * AsyncLogger is a logger designed for high throughput and low latency logging.
050 * It does not perform any I/O in the calling (application) thread, but instead
051 * hands off the work to another thread as soon as possible. The actual logging
052 * is performed in the background thread. It uses the LMAX Disruptor library for
053 * inter-thread communication. (<a
054 * href="http://lmax-exchange.github.com/disruptor/"
055 * >http://lmax-exchange.github.com/disruptor/</a>)
056 * <p>
057 * To use AsyncLogger, specify the System property
058 * {@code -DLog4jContextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector}
059 * before you obtain a Logger, and all Loggers returned by LogManager.getLogger
060 * will be AsyncLoggers.
061 * <p>
062 * Note that for performance reasons, this logger does not include source
063 * location by default. You need to specify {@code includeLocation="true"} in
064 * the configuration or any %class, %location or %line conversion patterns in
065 * your log4j.xml configuration will produce either a "?" character or no output
066 * at all.
067 * <p>
068 * For best performance, use AsyncLogger with the RandomAccessFileAppender or
069 * RollingRandomAccessFileAppender, with immediateFlush=false. These appenders
070 * have built-in support for the batching mechanism used by the Disruptor
071 * library, and they will flush to disk at the end of each batch. This means
072 * that even with immediateFlush=false, there will never be any items left in
073 * the buffer; all log events will all be written to disk in a very efficient
074 * manner.
075 */
076public class AsyncLogger extends Logger {
077    private static final long serialVersionUID = 1L;
078    private static final int HALF_A_SECOND = 500;
079    private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 20;
080    private static final int RINGBUFFER_MIN_SIZE = 128;
081    private static final int RINGBUFFER_DEFAULT_SIZE = 256 * 1024;
082    private static final StatusLogger LOGGER = StatusLogger.getLogger();
083    private static final ThreadNameStrategy THREAD_NAME_STRATEGY = ThreadNameStrategy.create();
084    private static final ThreadLocal<Info> threadlocalInfo = new ThreadLocal<Info>();
085
086    static enum ThreadNameStrategy { // LOG4J2-467
087        CACHED {
088            @Override
089            public String getThreadName(Info info) {
090                return info.cachedThreadName;
091            }
092        },
093        UNCACHED {
094            @Override
095            public String getThreadName(Info info) {
096                return Thread.currentThread().getName();
097            }
098        };
099        abstract String getThreadName(Info info);
100
101        static ThreadNameStrategy create() {
102            String name = System.getProperty("AsyncLogger.ThreadNameStrategy", CACHED.name());
103            try {
104                return ThreadNameStrategy.valueOf(name);
105            } catch (Exception ex) {
106                return CACHED;
107            }
108        }
109    }
110    private static volatile Disruptor<RingBufferLogEvent> disruptor;
111    private static Clock clock = ClockFactory.getClock();
112
113    private static ExecutorService executor = Executors
114            .newSingleThreadExecutor(new DaemonThreadFactory("AsyncLogger-"));
115
116    static {
117        initInfoForExecutorThread();
118        LOGGER.debug("AsyncLogger.ThreadNameStrategy={}", THREAD_NAME_STRATEGY);
119        final int ringBufferSize = calculateRingBufferSize();
120
121        final WaitStrategy waitStrategy = createWaitStrategy();
122        disruptor = new Disruptor<RingBufferLogEvent>(RingBufferLogEvent.FACTORY, ringBufferSize, executor,
123                ProducerType.MULTI, waitStrategy);
124        final EventHandler<RingBufferLogEvent>[] handlers = new RingBufferLogEventHandler[] {//
125        new RingBufferLogEventHandler() };
126        disruptor.handleExceptionsWith(getExceptionHandler());
127        disruptor.handleEventsWith(handlers);
128
129        LOGGER.debug("Starting AsyncLogger disruptor with ringbuffer size {}...", disruptor.getRingBuffer()
130                .getBufferSize());
131        disruptor.start();
132    }
133
134    private static int calculateRingBufferSize() {
135        int ringBufferSize = RINGBUFFER_DEFAULT_SIZE;
136        final String userPreferredRBSize = System.getProperty("AsyncLogger.RingBufferSize",
137                String.valueOf(ringBufferSize));
138        try {
139            int size = Integer.parseInt(userPreferredRBSize);
140            if (size < RINGBUFFER_MIN_SIZE) {
141                size = RINGBUFFER_MIN_SIZE;
142                LOGGER.warn("Invalid RingBufferSize {}, using minimum size {}.", userPreferredRBSize,
143                        RINGBUFFER_MIN_SIZE);
144            }
145            ringBufferSize = size;
146        } catch (final Exception ex) {
147            LOGGER.warn("Invalid RingBufferSize {}, using default size {}.", userPreferredRBSize, ringBufferSize);
148        }
149        return Util.ceilingNextPowerOfTwo(ringBufferSize);
150    }
151
152    /**
153     * Initialize an {@code Info} object that is threadlocal to the consumer/appender thread.
154     * This Info object uniquely has attribute {@code isAppenderThread} set to {@code true}.
155     * All other Info objects will have this attribute set to {@code false}.
156     * This allows us to detect Logger.log() calls initiated from the appender thread,
157     * which may cause deadlock when the RingBuffer is full. (LOG4J2-471)
158     */
159    private static void initInfoForExecutorThread() {
160        executor.submit(new Runnable(){
161            @Override
162            public void run() {
163                final boolean isAppenderThread = true;
164                final Info info = new Info(new RingBufferLogEventTranslator(), //
165                        Thread.currentThread().getName(), isAppenderThread);
166                threadlocalInfo.set(info);
167            }
168        });
169    }
170
171    private static WaitStrategy createWaitStrategy() {
172        final String strategy = System.getProperty("AsyncLogger.WaitStrategy");
173        LOGGER.debug("property AsyncLogger.WaitStrategy={}", strategy);
174        if ("Sleep".equals(strategy)) {
175            LOGGER.debug("disruptor event handler uses SleepingWaitStrategy");
176            return new SleepingWaitStrategy();
177        } else if ("Yield".equals(strategy)) {
178            LOGGER.debug("disruptor event handler uses YieldingWaitStrategy");
179            return new YieldingWaitStrategy();
180        } else if ("Block".equals(strategy)) {
181            LOGGER.debug("disruptor event handler uses BlockingWaitStrategy");
182            return new BlockingWaitStrategy();
183        }
184        LOGGER.debug("disruptor event handler uses SleepingWaitStrategy");
185        return new SleepingWaitStrategy();
186    }
187
188    private static ExceptionHandler getExceptionHandler() {
189        final String cls = System.getProperty("AsyncLogger.ExceptionHandler");
190        if (cls == null) {
191            LOGGER.debug("No AsyncLogger.ExceptionHandler specified");
192            return null;
193        }
194        try {
195            @SuppressWarnings("unchecked")
196            final Class<? extends ExceptionHandler> klass = (Class<? extends ExceptionHandler>) Class.forName(cls);
197            final ExceptionHandler result = klass.newInstance();
198            LOGGER.debug("AsyncLogger.ExceptionHandler=" + result);
199            return result;
200        } catch (final Exception ignored) {
201            LOGGER.debug("AsyncLogger.ExceptionHandler not set: error creating " + cls + ": ", ignored);
202            return null;
203        }
204    }
205
206    /**
207     * Constructs an {@code AsyncLogger} with the specified context, name and
208     * message factory.
209     * 
210     * @param context context of this logger
211     * @param name name of this logger
212     * @param messageFactory message factory of this logger
213     */
214    public AsyncLogger(final LoggerContext context, final String name, final MessageFactory messageFactory) {
215        super(context, name, messageFactory);
216    }
217
218    /**
219     * Tuple with the event translator and thread name for a thread.
220     */
221    static class Info {
222        private final RingBufferLogEventTranslator translator;
223        private final String cachedThreadName;
224        private final boolean isAppenderThread;
225        public Info(RingBufferLogEventTranslator translator, String threadName, boolean appenderThread) {
226            this.translator = translator;
227            this.cachedThreadName = threadName;
228            this.isAppenderThread = appenderThread;
229        }
230    }
231
232    @Override
233    public void log(final Marker marker, final String fqcn, final Level level, final Message data, final Throwable t) {
234        Info info = threadlocalInfo.get();
235        if (info == null) {
236            info = new Info(new RingBufferLogEventTranslator(), Thread.currentThread().getName(), false);
237            threadlocalInfo.set(info);
238        }
239        
240        // LOG4J2-471: prevent deadlock when RingBuffer is full and object
241        // being logged calls Logger.log() from its toString() method
242        if (info.isAppenderThread && disruptor.getRingBuffer().remainingCapacity() == 0) {
243            // bypass RingBuffer and invoke Appender directly
244            config.loggerConfig.log(getName(), marker, fqcn, level, data, t);
245            return;
246        }
247        final boolean includeLocation = config.loggerConfig.isIncludeLocation();
248        info.translator.setValues(this, getName(), marker, fqcn, level, data, t, //
249
250                // config properties are taken care of in the EventHandler
251                // thread in the #actualAsyncLog method
252
253                // needs shallow copy to be fast (LOG4J2-154)
254                ThreadContext.getImmutableContext(), //
255
256                // needs shallow copy to be fast (LOG4J2-154)
257                ThreadContext.getImmutableStack(), //
258
259                // Thread.currentThread().getName(), //
260                // info.cachedThreadName, //
261                THREAD_NAME_STRATEGY.getThreadName(info), // LOG4J2-467
262
263                // location: very expensive operation. LOG4J2-153:
264                // Only include if "includeLocation=true" is specified,
265                // exclude if not specified or if "false" was specified.
266                includeLocation ? location(fqcn) : null,
267
268                // System.currentTimeMillis());
269                // CoarseCachedClock: 20% faster than system clock, 16ms gaps
270                // CachedClock: 10% faster than system clock, smaller gaps
271                clock.currentTimeMillis());
272
273        disruptor.publishEvent(info.translator);
274    }
275
276    private StackTraceElement location(final String fqcnOfLogger) {
277        return Log4jLogEvent.calcLocation(fqcnOfLogger);
278    }
279
280    /**
281     * This method is called by the EventHandler that processes the
282     * RingBufferLogEvent in a separate thread.
283     * 
284     * @param event the event to log
285     */
286    public void actualAsyncLog(final RingBufferLogEvent event) {
287        final Map<Property, Boolean> properties = config.loggerConfig.getProperties();
288        event.mergePropertiesIntoContextMap(properties, config.config.getStrSubstitutor());
289        config.logEvent(event);
290    }
291
292    public static void stop() {
293        final Disruptor<RingBufferLogEvent> temp = disruptor;
294
295        // Must guarantee that publishing to the RingBuffer has stopped
296        // before we call disruptor.shutdown()
297        disruptor = null; // client code fails with NPE if log after stop = OK
298        if (temp == null) {
299            return; // stop() has already been called
300        }
301        temp.shutdown();
302
303        // wait up to 10 seconds for the ringbuffer to drain
304        final RingBuffer<RingBufferLogEvent> ringBuffer = temp.getRingBuffer();
305        for (int i = 0; i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
306            if (ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize())) {
307                break;
308            }
309            try {
310                // give ringbuffer some time to drain...
311                Thread.sleep(HALF_A_SECOND);
312            } catch (final InterruptedException e) {
313                // ignored
314            }
315        }
316        executor.shutdown(); // finally, kill the processor thread
317        threadlocalInfo.remove(); // LOG4J2-323
318    }
319
320    /**
321     * Creates and returns a new {@code RingBufferAdmin} that instruments the
322     * ringbuffer of the {@code AsyncLogger}.
323     * 
324     * @param contextName name of the global {@code AsyncLoggerContext}
325     */
326    public static RingBufferAdmin createRingBufferAdmin(String contextName) {
327        return RingBufferAdmin.forAsyncLogger(disruptor.getRingBuffer(), contextName);
328    }
329}