001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements. See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache license, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License. You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the license for the specific language governing permissions and
015     * limitations under the license.
016     */
017    package org.apache.logging.log4j.flume.appender;
018    
019    import org.apache.logging.log4j.core.Filter;
020    import org.apache.logging.log4j.core.Layout;
021    import org.apache.logging.log4j.core.LogEvent;
022    import org.apache.logging.log4j.core.appender.AbstractAppender;
023    import org.apache.logging.log4j.core.config.Property;
024    import org.apache.logging.log4j.core.config.plugins.Plugin;
025    import org.apache.logging.log4j.core.config.plugins.PluginAttr;
026    import org.apache.logging.log4j.core.config.plugins.PluginElement;
027    import org.apache.logging.log4j.core.config.plugins.PluginFactory;
028    import org.apache.logging.log4j.core.layout.RFC5424Layout;
029    
030    /**
031     * An Appender that uses the Avro protocol to route events to Flume.
032     */
033    @Plugin(name = "Flume", type = "Core", elementType = "appender", printObject = true)
034    public final class FlumeAppender extends AbstractAppender implements FlumeEventFactory {
035    
036        private final AbstractFlumeManager manager;
037    
038        private final String mdcIncludes;
039        private final String mdcExcludes;
040        private final String mdcRequired;
041    
042        private final String eventPrefix;
043    
044        private final String mdcPrefix;
045    
046        private final boolean compressBody;
047    
048        private final int reconnectDelay;
049    
050        private final int retries;
051    
052        private final FlumeEventFactory factory;
053    
054        private FlumeAppender(final String name, final Filter filter, final Layout layout, final boolean handleException,
055                              final String includes, final String excludes, final String required, final String mdcPrefix,
056                              final String eventPrefix, final boolean compress, final int delay, final int retries,
057                              final FlumeEventFactory factory, final AbstractFlumeManager manager) {
058            super(name, filter, layout, handleException);
059            this.manager = manager;
060            this.mdcIncludes = includes;
061            this.mdcExcludes = excludes;
062            this.mdcRequired = required;
063            this.eventPrefix = eventPrefix;
064            this.mdcPrefix = mdcPrefix;
065            this.compressBody = compress;
066            this.reconnectDelay = delay;
067            this.retries = retries;
068            this.factory = factory == null ? this : factory;
069        }
070    
071        /**
072         * Publish the event.
073         * @param event The LogEvent.
074         */
075        public void append(final LogEvent event) {
076    
077            final FlumeEvent flumeEvent = factory.createEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
078                eventPrefix, compressBody);
079            flumeEvent.setBody(getLayout().toByteArray(flumeEvent));
080            manager.send(flumeEvent, reconnectDelay, retries);
081        }
082    
083        @Override
084        public void stop() {
085            super.stop();
086            manager.release();
087        }
088    
089        /**
090         * Create a Flume event.
091         * @param event The Log4j LogEvent.
092         * @param includes comma separated list of mdc elements to include.
093         * @param excludes comma separated list of mdc elements to exclude.
094         * @param required comma separated list of mdc elements that must be present with a value.
095         * @param mdcPrefix The prefix to add to MDC key names.
096         * @param eventPrefix The prefix to add to event fields.
097         * @param compress If true the body will be compressed.
098         * @return A Flume Event.
099         */
100        public FlumeEvent createEvent(final LogEvent event, final String includes, final String excludes,
101                                      final String required, final String mdcPrefix, final String eventPrefix,
102                                      final boolean compress) {
103            return new FlumeEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
104                eventPrefix, compressBody);
105        }
106    
107        /**
108         * Create a Flume Avro Appender.
109         * @param agents An array of Agents.
110         * @param properties Properties to pass to the embedded agent.
111         * @param embedded true if the embedded agent manager should be used. otherwise the Avro mangaer will be used.
112         * @param dataDir The directory where the Flume FileChannel should write its data.
113         * @param delay The amount of time in milliseconds to wait between retries.
114         * @param agentRetries The number of times to retry an agent before failing to the next agent.
115         * @param name The name of the Appender.
116         * @param suppress If true exceptions will be handled in the appender.
117         * @param excludes A comma separated list of MDC elements to exclude.
118         * @param includes A comma separated list of MDC elements to include.
119         * @param required A comma separated list of MDC elements that are required.
120         * @param mdcPrefix The prefix to add to MDC key names.
121         * @param eventPrefix The prefix to add to event key names.
122         * @param compressBody If true the event body will be compressed.
123         * @param batchSize Number of events to include in a batch. Defaults to 1.
124         * @param factory The factory to use to create Flume events.
125         * @param layout The layout to format the event.
126         * @param filter A Filter to filter events.
127         * @return A Flume Avro Appender.
128         */
129        @PluginFactory
130        public static FlumeAppender createAppender(@PluginElement("agents") Agent[] agents,
131                                                       @PluginElement("properties") final Property[] properties,
132                                                       @PluginAttr("embedded") final String embedded,
133                                                       @PluginAttr("dataDir") final String dataDir,
134                                                       @PluginAttr("reconnectionDelay") final String delay,
135                                                       @PluginAttr("agentRetries") final String agentRetries,
136                                                       @PluginAttr("name") final String name,
137                                                       @PluginAttr("suppressExceptions") final String suppress,
138                                                       @PluginAttr("mdcExcludes") final String excludes,
139                                                       @PluginAttr("mdcIncludes") final String includes,
140                                                       @PluginAttr("mdcRequired") final String required,
141                                                       @PluginAttr("mdcPrefix") final String mdcPrefix,
142                                                       @PluginAttr("eventPrefix") final String eventPrefix,
143                                                       @PluginAttr("compress") final String compressBody,
144                                                       @PluginAttr("batchSize") final String batchSize,
145                                                       @PluginElement("flumeEventFactory") final FlumeEventFactory factory,
146                                                       @PluginElement("layout") Layout layout,
147                                                       @PluginElement("filters") final Filter filter) {
148    
149            final boolean embed = embedded != null ? Boolean.valueOf(embedded) :
150                (agents == null || agents.length == 0) && properties != null && properties.length > 0;
151            final boolean handleExceptions = suppress == null ? true : Boolean.valueOf(suppress);
152            final boolean compress = compressBody == null ? true : Boolean.valueOf(compressBody);
153    
154            final int batchCount = batchSize == null ? 1 : Integer.parseInt(batchSize);
155            final int reconnectDelay = delay == null ? 0 : Integer.parseInt(delay);
156            final int retries = agentRetries == null ? 0 : Integer.parseInt(agentRetries);
157    
158            if (layout == null) {
159                layout = RFC5424Layout.createLayout(null, null, null, "True", null, null, null, null, excludes,
160                    includes, required, null, null, null, null);
161            }
162    
163            if (name == null) {
164                LOGGER.error("No name provided for Appender");
165                return null;
166            }
167    
168            AbstractFlumeManager manager;
169    
170            if (embed) {
171                manager = FlumeEmbeddedManager.getManager(name, agents, properties, batchCount, dataDir);
172            } else {
173                if (agents == null || agents.length == 0) {
174                    LOGGER.debug("No agents provided, using defaults");
175                    agents = new Agent[] {Agent.createAgent(null, null)};
176                }
177                manager = FlumeAvroManager.getManager(name, agents, batchCount);
178            }
179    
180            if (manager == null) {
181                return null;
182            }
183    
184            return new FlumeAppender(name, filter, layout,  handleExceptions, includes,
185                excludes, required, mdcPrefix, eventPrefix, compress, reconnectDelay, retries, factory, manager);
186        }
187    }