001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements. See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache license, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License. You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the license for the specific language governing permissions and
015     * limitations under the license.
016     */
017    package org.apache.logging.log4j.core.appender;
018    
019    import org.apache.logging.log4j.core.Appender;
020    import org.apache.logging.log4j.core.Filter;
021    import org.apache.logging.log4j.core.Layout;
022    import org.apache.logging.log4j.core.LogEvent;
023    import org.apache.logging.log4j.core.config.AppenderControl;
024    import org.apache.logging.log4j.core.config.AppenderRef;
025    import org.apache.logging.log4j.core.config.Configuration;
026    import org.apache.logging.log4j.core.config.ConfigurationException;
027    import org.apache.logging.log4j.core.config.plugins.Plugin;
028    import org.apache.logging.log4j.core.config.plugins.PluginAttr;
029    import org.apache.logging.log4j.core.config.plugins.PluginConfiguration;
030    import org.apache.logging.log4j.core.config.plugins.PluginElement;
031    import org.apache.logging.log4j.core.config.plugins.PluginFactory;
032    import org.apache.logging.log4j.core.impl.Log4jLogEvent;
033    
034    import java.io.Serializable;
035    import java.util.ArrayList;
036    import java.util.List;
037    import java.util.Map;
038    import java.util.concurrent.ArrayBlockingQueue;
039    import java.util.concurrent.BlockingQueue;
040    
041    /**
042     * Appends to one or more Appenders asynchronously.  You can configure an AsynchAppender with one
043     * or more Appenders and an Appender to append to if the queue is full. The AsynchAppender does not allow
044     * a filter to be specified on the Appender references.
045     *
046     * @param <T> The {@link Layout}'s {@link Serializable} type.
047     */
048    @Plugin(name = "Asynch", type = "Core", elementType = "appender", printObject = true)
049    public final class AsynchAppender<T extends Serializable> extends AbstractAppender<T> {
050    
051        private static final int DEFAULT_QUEUE_SIZE = 128;
052        private static final String SHUTDOWN = "Shutdown";
053    
054        private final BlockingQueue<Serializable> queue;
055        private final boolean blocking;
056        private final Configuration config;
057        private final AppenderRef[] appenderRefs;
058        private final String errorRef;
059        private final boolean includeLocation;
060        private AppenderControl errorAppender;
061        private AsynchThread thread;
062    
063        private AsynchAppender(final String name, final Filter filter, final AppenderRef[] appenderRefs,
064                               final String errorRef, final int queueSize, final boolean blocking,
065                               final boolean handleExceptions, final Configuration config,
066                               final boolean includeLocation) {
067            super(name, filter, null, handleExceptions);
068            this.queue = new ArrayBlockingQueue<Serializable>(queueSize);
069            this.blocking = blocking;
070            this.config = config;
071            this.appenderRefs = appenderRefs;
072            this.errorRef = errorRef;
073            this.includeLocation = includeLocation;
074        }
075    
076        @Override
077        public void start() {
078            final Map<String, Appender<?>> map = config.getAppenders();
079            final List<AppenderControl> appenders = new ArrayList<AppenderControl>();
080            for (final AppenderRef appenderRef : appenderRefs) {
081                if (map.containsKey(appenderRef.getRef())) {
082                    appenders.add(new AppenderControl(map.get(appenderRef.getRef()), appenderRef.getLevel(),
083                        appenderRef.getFilter()));
084                } else {
085                    LOGGER.error("No appender named {} was configured", appenderRef);
086                }
087            }
088            if (errorRef != null) {
089                if (map.containsKey(errorRef)) {
090                    errorAppender = new AppenderControl(map.get(errorRef), null, null);
091                } else {
092                    LOGGER.error("Unable to set up error Appender. No appender named {} was configured", errorRef);
093                }
094            }
095            if (appenders.size() > 0) {
096                thread = new AsynchThread(appenders, queue);
097            } else if (errorRef == null) {
098                throw new ConfigurationException("No appenders are available for AsynchAppender " + getName());
099            }
100    
101            thread.start();
102            super.start();
103        }
104    
105        @Override
106        public void stop() {
107            super.stop();
108            thread.shutdown();
109            try {
110                thread.join();
111            } catch (final InterruptedException ex) {
112                LOGGER.warn("Interrupted while stopping AsynchAppender {}", getName());
113            }
114        }
115    
116        /**
117         * Actual writing occurs here.
118         * <p/>
119         * @param event The LogEvent.
120         */
121        public void append(final LogEvent event) {
122            if (!isStarted()) {
123                throw new IllegalStateException("AsynchAppender " + getName() + " is not active");
124            }
125            if (event instanceof Log4jLogEvent) {
126                boolean appendSuccessful = false;
127                if (blocking){
128                    try {
129                        // wait for free slots in the queue
130                        queue.put(Log4jLogEvent.serialize((Log4jLogEvent) event, includeLocation));
131                        appendSuccessful = true;
132                    } catch (InterruptedException e) {
133                        LOGGER.warn("Interrupted while waiting for a free slots in the LogEvent-queue at the AsynchAppender {}", getName());
134                    }
135                } else {
136                    appendSuccessful = queue.offer(Log4jLogEvent.serialize((Log4jLogEvent) event, includeLocation));
137                    if (!appendSuccessful) {
138                        error("Appender " + getName() + " is unable to write primary appenders. queue is full");
139                    }
140                }
141                if ((!appendSuccessful) && (errorAppender != null)){
142                    errorAppender.callAppender(event);
143                }
144            }
145        }
146    
147        /**
148         * Create an AsynchAppender.
149         * @param appenderRefs The Appenders to reference.
150         * @param errorRef An optional Appender to write to if the queue is full or other errors occur.
151         * @param blocking True if the Appender should wait when the queue is full. The default is true.
152         * @param size The size of the event queue. The default is 128.
153         * @param name The name of the Appender.
154         * @param includeLocation whether to include location information. The default is false.
155         * @param filter The Filter or null.
156         * @param config The Configuration.
157         * @param suppress "true" if exceptions should be hidden from the application, "false" otherwise.
158         * The default is "true".
159         * @param <S> The actual type of the Serializable.
160         * @return The AsynchAppender.
161         */
162        @PluginFactory
163        public static <S extends Serializable> AsynchAppender<S> createAppender(
164                    @PluginElement("appender-ref") final AppenderRef[] appenderRefs,
165                    @PluginAttr("error-ref") final String errorRef,
166                    @PluginAttr("blocking") final String blocking,
167                    @PluginAttr("bufferSize") final String size,
168                    @PluginAttr("name") final String name,
169                    @PluginAttr("includeLocation") final String includeLocation,
170                    @PluginElement("filter") final Filter filter,
171                    @PluginConfiguration final Configuration config,
172                    @PluginAttr("suppressExceptions") final String suppress) {
173            if (name == null) {
174                LOGGER.error("No name provided for AsynchAppender");
175                return null;
176            }
177            if (appenderRefs == null) {
178                LOGGER.error("No appender references provided to AsynchAppender {}", name);
179            }
180    
181            final boolean isBlocking = blocking == null ? true : Boolean.valueOf(blocking);
182            final int queueSize = size == null ? DEFAULT_QUEUE_SIZE : Integer.parseInt(size);
183            final boolean isIncludeLocation = includeLocation == null ? false :
184                    Boolean.parseBoolean(includeLocation);
185    
186            final boolean handleExceptions = suppress == null ? true : Boolean.valueOf(suppress);
187    
188            return new AsynchAppender<S>(name, filter, appenderRefs, errorRef, 
189                    queueSize, isBlocking, handleExceptions, config, isIncludeLocation);
190        }
191    
192        /**
193         * Thread that calls the Appenders.
194         */
195        private class AsynchThread extends Thread {
196    
197            private volatile boolean shutdown = false;
198            private final List<AppenderControl> appenders;
199            private final BlockingQueue<Serializable> queue;
200    
201            public AsynchThread(final List<AppenderControl> appenders, final BlockingQueue<Serializable> queue) {
202                this.appenders = appenders;
203                this.queue = queue;
204            }
205    
206            @Override
207            public void run() {
208                while (!shutdown) {
209                    Serializable s;
210                    try {
211                        s = queue.take();
212                        if (s != null && s instanceof String && SHUTDOWN.equals(s.toString())) {
213                            shutdown = true;
214                            continue;
215                        }
216                    } catch (final InterruptedException ex) {
217                        // No good reason for this.
218                        continue;
219                    }
220                    final Log4jLogEvent event = Log4jLogEvent.deserialize(s);
221                    event.setEndOfBatch(queue.isEmpty());
222                    boolean success = false;
223                    for (final AppenderControl<?> control : appenders) {
224                        try {
225                            control.callAppender(event);
226                            success = true;
227                        } catch (final Exception ex) {
228                            // If no appender is successful the error appender will get it.
229                        }
230                    }
231                    if (!success && errorAppender != null) {
232                        try {
233                            errorAppender.callAppender(event);
234                        } catch (final Exception ex) {
235                            // Silently accept the error.
236                        }
237                    }
238                }
239                // Process any remaining items in the queue.
240                while (!queue.isEmpty()) {
241                    try {
242                        Serializable s = queue.take();
243                        if (s instanceof Log4jLogEvent) {
244                            final Log4jLogEvent event = Log4jLogEvent.deserialize(s);
245                            event.setEndOfBatch(queue.isEmpty());
246                            for (final AppenderControl<?> control : appenders) {
247                                control.callAppender(event);
248                            }
249                        }
250                    } catch (final InterruptedException ex) {
251                        // May have been interrupted to shut down.
252                    }
253                }
254            }
255    
256            public void shutdown() {
257                shutdown = true;
258                if (queue.isEmpty()) {
259                    queue.offer(SHUTDOWN);
260                }
261            }
262        }
263    }