View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  
18  package org.apache.logging.log4j.core.async;
19  
20  import java.util.concurrent.ExecutorService;
21  import java.util.concurrent.Executors;
22  
23  import org.apache.logging.log4j.core.jmx.RingBufferAdmin;
24  import org.apache.logging.log4j.status.StatusLogger;
25  
26  import com.lmax.disruptor.ExceptionHandler;
27  import com.lmax.disruptor.RingBuffer;
28  import com.lmax.disruptor.WaitStrategy;
29  import com.lmax.disruptor.dsl.Disruptor;
30  import com.lmax.disruptor.dsl.ProducerType;
31  
32  /**
33   * Helper class for async loggers: AsyncLoggerDisruptor handles the mechanics of working with the LMAX Disruptor, and
34   * works with its associated AsyncLoggerContext to synchronize the life cycle of the Disruptor and its thread with the
35   * life cycle of the context. The AsyncLoggerDisruptor of the context is shared by all AsyncLogger objects created by
36   * that AsyncLoggerContext.
37   */
38  class AsyncLoggerDisruptor {
39      private static final int SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS = 50;
40      private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 200;
41      private static final StatusLogger LOGGER = StatusLogger.getLogger();
42  
43      private volatile Disruptor<RingBufferLogEvent> disruptor;
44      private ExecutorService executor;
45      private String contextName;
46  
47      private boolean useThreadLocalTranslator;
48      private long backgroundThreadId;
49  
50      AsyncLoggerDisruptor(String contextName) {
51          this.contextName = contextName;
52      }
53  
54      public String getContextName() {
55          return contextName;
56      }
57  
58      public void setContextName(String name) {
59          contextName = name;
60      }
61  
62      Disruptor<RingBufferLogEvent> getDisruptor() {
63          return disruptor;
64      }
65  
66      /**
67       * Creates and starts a new Disruptor and associated thread if none currently exists.
68       * 
69       * @see #stop()
70       */
71      synchronized void start() {
72          if (disruptor != null) {
73              LOGGER.trace(
74                      "[{}] AsyncLoggerDisruptor not starting new disruptor for this context, using existing object.",
75                      contextName);
76              return;
77          }
78          LOGGER.trace("[{}] AsyncLoggerDisruptor creating new disruptor for this context.", contextName);
79          final int ringBufferSize = DisruptorUtil.calculateRingBufferSize("AsyncLogger.RingBufferSize");
80          final WaitStrategy waitStrategy = DisruptorUtil.createWaitStrategy("AsyncLogger.WaitStrategy");
81          executor = Executors.newSingleThreadExecutor(new DaemonThreadFactory("AsyncLogger[" + contextName + "]"));
82          backgroundThreadId = DisruptorUtil.getExecutorThreadId(executor);
83  
84          disruptor = new Disruptor<>(RingBufferLogEvent.FACTORY, ringBufferSize, executor, ProducerType.MULTI,
85                  waitStrategy);
86  
87          final ExceptionHandler<RingBufferLogEvent> errorHandler = DisruptorUtil.getExceptionHandler(
88                  "AsyncLogger.ExceptionHandler", RingBufferLogEvent.class);
89          disruptor.handleExceptionsWith(errorHandler);
90  
91          final RingBufferLogEventHandler[] handlers = {new RingBufferLogEventHandler()};
92          disruptor.handleEventsWith(handlers);
93  
94          LOGGER.debug("[{}] Starting AsyncLogger disruptor for this context with ringbufferSize={}, waitStrategy={}, "
95                  + "exceptionHandler={}...", contextName, disruptor.getRingBuffer().getBufferSize(), waitStrategy
96                  .getClass().getSimpleName(), errorHandler);
97          disruptor.start();
98  
99          LOGGER.trace("[{}] AsyncLoggers use a {} translator", contextName, useThreadLocalTranslator ? "threadlocal"
100                 : "vararg");
101     }
102 
103     /**
104      * Decreases the reference count. If the reference count reached zero, the Disruptor and its associated thread are
105      * shut down and their references set to {@code null}.
106      */
107     synchronized void stop() {
108         final Disruptor<RingBufferLogEvent> temp = getDisruptor();
109         if (temp == null) {
110             LOGGER.trace("[{}] AsyncLoggerDisruptor: disruptor for this context already shut down.", contextName);
111             return; // disruptor was already shut down by another thread
112         }
113         LOGGER.debug("[{}] AsyncLoggerDisruptor: shutting down disruptor for this context.", contextName);
114 
115         // We must guarantee that publishing to the RingBuffer has stopped before we call disruptor.shutdown().
116         disruptor = null; // client code fails with NPE if log after stop. This is by design.
117 
118         // Calling Disruptor.shutdown() will wait until all enqueued events are fully processed,
119         // but this waiting happens in a busy-spin. To avoid (postpone) wasting CPU,
120         // we sleep in short chunks, up to 10 seconds, waiting for the ringbuffer to drain.
121         for (int i = 0; hasBacklog(temp) && i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
122             try {
123                 Thread.sleep(SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS); // give up the CPU for a while
124             } catch (final InterruptedException e) { // ignored
125             }
126         }
127         temp.shutdown(); // busy-spins until all events currently in the disruptor have been processed
128 
129         LOGGER.trace("[{}] AsyncLoggerDisruptor: shutting down disruptor executor.", contextName);
130         executor.shutdown(); // finally, kill the processor thread
131         executor = null;
132     }
133 
134     /**
135      * Returns {@code true} if the specified disruptor still has unprocessed events.
136      */
137     private static boolean hasBacklog(final Disruptor<?> theDisruptor) {
138         final RingBuffer<?> ringBuffer = theDisruptor.getRingBuffer();
139         return !ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize());
140     }
141 
142     /**
143      * Creates and returns a new {@code RingBufferAdmin} that instruments the ringbuffer of the {@code AsyncLogger}.
144      *
145      * @param jmxContextName name of the {@code AsyncLoggerContext}
146      * @return a new {@code RingBufferAdmin} that instruments the ringbuffer
147      */
148     public RingBufferAdmin createRingBufferAdmin(final String jmxContextName) {
149         final RingBuffer<RingBufferLogEvent> ring = disruptor == null ? null : disruptor.getRingBuffer();
150         return RingBufferAdmin.forAsyncLogger(ring, jmxContextName);
151     }
152 
153     /**
154      * Returns {@code true} if the current log event should be logged in the current thread, {@code false} if it should
155      * be logged in a background thread. (LOG4J2-471)
156      * 
157      * @return whether the current log event should be logged in the current thread
158      */
159     boolean shouldLogInCurrentThread() {
160         return currentThreadIsAppenderThread() && isRingBufferFull();
161     }
162 
163     /**
164      * Returns {@code true} if the current thread is the Disruptor background thread, {@code false} otherwise.
165      * 
166      * @return whether this thread is the Disruptor background thread.
167      */
168     private boolean currentThreadIsAppenderThread() {
169         return Thread.currentThread().getId() == backgroundThreadId;
170     }
171 
172     /**
173      * Returns {@code true} if the Disruptor is {@code null} because it has been stopped, or if its Ringbuffer is full.
174      * 
175      * @return {@code true} if the disruptor is currently not usable
176      */
177     private boolean isRingBufferFull() {
178         final Disruptor<RingBufferLogEvent> theDisruptor = this.disruptor;
179         return theDisruptor == null || theDisruptor.getRingBuffer().remainingCapacity() == 0;
180     }
181 
182     void enqueueLogMessageInfo(final RingBufferLogEventTranslator translator) {
183         // LOG4J2-639: catch NPE if disruptor field was set to null in stop()
184         try {
185             // Note: we deliberately access the volatile disruptor field afresh here.
186             // Avoiding this and using an older reference could result in adding a log event to the disruptor after it
187             // was shut down, which could cause the publishEvent method to hang and never return.
188             disruptor.publishEvent(translator);
189         } catch (final NullPointerException npe) {
190             LOGGER.fatal("[{}] Ignoring log event after log4j was shut down.", contextName);
191         }
192     }
193 
194     /**
195      * Returns whether it is allowed to store non-JDK classes in ThreadLocal objects for efficiency.
196      * 
197      * @return whether AsyncLoggers are allowed to use ThreadLocal objects
198      * @since 2.5
199      * @see <a href="https://issues.apache.org/jira/browse/LOG4J2-1172">LOG4J2-1172</a>
200      */
201     public boolean isUseThreadLocals() {
202         return useThreadLocalTranslator;
203     }
204 
205     /**
206      * Signals this AsyncLoggerDisruptor whether it is allowed to store non-JDK classes in ThreadLocal objects for
207      * efficiency.
208      * 
209      * @param allow whether AsyncLoggers are allowed to use ThreadLocal objects
210      * @since 2.5
211      * @see <a href="https://issues.apache.org/jira/browse/LOG4J2-1172">LOG4J2-1172</a>
212      */
213     public void setUseThreadLocals(final boolean allow) {
214         useThreadLocalTranslator = allow;
215     }
216 }