001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements. See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache license, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License. You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the license for the specific language governing permissions and
015     * limitations under the license.
016     */
017    package org.apache.logging.log4j.core.async;
018    
019    import java.util.Map;
020    import java.util.concurrent.ExecutorService;
021    import java.util.concurrent.Executors;
022    
023    import org.apache.logging.log4j.Level;
024    import org.apache.logging.log4j.Marker;
025    import org.apache.logging.log4j.ThreadContext;
026    import org.apache.logging.log4j.core.Logger;
027    import org.apache.logging.log4j.core.LoggerContext;
028    import org.apache.logging.log4j.core.config.Property;
029    import org.apache.logging.log4j.core.impl.Log4jLogEvent;
030    import org.apache.logging.log4j.core.jmx.RingBufferAdmin;
031    import org.apache.logging.log4j.core.util.Clock;
032    import org.apache.logging.log4j.core.util.ClockFactory;
033    import org.apache.logging.log4j.core.util.Loader;
034    import org.apache.logging.log4j.message.Message;
035    import org.apache.logging.log4j.message.MessageFactory;
036    import org.apache.logging.log4j.message.TimestampMessage;
037    import org.apache.logging.log4j.status.StatusLogger;
038    
039    import com.lmax.disruptor.BlockingWaitStrategy;
040    import com.lmax.disruptor.ExceptionHandler;
041    import com.lmax.disruptor.RingBuffer;
042    import com.lmax.disruptor.SleepingWaitStrategy;
043    import com.lmax.disruptor.WaitStrategy;
044    import com.lmax.disruptor.YieldingWaitStrategy;
045    import com.lmax.disruptor.dsl.Disruptor;
046    import com.lmax.disruptor.dsl.ProducerType;
047    import com.lmax.disruptor.util.Util;
048    
049    /**
050     * AsyncLogger is a logger designed for high throughput and low latency logging.
051     * It does not perform any I/O in the calling (application) thread, but instead
052     * hands off the work to another thread as soon as possible. The actual logging
053     * is performed in the background thread. It uses the LMAX Disruptor library for
054     * inter-thread communication. (<a
055     * href="http://lmax-exchange.github.com/disruptor/"
056     * >http://lmax-exchange.github.com/disruptor/</a>)
057     * <p>
058     * To use AsyncLogger, specify the System property
059     * {@code -DLog4jContextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector}
060     * before you obtain a Logger, and all Loggers returned by LogManager.getLogger
061     * will be AsyncLoggers.
062     * <p>
063     * Note that for performance reasons, this logger does not include source
064     * location by default. You need to specify {@code includeLocation="true"} in
065     * the configuration or any %class, %location or %line conversion patterns in
066     * your log4j.xml configuration will produce either a "?" character or no output
067     * at all.
068     * <p>
069     * For best performance, use AsyncLogger with the RandomAccessFileAppender or
070     * RollingRandomAccessFileAppender, with immediateFlush=false. These appenders
071     * have built-in support for the batching mechanism used by the Disruptor
072     * library, and they will flush to disk at the end of each batch. This means
073     * that even with immediateFlush=false, there will never be any items left in
074     * the buffer; all log events will all be written to disk in a very efficient
075     * manner.
076     */
077    public class AsyncLogger extends Logger {
078        private static final long serialVersionUID = 1L;
079        private static final int SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS = 50;
080        private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 200;
081        private static final int RINGBUFFER_MIN_SIZE = 128;
082        private static final int RINGBUFFER_DEFAULT_SIZE = 256 * 1024;
083        private static final StatusLogger LOGGER = StatusLogger.getLogger();
084        private static final ThreadNameStrategy THREAD_NAME_STRATEGY = ThreadNameStrategy.create();
085        private static final ThreadLocal<Info> threadlocalInfo = new ThreadLocal<Info>();
086    
087        static enum ThreadNameStrategy { // LOG4J2-467
088            CACHED {
089                @Override
090                public String getThreadName(final Info info) {
091                    return info.cachedThreadName;
092                }
093            },
094            UNCACHED {
095                @Override
096                public String getThreadName(final Info info) {
097                    return Thread.currentThread().getName();
098                }
099            };
100            abstract String getThreadName(Info info);
101    
102            static ThreadNameStrategy create() {
103                final String name = System.getProperty("AsyncLogger.ThreadNameStrategy", CACHED.name());
104                try {
105                    return ThreadNameStrategy.valueOf(name);
106                } catch (final Exception ex) {
107                    LOGGER.debug("Using AsyncLogger.ThreadNameStrategy.CACHED: '{}' not valid: {}", name, ex.toString());
108                    return CACHED;
109                }
110            }
111        }
112        private static volatile Disruptor<RingBufferLogEvent> disruptor;
113        private static final Clock clock = ClockFactory.getClock();
114    
115        private static final ExecutorService executor = Executors
116                .newSingleThreadExecutor(new DaemonThreadFactory("AsyncLogger-"));
117    
118        static {
119            initInfoForExecutorThread();
120            LOGGER.debug("AsyncLogger.ThreadNameStrategy={}", THREAD_NAME_STRATEGY);
121            final int ringBufferSize = calculateRingBufferSize();
122    
123            final WaitStrategy waitStrategy = createWaitStrategy();
124            disruptor = new Disruptor<RingBufferLogEvent>(RingBufferLogEvent.FACTORY, ringBufferSize, executor,
125                    ProducerType.MULTI, waitStrategy);
126            disruptor.handleExceptionsWith(getExceptionHandler());
127            disruptor.handleEventsWith(new RingBufferLogEventHandler());
128    
129            LOGGER.debug("Starting AsyncLogger disruptor with ringbuffer size {}...", disruptor.getRingBuffer()
130                    .getBufferSize());
131            disruptor.start();
132        }
133    
134        private static int calculateRingBufferSize() {
135            int ringBufferSize = RINGBUFFER_DEFAULT_SIZE;
136            final String userPreferredRBSize = System.getProperty("AsyncLogger.RingBufferSize",
137                    String.valueOf(ringBufferSize));
138            try {
139                int size = Integer.parseInt(userPreferredRBSize);
140                if (size < RINGBUFFER_MIN_SIZE) {
141                    size = RINGBUFFER_MIN_SIZE;
142                    LOGGER.warn("Invalid RingBufferSize {}, using minimum size {}.", userPreferredRBSize,
143                            RINGBUFFER_MIN_SIZE);
144                }
145                ringBufferSize = size;
146            } catch (final Exception ex) {
147                LOGGER.warn("Invalid RingBufferSize {}, using default size {}.", userPreferredRBSize, ringBufferSize);
148            }
149            return Util.ceilingNextPowerOfTwo(ringBufferSize);
150        }
151    
152        /**
153         * Initialize an {@code Info} object that is threadlocal to the consumer/appender thread.
154         * This Info object uniquely has attribute {@code isAppenderThread} set to {@code true}.
155         * All other Info objects will have this attribute set to {@code false}.
156         * This allows us to detect Logger.log() calls initiated from the appender thread,
157         * which may cause deadlock when the RingBuffer is full. (LOG4J2-471)
158         */
159        private static void initInfoForExecutorThread() {
160            executor.submit(new Runnable(){
161                @Override
162                public void run() {
163                    final boolean isAppenderThread = true;
164                    final Info info = new Info(new RingBufferLogEventTranslator(), //
165                            Thread.currentThread().getName(), isAppenderThread);
166                    threadlocalInfo.set(info);
167                }
168            });
169        }
170    
171        private static WaitStrategy createWaitStrategy() {
172            final String strategy = System.getProperty("AsyncLogger.WaitStrategy");
173            LOGGER.debug("property AsyncLogger.WaitStrategy={}", strategy);
174            if ("Sleep".equals(strategy)) {
175                return new SleepingWaitStrategy();
176            } else if ("Yield".equals(strategy)) {
177                return new YieldingWaitStrategy();
178            } else if ("Block".equals(strategy)) {
179                return new BlockingWaitStrategy();
180            }
181            LOGGER.debug("disruptor event handler uses BlockingWaitStrategy");
182            return new BlockingWaitStrategy();
183        }
184    
185        private static ExceptionHandler getExceptionHandler() {
186            final String cls = System.getProperty("AsyncLogger.ExceptionHandler");
187            if (cls == null) {
188                LOGGER.debug("No AsyncLogger.ExceptionHandler specified");
189                return null;
190            }
191            try {
192                final ExceptionHandler result = Loader.newCheckedInstanceOf(cls, ExceptionHandler.class);
193                LOGGER.debug("AsyncLogger.ExceptionHandler={}", result);
194                return result;
195            } catch (final Exception ignored) {
196                LOGGER.debug("AsyncLogger.ExceptionHandler not set: error creating " + cls + ": ", ignored);
197                return null;
198            }
199        }
200    
201        /**
202         * Constructs an {@code AsyncLogger} with the specified context, name and
203         * message factory.
204         *
205         * @param context context of this logger
206         * @param name name of this logger
207         * @param messageFactory message factory of this logger
208         */
209        public AsyncLogger(final LoggerContext context, final String name, final MessageFactory messageFactory) {
210            super(context, name, messageFactory);
211        }
212    
213        /**
214         * Tuple with the event translator and thread name for a thread.
215         */
216        static class Info {
217            private final RingBufferLogEventTranslator translator;
218            private final String cachedThreadName;
219            private final boolean isAppenderThread;
220            public Info(final RingBufferLogEventTranslator translator, final String threadName, final boolean appenderThread) {
221                this.translator = translator;
222                this.cachedThreadName = threadName;
223                this.isAppenderThread = appenderThread;
224            }
225        }
226    
227        @Override
228        public void logMessage(final String fqcn, final Level level, final Marker marker, final Message message, final Throwable thrown) {
229            // TODO refactor to reduce size to <= 35 bytecodes to allow JVM to inline it
230            Info info = threadlocalInfo.get();
231            if (info == null) {
232                info = new Info(new RingBufferLogEventTranslator(), Thread.currentThread().getName(), false);
233                threadlocalInfo.set(info);
234            }
235            
236            final Disruptor<RingBufferLogEvent> temp = disruptor;
237            if (temp == null) { // LOG4J2-639
238                LOGGER.fatal("Ignoring log event after log4j was shut down");
239                return;
240            }
241    
242            // LOG4J2-471: prevent deadlock when RingBuffer is full and object
243            // being logged calls Logger.log() from its toString() method
244            if (info.isAppenderThread && temp.getRingBuffer().remainingCapacity() == 0) {
245                // bypass RingBuffer and invoke Appender directly
246                config.loggerConfig.log(getName(), fqcn, marker, level, message, thrown);
247                return;
248            }
249            message.getFormattedMessage(); // LOG4J2-763: ask message to freeze parameters
250            final boolean includeLocation = config.loggerConfig.isIncludeLocation();
251            info.translator.setValues(this, getName(), marker, fqcn, level, message, //
252                    // don't construct ThrowableProxy until required
253                    thrown, //
254    
255                    // config properties are taken care of in the EventHandler
256                    // thread in the #actualAsyncLog method
257    
258                    // needs shallow copy to be fast (LOG4J2-154)
259                    ThreadContext.getImmutableContext(), //
260    
261                    // needs shallow copy to be fast (LOG4J2-154)
262                    ThreadContext.getImmutableStack(), //
263    
264                    // Thread.currentThread().getName(), //
265                    // info.cachedThreadName, //
266                    THREAD_NAME_STRATEGY.getThreadName(info), // LOG4J2-467
267    
268                    // location: very expensive operation. LOG4J2-153:
269                    // Only include if "includeLocation=true" is specified,
270                    // exclude if not specified or if "false" was specified.
271                    includeLocation ? location(fqcn) : null,
272    
273                    // System.currentTimeMillis());
274                    // CoarseCachedClock: 20% faster than system clock, 16ms gaps
275                    // CachedClock: 10% faster than system clock, smaller gaps
276                    // LOG4J2-744 avoid calling clock altogether if message has the timestamp
277                    message instanceof TimestampMessage ? ((TimestampMessage) message).getTimestamp() :
278                            clock.currentTimeMillis());
279    
280            // LOG4J2-639: catch NPE if disruptor field was set to null after our check above
281            try {
282                // Note: do NOT use the temp variable above!
283                // That could result in adding a log event to the disruptor after it was shut down,
284                // which could cause the publishEvent method to hang and never return.
285                disruptor.publishEvent(info.translator);
286            } catch (final NullPointerException npe) {
287                LOGGER.fatal("Ignoring log event after log4j was shut down.");
288            }
289        }
290    
291        private static StackTraceElement location(final String fqcnOfLogger) {
292            return Log4jLogEvent.calcLocation(fqcnOfLogger);
293        }
294    
295        /**
296         * This method is called by the EventHandler that processes the
297         * RingBufferLogEvent in a separate thread.
298         *
299         * @param event the event to log
300         */
301        public void actualAsyncLog(final RingBufferLogEvent event) {
302            final Map<Property, Boolean> properties = config.loggerConfig.getProperties();
303            event.mergePropertiesIntoContextMap(properties, config.config.getStrSubstitutor());
304            config.logEvent(event);
305        }
306    
307        public static void stop() {
308            final Disruptor<RingBufferLogEvent> temp = disruptor;
309    
310            // Must guarantee that publishing to the RingBuffer has stopped
311            // before we call disruptor.shutdown()
312            disruptor = null; // client code fails with NPE if log after stop = OK
313            if (temp == null) {
314                return; // stop() has already been called
315            }
316    
317            // Calling Disruptor.shutdown() will wait until all enqueued events are fully processed,
318            // but this waiting happens in a busy-spin. To avoid (postpone) wasting CPU,
319            // we sleep in short chunks, up to 10 seconds, waiting for the ringbuffer to drain.
320            for (int i = 0; hasBacklog(temp) && i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
321                try {
322                    Thread.sleep(SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS); // give up the CPU for a while
323                } catch (final InterruptedException e) { // ignored
324                }
325            }
326            temp.shutdown(); // busy-spins until all events currently in the disruptor have been processed
327            executor.shutdown(); // finally, kill the processor thread
328            threadlocalInfo.remove(); // LOG4J2-323
329        }
330    
331        /**
332         * Returns {@code true} if the specified disruptor still has unprocessed events.
333         */
334        private static boolean hasBacklog(final Disruptor<?> disruptor) {
335            final RingBuffer<?> ringBuffer = disruptor.getRingBuffer();
336            return !ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize());
337        }
338    
339        /**
340         * Creates and returns a new {@code RingBufferAdmin} that instruments the
341         * ringbuffer of the {@code AsyncLogger}.
342         *
343         * @param contextName name of the global {@code AsyncLoggerContext}
344         */
345        public static RingBufferAdmin createRingBufferAdmin(final String contextName) {
346            return RingBufferAdmin.forAsyncLogger(disruptor.getRingBuffer(), contextName);
347        }
348    }