001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017package org.apache.logging.log4j.flume.appender;
018
019import java.io.Serializable;
020import java.util.Locale;
021
022import org.apache.logging.log4j.core.Filter;
023import org.apache.logging.log4j.core.Layout;
024import org.apache.logging.log4j.core.LogEvent;
025import org.apache.logging.log4j.core.appender.AbstractAppender;
026import org.apache.logging.log4j.core.config.Property;
027import org.apache.logging.log4j.core.config.plugins.Plugin;
028import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
029import org.apache.logging.log4j.core.config.plugins.PluginElement;
030import org.apache.logging.log4j.core.config.plugins.PluginFactory;
031import org.apache.logging.log4j.core.helpers.Booleans;
032import org.apache.logging.log4j.core.helpers.Integers;
033import org.apache.logging.log4j.core.layout.RFC5424Layout;
034
035/**
036 * An Appender that uses the Avro protocol to route events to Flume.
037 */
038@Plugin(name = "Flume", category = "Core", elementType = "appender", printObject = true)
039public final class FlumeAppender extends AbstractAppender implements FlumeEventFactory {
040
041    private static final String[] EXCLUDED_PACKAGES = {"org.apache.flume", "org.apache.avro"};
042    private static final int DEFAULT_MAX_DELAY = 60000;
043
044    private static final int DEFAULT_LOCK_TIMEOUT_RETRY_COUNT = 5;
045
046    private final AbstractFlumeManager manager;
047
048    private final String mdcIncludes;
049    private final String mdcExcludes;
050    private final String mdcRequired;
051
052    private final String eventPrefix;
053
054    private final String mdcPrefix;
055
056    private final boolean compressBody;
057
058    private final FlumeEventFactory factory;
059
060    /**
061     * Which Manager will be used by the appender instance.
062     */
063    private enum ManagerType {
064        AVRO, EMBEDDED, PERSISTENT;
065
066        public static ManagerType getType(final String type) {
067            return valueOf(type.toUpperCase(Locale.US));
068        }
069    }
070
071    private FlumeAppender(final String name, final Filter filter, final Layout<? extends Serializable> layout,
072                          final boolean ignoreExceptions, final String includes, final String excludes,
073                          final String required, final String mdcPrefix, final String eventPrefix,
074                          final boolean compress, final FlumeEventFactory factory, final AbstractFlumeManager manager) {
075        super(name, filter, layout, ignoreExceptions);
076        this.manager = manager;
077        this.mdcIncludes = includes;
078        this.mdcExcludes = excludes;
079        this.mdcRequired = required;
080        this.eventPrefix = eventPrefix;
081        this.mdcPrefix = mdcPrefix;
082        this.compressBody = compress;
083        this.factory = factory == null ? this : factory;
084    }
085
086    /**
087     * Publish the event.
088     * @param event The LogEvent.
089     */
090    @Override
091    public void append(final LogEvent event) {
092        final String name = event.getLoggerName();
093        if (name != null) {
094            for (final String pkg : EXCLUDED_PACKAGES) {
095                if (name.startsWith(pkg)) {
096                    return;
097                }
098            }
099        }
100        final FlumeEvent flumeEvent = factory.createEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
101            eventPrefix, compressBody);
102        flumeEvent.setBody(getLayout().toByteArray(flumeEvent));
103        manager.send(flumeEvent);
104    }
105
106    @Override
107    public void stop() {
108        super.stop();
109        manager.release();
110    }
111
112    /**
113     * Create a Flume event.
114     * @param event The Log4j LogEvent.
115     * @param includes comma separated list of mdc elements to include.
116     * @param excludes comma separated list of mdc elements to exclude.
117     * @param required comma separated list of mdc elements that must be present with a value.
118     * @param mdcPrefix The prefix to add to MDC key names.
119     * @param eventPrefix The prefix to add to event fields.
120     * @param compress If true the body will be compressed.
121     * @return A Flume Event.
122     */
123    @Override
124    public FlumeEvent createEvent(final LogEvent event, final String includes, final String excludes,
125                                  final String required, final String mdcPrefix, final String eventPrefix,
126                                  final boolean compress) {
127        return new FlumeEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
128            eventPrefix, compressBody);
129    }
130
131    /**
132     * Create a Flume Avro Appender.
133     * @param agents An array of Agents.
134     * @param properties Properties to pass to the embedded agent.
135     * @param embedded true if the embedded agent manager should be used. otherwise the Avro manager will be used.
136     * <b>Note: </b><i>The embedded attribute is deprecated in favor of specifying the type attribute.</i>
137     * @param type Avro (default), Embedded, or Persistent.
138     * @param dataDir The directory where the Flume FileChannel should write its data.
139     * @param connectionTimeout The amount of time in milliseconds to wait before a connection times out. Minimum is
140     *                          1000.
141     * @param requestTimeout The amount of time in milliseconds to wait before a request times out. Minimum is 1000.
142     * @param agentRetries The number of times to retry an agent before failing to the next agent.
143     * @param maxDelay The maximum number of seconds to wait for a complete batch.
144     * @param name The name of the Appender.
145     * @param ignore If {@code "true"} (default) exceptions encountered when appending events are logged; otherwise
146     *               they are propagated to the caller.
147     * @param excludes A comma separated list of MDC elements to exclude.
148     * @param includes A comma separated list of MDC elements to include.
149     * @param required A comma separated list of MDC elements that are required.
150     * @param mdcPrefix The prefix to add to MDC key names.
151     * @param eventPrefix The prefix to add to event key names.
152     * @param compressBody If true the event body will be compressed.
153     * @param batchSize Number of events to include in a batch. Defaults to 1.
154     * @param lockTimeoutRetries Times to retry a lock timeout when writing to Berkeley DB.
155     * @param factory The factory to use to create Flume events.
156     * @param layout The layout to format the event.
157     * @param filter A Filter to filter events.
158     *
159     * @return A Flume Avro Appender.
160     */
161    @PluginFactory
162    public static FlumeAppender createAppender(@PluginElement("Agents") Agent[] agents,
163                                               @PluginElement("Properties") final Property[] properties,
164                                               @PluginAttribute("embedded") final String embedded,
165                                               @PluginAttribute("type") final String type,
166                                               @PluginAttribute("dataDir") final String dataDir,
167                                               @PluginAttribute("connectTimeout") final String connectionTimeout,
168                                               @PluginAttribute("requestTimeout") final String requestTimeout,
169                                               @PluginAttribute("agentRetries") final String agentRetries,
170                                               @PluginAttribute("maxDelay") final String maxDelay,
171                                               @PluginAttribute("name") final String name,
172                                               @PluginAttribute("ignoreExceptions") final String ignore,
173                                               @PluginAttribute("mdcExcludes") final String excludes,
174                                               @PluginAttribute("mdcIncludes") final String includes,
175                                               @PluginAttribute("mdcRequired") final String required,
176                                               @PluginAttribute("mdcPrefix") final String mdcPrefix,
177                                               @PluginAttribute("eventPrefix") final String eventPrefix,
178                                               @PluginAttribute("compress") final String compressBody,
179                                               @PluginAttribute("batchSize") final String batchSize,
180                                               @PluginAttribute("lockTimeoutRetries") final String lockTimeoutRetries,
181                                               @PluginElement("FlumeEventFactory") final FlumeEventFactory factory,
182                                               @PluginElement("Layout") Layout<? extends Serializable> layout,
183                                               @PluginElement("Filters") final Filter filter) {
184
185        final boolean embed = embedded != null ? Boolean.parseBoolean(embedded) :
186            (agents == null || agents.length == 0) && properties != null && properties.length > 0;
187        final boolean ignoreExceptions = Booleans.parseBoolean(ignore, true);
188        final boolean compress = Booleans.parseBoolean(compressBody, true);
189        ManagerType managerType;
190        if (type != null) {
191            if (embed && embedded != null) {
192                try {
193                    managerType = ManagerType.getType(type);
194                    LOGGER.warn("Embedded and type attributes are mutually exclusive. Using type " + type);
195                } catch (final Exception ex) {
196                    LOGGER.warn("Embedded and type attributes are mutually exclusive and type " + type +
197                        " is invalid.");
198                    managerType = ManagerType.EMBEDDED;
199                }
200            } else {
201                try {
202                    managerType = ManagerType.getType(type);
203                } catch (final Exception ex) {
204                    LOGGER.warn("Type " + type + " is invalid.");
205                    managerType = ManagerType.EMBEDDED;
206                }
207            }
208        }  else if (embed) {
209           managerType = ManagerType.EMBEDDED;
210        }  else {
211           managerType = ManagerType.AVRO;
212        }
213
214        final int batchCount = Integers.parseInt(batchSize, 1);
215        final int connectTimeout = Integers.parseInt(connectionTimeout, 0);
216        final int reqTimeout = Integers.parseInt(requestTimeout, 0);
217        final int retries = Integers.parseInt(agentRetries, 0);
218        final int lockTimeoutRetryCount = Integers.parseInt(lockTimeoutRetries, DEFAULT_LOCK_TIMEOUT_RETRY_COUNT);
219        final int delay = Integers.parseInt(maxDelay, DEFAULT_MAX_DELAY);
220
221        if (layout == null) {
222            layout = RFC5424Layout.createLayout(null, null, null, "True", null, mdcPrefix, eventPrefix,
223                    null, null, null, null, excludes, includes, required, null, null, null, null);
224        }
225
226        if (name == null) {
227            LOGGER.error("No name provided for Appender");
228            return null;
229        }
230
231        AbstractFlumeManager manager;
232
233        switch (managerType) {
234            case EMBEDDED:
235                manager = FlumeEmbeddedManager.getManager(name, agents, properties, batchCount, dataDir);
236                break;
237            case AVRO:
238                if (agents == null || agents.length == 0) {
239                    LOGGER.debug("No agents provided, using defaults");
240                    agents = new Agent[] {Agent.createAgent(null, null)};
241                }
242                manager = FlumeAvroManager.getManager(name, agents, batchCount, retries, connectTimeout, reqTimeout);
243                break;
244            case PERSISTENT:
245                if (agents == null || agents.length == 0) {
246                    LOGGER.debug("No agents provided, using defaults");
247                    agents = new Agent[] {Agent.createAgent(null, null)};
248                }
249                manager = FlumePersistentManager.getManager(name, agents, properties, batchCount, retries,
250                    connectTimeout, reqTimeout, delay, lockTimeoutRetryCount, dataDir);
251                break;
252            default:
253                LOGGER.debug("No manager type specified. Defaulting to AVRO");
254                if (agents == null || agents.length == 0) {
255                    LOGGER.debug("No agents provided, using defaults");
256                    agents = new Agent[] {Agent.createAgent(null, null)};
257                }
258                manager = FlumeAvroManager.getManager(name, agents, batchCount, retries, connectTimeout, reqTimeout);
259        }
260
261        if (manager == null) {
262            return null;
263        }
264
265        return new FlumeAppender(name, filter, layout,  ignoreExceptions, includes,
266            excludes, required, mdcPrefix, eventPrefix, compress, factory, manager);
267    }
268}