View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.core.async;
18  
19  import java.util.concurrent.ExecutorService;
20  import java.util.concurrent.Executors;
21  import java.util.concurrent.ThreadFactory;
22  
23  import org.apache.logging.log4j.Logger;
24  import org.apache.logging.log4j.core.LogEvent;
25  import org.apache.logging.log4j.core.jmx.RingBufferAdmin;
26  import org.apache.logging.log4j.core.util.Constants;
27  import org.apache.logging.log4j.status.StatusLogger;
28  
29  import com.lmax.disruptor.EventFactory;
30  import com.lmax.disruptor.EventTranslatorTwoArg;
31  import com.lmax.disruptor.ExceptionHandler;
32  import com.lmax.disruptor.RingBuffer;
33  import com.lmax.disruptor.Sequence;
34  import com.lmax.disruptor.SequenceReportingEventHandler;
35  import com.lmax.disruptor.WaitStrategy;
36  import com.lmax.disruptor.dsl.Disruptor;
37  import com.lmax.disruptor.dsl.ProducerType;
38  
39  /**
40   * Helper class decoupling the {@code AsyncLoggerConfig} class from the LMAX Disruptor library.
41   * <p>
42   * {@code AsyncLoggerConfig} is a plugin, and will be loaded even if users do not configure any {@code <asyncLogger>} or
43   * {@code <asyncRoot>} elements in the configuration. If {@code AsyncLoggerConfig} has inner classes that extend or
44   * implement classes from the Disruptor library, a {@code NoClassDefFoundError} is thrown if the Disruptor jar is not in
45   * the classpath when the PluginManager loads the {@code AsyncLoggerConfig} plugin from the pre-defined plugins
46   * definition file.
47   * <p>
48   * This class serves to make the dependency on the Disruptor optional, so that these classes are only loaded when the
49   * {@code AsyncLoggerConfig} is actually used.
50   */
51  public class AsyncLoggerConfigDisruptor implements AsyncLoggerConfigDelegate {
52  
53      private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 200;
54      private static final int SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS = 50;
55      private static final Logger LOGGER = StatusLogger.getLogger();
56  
57      /**
58       * RingBuffer events contain all information necessary to perform the work in a separate thread.
59       */
60      private static class Log4jEventWrapper {
61          private AsyncLoggerConfig loggerConfig;
62          private LogEvent event;
63  
64          /**
65           * Release references held by ring buffer to allow objects to be garbage-collected.
66           */
67          public void clear() {
68              loggerConfig = null;
69              event = null;
70          }
71      }
72  
73      /**
74       * EventHandler performs the work in a separate thread.
75       */
76      private static class Log4jEventWrapperHandler implements SequenceReportingEventHandler<Log4jEventWrapper> {
77          private static final int NOTIFY_PROGRESS_THRESHOLD = 50;
78          private Sequence sequenceCallback;
79          private int counter;
80  
81          @Override
82          public void setSequenceCallback(final Sequence sequenceCallback) {
83              this.sequenceCallback = sequenceCallback;
84          }
85  
86          @Override
87          public void onEvent(final Log4jEventWrapper event, final long sequence, final boolean endOfBatch)
88                  throws Exception {
89              event.event.setEndOfBatch(endOfBatch);
90              event.loggerConfig.asyncCallAppenders(event.event);
91              event.clear();
92  
93              notifyIntermediateProgress(sequence);
94          }
95  
96          /**
97           * Notify the BatchEventProcessor that the sequence has progressed. Without this callback the sequence would not
98           * be progressed until the batch has completely finished.
99           */
100         private void notifyIntermediateProgress(final long sequence) {
101             if (++counter > NOTIFY_PROGRESS_THRESHOLD) {
102                 sequenceCallback.set(sequence);
103                 counter = 0;
104             }
105         }
106     }
107 
108     /**
109      * Factory used to populate the RingBuffer with events. These event objects are then re-used during the life of the
110      * RingBuffer.
111      */
112     private static final EventFactory<Log4jEventWrapper> FACTORY = new EventFactory<Log4jEventWrapper>() {
113         @Override
114         public Log4jEventWrapper newInstance() {
115             return new Log4jEventWrapper();
116         }
117     };
118 
119     /**
120      * Object responsible for passing on data to a specific RingBuffer event.
121      */
122     private static final EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig> TRANSLATOR =
123             new EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig>() {
124 
125         @Override
126         public void translateTo(final Log4jEventWrapper ringBufferElement, final long sequence,
127                 final LogEvent logEvent, final AsyncLoggerConfig loggerConfig) {
128             ringBufferElement.event = logEvent;
129             ringBufferElement.loggerConfig = loggerConfig;
130         }
131     };
132 
133     private static final ThreadFactory THREAD_FACTORY = new DaemonThreadFactory("AsyncLoggerConfig-");
134 
135     private volatile Disruptor<Log4jEventWrapper> disruptor;
136     private ExecutorService executor;
137     private long backgroundThreadId; // LOG4J2-471
138 
139     public AsyncLoggerConfigDisruptor() {
140     }
141 
142     /**
143      * Increases the reference count and creates and starts a new Disruptor and associated thread if none currently
144      * exists.
145      * 
146      * @see #release()
147      */
148     public synchronized void start() {
149         if (disruptor != null) {
150             LOGGER.trace("AsyncLoggerConfigHelper not starting new disruptor for this configuration, "
151                     + "using existing object.");
152             return;
153         }
154         LOGGER.trace("AsyncLoggerConfigHelper creating new disruptor for this configuration.");
155         final int ringBufferSize = DisruptorUtil.calculateRingBufferSize("AsyncLoggerConfig.RingBufferSize");
156         final WaitStrategy waitStrategy = DisruptorUtil.createWaitStrategy("AsyncLoggerConfig.WaitStrategy");
157         executor = Executors.newSingleThreadExecutor(THREAD_FACTORY);
158         backgroundThreadId = DisruptorUtil.getExecutorThreadId(executor);
159 
160         disruptor = new Disruptor<>(FACTORY, ringBufferSize, executor, ProducerType.MULTI, waitStrategy);
161 
162         final ExceptionHandler<Log4jEventWrapper> errorHandler = DisruptorUtil.getExceptionHandler(
163                 "AsyncLoggerConfig.ExceptionHandler", Log4jEventWrapper.class);
164         disruptor.handleExceptionsWith(errorHandler);
165 
166         final Log4jEventWrapperHandler[] handlers = {new Log4jEventWrapperHandler()};
167         disruptor.handleEventsWith(handlers);
168 
169         LOGGER.debug("Starting AsyncLoggerConfig disruptor for this configuration with ringbufferSize={}, "
170                 + "waitStrategy={}, exceptionHandler={}...", disruptor.getRingBuffer().getBufferSize(), waitStrategy
171                 .getClass().getSimpleName(), errorHandler);
172         disruptor.start();
173     }
174 
175     /**
176      * Decreases the reference count. If the reference count reached zero, the Disruptor and its associated thread are
177      * shut down and their references set to {@code null}.
178      */
179     public synchronized void stop() {
180         final Disruptor<Log4jEventWrapper> temp = disruptor;
181         if (temp == null) {
182             LOGGER.trace("AsyncLoggerConfigHelper: disruptor for this configuration already shut down.");
183             return; // disruptor was already shut down by another thread
184         }
185         LOGGER.trace("AsyncLoggerConfigHelper: shutting down disruptor for this configuration.");
186 
187         // We must guarantee that publishing to the RingBuffer has stopped before we call disruptor.shutdown().
188         disruptor = null; // client code fails with NPE if log after stop = OK
189 
190         // Calling Disruptor.shutdown() will wait until all enqueued events are fully processed,
191         // but this waiting happens in a busy-spin. To avoid (postpone) wasting CPU,
192         // we sleep in short chunks, up to 10 seconds, waiting for the ringbuffer to drain.
193         for (int i = 0; hasBacklog(temp) && i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
194             try {
195                 Thread.sleep(SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS); // give up the CPU for a while
196             } catch (final InterruptedException e) { // ignored
197             }
198         }
199         temp.shutdown(); // busy-spins until all events currently in the disruptor have been processed
200 
201         LOGGER.trace("AsyncLoggerConfigHelper: shutting down disruptor executor for this configuration.");
202         executor.shutdown(); // finally, kill the processor thread
203         executor = null; // release reference to allow GC
204     }
205 
206     /**
207      * Returns {@code true} if the specified disruptor still has unprocessed events.
208      */
209     private static boolean hasBacklog(final Disruptor<?> theDisruptor) {
210         final RingBuffer<?> ringBuffer = theDisruptor.getRingBuffer();
211         return !ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize());
212     }
213 
214     /*
215      * (non-Javadoc)
216      * 
217      * @see
218      * org.apache.logging.log4j.core.async.AsyncLoggerConfigDelegate#tryCallAppendersInBackground(org.apache.logging
219      * .log4j.core.LogEvent)
220      */
221     @Override
222     public boolean tryCallAppendersInBackground(final LogEvent event, final AsyncLoggerConfig asyncLoggerConfig) {
223         final Disruptor<Log4jEventWrapper> temp = disruptor;
224         if (!hasLog4jBeenShutDown(temp)) {
225 
226             // LOG4J2-471: prevent deadlock when RingBuffer is full and object
227             // being logged calls Logger.log() from its toString() method
228             if (isCalledFromAppenderThreadAndBufferFull(temp)) {
229                 // bypass RingBuffer and invoke Appender directly
230                 return false;
231             }
232             enqueueEvent(event, asyncLoggerConfig);
233         }
234         return true;
235     }
236 
237     /**
238      * Returns {@code true} if the specified disruptor is null.
239      */
240     private boolean hasLog4jBeenShutDown(final Disruptor<Log4jEventWrapper> aDisruptor) {
241         if (aDisruptor == null) { // LOG4J2-639
242             LOGGER.fatal("Ignoring log event after log4j was shut down");
243             return true;
244         }
245         return false;
246     }
247 
248     private void enqueueEvent(final LogEvent event, final AsyncLoggerConfig asyncLoggerConfig) {
249         // LOG4J2-639: catch NPE if disruptor field was set to null after our check above
250         try {
251             final LogEvent logEvent = prepareEvent(event);
252             enqueue(logEvent, asyncLoggerConfig);
253         } catch (final NullPointerException npe) {
254             LOGGER.fatal("Ignoring log event after log4j was shut down.");
255         }
256     }
257 
258     private LogEvent prepareEvent(final LogEvent event) {
259         final LogEvent logEvent = ensureImmutable(event);
260         if (!Constants.FORMAT_MESSAGES_IN_BACKGROUND) { // LOG4J2-898: user may choose
261             logEvent.getMessage().getFormattedMessage(); // LOG4J2-763: ask message to freeze parameters
262         }
263         return logEvent;
264     }
265 
266     private void enqueue(final LogEvent logEvent, final AsyncLoggerConfig asyncLoggerConfig) {
267         // Note: do NOT use the temp variable above!
268         // That could result in adding a log event to the disruptor after it was shut down,
269         // which could cause the publishEvent method to hang and never return.
270         disruptor.getRingBuffer().publishEvent(TRANSLATOR, logEvent, asyncLoggerConfig);
271     }
272 
273     private LogEvent ensureImmutable(final LogEvent event) {
274         LogEvent result = event;
275         if (event instanceof RingBufferLogEvent) {
276             // Deal with special case where both types of Async Loggers are used together:
277             // RingBufferLogEvents are created by the all-loggers-async type, but
278             // this event is also consumed by the some-loggers-async type (this class).
279             // The original event will be re-used and modified in an application thread later,
280             // so take a snapshot of it, which can be safely processed in the
281             // some-loggers-async background thread.
282             result = ((RingBufferLogEvent) event).createMemento();
283         }
284         return result;
285     }
286 
287     /**
288      * Returns true if the specified ringbuffer is full and the Logger.log() call was made from the appender thread.
289      */
290     private boolean isCalledFromAppenderThreadAndBufferFull(Disruptor<Log4jEventWrapper> theDisruptor) {
291         return currentThreadIsAppenderThread() && theDisruptor.getRingBuffer().remainingCapacity() == 0;
292     }
293 
294     /**
295      * Returns {@code true} if the current thread is the Disruptor background thread, {@code false} otherwise.
296      * 
297      * @return whether this thread is the Disruptor background thread.
298      */
299     private boolean currentThreadIsAppenderThread() {
300         return Thread.currentThread().getId() == backgroundThreadId;
301     }
302 
303     /*
304      * (non-Javadoc)
305      * 
306      * @see org.apache.logging.log4j.core.async.AsyncLoggerConfigDelegate#createRingBufferAdmin(java.lang.String,
307      * java.lang.String)
308      */
309     @Override
310     public RingBufferAdmin createRingBufferAdmin(final String contextName, final String loggerConfigName) {
311         return RingBufferAdmin.forAsyncLoggerConfig(disruptor.getRingBuffer(), contextName, loggerConfigName);
312     }
313 
314 }