001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements. See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache license, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License. You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the license for the specific language governing permissions and
015     * limitations under the license.
016     */
017    package org.apache.logging.log4j.core.appender;
018    
019    import org.apache.logging.log4j.core.Appender;
020    import org.apache.logging.log4j.core.Filter;
021    import org.apache.logging.log4j.core.Layout;
022    import org.apache.logging.log4j.core.LogEvent;
023    import org.apache.logging.log4j.core.config.AppenderControl;
024    import org.apache.logging.log4j.core.config.AppenderRef;
025    import org.apache.logging.log4j.core.config.Configuration;
026    import org.apache.logging.log4j.core.config.ConfigurationException;
027    import org.apache.logging.log4j.core.config.plugins.Plugin;
028    import org.apache.logging.log4j.core.config.plugins.PluginAttr;
029    import org.apache.logging.log4j.core.config.plugins.PluginConfiguration;
030    import org.apache.logging.log4j.core.config.plugins.PluginElement;
031    import org.apache.logging.log4j.core.config.plugins.PluginFactory;
032    import org.apache.logging.log4j.core.impl.Log4jLogEvent;
033    
034    import java.io.Serializable;
035    import java.util.ArrayList;
036    import java.util.List;
037    import java.util.Map;
038    import java.util.concurrent.ArrayBlockingQueue;
039    import java.util.concurrent.BlockingQueue;
040    
041    /**
042     * Appends to one or more Appenders asynchronously.  You can configure an
043     * AsyncAppender with one or more Appenders and an Appender to append to if the
044     * queue is full. The AsyncAppender does not allow a filter to be specified on
045     * the Appender references.
046     *
047     * @param <T> The {@link Layout}'s {@link Serializable} type.
048     */
049    @Plugin(name = "Async", category = "Core", elementType = "appender", printObject = true)
050    public final class AsyncAppender<T extends Serializable> extends AbstractAppender<T> {
051    
052        private static final int DEFAULT_QUEUE_SIZE = 128;
053        private static final String SHUTDOWN = "Shutdown";
054    
055        private final BlockingQueue<Serializable> queue;
056        private final boolean blocking;
057        private final Configuration config;
058        private final AppenderRef[] appenderRefs;
059        private final String errorRef;
060        private final boolean includeLocation;
061        private AppenderControl<?> errorAppender;
062        private AsyncThread thread;
063    
064        private AsyncAppender(final String name, final Filter filter, final AppenderRef[] appenderRefs,
065                               final String errorRef, final int queueSize, final boolean blocking,
066                               final boolean handleExceptions, final Configuration config,
067                               final boolean includeLocation) {
068            super(name, filter, null, handleExceptions);
069            this.queue = new ArrayBlockingQueue<Serializable>(queueSize);
070            this.blocking = blocking;
071            this.config = config;
072            this.appenderRefs = appenderRefs;
073            this.errorRef = errorRef;
074            this.includeLocation = includeLocation;
075        }
076    
077        @Override
078        @SuppressWarnings("unchecked")
079        public void start() {
080            final Map<String, Appender<?>> map = config.getAppenders();
081            final List<AppenderControl<?>> appenders = new ArrayList<AppenderControl<?>>();
082            for (final AppenderRef appenderRef : appenderRefs) {
083                if (map.containsKey(appenderRef.getRef())) {
084                    appenders.add(new AppenderControl(map.get(appenderRef.getRef()), appenderRef.getLevel(),
085                        appenderRef.getFilter()));
086                } else {
087                    LOGGER.error("No appender named {} was configured", appenderRef);
088                }
089            }
090            if (errorRef != null) {
091                if (map.containsKey(errorRef)) {
092                    errorAppender = new AppenderControl(map.get(errorRef), null, null);
093                } else {
094                    LOGGER.error("Unable to set up error Appender. No appender named {} was configured", errorRef);
095                }
096            }
097            if (appenders.size() > 0) {
098                thread = new AsyncThread(appenders, queue);
099            } else if (errorRef == null) {
100                throw new ConfigurationException("No appenders are available for AsyncAppender " + getName());
101            }
102    
103            thread.start();
104            super.start();
105        }
106    
107        @Override
108        public void stop() {
109            super.stop();
110            thread.shutdown();
111            try {
112                thread.join();
113            } catch (final InterruptedException ex) {
114                LOGGER.warn("Interrupted while stopping AsyncAppender {}", getName());
115            }
116        }
117    
118        /**
119         * Actual writing occurs here.
120         * <p/>
121         * @param event The LogEvent.
122         */
123        @Override
124        public void append(final LogEvent event) {
125            if (!isStarted()) {
126                throw new IllegalStateException("AsyncAppender " + getName() + " is not active");
127            }
128            if (event instanceof Log4jLogEvent) {
129                boolean appendSuccessful = false;
130                if (blocking) {
131                    try {
132                        // wait for free slots in the queue
133                        queue.put(Log4jLogEvent.serialize((Log4jLogEvent) event, includeLocation));
134                        appendSuccessful = true;
135                    } catch (InterruptedException e) {
136                        LOGGER.warn("Interrupted while waiting for a free slot in the AsyncAppender LogEvent-queue {}",
137                                getName());
138                    }
139                } else {
140                    appendSuccessful = queue.offer(Log4jLogEvent.serialize((Log4jLogEvent) event, includeLocation));
141                    if (!appendSuccessful) {
142                        error("Appender " + getName() + " is unable to write primary appenders. queue is full");
143                    }
144                }
145                if ((!appendSuccessful) && (errorAppender != null)) {
146                    errorAppender.callAppender(event);
147                }
148            }
149        }
150    
151        /**
152         * Create an AsyncAppender.
153         * @param appenderRefs The Appenders to reference.
154         * @param errorRef An optional Appender to write to if the queue is full or other errors occur.
155         * @param blocking True if the Appender should wait when the queue is full. The default is true.
156         * @param size The size of the event queue. The default is 128.
157         * @param name The name of the Appender.
158         * @param includeLocation whether to include location information. The default is false.
159         * @param filter The Filter or null.
160         * @param config The Configuration.
161         * @param suppress "true" if exceptions should be hidden from the application, "false" otherwise.
162         * The default is "true".
163         * @param <S> The actual type of the Serializable.
164         * @return The AsyncAppender.
165         */
166        @PluginFactory
167        public static <S extends Serializable> AsyncAppender<S> createAppender(
168                    @PluginElement("appender-ref") final AppenderRef[] appenderRefs,
169                    @PluginAttr("error-ref") final String errorRef,
170                    @PluginAttr("blocking") final String blocking,
171                    @PluginAttr("bufferSize") final String size,
172                    @PluginAttr("name") final String name,
173                    @PluginAttr("includeLocation") final String includeLocation,
174                    @PluginElement("filter") final Filter filter,
175                    @PluginConfiguration final Configuration config,
176                    @PluginAttr("suppressExceptions") final String suppress) {
177            if (name == null) {
178                LOGGER.error("No name provided for AsyncAppender");
179                return null;
180            }
181            if (appenderRefs == null) {
182                LOGGER.error("No appender references provided to AsyncAppender {}", name);
183            }
184    
185            final boolean isBlocking = blocking == null ? true : Boolean.valueOf(blocking);
186            final int queueSize = size == null ? DEFAULT_QUEUE_SIZE : Integer.parseInt(size);
187            final boolean isIncludeLocation = includeLocation != null && Boolean.parseBoolean(includeLocation);
188    
189            final boolean handleExceptions = suppress == null ? true : Boolean.valueOf(suppress);
190    
191            return new AsyncAppender<S>(name, filter, appenderRefs, errorRef,
192                    queueSize, isBlocking, handleExceptions, config, isIncludeLocation);
193        }
194    
195        /**
196         * Thread that calls the Appenders.
197         */
198        private class AsyncThread extends Thread {
199    
200            private volatile boolean shutdown = false;
201            private final List<AppenderControl<?>> appenders;
202            private final BlockingQueue<Serializable> queue;
203    
204            public AsyncThread(final List<AppenderControl<?>> appenders, final BlockingQueue<Serializable> queue) {
205                this.appenders = appenders;
206                this.queue = queue;
207            }
208    
209            @Override
210            public void run() {
211                while (!shutdown) {
212                    Serializable s;
213                    try {
214                        s = queue.take();
215                        if (s != null && s instanceof String && SHUTDOWN.equals(s.toString())) {
216                            shutdown = true;
217                            continue;
218                        }
219                    } catch (final InterruptedException ex) {
220                        // No good reason for this.
221                        continue;
222                    }
223                    final Log4jLogEvent event = Log4jLogEvent.deserialize(s);
224                    event.setEndOfBatch(queue.isEmpty());
225                    boolean success = false;
226                    for (final AppenderControl<?> control : appenders) {
227                        try {
228                            control.callAppender(event);
229                            success = true;
230                        } catch (final Exception ex) {
231                            // If no appender is successful the error appender will get it.
232                        }
233                    }
234                    if (!success && errorAppender != null) {
235                        try {
236                            errorAppender.callAppender(event);
237                        } catch (final Exception ex) {
238                            // Silently accept the error.
239                        }
240                    }
241                }
242                // Process any remaining items in the queue.
243                while (!queue.isEmpty()) {
244                    try {
245                        Serializable s = queue.take();
246                        if (s instanceof Log4jLogEvent) {
247                            final Log4jLogEvent event = Log4jLogEvent.deserialize(s);
248                            event.setEndOfBatch(queue.isEmpty());
249                            for (final AppenderControl<?> control : appenders) {
250                                control.callAppender(event);
251                            }
252                        }
253                    } catch (final InterruptedException ex) {
254                        // May have been interrupted to shut down.
255                    }
256                }
257            }
258    
259            public void shutdown() {
260                shutdown = true;
261                if (queue.isEmpty()) {
262                    queue.offer(SHUTDOWN);
263                }
264            }
265        }
266    }