View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.core.async;
18  
19  import java.util.concurrent.ExecutorService;
20  import java.util.concurrent.Executors;
21  
22  import org.apache.logging.log4j.Logger;
23  import org.apache.logging.log4j.core.LogEvent;
24  import org.apache.logging.log4j.status.StatusLogger;
25  
26  import com.lmax.disruptor.BlockingWaitStrategy;
27  import com.lmax.disruptor.EventFactory;
28  import com.lmax.disruptor.EventHandler;
29  import com.lmax.disruptor.EventTranslator;
30  import com.lmax.disruptor.ExceptionHandler;
31  import com.lmax.disruptor.RingBuffer;
32  import com.lmax.disruptor.Sequence;
33  import com.lmax.disruptor.SequenceReportingEventHandler;
34  import com.lmax.disruptor.SleepingWaitStrategy;
35  import com.lmax.disruptor.WaitStrategy;
36  import com.lmax.disruptor.YieldingWaitStrategy;
37  import com.lmax.disruptor.dsl.Disruptor;
38  import com.lmax.disruptor.dsl.ProducerType;
39  import com.lmax.disruptor.util.Util;
40  
41  /**
42   * Helper class decoupling the {@code AsyncLoggerConfig} class from the LMAX
43   * Disruptor library.
44   * <p>
45   * {@code AsyncLoggerConfig} is a plugin, and will be loaded even if users do
46   * not configure any {@code <asyncLogger>} or {@code <asyncRoot>} elements in
47   * the configuration. If {@code AsyncLoggerConfig} has inner classes that extend
48   * or implement classes from the Disruptor library, a
49   * {@code NoClassDefFoundError} is thrown if the Disruptor jar is not in the
50   * classpath when the PluginManager loads the {@code AsyncLoggerConfig} plugin
51   * from the pre-defined plugins definition file.
52   * <p>
53   * This class serves to make the dependency on the Disruptor optional, so that
54   * these classes are only loaded when the {@code AsyncLoggerConfig} is actually
55   * used.
56   */
57  class AsyncLoggerConfigHelper {
58  
59      private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 20;
60      private static final int HALF_A_SECOND = 500;
61      private static final int RINGBUFFER_MIN_SIZE = 128;
62      private static final int RINGBUFFER_DEFAULT_SIZE = 256 * 1024;
63      private static final Logger LOGGER = StatusLogger.getLogger();
64  
65      private static volatile Disruptor<Log4jEventWrapper> disruptor;
66      private static ExecutorService executor = Executors.newSingleThreadExecutor();
67  
68      private static volatile int count = 0;
69  
70      /**
71       * Factory used to populate the RingBuffer with events. These event objects
72       * are then re-used during the life of the RingBuffer.
73       */
74      private static final EventFactory<Log4jEventWrapper> FACTORY = new EventFactory<Log4jEventWrapper>() {
75          @Override
76          public Log4jEventWrapper newInstance() {
77              return new Log4jEventWrapper();
78          }
79      };
80  
81      /**
82       * Object responsible for passing on data to a specific RingBuffer event.
83       */
84      private final EventTranslator<Log4jEventWrapper> translator = new EventTranslator<Log4jEventWrapper>() {
85          @Override
86          public void translateTo(Log4jEventWrapper event, long sequence) {
87              event.event = currentLogEvent.get();
88              event.loggerConfig = asyncLoggerConfig;
89          }
90      };
91  
92      private ThreadLocal<LogEvent> currentLogEvent = new ThreadLocal<LogEvent>();
93      private AsyncLoggerConfig asyncLoggerConfig;
94  
95      public AsyncLoggerConfigHelper(AsyncLoggerConfig asyncLoggerConfig) {
96          this.asyncLoggerConfig = asyncLoggerConfig;
97          initDisruptor();
98      }
99  
100     private static synchronized void initDisruptor() {
101         ++count;
102         if (disruptor != null) {
103             return;
104         }
105         int ringBufferSize = calculateRingBufferSize();
106         WaitStrategy waitStrategy = createWaitStrategy();
107         disruptor = new Disruptor<Log4jEventWrapper>(FACTORY,
108                 ringBufferSize, executor, ProducerType.MULTI, waitStrategy);
109         EventHandler<Log4jEventWrapper>[] handlers = new Log4jEventWrapperHandler[] {//
110                 new Log4jEventWrapperHandler() };
111         disruptor.handleExceptionsWith(getExceptionHandler());
112         disruptor.handleEventsWith(handlers);
113 
114         LOGGER.debug(
115                 "Starting AsyncLoggerConfig disruptor with ringbuffer size {}...",
116                 disruptor.getRingBuffer().getBufferSize());
117         disruptor.start();
118     }
119 
120     private static WaitStrategy createWaitStrategy() {
121         String strategy = System.getProperty("AsyncLoggerConfig.WaitStrategy");
122         LOGGER.debug("property AsyncLoggerConfig.WaitStrategy={}", strategy);
123         if ("Sleep".equals(strategy)) {
124             LOGGER.debug("disruptor event handler uses SleepingWaitStrategy");
125             return new SleepingWaitStrategy();
126         } else if ("Yield".equals(strategy)) {
127             LOGGER.debug("disruptor event handler uses YieldingWaitStrategy");
128             return new YieldingWaitStrategy();
129         } else if ("Block".equals(strategy)) {
130             LOGGER.debug("disruptor event handler uses BlockingWaitStrategy");
131             return new BlockingWaitStrategy();
132         }
133         LOGGER.debug("disruptor event handler uses SleepingWaitStrategy");
134         return new SleepingWaitStrategy();
135     }
136 
137     private static int calculateRingBufferSize() {
138         int ringBufferSize = RINGBUFFER_DEFAULT_SIZE;
139         String userPreferredRBSize = System.getProperty(
140                 "AsyncLoggerConfig.RingBufferSize",
141                 String.valueOf(ringBufferSize));
142         try {
143             int size = Integer.parseInt(userPreferredRBSize);
144             if (size < RINGBUFFER_MIN_SIZE) {
145                 size = RINGBUFFER_MIN_SIZE;
146                 LOGGER.warn(
147                         "Invalid RingBufferSize {}, using minimum size {}.",
148                         userPreferredRBSize, RINGBUFFER_MIN_SIZE);
149             }
150             ringBufferSize = size;
151         } catch (Exception ex) {
152             LOGGER.warn("Invalid RingBufferSize {}, using default size {}.",
153                     userPreferredRBSize, ringBufferSize);
154         }
155         return Util.ceilingNextPowerOfTwo(ringBufferSize);
156     }
157 
158     private static ExceptionHandler getExceptionHandler() {
159         String cls = System.getProperty("AsyncLoggerConfig.ExceptionHandler");
160         if (cls == null) {
161             LOGGER.debug("No AsyncLoggerConfig.ExceptionHandler specified");
162             return null;
163         }
164         try {
165             @SuppressWarnings("unchecked")
166             Class<? extends ExceptionHandler> klass = (Class<? extends ExceptionHandler>) Class
167                     .forName(cls);
168             ExceptionHandler result = klass.newInstance();
169             LOGGER.debug("AsyncLoggerConfig.ExceptionHandler=" + result);
170             return result;
171         } catch (Exception ignored) {
172             LOGGER.debug(
173                     "AsyncLoggerConfig.ExceptionHandler not set: error creating "
174                             + cls + ": ", ignored);
175             return null;
176         }
177     }
178 
179     /**
180      * RingBuffer events contain all information necessary to perform the work
181      * in a separate thread.
182      */
183     private static class Log4jEventWrapper {
184         private AsyncLoggerConfig loggerConfig;
185         private LogEvent event;
186     }
187 
188     /**
189      * EventHandler performs the work in a separate thread.
190      */
191     private static class Log4jEventWrapperHandler implements
192             SequenceReportingEventHandler<Log4jEventWrapper> {
193         private static final int NOTIFY_PROGRESS_THRESHOLD = 50;
194         private Sequence sequenceCallback;
195         private int counter;
196 
197         @Override
198         public void setSequenceCallback(Sequence sequenceCallback) {
199             this.sequenceCallback = sequenceCallback;
200         }
201 
202         @Override
203         public void onEvent(Log4jEventWrapper event, long sequence,
204                 boolean endOfBatch) throws Exception {
205             event.event.setEndOfBatch(endOfBatch);
206             event.loggerConfig.asyncCallAppenders(event.event);
207 
208             // notify the BatchEventProcessor that the sequence has progressed.
209             // Without this callback the sequence would not be progressed
210             // until the batch has completely finished.
211             if (++counter > NOTIFY_PROGRESS_THRESHOLD) {
212                 sequenceCallback.set(sequence);
213                 counter = 0;
214             }
215         }
216     }
217 
218     public synchronized void shutdown() {
219         if (--count > 0) {
220             return;
221         }
222         Disruptor<Log4jEventWrapper> temp = disruptor;
223         if (temp == null) {
224             return; // disruptor was already shut down by another thread
225         }
226 
227         // Must guarantee that publishing to the RingBuffer has stopped
228         // before we call disruptor.shutdown()
229         disruptor = null; // client code fails with NPE if log after stop = OK
230         temp.shutdown();
231 
232         // wait up to 10 seconds for the ringbuffer to drain
233         RingBuffer<Log4jEventWrapper> ringBuffer = temp.getRingBuffer();
234         for (int i = 0; i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
235             if (ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize())) {
236                 break;
237             }
238             try {
239                 // give ringbuffer some time to drain...
240                 Thread.sleep(HALF_A_SECOND);
241             } catch (InterruptedException e) {
242                 // ignored
243             }
244         }
245         executor.shutdown(); // finally, kill the processor thread
246     }
247 
248     public void callAppendersFromAnotherThread(LogEvent event) {
249         currentLogEvent.set(event);
250         disruptor.publishEvent(translator);
251     }
252 
253 }