001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements. See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache license, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License. You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the license for the specific language governing permissions and
015     * limitations under the license.
016     */
017    package org.apache.logging.log4j.core.async;
018    
019    import java.util.Map;
020    import java.util.concurrent.ExecutorService;
021    import java.util.concurrent.Executors;
022    
023    import org.apache.logging.log4j.Level;
024    import org.apache.logging.log4j.Marker;
025    import org.apache.logging.log4j.ThreadContext;
026    import org.apache.logging.log4j.core.Logger;
027    import org.apache.logging.log4j.core.LoggerContext;
028    import org.apache.logging.log4j.core.config.Property;
029    import org.apache.logging.log4j.core.impl.Log4jLogEvent;
030    import org.apache.logging.log4j.core.jmx.RingBufferAdmin;
031    import org.apache.logging.log4j.core.util.Clock;
032    import org.apache.logging.log4j.core.util.ClockFactory;
033    import org.apache.logging.log4j.core.util.Loader;
034    import org.apache.logging.log4j.message.Message;
035    import org.apache.logging.log4j.message.MessageFactory;
036    import org.apache.logging.log4j.status.StatusLogger;
037    
038    import com.lmax.disruptor.BlockingWaitStrategy;
039    import com.lmax.disruptor.ExceptionHandler;
040    import com.lmax.disruptor.RingBuffer;
041    import com.lmax.disruptor.SleepingWaitStrategy;
042    import com.lmax.disruptor.WaitStrategy;
043    import com.lmax.disruptor.YieldingWaitStrategy;
044    import com.lmax.disruptor.dsl.Disruptor;
045    import com.lmax.disruptor.dsl.ProducerType;
046    import com.lmax.disruptor.util.Util;
047    
048    /**
049     * AsyncLogger is a logger designed for high throughput and low latency logging.
050     * It does not perform any I/O in the calling (application) thread, but instead
051     * hands off the work to another thread as soon as possible. The actual logging
052     * is performed in the background thread. It uses the LMAX Disruptor library for
053     * inter-thread communication. (<a
054     * href="http://lmax-exchange.github.com/disruptor/"
055     * >http://lmax-exchange.github.com/disruptor/</a>)
056     * <p>
057     * To use AsyncLogger, specify the System property
058     * {@code -DLog4jContextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector}
059     * before you obtain a Logger, and all Loggers returned by LogManager.getLogger
060     * will be AsyncLoggers.
061     * <p>
062     * Note that for performance reasons, this logger does not include source
063     * location by default. You need to specify {@code includeLocation="true"} in
064     * the configuration or any %class, %location or %line conversion patterns in
065     * your log4j.xml configuration will produce either a "?" character or no output
066     * at all.
067     * <p>
068     * For best performance, use AsyncLogger with the RandomAccessFileAppender or
069     * RollingRandomAccessFileAppender, with immediateFlush=false. These appenders
070     * have built-in support for the batching mechanism used by the Disruptor
071     * library, and they will flush to disk at the end of each batch. This means
072     * that even with immediateFlush=false, there will never be any items left in
073     * the buffer; all log events will all be written to disk in a very efficient
074     * manner.
075     */
076    public class AsyncLogger extends Logger {
077        private static final long serialVersionUID = 1L;
078        private static final int SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS = 50;
079        private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 200;
080        private static final int RINGBUFFER_MIN_SIZE = 128;
081        private static final int RINGBUFFER_DEFAULT_SIZE = 256 * 1024;
082        private static final StatusLogger LOGGER = StatusLogger.getLogger();
083        private static final ThreadNameStrategy THREAD_NAME_STRATEGY = ThreadNameStrategy.create();
084        private static final ThreadLocal<Info> threadlocalInfo = new ThreadLocal<Info>();
085    
086        static enum ThreadNameStrategy { // LOG4J2-467
087            CACHED {
088                @Override
089                public String getThreadName(final Info info) {
090                    return info.cachedThreadName;
091                }
092            },
093            UNCACHED {
094                @Override
095                public String getThreadName(final Info info) {
096                    return Thread.currentThread().getName();
097                }
098            };
099            abstract String getThreadName(Info info);
100    
101            static ThreadNameStrategy create() {
102                final String name = System.getProperty("AsyncLogger.ThreadNameStrategy", CACHED.name());
103                try {
104                    return ThreadNameStrategy.valueOf(name);
105                } catch (final Exception ex) {
106                    LOGGER.debug("Using AsyncLogger.ThreadNameStrategy.CACHED: '{}' not valid: {}", name, ex.toString());
107                    return CACHED;
108                }
109            }
110        }
111        private static volatile Disruptor<RingBufferLogEvent> disruptor;
112        private static final Clock clock = ClockFactory.getClock();
113    
114        private static final ExecutorService executor = Executors
115                .newSingleThreadExecutor(new DaemonThreadFactory("AsyncLogger-"));
116    
117        static {
118            initInfoForExecutorThread();
119            LOGGER.debug("AsyncLogger.ThreadNameStrategy={}", THREAD_NAME_STRATEGY);
120            final int ringBufferSize = calculateRingBufferSize();
121    
122            final WaitStrategy waitStrategy = createWaitStrategy();
123            disruptor = new Disruptor<RingBufferLogEvent>(RingBufferLogEvent.FACTORY, ringBufferSize, executor,
124                    ProducerType.MULTI, waitStrategy);
125            disruptor.handleExceptionsWith(getExceptionHandler());
126            disruptor.handleEventsWith(new RingBufferLogEventHandler());
127    
128            LOGGER.debug("Starting AsyncLogger disruptor with ringbuffer size {}...", disruptor.getRingBuffer()
129                    .getBufferSize());
130            disruptor.start();
131        }
132    
133        private static int calculateRingBufferSize() {
134            int ringBufferSize = RINGBUFFER_DEFAULT_SIZE;
135            final String userPreferredRBSize = System.getProperty("AsyncLogger.RingBufferSize",
136                    String.valueOf(ringBufferSize));
137            try {
138                int size = Integer.parseInt(userPreferredRBSize);
139                if (size < RINGBUFFER_MIN_SIZE) {
140                    size = RINGBUFFER_MIN_SIZE;
141                    LOGGER.warn("Invalid RingBufferSize {}, using minimum size {}.", userPreferredRBSize,
142                            RINGBUFFER_MIN_SIZE);
143                }
144                ringBufferSize = size;
145            } catch (final Exception ex) {
146                LOGGER.warn("Invalid RingBufferSize {}, using default size {}.", userPreferredRBSize, ringBufferSize);
147            }
148            return Util.ceilingNextPowerOfTwo(ringBufferSize);
149        }
150    
151        /**
152         * Initialize an {@code Info} object that is threadlocal to the consumer/appender thread.
153         * This Info object uniquely has attribute {@code isAppenderThread} set to {@code true}.
154         * All other Info objects will have this attribute set to {@code false}.
155         * This allows us to detect Logger.log() calls initiated from the appender thread,
156         * which may cause deadlock when the RingBuffer is full. (LOG4J2-471)
157         */
158        private static void initInfoForExecutorThread() {
159            executor.submit(new Runnable(){
160                @Override
161                public void run() {
162                    final boolean isAppenderThread = true;
163                    final Info info = new Info(new RingBufferLogEventTranslator(), //
164                            Thread.currentThread().getName(), isAppenderThread);
165                    threadlocalInfo.set(info);
166                }
167            });
168        }
169    
170        private static WaitStrategy createWaitStrategy() {
171            final String strategy = System.getProperty("AsyncLogger.WaitStrategy");
172            LOGGER.debug("property AsyncLogger.WaitStrategy={}", strategy);
173            if ("Sleep".equals(strategy)) {
174                return new SleepingWaitStrategy();
175            } else if ("Yield".equals(strategy)) {
176                return new YieldingWaitStrategy();
177            } else if ("Block".equals(strategy)) {
178                return new BlockingWaitStrategy();
179            }
180            LOGGER.debug("disruptor event handler uses BlockingWaitStrategy");
181            return new BlockingWaitStrategy();
182        }
183    
184        private static ExceptionHandler getExceptionHandler() {
185            final String cls = System.getProperty("AsyncLogger.ExceptionHandler");
186            if (cls == null) {
187                LOGGER.debug("No AsyncLogger.ExceptionHandler specified");
188                return null;
189            }
190            try {
191                final ExceptionHandler result = Loader.newCheckedInstanceOf(cls, ExceptionHandler.class);
192                LOGGER.debug("AsyncLogger.ExceptionHandler={}", result);
193                return result;
194            } catch (final Exception ignored) {
195                LOGGER.debug("AsyncLogger.ExceptionHandler not set: error creating " + cls + ": ", ignored);
196                return null;
197            }
198        }
199    
200        /**
201         * Constructs an {@code AsyncLogger} with the specified context, name and
202         * message factory.
203         *
204         * @param context context of this logger
205         * @param name name of this logger
206         * @param messageFactory message factory of this logger
207         */
208        public AsyncLogger(final LoggerContext context, final String name, final MessageFactory messageFactory) {
209            super(context, name, messageFactory);
210        }
211    
212        /**
213         * Tuple with the event translator and thread name for a thread.
214         */
215        static class Info {
216            private final RingBufferLogEventTranslator translator;
217            private final String cachedThreadName;
218            private final boolean isAppenderThread;
219            public Info(final RingBufferLogEventTranslator translator, final String threadName, final boolean appenderThread) {
220                this.translator = translator;
221                this.cachedThreadName = threadName;
222                this.isAppenderThread = appenderThread;
223            }
224        }
225    
226        @Override
227        public void logMessage(final String fqcn, final Level level, final Marker marker, final Message message, final Throwable thrown) {
228            Info info = threadlocalInfo.get();
229            if (info == null) {
230                info = new Info(new RingBufferLogEventTranslator(), Thread.currentThread().getName(), false);
231                threadlocalInfo.set(info);
232            }
233            
234            Disruptor<RingBufferLogEvent> temp = disruptor;
235            if (temp == null) { // LOG4J2-639
236                LOGGER.fatal("Ignoring log event after log4j was shut down");
237                return;
238            }
239    
240            // LOG4J2-471: prevent deadlock when RingBuffer is full and object
241            // being logged calls Logger.log() from its toString() method
242            if (info.isAppenderThread && temp.getRingBuffer().remainingCapacity() == 0) {
243                // bypass RingBuffer and invoke Appender directly
244                config.loggerConfig.log(getName(), fqcn, marker, level, message, thrown);
245                return;
246            }
247            final boolean includeLocation = config.loggerConfig.isIncludeLocation();
248            info.translator.setValues(this, getName(), marker, fqcn, level, message, //
249                    // don't construct ThrowableProxy until required
250                    thrown, //
251    
252                    // config properties are taken care of in the EventHandler
253                    // thread in the #actualAsyncLog method
254    
255                    // needs shallow copy to be fast (LOG4J2-154)
256                    ThreadContext.getImmutableContext(), //
257    
258                    // needs shallow copy to be fast (LOG4J2-154)
259                    ThreadContext.getImmutableStack(), //
260    
261                    // Thread.currentThread().getName(), //
262                    // info.cachedThreadName, //
263                    THREAD_NAME_STRATEGY.getThreadName(info), // LOG4J2-467
264    
265                    // location: very expensive operation. LOG4J2-153:
266                    // Only include if "includeLocation=true" is specified,
267                    // exclude if not specified or if "false" was specified.
268                    includeLocation ? location(fqcn) : null,
269    
270                    // System.currentTimeMillis());
271                    // CoarseCachedClock: 20% faster than system clock, 16ms gaps
272                    // CachedClock: 10% faster than system clock, smaller gaps
273                    clock.currentTimeMillis());
274    
275            // LOG4J2-639: catch NPE if disruptor field was set to null after our check above
276            try {
277                // Note: do NOT use the temp variable above!
278                // That could result in adding a log event to the disruptor after it was shut down,
279                // which could cause the publishEvent method to hang and never return.
280                disruptor.publishEvent(info.translator);
281            } catch (NullPointerException npe) {
282                LOGGER.fatal("Ignoring log event after log4j was shut down.");
283            }
284        }
285    
286        private static StackTraceElement location(final String fqcnOfLogger) {
287            return Log4jLogEvent.calcLocation(fqcnOfLogger);
288        }
289    
290        /**
291         * This method is called by the EventHandler that processes the
292         * RingBufferLogEvent in a separate thread.
293         *
294         * @param event the event to log
295         */
296        public void actualAsyncLog(final RingBufferLogEvent event) {
297            final Map<Property, Boolean> properties = config.loggerConfig.getProperties();
298            event.mergePropertiesIntoContextMap(properties, config.config.getStrSubstitutor());
299            config.logEvent(event);
300        }
301    
302        public static void stop() {
303            final Disruptor<RingBufferLogEvent> temp = disruptor;
304    
305            // Must guarantee that publishing to the RingBuffer has stopped
306            // before we call disruptor.shutdown()
307            disruptor = null; // client code fails with NPE if log after stop = OK
308            if (temp == null) {
309                return; // stop() has already been called
310            }
311    
312            // Calling Disruptor.shutdown() will wait until all enqueued events are fully processed,
313            // but this waiting happens in a busy-spin. To avoid (postpone) wasting CPU,
314            // we sleep in short chunks, up to 10 seconds, waiting for the ringbuffer to drain.
315            for (int i = 0; hasBacklog(temp) && i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
316                try {
317                    Thread.sleep(SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS); // give up the CPU for a while
318                } catch (final InterruptedException e) { // ignored
319                }
320            }
321            temp.shutdown(); // busy-spins until all events currently in the disruptor have been processed
322            executor.shutdown(); // finally, kill the processor thread
323            threadlocalInfo.remove(); // LOG4J2-323
324        }
325    
326        /**
327         * Returns {@code true} if the specified disruptor still has unprocessed events.
328         */
329        private static boolean hasBacklog(Disruptor<?> disruptor) {
330            final RingBuffer<?> ringBuffer = disruptor.getRingBuffer();
331            return !ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize());
332        }
333    
334        /**
335         * Creates and returns a new {@code RingBufferAdmin} that instruments the
336         * ringbuffer of the {@code AsyncLogger}.
337         *
338         * @param contextName name of the global {@code AsyncLoggerContext}
339         */
340        public static RingBufferAdmin createRingBufferAdmin(final String contextName) {
341            return RingBufferAdmin.forAsyncLogger(disruptor.getRingBuffer(), contextName);
342        }
343    }