001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017package org.apache.logging.log4j.core.async;
018
019import java.util.concurrent.ExecutorService;
020import java.util.concurrent.Executors;
021import java.util.concurrent.ThreadFactory;
022
023import org.apache.logging.log4j.Logger;
024import org.apache.logging.log4j.core.LogEvent;
025import org.apache.logging.log4j.core.jmx.RingBufferAdmin;
026import org.apache.logging.log4j.core.util.Constants;
027import org.apache.logging.log4j.status.StatusLogger;
028
029import com.lmax.disruptor.EventFactory;
030import com.lmax.disruptor.EventTranslatorTwoArg;
031import com.lmax.disruptor.ExceptionHandler;
032import com.lmax.disruptor.RingBuffer;
033import com.lmax.disruptor.Sequence;
034import com.lmax.disruptor.SequenceReportingEventHandler;
035import com.lmax.disruptor.WaitStrategy;
036import com.lmax.disruptor.dsl.Disruptor;
037import com.lmax.disruptor.dsl.ProducerType;
038
039/**
040 * Helper class decoupling the {@code AsyncLoggerConfig} class from the LMAX Disruptor library.
041 * <p>
042 * {@code AsyncLoggerConfig} is a plugin, and will be loaded even if users do not configure any {@code <asyncLogger>} or
043 * {@code <asyncRoot>} elements in the configuration. If {@code AsyncLoggerConfig} has inner classes that extend or
044 * implement classes from the Disruptor library, a {@code NoClassDefFoundError} is thrown if the Disruptor jar is not in
045 * the classpath when the PluginManager loads the {@code AsyncLoggerConfig} plugin from the pre-defined plugins
046 * definition file.
047 * <p>
048 * This class serves to make the dependency on the Disruptor optional, so that these classes are only loaded when the
049 * {@code AsyncLoggerConfig} is actually used.
050 */
051public class AsyncLoggerConfigDisruptor implements AsyncLoggerConfigDelegate {
052
053    private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 200;
054    private static final int SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS = 50;
055    private static final Logger LOGGER = StatusLogger.getLogger();
056
057    /**
058     * RingBuffer events contain all information necessary to perform the work in a separate thread.
059     */
060    private static class Log4jEventWrapper {
061        private AsyncLoggerConfig loggerConfig;
062        private LogEvent event;
063
064        /**
065         * Release references held by ring buffer to allow objects to be garbage-collected.
066         */
067        public void clear() {
068            loggerConfig = null;
069            event = null;
070        }
071    }
072
073    /**
074     * EventHandler performs the work in a separate thread.
075     */
076    private static class Log4jEventWrapperHandler implements SequenceReportingEventHandler<Log4jEventWrapper> {
077        private static final int NOTIFY_PROGRESS_THRESHOLD = 50;
078        private Sequence sequenceCallback;
079        private int counter;
080
081        @Override
082        public void setSequenceCallback(final Sequence sequenceCallback) {
083            this.sequenceCallback = sequenceCallback;
084        }
085
086        @Override
087        public void onEvent(final Log4jEventWrapper event, final long sequence, final boolean endOfBatch)
088                throws Exception {
089            event.event.setEndOfBatch(endOfBatch);
090            event.loggerConfig.asyncCallAppenders(event.event);
091            event.clear();
092
093            notifyIntermediateProgress(sequence);
094        }
095
096        /**
097         * Notify the BatchEventProcessor that the sequence has progressed. Without this callback the sequence would not
098         * be progressed until the batch has completely finished.
099         */
100        private void notifyIntermediateProgress(final long sequence) {
101            if (++counter > NOTIFY_PROGRESS_THRESHOLD) {
102                sequenceCallback.set(sequence);
103                counter = 0;
104            }
105        }
106    }
107
108    /**
109     * Factory used to populate the RingBuffer with events. These event objects are then re-used during the life of the
110     * RingBuffer.
111     */
112    private static final EventFactory<Log4jEventWrapper> FACTORY = new EventFactory<Log4jEventWrapper>() {
113        @Override
114        public Log4jEventWrapper newInstance() {
115            return new Log4jEventWrapper();
116        }
117    };
118
119    /**
120     * Object responsible for passing on data to a specific RingBuffer event.
121     */
122    private static final EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig> TRANSLATOR =
123            new EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig>() {
124
125        @Override
126        public void translateTo(final Log4jEventWrapper ringBufferElement, final long sequence,
127                final LogEvent logEvent, final AsyncLoggerConfig loggerConfig) {
128            ringBufferElement.event = logEvent;
129            ringBufferElement.loggerConfig = loggerConfig;
130        }
131    };
132
133    private static final ThreadFactory THREAD_FACTORY = new DaemonThreadFactory("AsyncLoggerConfig-");
134
135    private volatile Disruptor<Log4jEventWrapper> disruptor;
136    private ExecutorService executor;
137    private long backgroundThreadId; // LOG4J2-471
138
139    public AsyncLoggerConfigDisruptor() {
140    }
141
142    /**
143     * Increases the reference count and creates and starts a new Disruptor and associated thread if none currently
144     * exists.
145     * 
146     * @see #release()
147     */
148    public synchronized void start() {
149        if (disruptor != null) {
150            LOGGER.trace("AsyncLoggerConfigHelper not starting new disruptor for this configuration, "
151                    + "using existing object.");
152            return;
153        }
154        LOGGER.trace("AsyncLoggerConfigHelper creating new disruptor for this configuration.");
155        final int ringBufferSize = DisruptorUtil.calculateRingBufferSize("AsyncLoggerConfig.RingBufferSize");
156        final WaitStrategy waitStrategy = DisruptorUtil.createWaitStrategy("AsyncLoggerConfig.WaitStrategy");
157        executor = Executors.newSingleThreadExecutor(THREAD_FACTORY);
158        backgroundThreadId = DisruptorUtil.getExecutorThreadId(executor);
159
160        disruptor = new Disruptor<>(FACTORY, ringBufferSize, executor, ProducerType.MULTI, waitStrategy);
161
162        final ExceptionHandler<Log4jEventWrapper> errorHandler = DisruptorUtil.getExceptionHandler(
163                "AsyncLoggerConfig.ExceptionHandler", Log4jEventWrapper.class);
164        disruptor.handleExceptionsWith(errorHandler);
165
166        final Log4jEventWrapperHandler[] handlers = {new Log4jEventWrapperHandler()};
167        disruptor.handleEventsWith(handlers);
168
169        LOGGER.debug("Starting AsyncLoggerConfig disruptor for this configuration with ringbufferSize={}, "
170                + "waitStrategy={}, exceptionHandler={}...", disruptor.getRingBuffer().getBufferSize(), waitStrategy
171                .getClass().getSimpleName(), errorHandler);
172        disruptor.start();
173    }
174
175    /**
176     * Decreases the reference count. If the reference count reached zero, the Disruptor and its associated thread are
177     * shut down and their references set to {@code null}.
178     */
179    public synchronized void stop() {
180        final Disruptor<Log4jEventWrapper> temp = disruptor;
181        if (temp == null) {
182            LOGGER.trace("AsyncLoggerConfigHelper: disruptor for this configuration already shut down.");
183            return; // disruptor was already shut down by another thread
184        }
185        LOGGER.trace("AsyncLoggerConfigHelper: shutting down disruptor for this configuration.");
186
187        // We must guarantee that publishing to the RingBuffer has stopped before we call disruptor.shutdown().
188        disruptor = null; // client code fails with NPE if log after stop = OK
189
190        // Calling Disruptor.shutdown() will wait until all enqueued events are fully processed,
191        // but this waiting happens in a busy-spin. To avoid (postpone) wasting CPU,
192        // we sleep in short chunks, up to 10 seconds, waiting for the ringbuffer to drain.
193        for (int i = 0; hasBacklog(temp) && i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
194            try {
195                Thread.sleep(SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS); // give up the CPU for a while
196            } catch (final InterruptedException e) { // ignored
197            }
198        }
199        temp.shutdown(); // busy-spins until all events currently in the disruptor have been processed
200
201        LOGGER.trace("AsyncLoggerConfigHelper: shutting down disruptor executor for this configuration.");
202        executor.shutdown(); // finally, kill the processor thread
203        executor = null; // release reference to allow GC
204    }
205
206    /**
207     * Returns {@code true} if the specified disruptor still has unprocessed events.
208     */
209    private static boolean hasBacklog(final Disruptor<?> theDisruptor) {
210        final RingBuffer<?> ringBuffer = theDisruptor.getRingBuffer();
211        return !ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize());
212    }
213
214    /*
215     * (non-Javadoc)
216     * 
217     * @see
218     * org.apache.logging.log4j.core.async.AsyncLoggerConfigDelegate#tryCallAppendersInBackground(org.apache.logging
219     * .log4j.core.LogEvent)
220     */
221    @Override
222    public boolean tryCallAppendersInBackground(final LogEvent event, final AsyncLoggerConfig asyncLoggerConfig) {
223        final Disruptor<Log4jEventWrapper> temp = disruptor;
224        if (!hasLog4jBeenShutDown(temp)) {
225
226            // LOG4J2-471: prevent deadlock when RingBuffer is full and object
227            // being logged calls Logger.log() from its toString() method
228            if (isCalledFromAppenderThreadAndBufferFull(temp)) {
229                // bypass RingBuffer and invoke Appender directly
230                return false;
231            }
232            enqueueEvent(event, asyncLoggerConfig);
233        }
234        return true;
235    }
236
237    /**
238     * Returns {@code true} if the specified disruptor is null.
239     */
240    private boolean hasLog4jBeenShutDown(final Disruptor<Log4jEventWrapper> aDisruptor) {
241        if (aDisruptor == null) { // LOG4J2-639
242            LOGGER.fatal("Ignoring log event after log4j was shut down");
243            return true;
244        }
245        return false;
246    }
247
248    private void enqueueEvent(final LogEvent event, final AsyncLoggerConfig asyncLoggerConfig) {
249        // LOG4J2-639: catch NPE if disruptor field was set to null after our check above
250        try {
251            final LogEvent logEvent = prepareEvent(event);
252            enqueue(logEvent, asyncLoggerConfig);
253        } catch (final NullPointerException npe) {
254            LOGGER.fatal("Ignoring log event after log4j was shut down.");
255        }
256    }
257
258    private LogEvent prepareEvent(final LogEvent event) {
259        final LogEvent logEvent = ensureImmutable(event);
260        if (!Constants.FORMAT_MESSAGES_IN_BACKGROUND) { // LOG4J2-898: user may choose
261            logEvent.getMessage().getFormattedMessage(); // LOG4J2-763: ask message to freeze parameters
262        }
263        return logEvent;
264    }
265
266    private void enqueue(final LogEvent logEvent, final AsyncLoggerConfig asyncLoggerConfig) {
267        // Note: do NOT use the temp variable above!
268        // That could result in adding a log event to the disruptor after it was shut down,
269        // which could cause the publishEvent method to hang and never return.
270        disruptor.getRingBuffer().publishEvent(TRANSLATOR, logEvent, asyncLoggerConfig);
271    }
272
273    private LogEvent ensureImmutable(final LogEvent event) {
274        LogEvent result = event;
275        if (event instanceof RingBufferLogEvent) {
276            // Deal with special case where both types of Async Loggers are used together:
277            // RingBufferLogEvents are created by the all-loggers-async type, but
278            // this event is also consumed by the some-loggers-async type (this class).
279            // The original event will be re-used and modified in an application thread later,
280            // so take a snapshot of it, which can be safely processed in the
281            // some-loggers-async background thread.
282            result = ((RingBufferLogEvent) event).createMemento();
283        }
284        return result;
285    }
286
287    /**
288     * Returns true if the specified ringbuffer is full and the Logger.log() call was made from the appender thread.
289     */
290    private boolean isCalledFromAppenderThreadAndBufferFull(Disruptor<Log4jEventWrapper> theDisruptor) {
291        return currentThreadIsAppenderThread() && theDisruptor.getRingBuffer().remainingCapacity() == 0;
292    }
293
294    /**
295     * Returns {@code true} if the current thread is the Disruptor background thread, {@code false} otherwise.
296     * 
297     * @return whether this thread is the Disruptor background thread.
298     */
299    private boolean currentThreadIsAppenderThread() {
300        return Thread.currentThread().getId() == backgroundThreadId;
301    }
302
303    /*
304     * (non-Javadoc)
305     * 
306     * @see org.apache.logging.log4j.core.async.AsyncLoggerConfigDelegate#createRingBufferAdmin(java.lang.String,
307     * java.lang.String)
308     */
309    @Override
310    public RingBufferAdmin createRingBufferAdmin(final String contextName, final String loggerConfigName) {
311        return RingBufferAdmin.forAsyncLoggerConfig(disruptor.getRingBuffer(), contextName, loggerConfigName);
312    }
313
314}