001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements. See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache license, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License. You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the license for the specific language governing permissions and
015     * limitations under the license.
016     */
017    package org.apache.logging.log4j.core.appender;
018    
019    import org.apache.logging.log4j.core.Appender;
020    import org.apache.logging.log4j.core.Filter;
021    import org.apache.logging.log4j.core.LogEvent;
022    import org.apache.logging.log4j.core.config.AppenderControl;
023    import org.apache.logging.log4j.core.config.AppenderRef;
024    import org.apache.logging.log4j.core.config.Configuration;
025    import org.apache.logging.log4j.core.config.ConfigurationException;
026    import org.apache.logging.log4j.core.config.plugins.Plugin;
027    import org.apache.logging.log4j.core.config.plugins.PluginAttr;
028    import org.apache.logging.log4j.core.config.plugins.PluginConfiguration;
029    import org.apache.logging.log4j.core.config.plugins.PluginElement;
030    import org.apache.logging.log4j.core.config.plugins.PluginFactory;
031    import org.apache.logging.log4j.core.impl.Log4jLogEvent;
032    
033    import java.io.Serializable;
034    import java.util.ArrayList;
035    import java.util.List;
036    import java.util.Map;
037    import java.util.concurrent.ArrayBlockingQueue;
038    import java.util.concurrent.BlockingQueue;
039    
040    /**
041     * Appender to write to one or more Appenders asynchronously.  The AsynchAppender can be configrued with one
042     * or more Appenders and an Appender to write to if the queue is full. The AsynchAppender does not allow
043     * filter to be specified on the Appender references.
044     */
045    @Plugin(name = "Asynch", type = "Core", elementType = "appender", printObject = true)
046    public final class AsynchAppender extends AbstractAppender {
047    
048        private static final int DEFAULT_QUEUE_SIZE = 128;
049        private static final String SHUTDOWN = "Shutdown";
050    
051        private final BlockingQueue<Serializable> queue;
052        private final boolean blocking;
053        private final Configuration config;
054        private final AppenderRef[] appenderRefs;
055        private final String errorRef;
056        private AppenderControl errorAppender = null;
057        private AsynchThread thread = null;
058    
059        private AsynchAppender(String name, Filter filter, AppenderRef[] appenderRefs, String errorRef,
060                               int queueSize, boolean blocking,
061                               boolean handleExceptions, Configuration config) {
062            super(name, filter, null, handleExceptions);
063            this.queue = new ArrayBlockingQueue<Serializable>(queueSize);
064            this.blocking = blocking;
065            this.config = config;
066            this.appenderRefs = appenderRefs;
067            this.errorRef = errorRef;
068        }
069    
070        @Override
071        public void start() {
072            Map<String, Appender> map = config.getAppenders();
073            List<AppenderControl> appenders = new ArrayList<AppenderControl>();
074            for (AppenderRef appenderRef : appenderRefs) {
075                if (map.containsKey(appenderRef.getRef())) {
076                    appenders.add(new AppenderControl(map.get(appenderRef.getRef()), null, null));
077                } else {
078                    LOGGER.error("No appender named {} was configured", appenderRef);
079                }
080            }
081            if (errorRef != null) {
082                if (map.containsKey(errorRef)) {
083                    errorAppender = new AppenderControl(map.get(errorRef), null, null);
084                } else {
085                    LOGGER.error("Unable to set up error Appender. No appender named {} was configured", errorRef);
086                }
087            }
088            if (appenders.size() > 0) {
089                thread = new AsynchThread(appenders, queue);
090            } else if (errorRef == null) {
091                throw new ConfigurationException("No appenders are available for AsynchAppender " + getName());
092            }
093    
094            thread.start();
095            super.start();
096        }
097    
098        @Override
099        public void stop() {
100            super.stop();
101            thread.shutdown();
102            try {
103                thread.join();
104            } catch (InterruptedException ex) {
105                LOGGER.warn("Interrupted while stopping AsynchAppender {}", getName());
106            }
107        }
108    
109        /**
110         * Actual writing occurs here.
111         * <p/>
112         * @param event The LogEvent.
113         */
114        public void append(LogEvent event) {
115            if (!isStarted()) {
116                throw new IllegalStateException("AsynchAppender " + getName() + " is not active");
117            }
118            if (event instanceof Log4jLogEvent) {
119                if (blocking && queue.remainingCapacity() > 0) {
120                    try {
121                        queue.add(Log4jLogEvent.serialize((Log4jLogEvent) event));
122                        return;
123                    } catch (IllegalStateException ex) {
124                        error("Appender " + getName() + " is unable to write primary appenders. queue is full");
125                    }
126                }
127                if (errorAppender != null) {
128                    if (!blocking) {
129                        error("Appender " + getName() + " is unable to write primary appenders. queue is full");
130                    }
131                    errorAppender.callAppender(event);
132                }
133            }
134        }
135    
136        /**
137         * Create an AsynchAppender.
138         * @param appenderRefs The Appenders to reference.
139         * @param errorRef An optional Appender to write to if the queue is full or other errors occur.
140         * @param blocking True if the Appender should wait when the queue is full. The default is true.
141         * @param size The size of the event queue. The default is 128.
142         * @param name The name of the Appender.
143         * @param filter The Filter or null.
144         * @param config The Configuration.
145         * @param suppress "true" if exceptions should be hidden from the application, "false" otherwise.
146         * The default is "true".
147         * @return The AsynchAppender.
148         */
149        @PluginFactory
150        public static AsynchAppender createAppender(@PluginElement("appender-ref") AppenderRef[] appenderRefs,
151                                                    @PluginAttr("error-ref") String errorRef,
152                                                    @PluginAttr("blocking") String blocking,
153                                                    @PluginAttr("bufferSize") String size,
154                                                    @PluginAttr("name") String name,
155                                                    @PluginElement("filter") Filter filter,
156                                                    @PluginConfiguration Configuration config,
157                                                    @PluginAttr("suppressExceptions") String suppress) {
158            if (name == null) {
159                LOGGER.error("No name provided for AsynchAppender");
160                return null;
161            }
162            if (appenderRefs == null) {
163                LOGGER.error("No appender references provided to AsynchAppender {}", name);
164            }
165    
166            boolean isBlocking = blocking == null ? true : Boolean.valueOf(blocking);
167            int queueSize = size == null ? DEFAULT_QUEUE_SIZE : Integer.parseInt(size);
168    
169            boolean handleExceptions = suppress == null ? true : Boolean.valueOf(suppress);
170    
171            return new AsynchAppender(name, filter, appenderRefs, errorRef, queueSize, isBlocking, handleExceptions,
172                                      config);
173        }
174    
175        /**
176         * Thread that calls the Appenders.
177         */
178        private class AsynchThread extends Thread {
179    
180            private volatile boolean shutdown = false;
181            private final List<AppenderControl> appenders;
182            private final BlockingQueue<Serializable> queue;
183    
184            public AsynchThread(List<AppenderControl> appenders, BlockingQueue<Serializable> queue) {
185                this.appenders = appenders;
186                this.queue = queue;
187            }
188    
189            @Override
190            public void run() {
191                while (!shutdown) {
192                    Serializable s;
193                    try {
194                        s = queue.take();
195                        if (s != null && s instanceof String && SHUTDOWN.equals(s.toString())) {
196                            shutdown = true;
197                            continue;
198                        }
199                    } catch (InterruptedException ex) {
200                        // No good reason for this.
201                        continue;
202                    }
203                    Log4jLogEvent event = Log4jLogEvent.deserialize(s);
204                    boolean success = false;
205                    for (AppenderControl control : appenders) {
206                        try {
207                            control.callAppender(event);
208                            success = true;
209                        } catch (Exception ex) {
210                            // If no appender is successful the error appender will get it.
211                        }
212                    }
213                    if (!success && errorAppender != null) {
214                        try {
215                            errorAppender.callAppender(event);
216                        } catch (Exception ex) {
217                            // Silently accept the error.
218                        }
219                    }
220                }
221                // Process any remaining items in the queue.
222                while (!queue.isEmpty()) {
223                    try {
224                        Log4jLogEvent event = Log4jLogEvent.deserialize(queue.take());
225                        for (AppenderControl control : appenders) {
226                            control.callAppender(event);
227                        }
228                    } catch (InterruptedException ex) {
229                        // May have been interrupted to shut down.
230                    }
231                }
232            }
233    
234            public void shutdown() {
235                shutdown = true;
236                if (queue.isEmpty()) {
237                    queue.offer(SHUTDOWN);
238                }
239            }
240        }
241    }