001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements. See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache license, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License. You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the license for the specific language governing permissions and
015     * limitations under the license.
016     */
017    package org.apache.logging.log4j.core.appender;
018    
019    import org.apache.logging.log4j.core.Appender;
020    import org.apache.logging.log4j.core.Filter;
021    import org.apache.logging.log4j.core.Layout;
022    import org.apache.logging.log4j.core.LogEvent;
023    import org.apache.logging.log4j.core.config.AppenderControl;
024    import org.apache.logging.log4j.core.config.AppenderRef;
025    import org.apache.logging.log4j.core.config.Configuration;
026    import org.apache.logging.log4j.core.config.ConfigurationException;
027    import org.apache.logging.log4j.core.config.plugins.Plugin;
028    import org.apache.logging.log4j.core.config.plugins.PluginAttr;
029    import org.apache.logging.log4j.core.config.plugins.PluginConfiguration;
030    import org.apache.logging.log4j.core.config.plugins.PluginElement;
031    import org.apache.logging.log4j.core.config.plugins.PluginFactory;
032    import org.apache.logging.log4j.core.impl.Log4jLogEvent;
033    
034    import java.io.Serializable;
035    import java.util.ArrayList;
036    import java.util.List;
037    import java.util.Map;
038    import java.util.concurrent.ArrayBlockingQueue;
039    import java.util.concurrent.BlockingQueue;
040    
041    /**
042     * Appends to one or more Appenders asynchronously.  You can configure an AsynchAppender with one
043     * or more Appenders and an Appender to append to if the queue is full. The AsynchAppender does not allow
044     * a filter to be specified on the Appender references.
045     *
046     * @param <T> The {@link Layout}'s {@link Serializable} type.
047     */
048    @Plugin(name = "Asynch", type = "Core", elementType = "appender", printObject = true)
049    public final class AsynchAppender<T extends Serializable> extends AbstractAppender<T> {
050    
051        private static final int DEFAULT_QUEUE_SIZE = 128;
052        private static final String SHUTDOWN = "Shutdown";
053    
054        private final BlockingQueue<Serializable> queue;
055        private final boolean blocking;
056        private final Configuration config;
057        private final AppenderRef[] appenderRefs;
058        private final String errorRef;
059        private AppenderControl errorAppender;
060        private AsynchThread thread;
061    
062        private AsynchAppender(final String name, final Filter filter, final AppenderRef[] appenderRefs,
063                               final String errorRef, final int queueSize, final boolean blocking,
064                               final boolean handleExceptions, final Configuration config) {
065            super(name, filter, null, handleExceptions);
066            this.queue = new ArrayBlockingQueue<Serializable>(queueSize);
067            this.blocking = blocking;
068            this.config = config;
069            this.appenderRefs = appenderRefs;
070            this.errorRef = errorRef;
071        }
072    
073        @Override
074        public void start() {
075            final Map<String, Appender<?>> map = config.getAppenders();
076            final List<AppenderControl> appenders = new ArrayList<AppenderControl>();
077            for (final AppenderRef appenderRef : appenderRefs) {
078                if (map.containsKey(appenderRef.getRef())) {
079                    appenders.add(new AppenderControl(map.get(appenderRef.getRef()), null, null));
080                } else {
081                    LOGGER.error("No appender named {} was configured", appenderRef);
082                }
083            }
084            if (errorRef != null) {
085                if (map.containsKey(errorRef)) {
086                    errorAppender = new AppenderControl(map.get(errorRef), null, null);
087                } else {
088                    LOGGER.error("Unable to set up error Appender. No appender named {} was configured", errorRef);
089                }
090            }
091            if (appenders.size() > 0) {
092                thread = new AsynchThread(appenders, queue);
093            } else if (errorRef == null) {
094                throw new ConfigurationException("No appenders are available for AsynchAppender " + getName());
095            }
096    
097            thread.start();
098            super.start();
099        }
100    
101        @Override
102        public void stop() {
103            super.stop();
104            thread.shutdown();
105            try {
106                thread.join();
107            } catch (final InterruptedException ex) {
108                LOGGER.warn("Interrupted while stopping AsynchAppender {}", getName());
109            }
110        }
111    
112        /**
113         * Actual writing occurs here.
114         * <p/>
115         * @param event The LogEvent.
116         */
117        public void append(final LogEvent event) {
118            if (!isStarted()) {
119                throw new IllegalStateException("AsynchAppender " + getName() + " is not active");
120            }
121            if (event instanceof Log4jLogEvent) {
122                if (blocking && queue.remainingCapacity() > 0) {
123                    try {
124                        queue.add(Log4jLogEvent.serialize((Log4jLogEvent) event));
125                        return;
126                    } catch (final IllegalStateException ex) {
127                        error("Appender " + getName() + " is unable to write primary appenders. queue is full");
128                    }
129                }
130                if (errorAppender != null) {
131                    if (!blocking) {
132                        error("Appender " + getName() + " is unable to write primary appenders. queue is full");
133                    }
134                    errorAppender.callAppender(event);
135                }
136            }
137        }
138    
139        /**
140         * Create an AsynchAppender.
141         * @param appenderRefs The Appenders to reference.
142         * @param errorRef An optional Appender to write to if the queue is full or other errors occur.
143         * @param blocking True if the Appender should wait when the queue is full. The default is true.
144         * @param size The size of the event queue. The default is 128.
145         * @param name The name of the Appender.
146         * @param filter The Filter or null.
147         * @param config The Configuration.
148         * @param suppress "true" if exceptions should be hidden from the application, "false" otherwise.
149         * The default is "true".
150         * @param <S> The actual type of the Serializable.
151         * @return The AsynchAppender.
152         */
153        @PluginFactory
154        public static <S extends Serializable> AsynchAppender<S> createAppender(
155                                                    @PluginElement("appender-ref") final AppenderRef[] appenderRefs,
156                                                    @PluginAttr("error-ref") final String errorRef,
157                                                    @PluginAttr("blocking") final String blocking,
158                                                    @PluginAttr("bufferSize") final String size,
159                                                    @PluginAttr("name") final String name,
160                                                    @PluginElement("filter") final Filter filter,
161                                                    @PluginConfiguration final Configuration config,
162                                                    @PluginAttr("suppressExceptions") final String suppress) {
163            if (name == null) {
164                LOGGER.error("No name provided for AsynchAppender");
165                return null;
166            }
167            if (appenderRefs == null) {
168                LOGGER.error("No appender references provided to AsynchAppender {}", name);
169            }
170    
171            final boolean isBlocking = blocking == null ? true : Boolean.valueOf(blocking);
172            final int queueSize = size == null ? DEFAULT_QUEUE_SIZE : Integer.parseInt(size);
173    
174            final boolean handleExceptions = suppress == null ? true : Boolean.valueOf(suppress);
175    
176            return new AsynchAppender<S>(name, filter, appenderRefs, errorRef, queueSize, isBlocking, handleExceptions,
177                                      config);
178        }
179    
180        /**
181         * Thread that calls the Appenders.
182         */
183        private class AsynchThread extends Thread {
184    
185            private volatile boolean shutdown = false;
186            private final List<AppenderControl> appenders;
187            private final BlockingQueue<Serializable> queue;
188    
189            public AsynchThread(final List<AppenderControl> appenders, final BlockingQueue<Serializable> queue) {
190                this.appenders = appenders;
191                this.queue = queue;
192            }
193    
194            @Override
195            public void run() {
196                while (!shutdown) {
197                    Serializable s;
198                    try {
199                        s = queue.take();
200                        if (s != null && s instanceof String && SHUTDOWN.equals(s.toString())) {
201                            shutdown = true;
202                            continue;
203                        }
204                    } catch (final InterruptedException ex) {
205                        // No good reason for this.
206                        continue;
207                    }
208                    final Log4jLogEvent event = Log4jLogEvent.deserialize(s);
209                    boolean success = false;
210                    for (final AppenderControl control : appenders) {
211                        try {
212                            control.callAppender(event);
213                            success = true;
214                        } catch (final Exception ex) {
215                            // If no appender is successful the error appender will get it.
216                        }
217                    }
218                    if (!success && errorAppender != null) {
219                        try {
220                            errorAppender.callAppender(event);
221                        } catch (final Exception ex) {
222                            // Silently accept the error.
223                        }
224                    }
225                }
226                // Process any remaining items in the queue.
227                while (!queue.isEmpty()) {
228                    try {
229                        final Log4jLogEvent event = Log4jLogEvent.deserialize(queue.take());
230                        for (final AppenderControl control : appenders) {
231                            control.callAppender(event);
232                        }
233                    } catch (final InterruptedException ex) {
234                        // May have been interrupted to shut down.
235                    }
236                }
237            }
238    
239            public void shutdown() {
240                shutdown = true;
241                if (queue.isEmpty()) {
242                    queue.offer(SHUTDOWN);
243                }
244            }
245        }
246    }