View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  
18  package org.apache.logging.log4j.core.async;
19  
20  import java.util.concurrent.ExecutorService;
21  import java.util.concurrent.Executors;
22  
23  import org.apache.logging.log4j.Level;
24  import org.apache.logging.log4j.core.jmx.RingBufferAdmin;
25  import org.apache.logging.log4j.status.StatusLogger;
26  
27  import com.lmax.disruptor.ExceptionHandler;
28  import com.lmax.disruptor.RingBuffer;
29  import com.lmax.disruptor.WaitStrategy;
30  import com.lmax.disruptor.dsl.Disruptor;
31  import com.lmax.disruptor.dsl.ProducerType;
32  
33  /**
34   * Helper class for async loggers: AsyncLoggerDisruptor handles the mechanics of working with the LMAX Disruptor, and
35   * works with its associated AsyncLoggerContext to synchronize the life cycle of the Disruptor and its thread with the
36   * life cycle of the context. The AsyncLoggerDisruptor of the context is shared by all AsyncLogger objects created by
37   * that AsyncLoggerContext.
38   */
39  class AsyncLoggerDisruptor {
40      private static final int SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS = 50;
41      private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 200;
42      private static final StatusLogger LOGGER = StatusLogger.getLogger();
43  
44      private volatile Disruptor<RingBufferLogEvent> disruptor;
45      private ExecutorService executor;
46      private String contextName;
47  
48      private boolean useThreadLocalTranslator = true;
49      private long backgroundThreadId;
50      private AsyncQueueFullPolicy asyncQueueFullPolicy;
51      private int ringBufferSize;
52  
53      AsyncLoggerDisruptor(String contextName) {
54          this.contextName = contextName;
55      }
56  
57      public String getContextName() {
58          return contextName;
59      }
60  
61      public void setContextName(String name) {
62          contextName = name;
63      }
64  
65      Disruptor<RingBufferLogEvent> getDisruptor() {
66          return disruptor;
67      }
68  
69      /**
70       * Creates and starts a new Disruptor and associated thread if none currently exists.
71       *
72       * @see #stop()
73       */
74      synchronized void start() {
75          if (disruptor != null) {
76              LOGGER.trace(
77                      "[{}] AsyncLoggerDisruptor not starting new disruptor for this context, using existing object.",
78                      contextName);
79              return;
80          }
81          LOGGER.trace("[{}] AsyncLoggerDisruptor creating new disruptor for this context.", contextName);
82          ringBufferSize = DisruptorUtil.calculateRingBufferSize("AsyncLogger.RingBufferSize");
83          final WaitStrategy waitStrategy = DisruptorUtil.createWaitStrategy("AsyncLogger.WaitStrategy");
84          executor = Executors.newSingleThreadExecutor(new DaemonThreadFactory("AsyncLogger[" + contextName + "]"));
85          backgroundThreadId = DisruptorUtil.getExecutorThreadId(executor);
86          asyncQueueFullPolicy = AsyncQueueFullPolicyFactory.create();
87  
88          disruptor = new Disruptor<>(RingBufferLogEvent.FACTORY, ringBufferSize, executor, ProducerType.MULTI,
89                  waitStrategy);
90  
91          final ExceptionHandler<RingBufferLogEvent> errorHandler = DisruptorUtil.getAsyncLoggerExceptionHandler();
92          disruptor.handleExceptionsWith(errorHandler);
93  
94          final RingBufferLogEventHandler[] handlers = {new RingBufferLogEventHandler()};
95          disruptor.handleEventsWith(handlers);
96  
97          LOGGER.debug("[{}] Starting AsyncLogger disruptor for this context with ringbufferSize={}, waitStrategy={}, "
98                  + "exceptionHandler={}...", contextName, disruptor.getRingBuffer().getBufferSize(), waitStrategy
99                  .getClass().getSimpleName(), errorHandler);
100         disruptor.start();
101 
102         LOGGER.trace("[{}] AsyncLoggers use a {} translator", contextName, useThreadLocalTranslator ? "threadlocal"
103                 : "vararg");
104     }
105 
106     /**
107      * Decreases the reference count. If the reference count reached zero, the Disruptor and its associated thread are
108      * shut down and their references set to {@code null}.
109      */
110     synchronized void stop() {
111         final Disruptor<RingBufferLogEvent> temp = getDisruptor();
112         if (temp == null) {
113             LOGGER.trace("[{}] AsyncLoggerDisruptor: disruptor for this context already shut down.", contextName);
114             return; // disruptor was already shut down by another thread
115         }
116         LOGGER.debug("[{}] AsyncLoggerDisruptor: shutting down disruptor for this context.", contextName);
117 
118         // We must guarantee that publishing to the RingBuffer has stopped before we call disruptor.shutdown().
119         disruptor = null; // client code fails with NPE if log after stop. This is by design.
120 
121         // Calling Disruptor.shutdown() will wait until all enqueued events are fully processed,
122         // but this waiting happens in a busy-spin. To avoid (postpone) wasting CPU,
123         // we sleep in short chunks, up to 10 seconds, waiting for the ringbuffer to drain.
124         for (int i = 0; hasBacklog(temp) && i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
125             try {
126                 Thread.sleep(SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS); // give up the CPU for a while
127             } catch (final InterruptedException e) { // ignored
128             }
129         }
130         temp.shutdown(); // busy-spins until all events currently in the disruptor have been processed
131 
132         LOGGER.trace("[{}] AsyncLoggerDisruptor: shutting down disruptor executor.", contextName);
133         executor.shutdown(); // finally, kill the processor thread
134         executor = null;
135 
136         if (DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy) > 0) {
137             LOGGER.trace("AsyncLoggerDisruptor: {} discarded {} events.", asyncQueueFullPolicy,
138                     DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy));
139         }
140     }
141 
142     /**
143      * Returns {@code true} if the specified disruptor still has unprocessed events.
144      */
145     private static boolean hasBacklog(final Disruptor<?> theDisruptor) {
146         final RingBuffer<?> ringBuffer = theDisruptor.getRingBuffer();
147         return !ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize());
148     }
149 
150     /**
151      * Creates and returns a new {@code RingBufferAdmin} that instruments the ringbuffer of the {@code AsyncLogger}.
152      *
153      * @param jmxContextName name of the {@code AsyncLoggerContext}
154      * @return a new {@code RingBufferAdmin} that instruments the ringbuffer
155      */
156     public RingBufferAdmin createRingBufferAdmin(final String jmxContextName) {
157         final RingBuffer<RingBufferLogEvent> ring = disruptor == null ? null : disruptor.getRingBuffer();
158         return RingBufferAdmin.forAsyncLogger(ring, jmxContextName);
159     }
160 
161     EventRoute getEventRoute(final Level logLevel) {
162         final int remainingCapacity = remainingDisruptorCapacity();
163         if (remainingCapacity < 0) {
164             return EventRoute.DISCARD;
165         }
166         return asyncQueueFullPolicy.getRoute(backgroundThreadId, logLevel);
167     }
168 
169     private int remainingDisruptorCapacity() {
170         final Disruptor<RingBufferLogEvent> temp = disruptor;
171         if (hasLog4jBeenShutDown(temp)) {
172             return -1;
173         }
174         return (int) temp.getRingBuffer().remainingCapacity();
175     }
176         /**
177          * Returns {@code true} if the specified disruptor is null.
178          */
179     private boolean hasLog4jBeenShutDown(final Disruptor<RingBufferLogEvent> aDisruptor) {
180         if (aDisruptor == null) { // LOG4J2-639
181             LOGGER.warn("Ignoring log event after log4j was shut down");
182             return true;
183         }
184         return false;
185     }
186 
187     public boolean tryPublish(final RingBufferLogEventTranslator translator) {
188         // LOG4J2-639: catch NPE if disruptor field was set to null in stop()
189         try {
190             return disruptor.getRingBuffer().tryPublishEvent(translator);
191         } catch (final NullPointerException npe) {
192             LOGGER.warn("[{}] Ignoring log event after log4j was shut down.", contextName);
193             return false;
194         }
195     }
196 
197     void enqueueLogMessageInfo(final RingBufferLogEventTranslator translator) {
198         // LOG4J2-639: catch NPE if disruptor field was set to null in stop()
199         try {
200             // Note: we deliberately access the volatile disruptor field afresh here.
201             // Avoiding this and using an older reference could result in adding a log event to the disruptor after it
202             // was shut down, which could cause the publishEvent method to hang and never return.
203             disruptor.publishEvent(translator);
204         } catch (final NullPointerException npe) {
205             LOGGER.warn("[{}] Ignoring log event after log4j was shut down.", contextName);
206         }
207     }
208 
209     /**
210      * Returns whether it is allowed to store non-JDK classes in ThreadLocal objects for efficiency.
211      *
212      * @return whether AsyncLoggers are allowed to use ThreadLocal objects
213      * @since 2.5
214      * @see <a href="https://issues.apache.org/jira/browse/LOG4J2-1172">LOG4J2-1172</a>
215      */
216     public boolean isUseThreadLocals() {
217         return useThreadLocalTranslator;
218     }
219 
220     /**
221      * Signals this AsyncLoggerDisruptor whether it is allowed to store non-JDK classes in ThreadLocal objects for
222      * efficiency.
223      * <p>
224      * This property may be modified after the {@link #start()} method has been called.
225      * </p>
226      *
227      * @param allow whether AsyncLoggers are allowed to use ThreadLocal objects
228      * @since 2.5
229      * @see <a href="https://issues.apache.org/jira/browse/LOG4J2-1172">LOG4J2-1172</a>
230      */
231     public void setUseThreadLocals(final boolean allow) {
232         useThreadLocalTranslator = allow;
233         LOGGER.trace("[{}] AsyncLoggers have been modified to use a {} translator", contextName,
234                 useThreadLocalTranslator ? "threadlocal" : "vararg");
235     }
236 }