001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements. See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache license, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License. You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the license for the specific language governing permissions and
015     * limitations under the license.
016     */
017    package org.apache.logging.log4j.flume.appender;
018    
019    import java.io.Serializable;
020    import java.util.Locale;
021    
022    import org.apache.logging.log4j.core.Filter;
023    import org.apache.logging.log4j.core.Layout;
024    import org.apache.logging.log4j.core.LogEvent;
025    import org.apache.logging.log4j.core.appender.AbstractAppender;
026    import org.apache.logging.log4j.core.config.Property;
027    import org.apache.logging.log4j.core.config.plugins.Plugin;
028    import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
029    import org.apache.logging.log4j.core.config.plugins.PluginElement;
030    import org.apache.logging.log4j.core.config.plugins.PluginFactory;
031    import org.apache.logging.log4j.core.layout.Rfc5424Layout;
032    import org.apache.logging.log4j.core.net.Facility;
033    import org.apache.logging.log4j.core.util.Booleans;
034    import org.apache.logging.log4j.core.util.Integers;
035    
036    /**
037     * An Appender that uses the Avro protocol to route events to Flume.
038     */
039    @Plugin(name = "Flume", category = "Core", elementType = "appender", printObject = true)
040    public final class FlumeAppender extends AbstractAppender implements FlumeEventFactory {
041    
042        private static final String[] EXCLUDED_PACKAGES = {"org.apache.flume", "org.apache.avro"};
043        private static final int DEFAULT_MAX_DELAY = 60000;
044    
045        private static final int DEFAULT_LOCK_TIMEOUT_RETRY_COUNT = 5;
046    
047        private final AbstractFlumeManager manager;
048    
049        private final String mdcIncludes;
050        private final String mdcExcludes;
051        private final String mdcRequired;
052    
053        private final String eventPrefix;
054    
055        private final String mdcPrefix;
056    
057        private final boolean compressBody;
058    
059        private final FlumeEventFactory factory;
060    
061        /**
062         * Which Manager will be used by the appender instance.
063         */
064        private enum ManagerType {
065            AVRO, EMBEDDED, PERSISTENT;
066    
067            public static ManagerType getType(final String type) {
068                return valueOf(type.toUpperCase(Locale.US));
069            }
070        }
071    
072        private FlumeAppender(final String name, final Filter filter, final Layout<? extends Serializable> layout,
073                              final boolean ignoreExceptions, final String includes, final String excludes,
074                              final String required, final String mdcPrefix, final String eventPrefix,
075                              final boolean compress, final FlumeEventFactory factory, final AbstractFlumeManager manager) {
076            super(name, filter, layout, ignoreExceptions);
077            this.manager = manager;
078            this.mdcIncludes = includes;
079            this.mdcExcludes = excludes;
080            this.mdcRequired = required;
081            this.eventPrefix = eventPrefix;
082            this.mdcPrefix = mdcPrefix;
083            this.compressBody = compress;
084            this.factory = factory == null ? this : factory;
085        }
086    
087        /**
088         * Publish the event.
089         * @param event The LogEvent.
090         */
091        @Override
092        public void append(final LogEvent event) {
093            final String name = event.getLoggerName();
094            if (name != null) {
095                for (final String pkg : EXCLUDED_PACKAGES) {
096                    if (name.startsWith(pkg)) {
097                        return;
098                    }
099                }
100            }
101            final FlumeEvent flumeEvent = factory.createEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
102                eventPrefix, compressBody);
103            flumeEvent.setBody(getLayout().toByteArray(flumeEvent));
104            manager.send(flumeEvent);
105        }
106    
107        @Override
108        public void stop() {
109            super.stop();
110            manager.release();
111        }
112    
113        /**
114         * Create a Flume event.
115         * @param event The Log4j LogEvent.
116         * @param includes comma separated list of mdc elements to include.
117         * @param excludes comma separated list of mdc elements to exclude.
118         * @param required comma separated list of mdc elements that must be present with a value.
119         * @param mdcPrefix The prefix to add to MDC key names.
120         * @param eventPrefix The prefix to add to event fields.
121         * @param compress If true the body will be compressed.
122         * @return A Flume Event.
123         */
124        @Override
125        public FlumeEvent createEvent(final LogEvent event, final String includes, final String excludes,
126                                      final String required, final String mdcPrefix, final String eventPrefix,
127                                      final boolean compress) {
128            return new FlumeEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
129                eventPrefix, compressBody);
130        }
131    
132        /**
133         * Create a Flume Avro Appender.
134         * @param agents An array of Agents.
135         * @param properties Properties to pass to the embedded agent.
136         * @param embedded true if the embedded agent manager should be used. otherwise the Avro manager will be used.
137         * <b>Note: </b><i>The embedded attribute is deprecated in favor of specifying the type attribute.</i>
138         * @param type Avro (default), Embedded, or Persistent.
139         * @param dataDir The directory where the Flume FileChannel should write its data.
140         * @param connectionTimeout The amount of time in milliseconds to wait before a connection times out. Minimum is
141         *                          1000.
142         * @param requestTimeout The amount of time in milliseconds to wait before a request times out. Minimum is 1000.
143         * @param agentRetries The number of times to retry an agent before failing to the next agent.
144         * @param maxDelay The maximum number of seconds to wait for a complete batch.
145         * @param name The name of the Appender.
146         * @param ignore If {@code "true"} (default) exceptions encountered when appending events are logged; otherwise
147         *               they are propagated to the caller.
148         * @param excludes A comma separated list of MDC elements to exclude.
149         * @param includes A comma separated list of MDC elements to include.
150         * @param required A comma separated list of MDC elements that are required.
151         * @param mdcPrefix The prefix to add to MDC key names.
152         * @param eventPrefix The prefix to add to event key names.
153         * @param compressBody If true the event body will be compressed.
154         * @param batchSize Number of events to include in a batch. Defaults to 1.
155         * @param lockTimeoutRetries Times to retry a lock timeout when writing to Berkeley DB.
156         * @param factory The factory to use to create Flume events.
157         * @param layout The layout to format the event.
158         * @param filter A Filter to filter events.
159         *
160         * @return A Flume Avro Appender.
161         */
162        @PluginFactory
163        public static FlumeAppender createAppender(@PluginElement("Agents") Agent[] agents,
164                                                   @PluginElement("Properties") final Property[] properties,
165                                                   @PluginAttribute("embedded") final String embedded,
166                                                   @PluginAttribute("type") final String type,
167                                                   @PluginAttribute("dataDir") final String dataDir,
168                                                   @PluginAttribute("connectTimeout") final String connectionTimeout,
169                                                   @PluginAttribute("requestTimeout") final String requestTimeout,
170                                                   @PluginAttribute("agentRetries") final String agentRetries,
171                                                   @PluginAttribute("maxDelay") final String maxDelay,
172                                                   @PluginAttribute("name") final String name,
173                                                   @PluginAttribute("ignoreExceptions") final String ignore,
174                                                   @PluginAttribute("mdcExcludes") final String excludes,
175                                                   @PluginAttribute("mdcIncludes") final String includes,
176                                                   @PluginAttribute("mdcRequired") final String required,
177                                                   @PluginAttribute("mdcPrefix") final String mdcPrefix,
178                                                   @PluginAttribute("eventPrefix") final String eventPrefix,
179                                                   @PluginAttribute("compress") final String compressBody,
180                                                   @PluginAttribute("batchSize") final String batchSize,
181                                                   @PluginAttribute("lockTimeoutRetries") final String lockTimeoutRetries,
182                                                   @PluginElement("FlumeEventFactory") final FlumeEventFactory factory,
183                                                   @PluginElement("Layout") Layout<? extends Serializable> layout,
184                                                   @PluginElement("Filter") final Filter filter) {
185    
186            final boolean embed = embedded != null ? Boolean.parseBoolean(embedded) :
187                (agents == null || agents.length == 0) && properties != null && properties.length > 0;
188            final boolean ignoreExceptions = Booleans.parseBoolean(ignore, true);
189            final boolean compress = Booleans.parseBoolean(compressBody, true);
190            ManagerType managerType;
191            if (type != null) {
192                if (embed && embedded != null) {
193                    try {
194                        managerType = ManagerType.getType(type);
195                        LOGGER.warn("Embedded and type attributes are mutually exclusive. Using type " + type);
196                    } catch (final Exception ex) {
197                        LOGGER.warn("Embedded and type attributes are mutually exclusive and type " + type +
198                            " is invalid.");
199                        managerType = ManagerType.EMBEDDED;
200                    }
201                } else {
202                    try {
203                        managerType = ManagerType.getType(type);
204                    } catch (final Exception ex) {
205                        LOGGER.warn("Type " + type + " is invalid.");
206                        managerType = ManagerType.EMBEDDED;
207                    }
208                }
209            }  else if (embed) {
210               managerType = ManagerType.EMBEDDED;
211            }  else {
212               managerType = ManagerType.AVRO;
213            }
214    
215            final int batchCount = Integers.parseInt(batchSize, 1);
216            final int connectTimeout = Integers.parseInt(connectionTimeout, 0);
217            final int reqTimeout = Integers.parseInt(requestTimeout, 0);
218            final int retries = Integers.parseInt(agentRetries, 0);
219            final int lockTimeoutRetryCount = Integers.parseInt(lockTimeoutRetries, DEFAULT_LOCK_TIMEOUT_RETRY_COUNT);
220            final int delay = Integers.parseInt(maxDelay, DEFAULT_MAX_DELAY);
221    
222            if (layout == null) {
223                final int enterpriseNumber = Rfc5424Layout.DEFAULT_ENTERPRISE_NUMBER;
224                layout = Rfc5424Layout.createLayout(Facility.LOCAL0, null, enterpriseNumber, true, Rfc5424Layout.DEFAULT_MDCID,
225                        mdcPrefix, eventPrefix, false, null, null, null, excludes, includes, required, null, false, null,
226                        null);
227            }
228    
229            if (name == null) {
230                LOGGER.error("No name provided for Appender");
231                return null;
232            }
233    
234            AbstractFlumeManager manager;
235    
236            switch (managerType) {
237                case EMBEDDED:
238                    manager = FlumeEmbeddedManager.getManager(name, agents, properties, batchCount, dataDir);
239                    break;
240                case AVRO:
241                    if (agents == null || agents.length == 0) {
242                        LOGGER.debug("No agents provided, using defaults");
243                        agents = new Agent[] {Agent.createAgent(null, null)};
244                    }
245                    manager = FlumeAvroManager.getManager(name, agents, batchCount, retries, connectTimeout, reqTimeout);
246                    break;
247                case PERSISTENT:
248                    if (agents == null || agents.length == 0) {
249                        LOGGER.debug("No agents provided, using defaults");
250                        agents = new Agent[] {Agent.createAgent(null, null)};
251                    }
252                    manager = FlumePersistentManager.getManager(name, agents, properties, batchCount, retries,
253                        connectTimeout, reqTimeout, delay, lockTimeoutRetryCount, dataDir);
254                    break;
255                default:
256                    LOGGER.debug("No manager type specified. Defaulting to AVRO");
257                    if (agents == null || agents.length == 0) {
258                        LOGGER.debug("No agents provided, using defaults");
259                        agents = new Agent[] {Agent.createAgent(null, null)};
260                    }
261                    manager = FlumeAvroManager.getManager(name, agents, batchCount, retries, connectTimeout, reqTimeout);
262            }
263    
264            if (manager == null) {
265                return null;
266            }
267    
268            return new FlumeAppender(name, filter, layout,  ignoreExceptions, includes,
269                excludes, required, mdcPrefix, eventPrefix, compress, factory, manager);
270        }
271    }