View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.core.async;
18  
19  import java.util.concurrent.ExecutorService;
20  import java.util.concurrent.Executors;
21  
22  import org.apache.logging.log4j.Logger;
23  import org.apache.logging.log4j.core.LogEvent;
24  import org.apache.logging.log4j.status.StatusLogger;
25  
26  import com.lmax.disruptor.BlockingWaitStrategy;
27  import com.lmax.disruptor.EventFactory;
28  import com.lmax.disruptor.EventHandler;
29  import com.lmax.disruptor.EventTranslator;
30  import com.lmax.disruptor.ExceptionHandler;
31  import com.lmax.disruptor.RingBuffer;
32  import com.lmax.disruptor.Sequence;
33  import com.lmax.disruptor.SequenceReportingEventHandler;
34  import com.lmax.disruptor.SleepingWaitStrategy;
35  import com.lmax.disruptor.WaitStrategy;
36  import com.lmax.disruptor.YieldingWaitStrategy;
37  import com.lmax.disruptor.dsl.Disruptor;
38  import com.lmax.disruptor.dsl.ProducerType;
39  import com.lmax.disruptor.util.Util;
40  
41  /**
42   * Helper class decoupling the {@code AsyncLoggerConfig} class from the LMAX
43   * Disruptor library.
44   * <p>
45   * {@code AsyncLoggerConfig} is a plugin, and will be loaded even if users do
46   * not configure any {@code <asyncLogger>} or {@code <asyncRoot>} elements in
47   * the configuration. If {@code AsyncLoggerConfig} has inner classes that extend
48   * or implement classes from the Disruptor library, a
49   * {@code NoClassDefFoundError} is thrown if the Disruptor jar is not in the
50   * classpath when the PluginManager loads the {@code AsyncLoggerConfig} plugin
51   * from the pre-defined plugins definition file.
52   * <p>
53   * This class serves to make the dependency on the Disruptor optional, so that
54   * these classes are only loaded when the {@code AsyncLoggerConfig} is actually
55   * used.
56   */
57  class AsyncLoggerConfigHelper {
58  
59      private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 20;
60      private static final int HALF_A_SECOND = 500;
61      private static final int RINGBUFFER_MIN_SIZE = 128;
62      private static final int RINGBUFFER_DEFAULT_SIZE = 256 * 1024;
63      private static final Logger LOGGER = StatusLogger.getLogger();
64  
65      private static volatile Disruptor<RingBufferLog4jEvent> disruptor;
66      private static ExecutorService executor = Executors
67              .newSingleThreadExecutor();
68  
69      /**
70       * Factory used to populate the RingBuffer with events. These event objects
71       * are then re-used during the life of the RingBuffer.
72       */
73      private static final EventFactory<RingBufferLog4jEvent> FACTORY = new EventFactory<RingBufferLog4jEvent>() {
74          @Override
75          public RingBufferLog4jEvent newInstance() {
76              return new RingBufferLog4jEvent();
77          }
78      };
79  
80      /**
81       * Object responsible for passing on data to a specific RingBuffer event.
82       */
83      private final EventTranslator<RingBufferLog4jEvent> translator = new EventTranslator<RingBufferLog4jEvent>() {
84          @Override
85          public void translateTo(RingBufferLog4jEvent event, long sequence) {
86              event.event = currentLogEvent.get();
87              event.loggerConfig = asyncLoggerConfig;
88          }
89      };
90  
91      private ThreadLocal<LogEvent> currentLogEvent = new ThreadLocal<LogEvent>();
92      private AsyncLoggerConfig asyncLoggerConfig;
93  
94      public AsyncLoggerConfigHelper(AsyncLoggerConfig asyncLoggerConfig) {
95          this.asyncLoggerConfig = asyncLoggerConfig;
96          initDisruptor();
97      }
98  
99      private static synchronized void initDisruptor() {
100         if (disruptor != null) {
101             return;
102         }
103         int ringBufferSize = calculateRingBufferSize();
104         WaitStrategy waitStrategy = createWaitStrategy();
105         disruptor = new Disruptor<RingBufferLog4jEvent>(FACTORY,
106                 ringBufferSize, executor, ProducerType.MULTI, waitStrategy);
107         EventHandler<RingBufferLog4jEvent>[] handlers = new RingBufferLog4jEventHandler[] {//
108                 new RingBufferLog4jEventHandler() };
109         disruptor.handleExceptionsWith(getExceptionHandler());
110         disruptor.handleEventsWith(handlers);
111 
112         LOGGER.debug(
113                 "Starting AsyncLoggerConfig disruptor with ringbuffer size {}...",
114                 disruptor.getRingBuffer().getBufferSize());
115         disruptor.start();
116     }
117 
118     private static WaitStrategy createWaitStrategy() {
119         String strategy = System.getProperty("AsyncLoggerConfig.WaitStrategy");
120         LOGGER.debug("property AsyncLoggerConfig.WaitStrategy={}", strategy);
121         if ("Sleep".equals(strategy)) {
122             LOGGER.debug("disruptor event handler uses SleepingWaitStrategy");
123             return new SleepingWaitStrategy();
124         } else if ("Yield".equals(strategy)) {
125             LOGGER.debug("disruptor event handler uses YieldingWaitStrategy");
126             return new YieldingWaitStrategy();
127         } else if ("Block".equals(strategy)) {
128             LOGGER.debug("disruptor event handler uses BlockingWaitStrategy");
129             return new BlockingWaitStrategy();
130         }
131         LOGGER.debug("disruptor event handler uses SleepingWaitStrategy");
132         return new SleepingWaitStrategy();
133     }
134 
135     private static int calculateRingBufferSize() {
136         int ringBufferSize = RINGBUFFER_DEFAULT_SIZE;
137         String userPreferredRBSize = System.getProperty(
138                 "AsyncLoggerConfig.RingBufferSize",
139                 String.valueOf(ringBufferSize));
140         try {
141             int size = Integer.parseInt(userPreferredRBSize);
142             if (size < RINGBUFFER_MIN_SIZE) {
143                 size = RINGBUFFER_MIN_SIZE;
144                 LOGGER.warn(
145                         "Invalid RingBufferSize {}, using minimum size {}.",
146                         userPreferredRBSize, RINGBUFFER_MIN_SIZE);
147             }
148             ringBufferSize = size;
149         } catch (Exception ex) {
150             LOGGER.warn("Invalid RingBufferSize {}, using default size {}.",
151                     userPreferredRBSize, ringBufferSize);
152         }
153         return Util.ceilingNextPowerOfTwo(ringBufferSize);
154     }
155 
156     private static ExceptionHandler getExceptionHandler() {
157         String cls = System.getProperty("AsyncLoggerConfig.ExceptionHandler");
158         if (cls == null) {
159             LOGGER.debug("No AsyncLoggerConfig.ExceptionHandler specified");
160             return null;
161         }
162         try {
163             @SuppressWarnings("unchecked")
164             Class<? extends ExceptionHandler> klass = (Class<? extends ExceptionHandler>) Class
165                     .forName(cls);
166             ExceptionHandler result = klass.newInstance();
167             LOGGER.debug("AsyncLoggerConfig.ExceptionHandler=" + result);
168             return result;
169         } catch (Exception ignored) {
170             LOGGER.debug(
171                     "AsyncLoggerConfig.ExceptionHandler not set: error creating "
172                             + cls + ": ", ignored);
173             return null;
174         }
175     }
176 
177     /**
178      * RingBuffer events contain all information necessary to perform the work
179      * in a separate thread.
180      */
181     private static class RingBufferLog4jEvent {
182         private AsyncLoggerConfig loggerConfig;
183         private LogEvent event;
184     }
185 
186     /**
187      * EventHandler performs the work in a separate thread.
188      */
189     private static class RingBufferLog4jEventHandler implements
190             SequenceReportingEventHandler<RingBufferLog4jEvent> {
191         private static final int NOTIFY_PROGRESS_THRESHOLD = 50;
192         private Sequence sequenceCallback;
193         private int counter;
194 
195         @Override
196         public void setSequenceCallback(Sequence sequenceCallback) {
197             this.sequenceCallback = sequenceCallback;
198         }
199 
200         @Override
201         public void onEvent(RingBufferLog4jEvent event, long sequence,
202                 boolean endOfBatch) throws Exception {
203             event.event.setEndOfBatch(endOfBatch);
204             event.loggerConfig.asyncCallAppenders(event.event);
205 
206             // notify the BatchEventProcessor that the sequence has progressed.
207             // Without this callback the sequence would not be progressed
208             // until the batch has completely finished.
209             if (++counter > NOTIFY_PROGRESS_THRESHOLD) {
210                 sequenceCallback.set(sequence);
211                 counter = 0;
212             }
213         }
214     }
215 
216     public void shutdown() {
217         Disruptor<RingBufferLog4jEvent> temp = disruptor;
218 
219         // Must guarantee that publishing to the RingBuffer has stopped
220         // before we call disruptor.shutdown()
221         disruptor = null; // client code fails with NPE if log after stop = OK
222         temp.shutdown();
223 
224         // wait up to 10 seconds for the ringbuffer to drain
225         RingBuffer<RingBufferLog4jEvent> ringBuffer = temp.getRingBuffer();
226         for (int i = 0; i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
227             if (ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize())) {
228                 break;
229             }
230             try {
231                 // give ringbuffer some time to drain...
232                 Thread.sleep(HALF_A_SECOND);
233             } catch (InterruptedException e) {
234                 // ignored
235             }
236         }
237         executor.shutdown(); // finally, kill the processor thread
238     }
239 
240     public void callAppendersFromAnotherThread(LogEvent event) {
241         currentLogEvent.set(event);
242         disruptor.publishEvent(translator);
243     }
244 
245 }