View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.core.async;
18  
19  import java.util.concurrent.ExecutorService;
20  import java.util.concurrent.Executors;
21  import java.util.concurrent.ThreadFactory;
22  
23  import org.apache.logging.log4j.Logger;
24  import org.apache.logging.log4j.core.LogEvent;
25  import org.apache.logging.log4j.core.jmx.RingBufferAdmin;
26  import org.apache.logging.log4j.core.util.Integers;
27  import org.apache.logging.log4j.status.StatusLogger;
28  import org.apache.logging.log4j.util.PropertiesUtil;
29  
30  import com.lmax.disruptor.BlockingWaitStrategy;
31  import com.lmax.disruptor.EventFactory;
32  import com.lmax.disruptor.EventHandler;
33  import com.lmax.disruptor.EventTranslatorTwoArg;
34  import com.lmax.disruptor.ExceptionHandler;
35  import com.lmax.disruptor.RingBuffer;
36  import com.lmax.disruptor.Sequence;
37  import com.lmax.disruptor.SequenceReportingEventHandler;
38  import com.lmax.disruptor.SleepingWaitStrategy;
39  import com.lmax.disruptor.WaitStrategy;
40  import com.lmax.disruptor.YieldingWaitStrategy;
41  import com.lmax.disruptor.dsl.Disruptor;
42  import com.lmax.disruptor.dsl.ProducerType;
43  
44  /**
45   * Helper class decoupling the {@code AsyncLoggerConfig} class from the LMAX Disruptor library.
46   * <p>
47   * {@code AsyncLoggerConfig} is a plugin, and will be loaded even if users do not configure any {@code <asyncLogger>} or
48   * {@code <asyncRoot>} elements in the configuration. If {@code AsyncLoggerConfig} has inner classes that extend or
49   * implement classes from the Disruptor library, a {@code NoClassDefFoundError} is thrown if the Disruptor jar is not in
50   * the classpath when the PluginManager loads the {@code AsyncLoggerConfig} plugin from the pre-defined plugins
51   * definition file.
52   * <p>
53   * This class serves to make the dependency on the Disruptor optional, so that these classes are only loaded when the
54   * {@code AsyncLoggerConfig} is actually used.
55   */
56  class AsyncLoggerConfigHelper {
57  
58      private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 200;
59      private static final int SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS = 50;
60      private static final int RINGBUFFER_MIN_SIZE = 128;
61      private static final int RINGBUFFER_DEFAULT_SIZE = 256 * 1024;
62      private static final Logger LOGGER = StatusLogger.getLogger();
63  
64      private static ThreadFactory threadFactory = new DaemonThreadFactory("AsyncLoggerConfig-");
65      private static volatile Disruptor<Log4jEventWrapper> disruptor;
66      private static ExecutorService executor;
67  
68      private static volatile int count = 0;
69      private static ThreadLocal<Boolean> isAppenderThread = new ThreadLocal<>();
70  
71      /**
72       * Factory used to populate the RingBuffer with events. These event objects are then re-used during the life of the
73       * RingBuffer.
74       */
75      private static final EventFactory<Log4jEventWrapper> FACTORY = new EventFactory<Log4jEventWrapper>() {
76          @Override
77          public Log4jEventWrapper newInstance() {
78              return new Log4jEventWrapper();
79          }
80      };
81  
82      /**
83       * Object responsible for passing on data to a specific RingBuffer event.
84       */
85      private final EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig> translator =
86              new EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig>() {
87  
88          @Override
89          public void translateTo(final Log4jEventWrapper ringBufferElement, final long sequence,
90                  final LogEvent logEvent, final AsyncLoggerConfig loggerConfig) {
91              ringBufferElement.event = logEvent;
92              ringBufferElement.loggerConfig = loggerConfig;
93          }
94      };
95  
96      private final AsyncLoggerConfig asyncLoggerConfig;
97  
98      public AsyncLoggerConfigHelper(final AsyncLoggerConfig asyncLoggerConfig) {
99          this.asyncLoggerConfig = asyncLoggerConfig;
100         claim();
101     }
102 
103     private static synchronized void initDisruptor() {
104         if (disruptor != null) {
105             LOGGER.trace("AsyncLoggerConfigHelper not starting new disruptor, using existing object. Ref count is {}.",
106                     count);
107             return;
108         }
109         LOGGER.trace("AsyncLoggerConfigHelper creating new disruptor. Ref count is {}.", count);
110         final int ringBufferSize = calculateRingBufferSize();
111         final WaitStrategy waitStrategy = createWaitStrategy();
112         executor = Executors.newSingleThreadExecutor(threadFactory);
113         initThreadLocalForExecutorThread();
114         disruptor = new Disruptor<>(FACTORY, ringBufferSize, executor, ProducerType.MULTI, waitStrategy);
115         final EventHandler<Log4jEventWrapper>[] handlers = new Log4jEventWrapperHandler[] {//
116         new Log4jEventWrapperHandler()};
117         final ExceptionHandler<Log4jEventWrapper> errorHandler = getExceptionHandler();
118         disruptor.handleExceptionsWith(errorHandler);
119         disruptor.handleEventsWith(handlers);
120 
121         LOGGER.debug(
122                 "Starting AsyncLoggerConfig disruptor with ringbuffer size={}, waitStrategy={}, exceptionHandler={}...",
123                 disruptor.getRingBuffer().getBufferSize(), waitStrategy.getClass().getSimpleName(), errorHandler);
124         disruptor.start();
125     }
126 
127     private static WaitStrategy createWaitStrategy() {
128         final String strategy = System.getProperty("AsyncLoggerConfig.WaitStrategy");
129         LOGGER.debug("property AsyncLoggerConfig.WaitStrategy={}", strategy);
130         if ("Sleep".equals(strategy)) {
131             return new SleepingWaitStrategy();
132         } else if ("Yield".equals(strategy)) {
133             return new YieldingWaitStrategy();
134         } else if ("Block".equals(strategy)) {
135             return new BlockingWaitStrategy();
136         }
137         LOGGER.debug("disruptor event handler uses BlockingWaitStrategy");
138         return new BlockingWaitStrategy();
139     }
140 
141     private static int calculateRingBufferSize() {
142         int ringBufferSize = RINGBUFFER_DEFAULT_SIZE;
143         final String userPreferredRBSize = PropertiesUtil.getProperties().getStringProperty(
144                 "AsyncLoggerConfig.RingBufferSize", String.valueOf(ringBufferSize));
145         try {
146             int size = Integer.parseInt(userPreferredRBSize);
147             if (size < RINGBUFFER_MIN_SIZE) {
148                 size = RINGBUFFER_MIN_SIZE;
149                 LOGGER.warn("Invalid RingBufferSize {}, using minimum size {}.", userPreferredRBSize,
150                         RINGBUFFER_MIN_SIZE);
151             }
152             ringBufferSize = size;
153         } catch (final Exception ex) {
154             LOGGER.warn("Invalid RingBufferSize {}, using default size {}.", userPreferredRBSize, ringBufferSize);
155         }
156         return Integers.ceilingNextPowerOfTwo(ringBufferSize);
157     }
158 
159     private static ExceptionHandler<Log4jEventWrapper> getExceptionHandler() {
160         final String cls = System.getProperty("AsyncLoggerConfig.ExceptionHandler");
161         if (cls == null) {
162             return null;
163         }
164         try {
165             @SuppressWarnings("unchecked")
166             final Class<? extends ExceptionHandler<Log4jEventWrapper>> klass =
167                     (Class<? extends ExceptionHandler<Log4jEventWrapper>>) Class.forName(cls);
168             return klass.newInstance();
169         } catch (final Exception ignored) {
170             LOGGER.debug("AsyncLoggerConfig.ExceptionHandler not set: error creating " + cls + ": ", ignored);
171             return null;
172         }
173     }
174 
175     /**
176      * RingBuffer events contain all information necessary to perform the work in a separate thread.
177      */
178     private static class Log4jEventWrapper {
179         private AsyncLoggerConfig loggerConfig;
180         private LogEvent event;
181 
182         /**
183          * Release references held by ring buffer to allow objects to be garbage-collected.
184          */
185         public void clear() {
186             loggerConfig = null;
187             event = null;
188         }
189     }
190 
191     /**
192      * EventHandler performs the work in a separate thread.
193      */
194     private static class Log4jEventWrapperHandler implements SequenceReportingEventHandler<Log4jEventWrapper> {
195         private static final int NOTIFY_PROGRESS_THRESHOLD = 50;
196         private Sequence sequenceCallback;
197         private int counter;
198 
199         @Override
200         public void setSequenceCallback(final Sequence sequenceCallback) {
201             this.sequenceCallback = sequenceCallback;
202         }
203 
204         @Override
205         public void onEvent(final Log4jEventWrapper event, final long sequence, final boolean endOfBatch)
206                 throws Exception {
207             event.event.setEndOfBatch(endOfBatch);
208             event.loggerConfig.asyncCallAppenders(event.event);
209             event.clear();
210 
211             notifyIntermediateProgress(sequence);
212         }
213 
214         /**
215          * Notify the BatchEventProcessor that the sequence has progressed. Without this callback the sequence would not
216          * be progressed until the batch has completely finished.
217          */
218         private void notifyIntermediateProgress(final long sequence) {
219             if (++counter > NOTIFY_PROGRESS_THRESHOLD) {
220                 sequenceCallback.set(sequence);
221                 counter = 0;
222             }
223         }
224     }
225 
226     /**
227      * Increases the reference count and creates and starts a new Disruptor and associated thread if none currently
228      * exists.
229      * 
230      * @see #release()
231      */
232     static synchronized void claim() {
233         count++;
234         initDisruptor();
235     }
236 
237     /**
238      * Decreases the reference count. If the reference count reached zero, the Disruptor and its associated thread are
239      * shut down and their references set to {@code null}.
240      */
241     static synchronized void release() {
242         if (--count > 0) {
243             LOGGER.trace("AsyncLoggerConfigHelper: not shutting down disruptor: ref count is {}.", count);
244             return;
245         }
246         final Disruptor<Log4jEventWrapper> temp = disruptor;
247         if (temp == null) {
248             LOGGER.trace("AsyncLoggerConfigHelper: disruptor already shut down: ref count is {}. (Resetting to zero.)",
249                     count);
250             count = 0; // ref count must not be negative or #claim() will not work correctly
251             return; // disruptor was already shut down by another thread
252         }
253         LOGGER.trace("AsyncLoggerConfigHelper: shutting down disruptor: ref count is {}.", count);
254 
255         // Must guarantee that publishing to the RingBuffer has stopped
256         // before we call disruptor.shutdown()
257         disruptor = null; // client code fails with NPE if log after stop = OK
258 
259         // Calling Disruptor.shutdown() will wait until all enqueued events are fully processed,
260         // but this waiting happens in a busy-spin. To avoid (postpone) wasting CPU,
261         // we sleep in short chunks, up to 10 seconds, waiting for the ringbuffer to drain.
262         for (int i = 0; hasBacklog(temp) && i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
263             try {
264                 Thread.sleep(SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS); // give up the CPU for a while
265             } catch (final InterruptedException e) { // ignored
266             }
267         }
268         temp.shutdown(); // busy-spins until all events currently in the disruptor have been processed
269         executor.shutdown(); // finally, kill the processor thread
270         executor = null; // release reference to allow GC
271     }
272 
273     /**
274      * Returns {@code true} if the specified disruptor still has unprocessed events.
275      */
276     private static boolean hasBacklog(final Disruptor<?> theDisruptor) {
277         final RingBuffer<?> ringBuffer = theDisruptor.getRingBuffer();
278         return !ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize());
279     }
280 
281     /**
282      * Initialize the threadlocal that allows us to detect Logger.log() calls initiated from the appender thread, which
283      * may cause deadlock when the RingBuffer is full. (LOG4J2-471)
284      */
285     private static void initThreadLocalForExecutorThread() {
286         executor.submit(new Runnable() {
287             @Override
288             public void run() {
289                 isAppenderThread.set(Boolean.TRUE);
290             }
291         });
292     }
293 
294     /**
295      * If possible, delegates the invocation to {@code callAppenders} to another thread and returns {@code true}. If
296      * this is not possible (if it detects that delegating to another thread would cause deadlock because the current
297      * call to Logger.log() originated from the appender thread and the ringbuffer is full) then this method does
298      * nothing and returns {@code false}. It is the responsibility of the caller to process the event when this method
299      * returns {@code false}.
300      * 
301      * @param event the event to delegate to another thread
302      * @return {@code true} if delegation was successful, {@code false} if the calling thread needs to process the
303      *          event itself
304      */
305     public boolean callAppendersFromAnotherThread(final LogEvent event) {
306         final Disruptor<Log4jEventWrapper> temp = disruptor;
307         if (!hasLog4jBeenShutDown(temp)) {
308 
309             // LOG4J2-471: prevent deadlock when RingBuffer is full and object
310             // being logged calls Logger.log() from its toString() method
311             if (isCalledFromAppenderThreadAndBufferFull(temp)) {
312                 // bypass RingBuffer and invoke Appender directly
313                 return false;
314             }
315             enqueueEvent(event);
316         }
317         return true;
318     }
319 
320     /**
321      * Returns {@code true} if the specified disruptor is null.
322      */
323     private boolean hasLog4jBeenShutDown(final Disruptor<Log4jEventWrapper> aDisruptor) {
324         if (aDisruptor == null) { // LOG4J2-639
325             LOGGER.fatal("Ignoring log event after log4j was shut down");
326             return true;
327         }
328         return false;
329     }
330 
331     private void enqueueEvent(final LogEvent event) {
332         // LOG4J2-639: catch NPE if disruptor field was set to null after our check above
333         try {
334             final LogEvent logEvent = prepareEvent(event);
335             enqueue(logEvent);
336         } catch (final NullPointerException npe) {
337             LOGGER.fatal("Ignoring log event after log4j was shut down.");
338         }
339     }
340 
341     private LogEvent prepareEvent(final LogEvent event) {
342         final LogEvent logEvent = ensureImmutable(event);
343         logEvent.getMessage().getFormattedMessage(); // LOG4J2-763: ask message to freeze parameters
344         return logEvent;
345     }
346 
347     private void enqueue(LogEvent logEvent) {
348         // Note: do NOT use the temp variable above!
349         // That could result in adding a log event to the disruptor after it was shut down,
350         // which could cause the publishEvent method to hang and never return.
351         disruptor.getRingBuffer().publishEvent(translator, logEvent, asyncLoggerConfig);
352     }
353 
354     private LogEvent ensureImmutable(final LogEvent event) {
355         LogEvent result = event;
356         if (event instanceof RingBufferLogEvent) {
357             // Deal with special case where both types of Async Loggers are used together:
358             // RingBufferLogEvents are created by the all-loggers-async type, but
359             // this event is also consumed by the some-loggers-async type (this class).
360             // The original event will be re-used and modified in an application thread later,
361             // so take a snapshot of it, which can be safely processed in the
362             // some-loggers-async background thread.
363             result = ((RingBufferLogEvent) event).createMemento();
364         }
365         return result;
366     }
367 
368     /**
369      * Returns true if the specified ringbuffer is full and the Logger.log() call was made from the appender thread.
370      */
371     private boolean isCalledFromAppenderThreadAndBufferFull(Disruptor<Log4jEventWrapper> theDisruptor) {
372         return isAppenderThread.get() == Boolean.TRUE && theDisruptor.getRingBuffer().remainingCapacity() == 0;
373     }
374 
375     /**
376      * Creates and returns a new {@code RingBufferAdmin} that instruments the ringbuffer of this
377      * {@code AsyncLoggerConfig}.
378      * 
379      * @param contextName name of the {@code LoggerContext}
380      * @param loggerConfigName name of the logger config
381      */
382     public RingBufferAdmin createRingBufferAdmin(final String contextName, final String loggerConfigName) {
383         return RingBufferAdmin.forAsyncLoggerConfig(disruptor.getRingBuffer(), contextName, loggerConfigName);
384     }
385 
386 }