001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017 package org.apache.logging.log4j.core.appender; 018 019 import org.apache.logging.log4j.core.Appender; 020 import org.apache.logging.log4j.core.Filter; 021 import org.apache.logging.log4j.core.Layout; 022 import org.apache.logging.log4j.core.LogEvent; 023 import org.apache.logging.log4j.core.config.AppenderControl; 024 import org.apache.logging.log4j.core.config.AppenderRef; 025 import org.apache.logging.log4j.core.config.Configuration; 026 import org.apache.logging.log4j.core.config.ConfigurationException; 027 import org.apache.logging.log4j.core.config.plugins.Plugin; 028 import org.apache.logging.log4j.core.config.plugins.PluginAttr; 029 import org.apache.logging.log4j.core.config.plugins.PluginConfiguration; 030 import org.apache.logging.log4j.core.config.plugins.PluginElement; 031 import org.apache.logging.log4j.core.config.plugins.PluginFactory; 032 import org.apache.logging.log4j.core.impl.Log4jLogEvent; 033 034 import java.io.Serializable; 035 import java.util.ArrayList; 036 import java.util.List; 037 import java.util.Map; 038 import java.util.concurrent.ArrayBlockingQueue; 039 import java.util.concurrent.BlockingQueue; 040 041 /** 042 * Appends to one or more Appenders asynchronously. You can configure an 043 * AsyncAppender with one or more Appenders and an Appender to append to if the 044 * queue is full. The AsyncAppender does not allow a filter to be specified on 045 * the Appender references. 046 * 047 * @param <T> The {@link Layout}'s {@link Serializable} type. 048 */ 049 @Plugin(name = "Async", category = "Core", elementType = "appender", printObject = true) 050 public final class AsyncAppender<T extends Serializable> extends AbstractAppender<T> { 051 052 private static final int DEFAULT_QUEUE_SIZE = 128; 053 private static final String SHUTDOWN = "Shutdown"; 054 055 private final BlockingQueue<Serializable> queue; 056 private final boolean blocking; 057 private final Configuration config; 058 private final AppenderRef[] appenderRefs; 059 private final String errorRef; 060 private final boolean includeLocation; 061 private AppenderControl<?> errorAppender; 062 private AsyncThread thread; 063 064 private AsyncAppender(final String name, final Filter filter, final AppenderRef[] appenderRefs, 065 final String errorRef, final int queueSize, final boolean blocking, 066 final boolean handleExceptions, final Configuration config, 067 final boolean includeLocation) { 068 super(name, filter, null, handleExceptions); 069 this.queue = new ArrayBlockingQueue<Serializable>(queueSize); 070 this.blocking = blocking; 071 this.config = config; 072 this.appenderRefs = appenderRefs; 073 this.errorRef = errorRef; 074 this.includeLocation = includeLocation; 075 } 076 077 @Override 078 @SuppressWarnings("unchecked") 079 public void start() { 080 final Map<String, Appender<?>> map = config.getAppenders(); 081 final List<AppenderControl<?>> appenders = new ArrayList<AppenderControl<?>>(); 082 for (final AppenderRef appenderRef : appenderRefs) { 083 if (map.containsKey(appenderRef.getRef())) { 084 appenders.add(new AppenderControl(map.get(appenderRef.getRef()), appenderRef.getLevel(), 085 appenderRef.getFilter())); 086 } else { 087 LOGGER.error("No appender named {} was configured", appenderRef); 088 } 089 } 090 if (errorRef != null) { 091 if (map.containsKey(errorRef)) { 092 errorAppender = new AppenderControl(map.get(errorRef), null, null); 093 } else { 094 LOGGER.error("Unable to set up error Appender. No appender named {} was configured", errorRef); 095 } 096 } 097 if (appenders.size() > 0) { 098 thread = new AsyncThread(appenders, queue); 099 } else if (errorRef == null) { 100 throw new ConfigurationException("No appenders are available for AsyncAppender " + getName()); 101 } 102 103 thread.start(); 104 super.start(); 105 } 106 107 @Override 108 public void stop() { 109 super.stop(); 110 thread.shutdown(); 111 try { 112 thread.join(); 113 } catch (final InterruptedException ex) { 114 LOGGER.warn("Interrupted while stopping AsyncAppender {}", getName()); 115 } 116 } 117 118 /** 119 * Actual writing occurs here. 120 * <p/> 121 * @param event The LogEvent. 122 */ 123 @Override 124 public void append(final LogEvent event) { 125 if (!isStarted()) { 126 throw new IllegalStateException("AsyncAppender " + getName() + " is not active"); 127 } 128 if (event instanceof Log4jLogEvent) { 129 boolean appendSuccessful = false; 130 if (blocking) { 131 try { 132 // wait for free slots in the queue 133 queue.put(Log4jLogEvent.serialize((Log4jLogEvent) event, includeLocation)); 134 appendSuccessful = true; 135 } catch (InterruptedException e) { 136 LOGGER.warn("Interrupted while waiting for a free slot in the AsyncAppender LogEvent-queue {}", 137 getName()); 138 } 139 } else { 140 appendSuccessful = queue.offer(Log4jLogEvent.serialize((Log4jLogEvent) event, includeLocation)); 141 if (!appendSuccessful) { 142 error("Appender " + getName() + " is unable to write primary appenders. queue is full"); 143 } 144 } 145 if ((!appendSuccessful) && (errorAppender != null)) { 146 errorAppender.callAppender(event); 147 } 148 } 149 } 150 151 /** 152 * Create an AsyncAppender. 153 * @param appenderRefs The Appenders to reference. 154 * @param errorRef An optional Appender to write to if the queue is full or other errors occur. 155 * @param blocking True if the Appender should wait when the queue is full. The default is true. 156 * @param size The size of the event queue. The default is 128. 157 * @param name The name of the Appender. 158 * @param includeLocation whether to include location information. The default is false. 159 * @param filter The Filter or null. 160 * @param config The Configuration. 161 * @param suppress "true" if exceptions should be hidden from the application, "false" otherwise. 162 * The default is "true". 163 * @param <S> The actual type of the Serializable. 164 * @return The AsyncAppender. 165 */ 166 @PluginFactory 167 public static <S extends Serializable> AsyncAppender<S> createAppender( 168 @PluginElement("appender-ref") final AppenderRef[] appenderRefs, 169 @PluginAttr("error-ref") final String errorRef, 170 @PluginAttr("blocking") final String blocking, 171 @PluginAttr("bufferSize") final String size, 172 @PluginAttr("name") final String name, 173 @PluginAttr("includeLocation") final String includeLocation, 174 @PluginElement("filter") final Filter filter, 175 @PluginConfiguration final Configuration config, 176 @PluginAttr("suppressExceptions") final String suppress) { 177 if (name == null) { 178 LOGGER.error("No name provided for AsyncAppender"); 179 return null; 180 } 181 if (appenderRefs == null) { 182 LOGGER.error("No appender references provided to AsyncAppender {}", name); 183 } 184 185 final boolean isBlocking = blocking == null ? true : Boolean.valueOf(blocking); 186 final int queueSize = size == null ? DEFAULT_QUEUE_SIZE : Integer.parseInt(size); 187 final boolean isIncludeLocation = includeLocation != null && Boolean.parseBoolean(includeLocation); 188 189 final boolean handleExceptions = suppress == null ? true : Boolean.valueOf(suppress); 190 191 return new AsyncAppender<S>(name, filter, appenderRefs, errorRef, 192 queueSize, isBlocking, handleExceptions, config, isIncludeLocation); 193 } 194 195 /** 196 * Thread that calls the Appenders. 197 */ 198 private class AsyncThread extends Thread { 199 200 private volatile boolean shutdown = false; 201 private final List<AppenderControl<?>> appenders; 202 private final BlockingQueue<Serializable> queue; 203 204 public AsyncThread(final List<AppenderControl<?>> appenders, final BlockingQueue<Serializable> queue) { 205 this.appenders = appenders; 206 this.queue = queue; 207 } 208 209 @Override 210 public void run() { 211 while (!shutdown) { 212 Serializable s; 213 try { 214 s = queue.take(); 215 if (s != null && s instanceof String && SHUTDOWN.equals(s.toString())) { 216 shutdown = true; 217 continue; 218 } 219 } catch (final InterruptedException ex) { 220 // No good reason for this. 221 continue; 222 } 223 final Log4jLogEvent event = Log4jLogEvent.deserialize(s); 224 event.setEndOfBatch(queue.isEmpty()); 225 boolean success = false; 226 for (final AppenderControl<?> control : appenders) { 227 try { 228 control.callAppender(event); 229 success = true; 230 } catch (final Exception ex) { 231 // If no appender is successful the error appender will get it. 232 } 233 } 234 if (!success && errorAppender != null) { 235 try { 236 errorAppender.callAppender(event); 237 } catch (final Exception ex) { 238 // Silently accept the error. 239 } 240 } 241 } 242 // Process any remaining items in the queue. 243 while (!queue.isEmpty()) { 244 try { 245 Serializable s = queue.take(); 246 if (s instanceof Log4jLogEvent) { 247 final Log4jLogEvent event = Log4jLogEvent.deserialize(s); 248 event.setEndOfBatch(queue.isEmpty()); 249 for (final AppenderControl<?> control : appenders) { 250 control.callAppender(event); 251 } 252 } 253 } catch (final InterruptedException ex) { 254 // May have been interrupted to shut down. 255 } 256 } 257 } 258 259 public void shutdown() { 260 shutdown = true; 261 if (queue.isEmpty()) { 262 queue.offer(SHUTDOWN); 263 } 264 } 265 } 266 }