001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements. See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache license, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License. You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the license for the specific language governing permissions and
015     * limitations under the license.
016     */
017    package org.apache.logging.log4j.flume.appender;
018    
019    import org.apache.logging.log4j.core.Filter;
020    import org.apache.logging.log4j.core.Layout;
021    import org.apache.logging.log4j.core.LogEvent;
022    import org.apache.logging.log4j.core.appender.AbstractAppender;
023    import org.apache.logging.log4j.core.config.Property;
024    import org.apache.logging.log4j.core.config.plugins.Plugin;
025    import org.apache.logging.log4j.core.config.plugins.PluginAttr;
026    import org.apache.logging.log4j.core.config.plugins.PluginElement;
027    import org.apache.logging.log4j.core.config.plugins.PluginFactory;
028    import org.apache.logging.log4j.core.layout.RFC5424Layout;
029    
030    import java.util.Locale;
031    
032    /**
033     * An Appender that uses the Avro protocol to route events to Flume.
034     */
035    @Plugin(name = "Flume", type = "Core", elementType = "appender", printObject = true)
036    public final class FlumeAppender extends AbstractAppender implements FlumeEventFactory {
037    
038        private final AbstractFlumeManager manager;
039    
040        private final String mdcIncludes;
041        private final String mdcExcludes;
042        private final String mdcRequired;
043    
044        private final String eventPrefix;
045    
046        private final String mdcPrefix;
047    
048        private final boolean compressBody;
049    
050        private final FlumeEventFactory factory;
051    
052        private enum ManagerType {
053            AVRO, EMBEDDED, PERSISTENT;
054    
055            public static ManagerType getType(String type) {
056                return valueOf(type.toUpperCase(Locale.US));
057            }
058        }
059    
060        private FlumeAppender(final String name, final Filter filter, final Layout layout, final boolean handleException,
061                              final String includes, final String excludes, final String required, final String mdcPrefix,
062                              final String eventPrefix, final boolean compress,
063                              final FlumeEventFactory factory, final AbstractFlumeManager manager) {
064            super(name, filter, layout, handleException);
065            this.manager = manager;
066            this.mdcIncludes = includes;
067            this.mdcExcludes = excludes;
068            this.mdcRequired = required;
069            this.eventPrefix = eventPrefix;
070            this.mdcPrefix = mdcPrefix;
071            this.compressBody = compress;
072            this.factory = factory == null ? this : factory;
073        }
074    
075        /**
076         * Publish the event.
077         * @param event The LogEvent.
078         */
079        public void append(final LogEvent event) {
080    
081            final FlumeEvent flumeEvent = factory.createEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
082                eventPrefix, compressBody);
083            flumeEvent.setBody(getLayout().toByteArray(flumeEvent));
084            manager.send(flumeEvent);
085        }
086    
087        @Override
088        public void stop() {
089            super.stop();
090            manager.release();
091        }
092    
093        /**
094         * Create a Flume event.
095         * @param event The Log4j LogEvent.
096         * @param includes comma separated list of mdc elements to include.
097         * @param excludes comma separated list of mdc elements to exclude.
098         * @param required comma separated list of mdc elements that must be present with a value.
099         * @param mdcPrefix The prefix to add to MDC key names.
100         * @param eventPrefix The prefix to add to event fields.
101         * @param compress If true the body will be compressed.
102         * @return A Flume Event.
103         */
104        public FlumeEvent createEvent(final LogEvent event, final String includes, final String excludes,
105                                      final String required, final String mdcPrefix, final String eventPrefix,
106                                      final boolean compress) {
107            return new FlumeEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
108                eventPrefix, compressBody);
109        }
110    
111        /**
112         * Create a Flume Avro Appender.
113         * @param agents An array of Agents.
114         * @param properties Properties to pass to the embedded agent.
115         * @param embedded true if the embedded agent manager should be used. otherwise the Avro manager will be used.
116         * <b>Note: </b><i>The embedded attribute is deprecated in favor of specifying the type attribute.</i>
117         * @param type Avro (default), Embedded, or Persistent.
118         * @param dataDir The directory where the Flume FileChannel should write its data.
119         * @param connectionTimeout The amount of time in milliseconds to wait before a connection times out. Minimum is
120         *                          1000.
121         * @param requestTimeout The amount of time in milliseconds to wait before a request times out. Minimum is 1000.
122         * @param agentRetries The number of times to retry an agent before failing to the next agent.
123         * @param maxDelay The maximum number of seconds to wait for a complete batch.
124         * @param name The name of the Appender.
125         * @param suppress If true exceptions will be handled in the appender.
126         * @param excludes A comma separated list of MDC elements to exclude.
127         * @param includes A comma separated list of MDC elements to include.
128         * @param required A comma separated list of MDC elements that are required.
129         * @param mdcPrefix The prefix to add to MDC key names.
130         * @param eventPrefix The prefix to add to event key names.
131         * @param compressBody If true the event body will be compressed.
132         * @param batchSize Number of events to include in a batch. Defaults to 1.
133         * @param factory The factory to use to create Flume events.
134         * @param layout The layout to format the event.
135         * @param filter A Filter to filter events.
136         * @return A Flume Avro Appender.
137         */
138        @PluginFactory
139        public static FlumeAppender createAppender(@PluginElement("agents") Agent[] agents,
140                                                       @PluginElement("properties") final Property[] properties,
141                                                       @PluginAttr("embedded") final String embedded,
142                                                       @PluginAttr("type") final String type,
143                                                       @PluginAttr("dataDir") final String dataDir,
144                                                       @PluginAttr("connectTimeout") final String connectionTimeout,
145                                                       @PluginAttr("requestTimeout") final String requestTimeout,
146                                                       @PluginAttr("agentRetries") final String agentRetries,
147                                                       @PluginAttr("maxDelay") final String maxDelay,
148                                                       @PluginAttr("name") final String name,
149                                                       @PluginAttr("suppressExceptions") final String suppress,
150                                                       @PluginAttr("mdcExcludes") final String excludes,
151                                                       @PluginAttr("mdcIncludes") final String includes,
152                                                       @PluginAttr("mdcRequired") final String required,
153                                                       @PluginAttr("mdcPrefix") final String mdcPrefix,
154                                                       @PluginAttr("eventPrefix") final String eventPrefix,
155                                                       @PluginAttr("compress") final String compressBody,
156                                                       @PluginAttr("batchSize") final String batchSize,
157                                                       @PluginElement("flumeEventFactory") final FlumeEventFactory factory,
158                                                       @PluginElement("layout") Layout layout,
159                                                       @PluginElement("filters") final Filter filter) {
160    
161            final boolean embed = embedded != null ? Boolean.valueOf(embedded) :
162                (agents == null || agents.length == 0) && properties != null && properties.length > 0;
163            final boolean handleExceptions = suppress == null ? true : Boolean.valueOf(suppress);
164            final boolean compress = compressBody == null ? true : Boolean.valueOf(compressBody);
165            ManagerType managerType;
166            if (type != null) {
167                if (embed && embedded != null) {
168                    try {
169                        managerType = ManagerType.getType(type);
170                        LOGGER.warn("Embedded and type attributes are mutually exclusive. Using type " + type);
171                    } catch (Exception ex) {
172                        LOGGER.warn("Embedded and type attributes are mutually exclusive and type " + type + " is invalid.");
173                        managerType = ManagerType.EMBEDDED;
174                    }
175                } else {
176                    try {
177                        managerType = ManagerType.getType(type);
178                    } catch (Exception ex) {
179                        LOGGER.warn("Type " + type + " is invalid.");
180                        managerType = ManagerType.EMBEDDED;
181                    }
182                }
183            }  else if (embed) {
184               managerType = ManagerType.EMBEDDED;
185            }  else {
186               managerType = ManagerType.AVRO;
187            }
188    
189            final int batchCount = batchSize == null ? 1 : Integer.parseInt(batchSize);
190            final int connectTimeout = connectionTimeout == null ? 0 : Integer.parseInt(connectionTimeout);
191            final int reqTimeout = requestTimeout == null ? 0 : Integer.parseInt(requestTimeout);
192            final int retries = agentRetries == null ? 0 : Integer.parseInt(agentRetries);
193            final int delay = maxDelay == null ? 60000 : Integer.parseInt(maxDelay);
194    
195    
196            if (layout == null) {
197                layout = RFC5424Layout.createLayout(null, null, null, "True", null, mdcPrefix, eventPrefix, null, null,
198                    null, excludes, includes, required, null, null, null, null);
199            }
200    
201            if (name == null) {
202                LOGGER.error("No name provided for Appender");
203                return null;
204            }
205    
206            AbstractFlumeManager manager;
207    
208            switch (managerType) {
209                case EMBEDDED:
210                    manager = FlumeEmbeddedManager.getManager(name, agents, properties, batchCount, dataDir);
211                    break;
212                case AVRO:
213                    if (agents == null || agents.length == 0) {
214                        LOGGER.debug("No agents provided, using defaults");
215                        agents = new Agent[] {Agent.createAgent(null, null)};
216                    }
217                    manager = FlumeAvroManager.getManager(name, agents, batchCount, retries, connectTimeout, reqTimeout);
218                    break;
219                case PERSISTENT:
220                    if (agents == null || agents.length == 0) {
221                        LOGGER.debug("No agents provided, using defaults");
222                        agents = new Agent[] {Agent.createAgent(null, null)};
223                    }
224                    manager = FlumePersistentManager.getManager(name, agents, properties, batchCount, retries,
225                        connectTimeout, reqTimeout, delay, dataDir);
226                    break;
227                default:
228                    LOGGER.debug("No manager type specified. Defaulting to AVRO");
229                    if (agents == null || agents.length == 0) {
230                        LOGGER.debug("No agents provided, using defaults");
231                        agents = new Agent[] {Agent.createAgent(null, null)};
232                    }
233                    manager = FlumeAvroManager.getManager(name, agents, batchCount, retries, connectTimeout, reqTimeout);
234            }
235    
236            if (manager == null) {
237                return null;
238            }
239    
240            return new FlumeAppender(name, filter, layout,  handleExceptions, includes,
241                excludes, required, mdcPrefix, eventPrefix, compress, factory, manager);
242        }
243    }