001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017package org.apache.logging.log4j.core.async; 018 019import java.util.concurrent.ExecutorService; 020import java.util.concurrent.Executors; 021import java.util.concurrent.ThreadFactory; 022 023import org.apache.logging.log4j.Logger; 024import org.apache.logging.log4j.core.LogEvent; 025import org.apache.logging.log4j.core.jmx.RingBufferAdmin; 026import org.apache.logging.log4j.core.util.Constants; 027import org.apache.logging.log4j.status.StatusLogger; 028 029import com.lmax.disruptor.EventFactory; 030import com.lmax.disruptor.EventTranslatorTwoArg; 031import com.lmax.disruptor.ExceptionHandler; 032import com.lmax.disruptor.RingBuffer; 033import com.lmax.disruptor.Sequence; 034import com.lmax.disruptor.SequenceReportingEventHandler; 035import com.lmax.disruptor.WaitStrategy; 036import com.lmax.disruptor.dsl.Disruptor; 037import com.lmax.disruptor.dsl.ProducerType; 038 039/** 040 * Helper class decoupling the {@code AsyncLoggerConfig} class from the LMAX Disruptor library. 041 * <p> 042 * {@code AsyncLoggerConfig} is a plugin, and will be loaded even if users do not configure any {@code <asyncLogger>} or 043 * {@code <asyncRoot>} elements in the configuration. If {@code AsyncLoggerConfig} has inner classes that extend or 044 * implement classes from the Disruptor library, a {@code NoClassDefFoundError} is thrown if the Disruptor jar is not in 045 * the classpath when the PluginManager loads the {@code AsyncLoggerConfig} plugin from the pre-defined plugins 046 * definition file. 047 * <p> 048 * This class serves to make the dependency on the Disruptor optional, so that these classes are only loaded when the 049 * {@code AsyncLoggerConfig} is actually used. 050 */ 051public class AsyncLoggerConfigDisruptor implements AsyncLoggerConfigDelegate { 052 053 private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 200; 054 private static final int SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS = 50; 055 private static final Logger LOGGER = StatusLogger.getLogger(); 056 057 /** 058 * RingBuffer events contain all information necessary to perform the work in a separate thread. 059 */ 060 private static class Log4jEventWrapper { 061 private AsyncLoggerConfig loggerConfig; 062 private LogEvent event; 063 064 /** 065 * Release references held by ring buffer to allow objects to be garbage-collected. 066 */ 067 public void clear() { 068 loggerConfig = null; 069 event = null; 070 } 071 } 072 073 /** 074 * EventHandler performs the work in a separate thread. 075 */ 076 private static class Log4jEventWrapperHandler implements SequenceReportingEventHandler<Log4jEventWrapper> { 077 private static final int NOTIFY_PROGRESS_THRESHOLD = 50; 078 private Sequence sequenceCallback; 079 private int counter; 080 081 @Override 082 public void setSequenceCallback(final Sequence sequenceCallback) { 083 this.sequenceCallback = sequenceCallback; 084 } 085 086 @Override 087 public void onEvent(final Log4jEventWrapper event, final long sequence, final boolean endOfBatch) 088 throws Exception { 089 event.event.setEndOfBatch(endOfBatch); 090 event.loggerConfig.asyncCallAppenders(event.event); 091 event.clear(); 092 093 notifyIntermediateProgress(sequence); 094 } 095 096 /** 097 * Notify the BatchEventProcessor that the sequence has progressed. Without this callback the sequence would not 098 * be progressed until the batch has completely finished. 099 */ 100 private void notifyIntermediateProgress(final long sequence) { 101 if (++counter > NOTIFY_PROGRESS_THRESHOLD) { 102 sequenceCallback.set(sequence); 103 counter = 0; 104 } 105 } 106 } 107 108 /** 109 * Factory used to populate the RingBuffer with events. These event objects are then re-used during the life of the 110 * RingBuffer. 111 */ 112 private static final EventFactory<Log4jEventWrapper> FACTORY = new EventFactory<Log4jEventWrapper>() { 113 @Override 114 public Log4jEventWrapper newInstance() { 115 return new Log4jEventWrapper(); 116 } 117 }; 118 119 /** 120 * Object responsible for passing on data to a specific RingBuffer event. 121 */ 122 private static final EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig> TRANSLATOR = 123 new EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig>() { 124 125 @Override 126 public void translateTo(final Log4jEventWrapper ringBufferElement, final long sequence, 127 final LogEvent logEvent, final AsyncLoggerConfig loggerConfig) { 128 ringBufferElement.event = logEvent; 129 ringBufferElement.loggerConfig = loggerConfig; 130 } 131 }; 132 133 private static final ThreadFactory THREAD_FACTORY = new DaemonThreadFactory("AsyncLoggerConfig-"); 134 135 private volatile Disruptor<Log4jEventWrapper> disruptor; 136 private ExecutorService executor; 137 private long backgroundThreadId; // LOG4J2-471 138 139 public AsyncLoggerConfigDisruptor() { 140 } 141 142 /** 143 * Increases the reference count and creates and starts a new Disruptor and associated thread if none currently 144 * exists. 145 * 146 * @see #release() 147 */ 148 public synchronized void start() { 149 if (disruptor != null) { 150 LOGGER.trace("AsyncLoggerConfigHelper not starting new disruptor for this configuration, " 151 + "using existing object."); 152 return; 153 } 154 LOGGER.trace("AsyncLoggerConfigHelper creating new disruptor for this configuration."); 155 final int ringBufferSize = DisruptorUtil.calculateRingBufferSize("AsyncLoggerConfig.RingBufferSize"); 156 final WaitStrategy waitStrategy = DisruptorUtil.createWaitStrategy("AsyncLoggerConfig.WaitStrategy"); 157 executor = Executors.newSingleThreadExecutor(THREAD_FACTORY); 158 backgroundThreadId = DisruptorUtil.getExecutorThreadId(executor); 159 160 disruptor = new Disruptor<>(FACTORY, ringBufferSize, executor, ProducerType.MULTI, waitStrategy); 161 162 final ExceptionHandler<Log4jEventWrapper> errorHandler = DisruptorUtil.getExceptionHandler( 163 "AsyncLoggerConfig.ExceptionHandler", Log4jEventWrapper.class); 164 disruptor.handleExceptionsWith(errorHandler); 165 166 final Log4jEventWrapperHandler[] handlers = {new Log4jEventWrapperHandler()}; 167 disruptor.handleEventsWith(handlers); 168 169 LOGGER.debug("Starting AsyncLoggerConfig disruptor for this configuration with ringbufferSize={}, " 170 + "waitStrategy={}, exceptionHandler={}...", disruptor.getRingBuffer().getBufferSize(), waitStrategy 171 .getClass().getSimpleName(), errorHandler); 172 disruptor.start(); 173 } 174 175 /** 176 * Decreases the reference count. If the reference count reached zero, the Disruptor and its associated thread are 177 * shut down and their references set to {@code null}. 178 */ 179 public synchronized void stop() { 180 final Disruptor<Log4jEventWrapper> temp = disruptor; 181 if (temp == null) { 182 LOGGER.trace("AsyncLoggerConfigHelper: disruptor for this configuration already shut down."); 183 return; // disruptor was already shut down by another thread 184 } 185 LOGGER.trace("AsyncLoggerConfigHelper: shutting down disruptor for this configuration."); 186 187 // We must guarantee that publishing to the RingBuffer has stopped before we call disruptor.shutdown(). 188 disruptor = null; // client code fails with NPE if log after stop = OK 189 190 // Calling Disruptor.shutdown() will wait until all enqueued events are fully processed, 191 // but this waiting happens in a busy-spin. To avoid (postpone) wasting CPU, 192 // we sleep in short chunks, up to 10 seconds, waiting for the ringbuffer to drain. 193 for (int i = 0; hasBacklog(temp) && i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) { 194 try { 195 Thread.sleep(SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS); // give up the CPU for a while 196 } catch (final InterruptedException e) { // ignored 197 } 198 } 199 temp.shutdown(); // busy-spins until all events currently in the disruptor have been processed 200 201 LOGGER.trace("AsyncLoggerConfigHelper: shutting down disruptor executor for this configuration."); 202 executor.shutdown(); // finally, kill the processor thread 203 executor = null; // release reference to allow GC 204 } 205 206 /** 207 * Returns {@code true} if the specified disruptor still has unprocessed events. 208 */ 209 private static boolean hasBacklog(final Disruptor<?> theDisruptor) { 210 final RingBuffer<?> ringBuffer = theDisruptor.getRingBuffer(); 211 return !ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize()); 212 } 213 214 /* 215 * (non-Javadoc) 216 * 217 * @see 218 * org.apache.logging.log4j.core.async.AsyncLoggerConfigDelegate#tryCallAppendersInBackground(org.apache.logging 219 * .log4j.core.LogEvent) 220 */ 221 @Override 222 public boolean tryCallAppendersInBackground(final LogEvent event, final AsyncLoggerConfig asyncLoggerConfig) { 223 final Disruptor<Log4jEventWrapper> temp = disruptor; 224 if (!hasLog4jBeenShutDown(temp)) { 225 226 // LOG4J2-471: prevent deadlock when RingBuffer is full and object 227 // being logged calls Logger.log() from its toString() method 228 if (isCalledFromAppenderThreadAndBufferFull(temp)) { 229 // bypass RingBuffer and invoke Appender directly 230 return false; 231 } 232 enqueueEvent(event, asyncLoggerConfig); 233 } 234 return true; 235 } 236 237 /** 238 * Returns {@code true} if the specified disruptor is null. 239 */ 240 private boolean hasLog4jBeenShutDown(final Disruptor<Log4jEventWrapper> aDisruptor) { 241 if (aDisruptor == null) { // LOG4J2-639 242 LOGGER.fatal("Ignoring log event after log4j was shut down"); 243 return true; 244 } 245 return false; 246 } 247 248 private void enqueueEvent(final LogEvent event, final AsyncLoggerConfig asyncLoggerConfig) { 249 // LOG4J2-639: catch NPE if disruptor field was set to null after our check above 250 try { 251 final LogEvent logEvent = prepareEvent(event); 252 enqueue(logEvent, asyncLoggerConfig); 253 } catch (final NullPointerException npe) { 254 LOGGER.fatal("Ignoring log event after log4j was shut down."); 255 } 256 } 257 258 private LogEvent prepareEvent(final LogEvent event) { 259 final LogEvent logEvent = ensureImmutable(event); 260 if (!Constants.FORMAT_MESSAGES_IN_BACKGROUND) { // LOG4J2-898: user may choose 261 logEvent.getMessage().getFormattedMessage(); // LOG4J2-763: ask message to freeze parameters 262 } 263 return logEvent; 264 } 265 266 private void enqueue(final LogEvent logEvent, final AsyncLoggerConfig asyncLoggerConfig) { 267 // Note: do NOT use the temp variable above! 268 // That could result in adding a log event to the disruptor after it was shut down, 269 // which could cause the publishEvent method to hang and never return. 270 disruptor.getRingBuffer().publishEvent(TRANSLATOR, logEvent, asyncLoggerConfig); 271 } 272 273 private LogEvent ensureImmutable(final LogEvent event) { 274 LogEvent result = event; 275 if (event instanceof RingBufferLogEvent) { 276 // Deal with special case where both types of Async Loggers are used together: 277 // RingBufferLogEvents are created by the all-loggers-async type, but 278 // this event is also consumed by the some-loggers-async type (this class). 279 // The original event will be re-used and modified in an application thread later, 280 // so take a snapshot of it, which can be safely processed in the 281 // some-loggers-async background thread. 282 result = ((RingBufferLogEvent) event).createMemento(); 283 } 284 return result; 285 } 286 287 /** 288 * Returns true if the specified ringbuffer is full and the Logger.log() call was made from the appender thread. 289 */ 290 private boolean isCalledFromAppenderThreadAndBufferFull(Disruptor<Log4jEventWrapper> theDisruptor) { 291 return currentThreadIsAppenderThread() && theDisruptor.getRingBuffer().remainingCapacity() == 0; 292 } 293 294 /** 295 * Returns {@code true} if the current thread is the Disruptor background thread, {@code false} otherwise. 296 * 297 * @return whether this thread is the Disruptor background thread. 298 */ 299 private boolean currentThreadIsAppenderThread() { 300 return Thread.currentThread().getId() == backgroundThreadId; 301 } 302 303 /* 304 * (non-Javadoc) 305 * 306 * @see org.apache.logging.log4j.core.async.AsyncLoggerConfigDelegate#createRingBufferAdmin(java.lang.String, 307 * java.lang.String) 308 */ 309 @Override 310 public RingBufferAdmin createRingBufferAdmin(final String contextName, final String loggerConfigName) { 311 return RingBufferAdmin.forAsyncLoggerConfig(disruptor.getRingBuffer(), contextName, loggerConfigName); 312 } 313 314}