001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017 package org.apache.logging.log4j.core.appender; 018 019 import org.apache.logging.log4j.core.Appender; 020 import org.apache.logging.log4j.core.Filter; 021 import org.apache.logging.log4j.core.Layout; 022 import org.apache.logging.log4j.core.LogEvent; 023 import org.apache.logging.log4j.core.config.AppenderControl; 024 import org.apache.logging.log4j.core.config.AppenderRef; 025 import org.apache.logging.log4j.core.config.Configuration; 026 import org.apache.logging.log4j.core.config.ConfigurationException; 027 import org.apache.logging.log4j.core.config.plugins.Plugin; 028 import org.apache.logging.log4j.core.config.plugins.PluginAttr; 029 import org.apache.logging.log4j.core.config.plugins.PluginConfiguration; 030 import org.apache.logging.log4j.core.config.plugins.PluginElement; 031 import org.apache.logging.log4j.core.config.plugins.PluginFactory; 032 import org.apache.logging.log4j.core.impl.Log4jLogEvent; 033 034 import java.io.Serializable; 035 import java.util.ArrayList; 036 import java.util.List; 037 import java.util.Map; 038 import java.util.concurrent.ArrayBlockingQueue; 039 import java.util.concurrent.BlockingQueue; 040 041 /** 042 * Appends to one or more Appenders asynchronously. You can configure an AsynchAppender with one 043 * or more Appenders and an Appender to append to if the queue is full. The AsynchAppender does not allow 044 * a filter to be specified on the Appender references. 045 * 046 * @param <T> The {@link Layout}'s {@link Serializable} type. 047 */ 048 @Plugin(name = "Asynch", type = "Core", elementType = "appender", printObject = true) 049 public final class AsynchAppender<T extends Serializable> extends AbstractAppender<T> { 050 051 private static final int DEFAULT_QUEUE_SIZE = 128; 052 private static final String SHUTDOWN = "Shutdown"; 053 054 private final BlockingQueue<Serializable> queue; 055 private final boolean blocking; 056 private final Configuration config; 057 private final AppenderRef[] appenderRefs; 058 private final String errorRef; 059 private final boolean includeLocation; 060 private AppenderControl errorAppender; 061 private AsynchThread thread; 062 063 private AsynchAppender(final String name, final Filter filter, final AppenderRef[] appenderRefs, 064 final String errorRef, final int queueSize, final boolean blocking, 065 final boolean handleExceptions, final Configuration config, 066 final boolean includeLocation) { 067 super(name, filter, null, handleExceptions); 068 this.queue = new ArrayBlockingQueue<Serializable>(queueSize); 069 this.blocking = blocking; 070 this.config = config; 071 this.appenderRefs = appenderRefs; 072 this.errorRef = errorRef; 073 this.includeLocation = includeLocation; 074 } 075 076 @Override 077 public void start() { 078 final Map<String, Appender<?>> map = config.getAppenders(); 079 final List<AppenderControl> appenders = new ArrayList<AppenderControl>(); 080 for (final AppenderRef appenderRef : appenderRefs) { 081 if (map.containsKey(appenderRef.getRef())) { 082 appenders.add(new AppenderControl(map.get(appenderRef.getRef()), appenderRef.getLevel(), 083 appenderRef.getFilter())); 084 } else { 085 LOGGER.error("No appender named {} was configured", appenderRef); 086 } 087 } 088 if (errorRef != null) { 089 if (map.containsKey(errorRef)) { 090 errorAppender = new AppenderControl(map.get(errorRef), null, null); 091 } else { 092 LOGGER.error("Unable to set up error Appender. No appender named {} was configured", errorRef); 093 } 094 } 095 if (appenders.size() > 0) { 096 thread = new AsynchThread(appenders, queue); 097 } else if (errorRef == null) { 098 throw new ConfigurationException("No appenders are available for AsynchAppender " + getName()); 099 } 100 101 thread.start(); 102 super.start(); 103 } 104 105 @Override 106 public void stop() { 107 super.stop(); 108 thread.shutdown(); 109 try { 110 thread.join(); 111 } catch (final InterruptedException ex) { 112 LOGGER.warn("Interrupted while stopping AsynchAppender {}", getName()); 113 } 114 } 115 116 /** 117 * Actual writing occurs here. 118 * <p/> 119 * @param event The LogEvent. 120 */ 121 public void append(final LogEvent event) { 122 if (!isStarted()) { 123 throw new IllegalStateException("AsynchAppender " + getName() + " is not active"); 124 } 125 if (event instanceof Log4jLogEvent) { 126 boolean appendSuccessful = false; 127 if (blocking){ 128 try { 129 // wait for free slots in the queue 130 queue.put(Log4jLogEvent.serialize((Log4jLogEvent) event, includeLocation)); 131 appendSuccessful = true; 132 } catch (InterruptedException e) { 133 LOGGER.warn("Interrupted while waiting for a free slots in the LogEvent-queue at the AsynchAppender {}", getName()); 134 } 135 } else { 136 appendSuccessful = queue.offer(Log4jLogEvent.serialize((Log4jLogEvent) event, includeLocation)); 137 if (!appendSuccessful) { 138 error("Appender " + getName() + " is unable to write primary appenders. queue is full"); 139 } 140 } 141 if ((!appendSuccessful) && (errorAppender != null)){ 142 errorAppender.callAppender(event); 143 } 144 } 145 } 146 147 /** 148 * Create an AsynchAppender. 149 * @param appenderRefs The Appenders to reference. 150 * @param errorRef An optional Appender to write to if the queue is full or other errors occur. 151 * @param blocking True if the Appender should wait when the queue is full. The default is true. 152 * @param size The size of the event queue. The default is 128. 153 * @param name The name of the Appender. 154 * @param includeLocation whether to include location information. The default is false. 155 * @param filter The Filter or null. 156 * @param config The Configuration. 157 * @param suppress "true" if exceptions should be hidden from the application, "false" otherwise. 158 * The default is "true". 159 * @param <S> The actual type of the Serializable. 160 * @return The AsynchAppender. 161 */ 162 @PluginFactory 163 public static <S extends Serializable> AsynchAppender<S> createAppender( 164 @PluginElement("appender-ref") final AppenderRef[] appenderRefs, 165 @PluginAttr("error-ref") final String errorRef, 166 @PluginAttr("blocking") final String blocking, 167 @PluginAttr("bufferSize") final String size, 168 @PluginAttr("name") final String name, 169 @PluginAttr("includeLocation") final String includeLocation, 170 @PluginElement("filter") final Filter filter, 171 @PluginConfiguration final Configuration config, 172 @PluginAttr("suppressExceptions") final String suppress) { 173 if (name == null) { 174 LOGGER.error("No name provided for AsynchAppender"); 175 return null; 176 } 177 if (appenderRefs == null) { 178 LOGGER.error("No appender references provided to AsynchAppender {}", name); 179 } 180 181 final boolean isBlocking = blocking == null ? true : Boolean.valueOf(blocking); 182 final int queueSize = size == null ? DEFAULT_QUEUE_SIZE : Integer.parseInt(size); 183 final boolean isIncludeLocation = includeLocation == null ? false : 184 Boolean.parseBoolean(includeLocation); 185 186 final boolean handleExceptions = suppress == null ? true : Boolean.valueOf(suppress); 187 188 return new AsynchAppender<S>(name, filter, appenderRefs, errorRef, 189 queueSize, isBlocking, handleExceptions, config, isIncludeLocation); 190 } 191 192 /** 193 * Thread that calls the Appenders. 194 */ 195 private class AsynchThread extends Thread { 196 197 private volatile boolean shutdown = false; 198 private final List<AppenderControl> appenders; 199 private final BlockingQueue<Serializable> queue; 200 201 public AsynchThread(final List<AppenderControl> appenders, final BlockingQueue<Serializable> queue) { 202 this.appenders = appenders; 203 this.queue = queue; 204 } 205 206 @Override 207 public void run() { 208 while (!shutdown) { 209 Serializable s; 210 try { 211 s = queue.take(); 212 if (s != null && s instanceof String && SHUTDOWN.equals(s.toString())) { 213 shutdown = true; 214 continue; 215 } 216 } catch (final InterruptedException ex) { 217 // No good reason for this. 218 continue; 219 } 220 final Log4jLogEvent event = Log4jLogEvent.deserialize(s); 221 event.setEndOfBatch(queue.isEmpty()); 222 boolean success = false; 223 for (final AppenderControl<?> control : appenders) { 224 try { 225 control.callAppender(event); 226 success = true; 227 } catch (final Exception ex) { 228 // If no appender is successful the error appender will get it. 229 } 230 } 231 if (!success && errorAppender != null) { 232 try { 233 errorAppender.callAppender(event); 234 } catch (final Exception ex) { 235 // Silently accept the error. 236 } 237 } 238 } 239 // Process any remaining items in the queue. 240 while (!queue.isEmpty()) { 241 try { 242 Serializable s = queue.take(); 243 if (s instanceof Log4jLogEvent) { 244 final Log4jLogEvent event = Log4jLogEvent.deserialize(s); 245 event.setEndOfBatch(queue.isEmpty()); 246 for (final AppenderControl<?> control : appenders) { 247 control.callAppender(event); 248 } 249 } 250 } catch (final InterruptedException ex) { 251 // May have been interrupted to shut down. 252 } 253 } 254 } 255 256 public void shutdown() { 257 shutdown = true; 258 if (queue.isEmpty()) { 259 queue.offer(SHUTDOWN); 260 } 261 } 262 } 263 }