001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements. See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache license, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License. You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the license for the specific language governing permissions and
015     * limitations under the license.
016     */
017    package org.apache.logging.log4j.core.async;
018    
019    import java.util.Map;
020    import java.util.concurrent.ExecutorService;
021    import java.util.concurrent.Executors;
022    
023    import org.apache.logging.log4j.Level;
024    import org.apache.logging.log4j.Marker;
025    import org.apache.logging.log4j.ThreadContext;
026    import org.apache.logging.log4j.core.Logger;
027    import org.apache.logging.log4j.core.LoggerContext;
028    import org.apache.logging.log4j.core.config.Property;
029    import org.apache.logging.log4j.core.helpers.Clock;
030    import org.apache.logging.log4j.core.helpers.ClockFactory;
031    import org.apache.logging.log4j.core.impl.Log4jLogEvent;
032    import org.apache.logging.log4j.message.Message;
033    import org.apache.logging.log4j.message.MessageFactory;
034    import org.apache.logging.log4j.status.StatusLogger;
035    
036    import com.lmax.disruptor.BlockingWaitStrategy;
037    import com.lmax.disruptor.EventHandler;
038    import com.lmax.disruptor.ExceptionHandler;
039    import com.lmax.disruptor.RingBuffer;
040    import com.lmax.disruptor.SleepingWaitStrategy;
041    import com.lmax.disruptor.WaitStrategy;
042    import com.lmax.disruptor.YieldingWaitStrategy;
043    import com.lmax.disruptor.dsl.Disruptor;
044    import com.lmax.disruptor.dsl.ProducerType;
045    import com.lmax.disruptor.util.Util;
046    
047    /**
048     * AsyncLogger is a logger designed for high throughput and low latency logging.
049     * It does not perform any I/O in the calling (application) thread, but instead
050     * hands off the work to another thread as soon as possible. The actual logging
051     * is performed in the background thread. It uses the LMAX Disruptor library for
052     * inter-thread communication. (<a
053     * href="http://lmax-exchange.github.com/disruptor/"
054     * >http://lmax-exchange.github.com/disruptor/</a>)
055     * <p>
056     * To use AsyncLogger, specify the System property
057     * {@code -DLog4jContextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector}
058     * before you obtain a Logger, and all Loggers returned by LogManager.getLogger
059     * will be AsyncLoggers.
060     * <p>
061     * Note that for performance reasons, this logger does not include source
062     * location by default. You need to specify {@code includeLocation="true"} in
063     * the configuration or any %class, %location or %line conversion patterns in
064     * your log4j.xml configuration will produce either a "?" character or no output
065     * at all.
066     * <p>
067     * For best performance, use AsyncLogger with the FastFileAppender or
068     * FastRollingFileAppender, with immediateFlush=false. These appenders have
069     * built-in support for the batching mechanism used by the Disruptor library,
070     * and they will flush to disk at the end of each batch. This means that even
071     * with immediateFlush=false, there will never be any items left in the buffer;
072     * all log events will all be written to disk in a very efficient manner.
073     */
074    public class AsyncLogger extends Logger {
075        private static final int HALF_A_SECOND = 500;
076        private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 20;
077        private static final int RINGBUFFER_MIN_SIZE = 128;
078        private static final int RINGBUFFER_DEFAULT_SIZE = 256 * 1024;
079        private static final StatusLogger LOGGER = StatusLogger.getLogger();
080    
081        private static volatile Disruptor<RingBufferLogEvent> disruptor;
082        private static Clock clock = ClockFactory.getClock();
083    
084        private static ExecutorService executor = Executors
085                .newSingleThreadExecutor();
086        private ThreadLocal<Info> threadlocalInfo = new ThreadLocal<Info>();
087    
088        static {
089            int ringBufferSize = calculateRingBufferSize();
090    
091            WaitStrategy waitStrategy = createWaitStrategy();
092            disruptor = new Disruptor<RingBufferLogEvent>(
093                    RingBufferLogEvent.FACTORY, ringBufferSize, executor,
094                    ProducerType.MULTI, waitStrategy);
095            EventHandler<RingBufferLogEvent>[] handlers = new RingBufferLogEventHandler[] {//
096            new RingBufferLogEventHandler() };
097            disruptor.handleExceptionsWith(getExceptionHandler());
098            disruptor.handleEventsWith(handlers);
099    
100            LOGGER.debug(
101                    "Starting AsyncLogger disruptor with ringbuffer size {}...",
102                    disruptor.getRingBuffer().getBufferSize());
103            disruptor.start();
104        }
105    
106        private static int calculateRingBufferSize() {
107            int ringBufferSize = RINGBUFFER_DEFAULT_SIZE;
108            String userPreferredRBSize = System.getProperty(
109                    "AsyncLogger.RingBufferSize", String.valueOf(ringBufferSize));
110            try {
111                int size = Integer.parseInt(userPreferredRBSize);
112                if (size < RINGBUFFER_MIN_SIZE) {
113                    size = RINGBUFFER_MIN_SIZE;
114                    LOGGER.warn(
115                            "Invalid RingBufferSize {}, using minimum size {}.",
116                            userPreferredRBSize, RINGBUFFER_MIN_SIZE);
117                }
118                ringBufferSize = size;
119            } catch (Exception ex) {
120                LOGGER.warn("Invalid RingBufferSize {}, using default size {}.",
121                        userPreferredRBSize, ringBufferSize);
122            }
123            return Util.ceilingNextPowerOfTwo(ringBufferSize);
124        }
125    
126        private static WaitStrategy createWaitStrategy() {
127            String strategy = System.getProperty("AsyncLogger.WaitStrategy");
128            LOGGER.debug("property AsyncLogger.WaitStrategy={}", strategy);
129            if ("Sleep".equals(strategy)) {
130                LOGGER.debug("disruptor event handler uses SleepingWaitStrategy");
131                return new SleepingWaitStrategy();
132            } else if ("Yield".equals(strategy)) {
133                LOGGER.debug("disruptor event handler uses YieldingWaitStrategy");
134                return new YieldingWaitStrategy();
135            } else if ("Block".equals(strategy)) {
136                LOGGER.debug("disruptor event handler uses BlockingWaitStrategy");
137                return new BlockingWaitStrategy();
138            }
139            LOGGER.debug("disruptor event handler uses SleepingWaitStrategy");
140            return new SleepingWaitStrategy();
141        }
142    
143        private static ExceptionHandler getExceptionHandler() {
144            String cls = System.getProperty("AsyncLogger.ExceptionHandler");
145            if (cls == null) {
146                LOGGER.debug("No AsyncLogger.ExceptionHandler specified");
147                return null;
148            }
149            try {
150                @SuppressWarnings("unchecked")
151                Class<? extends ExceptionHandler> klass = (Class<? extends ExceptionHandler>) Class
152                        .forName(cls);
153                ExceptionHandler result = klass.newInstance();
154                LOGGER.debug("AsyncLogger.ExceptionHandler=" + result);
155                return result;
156            } catch (Exception ignored) {
157                LOGGER.debug(
158                        "AsyncLogger.ExceptionHandler not set: error creating "
159                                + cls + ": ", ignored);
160                return null;
161            }
162        }
163    
164        /**
165         * Constructs an {@code AsyncLogger} with the specified context, name and
166         * message factory.
167         * 
168         * @param context context of this logger
169         * @param name name of this logger
170         * @param messageFactory message factory of this logger
171         */
172        public AsyncLogger(LoggerContext context, String name,
173                MessageFactory messageFactory) {
174            super(context, name, messageFactory);
175        }
176    
177        /**
178         * Tuple with the event translator and thread name for a thread.
179         */
180        private static class Info {
181            private RingBufferLogEventTranslator translator;
182            private String cachedThreadName;
183        }
184    
185        @Override
186        public void log(Marker marker, String fqcn, Level level, Message data,
187                Throwable t) {
188            Info info = threadlocalInfo.get();
189            if (info == null) {
190                info = new Info();
191                info.translator = new RingBufferLogEventTranslator();
192                info.cachedThreadName = Thread.currentThread().getName();
193                threadlocalInfo.set(info);
194            }
195    
196            boolean includeLocation = config.loggerConfig.isIncludeLocation();
197            info.translator.setValues(this, getName(), marker, fqcn, level, data,
198                    t, //
199    
200                    // config properties are taken care of in the EventHandler
201                    // thread in the #actualAsyncLog method
202    
203                    // needs shallow copy to be fast (LOG4J2-154)
204                    ThreadContext.getImmutableContext(), //
205    
206                    // needs shallow copy to be fast (LOG4J2-154)
207                    ThreadContext.getImmutableStack(), //
208    
209                    // Thread.currentThread().getName(), //
210                    info.cachedThreadName, //
211    
212                    // location: very expensive operation. LOG4J2-153:
213                    // Only include if "includeLocation=true" is specified,
214                    // exclude if not specified or if "false" was specified.
215                    includeLocation ? location(fqcn) : null,
216    
217                    // System.currentTimeMillis());
218                    // CoarseCachedClock: 20% faster than system clock, 16ms gaps
219                    // CachedClock: 10% faster than system clock, smaller gaps
220                    clock.currentTimeMillis());
221    
222            disruptor.publishEvent(info.translator);
223        }
224    
225        private StackTraceElement location(String fqcnOfLogger) {
226            return Log4jLogEvent.calcLocation(fqcnOfLogger);
227        }
228    
229        /**
230         * This method is called by the EventHandler that processes the
231         * RingBufferLogEvent in a separate thread.
232         * 
233         * @param event the event to log
234         */
235        public void actualAsyncLog(RingBufferLogEvent event) {
236            Map<Property, Boolean> properties = config.loggerConfig.getProperties();
237            event.mergePropertiesIntoContextMap(properties,
238                    config.config.getSubst());
239            config.logEvent(event);
240        }
241    
242        public static void stop() {
243            Disruptor<RingBufferLogEvent> temp = disruptor;
244    
245            // Must guarantee that publishing to the RingBuffer has stopped
246            // before we call disruptor.shutdown()
247            disruptor = null; // client code fails with NPE if log after stop = OK
248            temp.shutdown();
249    
250            // wait up to 10 seconds for the ringbuffer to drain
251            RingBuffer<RingBufferLogEvent> ringBuffer = temp.getRingBuffer();
252            for (int i = 0; i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
253                if (ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize())) {
254                    break;
255                }
256                try {
257                    // give ringbuffer some time to drain...
258                    Thread.sleep(HALF_A_SECOND);
259                } catch (InterruptedException e) {
260                    // ignored
261                }
262            }
263            executor.shutdown(); // finally, kill the processor thread
264        }
265    }