001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017 package org.apache.logging.log4j.core.appender; 018 019 import org.apache.logging.log4j.core.Appender; 020 import org.apache.logging.log4j.core.Filter; 021 import org.apache.logging.log4j.core.Layout; 022 import org.apache.logging.log4j.core.LogEvent; 023 import org.apache.logging.log4j.core.config.AppenderControl; 024 import org.apache.logging.log4j.core.config.AppenderRef; 025 import org.apache.logging.log4j.core.config.Configuration; 026 import org.apache.logging.log4j.core.config.ConfigurationException; 027 import org.apache.logging.log4j.core.config.plugins.Plugin; 028 import org.apache.logging.log4j.core.config.plugins.PluginAttr; 029 import org.apache.logging.log4j.core.config.plugins.PluginConfiguration; 030 import org.apache.logging.log4j.core.config.plugins.PluginElement; 031 import org.apache.logging.log4j.core.config.plugins.PluginFactory; 032 import org.apache.logging.log4j.core.impl.Log4jLogEvent; 033 034 import java.io.Serializable; 035 import java.util.ArrayList; 036 import java.util.List; 037 import java.util.Map; 038 import java.util.concurrent.ArrayBlockingQueue; 039 import java.util.concurrent.BlockingQueue; 040 041 /** 042 * Appends to one or more Appenders asynchronously. You can configure an AsynchAppender with one 043 * or more Appenders and an Appender to append to if the queue is full. The AsynchAppender does not allow 044 * a filter to be specified on the Appender references. 045 * 046 * @param <T> The {@link Layout}'s {@link Serializable} type. 047 */ 048 @Plugin(name = "Asynch", type = "Core", elementType = "appender", printObject = true) 049 public final class AsynchAppender<T extends Serializable> extends AbstractAppender<T> { 050 051 private static final int DEFAULT_QUEUE_SIZE = 128; 052 private static final String SHUTDOWN = "Shutdown"; 053 054 private final BlockingQueue<Serializable> queue; 055 private final boolean blocking; 056 private final Configuration config; 057 private final AppenderRef[] appenderRefs; 058 private final String errorRef; 059 private AppenderControl errorAppender; 060 private AsynchThread thread; 061 062 private AsynchAppender(final String name, final Filter filter, final AppenderRef[] appenderRefs, 063 final String errorRef, final int queueSize, final boolean blocking, 064 final boolean handleExceptions, final Configuration config) { 065 super(name, filter, null, handleExceptions); 066 this.queue = new ArrayBlockingQueue<Serializable>(queueSize); 067 this.blocking = blocking; 068 this.config = config; 069 this.appenderRefs = appenderRefs; 070 this.errorRef = errorRef; 071 } 072 073 @Override 074 public void start() { 075 final Map<String, Appender<?>> map = config.getAppenders(); 076 final List<AppenderControl> appenders = new ArrayList<AppenderControl>(); 077 for (final AppenderRef appenderRef : appenderRefs) { 078 if (map.containsKey(appenderRef.getRef())) { 079 appenders.add(new AppenderControl(map.get(appenderRef.getRef()), null, null)); 080 } else { 081 LOGGER.error("No appender named {} was configured", appenderRef); 082 } 083 } 084 if (errorRef != null) { 085 if (map.containsKey(errorRef)) { 086 errorAppender = new AppenderControl(map.get(errorRef), null, null); 087 } else { 088 LOGGER.error("Unable to set up error Appender. No appender named {} was configured", errorRef); 089 } 090 } 091 if (appenders.size() > 0) { 092 thread = new AsynchThread(appenders, queue); 093 } else if (errorRef == null) { 094 throw new ConfigurationException("No appenders are available for AsynchAppender " + getName()); 095 } 096 097 thread.start(); 098 super.start(); 099 } 100 101 @Override 102 public void stop() { 103 super.stop(); 104 thread.shutdown(); 105 try { 106 thread.join(); 107 } catch (final InterruptedException ex) { 108 LOGGER.warn("Interrupted while stopping AsynchAppender {}", getName()); 109 } 110 } 111 112 /** 113 * Actual writing occurs here. 114 * <p/> 115 * @param event The LogEvent. 116 */ 117 public void append(final LogEvent event) { 118 if (!isStarted()) { 119 throw new IllegalStateException("AsynchAppender " + getName() + " is not active"); 120 } 121 if (event instanceof Log4jLogEvent) { 122 if (blocking && queue.remainingCapacity() > 0) { 123 try { 124 queue.add(Log4jLogEvent.serialize((Log4jLogEvent) event)); 125 return; 126 } catch (final IllegalStateException ex) { 127 error("Appender " + getName() + " is unable to write primary appenders. queue is full"); 128 } 129 } 130 if (errorAppender != null) { 131 if (!blocking) { 132 error("Appender " + getName() + " is unable to write primary appenders. queue is full"); 133 } 134 errorAppender.callAppender(event); 135 } 136 } 137 } 138 139 /** 140 * Create an AsynchAppender. 141 * @param appenderRefs The Appenders to reference. 142 * @param errorRef An optional Appender to write to if the queue is full or other errors occur. 143 * @param blocking True if the Appender should wait when the queue is full. The default is true. 144 * @param size The size of the event queue. The default is 128. 145 * @param name The name of the Appender. 146 * @param filter The Filter or null. 147 * @param config The Configuration. 148 * @param suppress "true" if exceptions should be hidden from the application, "false" otherwise. 149 * The default is "true". 150 * @param <S> The actual type of the Serializable. 151 * @return The AsynchAppender. 152 */ 153 @PluginFactory 154 public static <S extends Serializable> AsynchAppender<S> createAppender( 155 @PluginElement("appender-ref") final AppenderRef[] appenderRefs, 156 @PluginAttr("error-ref") final String errorRef, 157 @PluginAttr("blocking") final String blocking, 158 @PluginAttr("bufferSize") final String size, 159 @PluginAttr("name") final String name, 160 @PluginElement("filter") final Filter filter, 161 @PluginConfiguration final Configuration config, 162 @PluginAttr("suppressExceptions") final String suppress) { 163 if (name == null) { 164 LOGGER.error("No name provided for AsynchAppender"); 165 return null; 166 } 167 if (appenderRefs == null) { 168 LOGGER.error("No appender references provided to AsynchAppender {}", name); 169 } 170 171 final boolean isBlocking = blocking == null ? true : Boolean.valueOf(blocking); 172 final int queueSize = size == null ? DEFAULT_QUEUE_SIZE : Integer.parseInt(size); 173 174 final boolean handleExceptions = suppress == null ? true : Boolean.valueOf(suppress); 175 176 return new AsynchAppender<S>(name, filter, appenderRefs, errorRef, queueSize, isBlocking, handleExceptions, 177 config); 178 } 179 180 /** 181 * Thread that calls the Appenders. 182 */ 183 private class AsynchThread extends Thread { 184 185 private volatile boolean shutdown = false; 186 private final List<AppenderControl> appenders; 187 private final BlockingQueue<Serializable> queue; 188 189 public AsynchThread(final List<AppenderControl> appenders, final BlockingQueue<Serializable> queue) { 190 this.appenders = appenders; 191 this.queue = queue; 192 } 193 194 @Override 195 public void run() { 196 while (!shutdown) { 197 Serializable s; 198 try { 199 s = queue.take(); 200 if (s != null && s instanceof String && SHUTDOWN.equals(s.toString())) { 201 shutdown = true; 202 continue; 203 } 204 } catch (final InterruptedException ex) { 205 // No good reason for this. 206 continue; 207 } 208 final Log4jLogEvent event = Log4jLogEvent.deserialize(s); 209 boolean success = false; 210 for (final AppenderControl control : appenders) { 211 try { 212 control.callAppender(event); 213 success = true; 214 } catch (final Exception ex) { 215 // If no appender is successful the error appender will get it. 216 } 217 } 218 if (!success && errorAppender != null) { 219 try { 220 errorAppender.callAppender(event); 221 } catch (final Exception ex) { 222 // Silently accept the error. 223 } 224 } 225 } 226 // Process any remaining items in the queue. 227 while (!queue.isEmpty()) { 228 try { 229 final Log4jLogEvent event = Log4jLogEvent.deserialize(queue.take()); 230 for (final AppenderControl control : appenders) { 231 control.callAppender(event); 232 } 233 } catch (final InterruptedException ex) { 234 // May have been interrupted to shut down. 235 } 236 } 237 } 238 239 public void shutdown() { 240 shutdown = true; 241 if (queue.isEmpty()) { 242 queue.offer(SHUTDOWN); 243 } 244 } 245 } 246 }