001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017package org.apache.logging.log4j.core.async; 018 019import java.util.concurrent.ExecutorService; 020import java.util.concurrent.Executors; 021import java.util.concurrent.ThreadFactory; 022 023import org.apache.logging.log4j.Level; 024import org.apache.logging.log4j.Logger; 025import org.apache.logging.log4j.core.LogEvent; 026import org.apache.logging.log4j.core.impl.Log4jLogEvent; 027import org.apache.logging.log4j.core.impl.LogEventFactory; 028import org.apache.logging.log4j.core.impl.MutableLogEvent; 029import org.apache.logging.log4j.core.impl.ReusableLogEventFactory; 030import org.apache.logging.log4j.core.jmx.RingBufferAdmin; 031import org.apache.logging.log4j.message.ReusableMessage; 032import org.apache.logging.log4j.status.StatusLogger; 033 034import com.lmax.disruptor.EventFactory; 035import com.lmax.disruptor.EventTranslatorTwoArg; 036import com.lmax.disruptor.ExceptionHandler; 037import com.lmax.disruptor.RingBuffer; 038import com.lmax.disruptor.Sequence; 039import com.lmax.disruptor.SequenceReportingEventHandler; 040import com.lmax.disruptor.WaitStrategy; 041import com.lmax.disruptor.dsl.Disruptor; 042import com.lmax.disruptor.dsl.ProducerType; 043 044/** 045 * Helper class decoupling the {@code AsyncLoggerConfig} class from the LMAX Disruptor library. 046 * <p> 047 * {@code AsyncLoggerConfig} is a plugin, and will be loaded even if users do not configure any {@code <asyncLogger>} or 048 * {@code <asyncRoot>} elements in the configuration. If {@code AsyncLoggerConfig} has inner classes that extend or 049 * implement classes from the Disruptor library, a {@code NoClassDefFoundError} is thrown if the Disruptor jar is not in 050 * the classpath when the PluginManager loads the {@code AsyncLoggerConfig} plugin from the pre-defined plugins 051 * definition file. 052 * <p> 053 * This class serves to make the dependency on the Disruptor optional, so that these classes are only loaded when the 054 * {@code AsyncLoggerConfig} is actually used. 055 */ 056public class AsyncLoggerConfigDisruptor implements AsyncLoggerConfigDelegate { 057 058 private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 200; 059 private static final int SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS = 50; 060 private static final Logger LOGGER = StatusLogger.getLogger(); 061 062 /** 063 * RingBuffer events contain all information necessary to perform the work in a separate thread. 064 */ 065 public static class Log4jEventWrapper { 066 public Log4jEventWrapper() { 067 } 068 069 public Log4jEventWrapper(MutableLogEvent mutableLogEvent) { 070 event = mutableLogEvent; 071 } 072 073 private AsyncLoggerConfig loggerConfig; 074 private LogEvent event; 075 076 /** 077 * Release references held by ring buffer to allow objects to be garbage-collected. 078 */ 079 public void clear() { 080 loggerConfig = null; 081 if (event instanceof MutableLogEvent) { 082 ((MutableLogEvent) event).clear(); 083 } else { 084 event = null; 085 } 086 } 087 088 @Override 089 public String toString() { 090 return String.valueOf(event); 091 } 092 } 093 094 /** 095 * EventHandler performs the work in a separate thread. 096 */ 097 private static class Log4jEventWrapperHandler implements SequenceReportingEventHandler<Log4jEventWrapper> { 098 private static final int NOTIFY_PROGRESS_THRESHOLD = 50; 099 private Sequence sequenceCallback; 100 private int counter; 101 102 @Override 103 public void setSequenceCallback(final Sequence sequenceCallback) { 104 this.sequenceCallback = sequenceCallback; 105 } 106 107 @Override 108 public void onEvent(final Log4jEventWrapper event, final long sequence, final boolean endOfBatch) 109 throws Exception { 110 event.event.setEndOfBatch(endOfBatch); 111 event.loggerConfig.asyncCallAppenders(event.event); 112 event.clear(); 113 114 notifyIntermediateProgress(sequence); 115 } 116 117 /** 118 * Notify the BatchEventProcessor that the sequence has progressed. Without this callback the sequence would not 119 * be progressed until the batch has completely finished. 120 */ 121 private void notifyIntermediateProgress(final long sequence) { 122 if (++counter > NOTIFY_PROGRESS_THRESHOLD) { 123 sequenceCallback.set(sequence); 124 counter = 0; 125 } 126 } 127 } 128 129 /** 130 * Factory used to populate the RingBuffer with events. These event objects are then re-used during the life of the 131 * RingBuffer. 132 */ 133 private static final EventFactory<Log4jEventWrapper> FACTORY = new EventFactory<Log4jEventWrapper>() { 134 @Override 135 public Log4jEventWrapper newInstance() { 136 return new Log4jEventWrapper(); 137 } 138 }; 139 140 /** 141 * Factory used to populate the RingBuffer with events. These event objects are then re-used during the life of the 142 * RingBuffer. 143 */ 144 private static final EventFactory<Log4jEventWrapper> MUTABLE_FACTORY = new EventFactory<Log4jEventWrapper>() { 145 @Override 146 public Log4jEventWrapper newInstance() { 147 return new Log4jEventWrapper(new MutableLogEvent()); 148 } 149 }; 150 151 /** 152 * Object responsible for passing on data to a specific RingBuffer event. 153 */ 154 private static final EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig> TRANSLATOR = 155 new EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig>() { 156 157 @Override 158 public void translateTo(final Log4jEventWrapper ringBufferElement, final long sequence, 159 final LogEvent logEvent, final AsyncLoggerConfig loggerConfig) { 160 ringBufferElement.event = logEvent; 161 ringBufferElement.loggerConfig = loggerConfig; 162 } 163 }; 164 165 /** 166 * Object responsible for passing on data to a RingBuffer event with a MutableLogEvent. 167 */ 168 private static final EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig> MUTABLE_TRANSLATOR = 169 new EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig>() { 170 171 @Override 172 public void translateTo(final Log4jEventWrapper ringBufferElement, final long sequence, 173 final LogEvent logEvent, final AsyncLoggerConfig loggerConfig) { 174 ((MutableLogEvent) ringBufferElement.event).initFrom(logEvent); 175 ringBufferElement.loggerConfig = loggerConfig; 176 } 177 }; 178 179 private static final ThreadFactory THREAD_FACTORY = new DaemonThreadFactory("AsyncLoggerConfig-"); 180 181 private int ringBufferSize; 182 private AsyncQueueFullPolicy asyncQueueFullPolicy; 183 private Boolean mutable = Boolean.FALSE; 184 185 private volatile Disruptor<Log4jEventWrapper> disruptor; 186 private ExecutorService executor; 187 private long backgroundThreadId; // LOG4J2-471 188 private EventFactory<Log4jEventWrapper> factory; 189 private EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig> translator; 190 191 public AsyncLoggerConfigDisruptor() { 192 } 193 194 // called from AsyncLoggerConfig constructor 195 @Override 196 public void setLogEventFactory(final LogEventFactory logEventFactory) { 197 // if any AsyncLoggerConfig uses a ReusableLogEventFactory 198 // then we need to populate our ringbuffer with MutableLogEvents 199 this.mutable = mutable || (logEventFactory instanceof ReusableLogEventFactory); 200 } 201 202 /** 203 * Increases the reference count and creates and starts a new Disruptor and associated thread if none currently 204 * exists. 205 * 206 * @see #stop() 207 */ 208 public synchronized void start() { 209 if (disruptor != null) { 210 LOGGER.trace("AsyncLoggerConfigDisruptor not starting new disruptor for this configuration, " 211 + "using existing object."); 212 return; 213 } 214 LOGGER.trace("AsyncLoggerConfigDisruptor creating new disruptor for this configuration."); 215 ringBufferSize = DisruptorUtil.calculateRingBufferSize("AsyncLoggerConfig.RingBufferSize"); 216 final WaitStrategy waitStrategy = DisruptorUtil.createWaitStrategy("AsyncLoggerConfig.WaitStrategy"); 217 executor = Executors.newSingleThreadExecutor(THREAD_FACTORY); 218 backgroundThreadId = DisruptorUtil.getExecutorThreadId(executor); 219 asyncQueueFullPolicy = AsyncQueueFullPolicyFactory.create(); 220 221 translator = mutable ? MUTABLE_TRANSLATOR : TRANSLATOR; 222 factory = mutable ? MUTABLE_FACTORY : FACTORY; 223 disruptor = new Disruptor<>(factory, ringBufferSize, executor, ProducerType.MULTI, waitStrategy); 224 225 final ExceptionHandler<Log4jEventWrapper> errorHandler = DisruptorUtil.getAsyncLoggerConfigExceptionHandler(); 226 disruptor.handleExceptionsWith(errorHandler); 227 228 final Log4jEventWrapperHandler[] handlers = {new Log4jEventWrapperHandler()}; 229 disruptor.handleEventsWith(handlers); 230 231 LOGGER.debug("Starting AsyncLoggerConfig disruptor for this configuration with ringbufferSize={}, " 232 + "waitStrategy={}, exceptionHandler={}...", disruptor.getRingBuffer().getBufferSize(), waitStrategy 233 .getClass().getSimpleName(), errorHandler); 234 disruptor.start(); 235 } 236 237 /** 238 * Decreases the reference count. If the reference count reached zero, the Disruptor and its associated thread are 239 * shut down and their references set to {@code null}. 240 */ 241 public synchronized void stop() { 242 final Disruptor<Log4jEventWrapper> temp = disruptor; 243 if (temp == null) { 244 LOGGER.trace("AsyncLoggerConfigDisruptor: disruptor for this configuration already shut down."); 245 return; // disruptor was already shut down by another thread 246 } 247 LOGGER.trace("AsyncLoggerConfigDisruptor: shutting down disruptor for this configuration."); 248 249 // We must guarantee that publishing to the RingBuffer has stopped before we call disruptor.shutdown(). 250 disruptor = null; // client code fails with NPE if log after stop = OK 251 252 // Calling Disruptor.shutdown() will wait until all enqueued events are fully processed, 253 // but this waiting happens in a busy-spin. To avoid (postpone) wasting CPU, 254 // we sleep in short chunks, up to 10 seconds, waiting for the ringbuffer to drain. 255 for (int i = 0; hasBacklog(temp) && i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) { 256 try { 257 Thread.sleep(SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS); // give up the CPU for a while 258 } catch (final InterruptedException e) { // ignored 259 } 260 } 261 temp.shutdown(); // busy-spins until all events currently in the disruptor have been processed 262 263 LOGGER.trace("AsyncLoggerConfigDisruptor: shutting down disruptor executor for this configuration."); 264 executor.shutdown(); // finally, kill the processor thread 265 executor = null; // release reference to allow GC 266 267 if (DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy) > 0) { 268 LOGGER.trace("AsyncLoggerConfigDisruptor: {} discarded {} events.", asyncQueueFullPolicy, 269 DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy)); 270 } 271 } 272 273 /** 274 * Returns {@code true} if the specified disruptor still has unprocessed events. 275 */ 276 private static boolean hasBacklog(final Disruptor<?> theDisruptor) { 277 final RingBuffer<?> ringBuffer = theDisruptor.getRingBuffer(); 278 return !ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize()); 279 } 280 281 @Override 282 public EventRoute getEventRoute(final Level logLevel) { 283 final int remainingCapacity = remainingDisruptorCapacity(); 284 if (remainingCapacity < 0) { 285 return EventRoute.DISCARD; 286 } 287 return asyncQueueFullPolicy.getRoute(backgroundThreadId, logLevel); 288 } 289 290 private int remainingDisruptorCapacity() { 291 final Disruptor<Log4jEventWrapper> temp = disruptor; 292 if (hasLog4jBeenShutDown(temp)) { 293 return -1; 294 } 295 return (int) temp.getRingBuffer().remainingCapacity(); 296 } 297 298 /** 299 * Returns {@code true} if the specified disruptor is null. 300 */ 301 private boolean hasLog4jBeenShutDown(final Disruptor<Log4jEventWrapper> aDisruptor) { 302 if (aDisruptor == null) { // LOG4J2-639 303 LOGGER.warn("Ignoring log event after log4j was shut down"); 304 return true; 305 } 306 return false; 307 } 308 309 @Override 310 public void enqueueEvent(final LogEvent event, final AsyncLoggerConfig asyncLoggerConfig) { 311 // LOG4J2-639: catch NPE if disruptor field was set to null after our check above 312 try { 313 final LogEvent logEvent = prepareEvent(event); 314 enqueue(logEvent, asyncLoggerConfig); 315 } catch (final NullPointerException npe) { 316 // Note: NPE prevents us from adding a log event to the disruptor after it was shut down, 317 // which could cause the publishEvent method to hang and never return. 318 LOGGER.warn("Ignoring log event after log4j was shut down."); 319 } 320 } 321 322 private LogEvent prepareEvent(final LogEvent event) { 323 final LogEvent logEvent = ensureImmutable(event); 324 if (logEvent instanceof Log4jLogEvent && logEvent.getMessage() instanceof ReusableMessage) { 325 ((Log4jLogEvent) logEvent).makeMessageImmutable(); 326 } 327 return logEvent; 328 } 329 330 private void enqueue(final LogEvent logEvent, final AsyncLoggerConfig asyncLoggerConfig) { 331 disruptor.getRingBuffer().publishEvent(translator, logEvent, asyncLoggerConfig); 332 } 333 334 @Override 335 public boolean tryEnqueue(final LogEvent event, final AsyncLoggerConfig asyncLoggerConfig) { 336 final LogEvent logEvent = prepareEvent(event); 337 return disruptor.getRingBuffer().tryPublishEvent(translator, logEvent, asyncLoggerConfig); 338 } 339 340 private LogEvent ensureImmutable(final LogEvent event) { 341 LogEvent result = event; 342 if (event instanceof RingBufferLogEvent) { 343 // Deal with special case where both types of Async Loggers are used together: 344 // RingBufferLogEvents are created by the all-loggers-async type, but 345 // this event is also consumed by the some-loggers-async type (this class). 346 // The original event will be re-used and modified in an application thread later, 347 // so take a snapshot of it, which can be safely processed in the 348 // some-loggers-async background thread. 349 result = ((RingBufferLogEvent) event).createMemento(); 350 } 351 return result; 352 } 353 354 /* 355 * (non-Javadoc) 356 * 357 * @see org.apache.logging.log4j.core.async.AsyncLoggerConfigDelegate#createRingBufferAdmin(java.lang.String, 358 * java.lang.String) 359 */ 360 @Override 361 public RingBufferAdmin createRingBufferAdmin(final String contextName, final String loggerConfigName) { 362 return RingBufferAdmin.forAsyncLoggerConfig(disruptor.getRingBuffer(), contextName, loggerConfigName); 363 } 364}