001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements. See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache license, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License. You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the license for the specific language governing permissions and
015     * limitations under the license.
016     */
017    package org.apache.logging.log4j.flume.appender;
018    
019    import org.apache.logging.log4j.core.Filter;
020    import org.apache.logging.log4j.core.Layout;
021    import org.apache.logging.log4j.core.LogEvent;
022    import org.apache.logging.log4j.core.appender.AbstractAppender;
023    import org.apache.logging.log4j.core.config.Property;
024    import org.apache.logging.log4j.core.config.plugins.Plugin;
025    import org.apache.logging.log4j.core.config.plugins.PluginAttr;
026    import org.apache.logging.log4j.core.config.plugins.PluginElement;
027    import org.apache.logging.log4j.core.config.plugins.PluginFactory;
028    import org.apache.logging.log4j.core.layout.RFC5424Layout;
029    
030    /**
031     * An Appender that uses the Avro protocol to route events to Flume.
032     */
033    @Plugin(name = "Flume", type = "Core", elementType = "appender", printObject = true)
034    public final class FlumeAppender extends AbstractAppender implements FlumeEventFactory {
035    
036        private AbstractFlumeManager manager;
037    
038        private final String mdcIncludes;
039        private final String mdcExcludes;
040        private final String mdcRequired;
041    
042        private final String eventPrefix;
043    
044        private final String mdcPrefix;
045    
046        private final boolean compressBody;
047    
048        private final int reconnectDelay;
049    
050        private final int retries;
051    
052        private final FlumeEventFactory factory;
053    
054        private FlumeAppender(String name, Filter filter, Layout layout, boolean handleException,
055                              String includes, String excludes, String required, String mdcPrefix,
056                              String eventPrefix, boolean compress, int delay, int retries,
057                              FlumeEventFactory factory, AbstractFlumeManager manager) {
058            super(name, filter, layout, handleException);
059            this.manager = manager;
060            this.mdcIncludes = includes;
061            this.mdcExcludes = excludes;
062            this.mdcRequired = required;
063            this.eventPrefix = eventPrefix;
064            this.mdcPrefix = mdcPrefix;
065            this.compressBody = compress;
066            this.reconnectDelay = delay;
067            this.retries = retries;
068            this.factory = factory == null ? this : factory;
069        }
070    
071        /**
072         * Publish the event.
073         * @param event The LogEvent.
074         */
075        public void append(LogEvent event) {
076    
077            FlumeEvent flumeEvent = factory.createEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
078                eventPrefix, compressBody);
079            flumeEvent.setBody(getLayout().toByteArray(flumeEvent));
080            manager.send(flumeEvent, reconnectDelay, retries);
081        }
082    
083        @Override
084        public void stop() {
085            super.stop();
086            manager.release();
087        }
088    
089        /**
090         * Create a Flume event.
091         * @param event The Log4j LogEvent.
092         * @param includes comma separated list of mdc elements to include.
093         * @param excludes comma separated list of mdc elements to exclude.
094         * @param required comma separated list of mdc elements that must be present with a value.
095         * @param mdcPrefix The prefix to add to MDC key names.
096         * @param eventPrefix The prefix to add to event fields.
097         * @param compress If true the body will be compressed.
098         * @return A Flume Event.
099         */
100        public FlumeEvent createEvent(LogEvent event, String includes, String excludes, String required,
101                          String mdcPrefix, String eventPrefix, boolean compress) {
102            return new FlumeEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
103                eventPrefix, compressBody);
104        }
105    
106        /**
107         * Create a Flume Avro Appender.
108         * @param agents An array of Agents.
109         * @param delay The amount of time in milliseconds to wait between retries.
110         * @param agentRetries The number of times to retry an agent before failing to the next agent.
111         * @param name The name of the Appender.
112         * @param suppress If true exceptions will be handled in the appender.
113         * @param excludes A comma separated list of MDC elements to exclude.
114         * @param includes A comma separated list of MDC elements to include.
115         * @param required A comma separated list of MDC elements that are required.
116         * @param mdcPrefix The prefix to add to MDC key names.
117         * @param eventPrefix The prefix to add to event key names.
118         * @param compressBody If true the event body will be compressed.
119         * @param batchSize Number of events to include in a batch. Defaults to 1.
120         * @param factory The factory to use to create Flume events.
121         * @param layout The layout to format the event.
122         * @param filter A Filter to filter events.
123         * @return A Flume Avro Appender.
124         */
125        @PluginFactory
126        public static FlumeAppender createAppender(@PluginElement("agents") Agent[] agents,
127                                                       @PluginElement("properties") Property[] properties,
128                                                       @PluginAttr("embedded") String embedded,
129                                                       @PluginAttr("dataDir") String dataDir,
130                                                       @PluginAttr("reconnectionDelay") String delay,
131                                                       @PluginAttr("agentRetries") String agentRetries,
132                                                       @PluginAttr("name") String name,
133                                                       @PluginAttr("suppressExceptions") String suppress,
134                                                       @PluginAttr("mdcExcludes") String excludes,
135                                                       @PluginAttr("mdcIncludes") String includes,
136                                                       @PluginAttr("mdcRequired") String required,
137                                                       @PluginAttr("mdcPrefix") String mdcPrefix,
138                                                       @PluginAttr("eventPrefix") String eventPrefix,
139                                                       @PluginAttr("compress") String compressBody,
140                                                       @PluginAttr("batchSize") String batchSize,
141                                                       @PluginElement("flumeEventFactory") FlumeEventFactory factory,
142                                                       @PluginElement("layout") Layout layout,
143                                                       @PluginElement("filters") Filter filter) {
144    
145            boolean embed = embedded != null ? Boolean.valueOf(embedded) :
146                (agents == null || agents.length == 0) && properties != null && properties.length > 0;
147            boolean handleExceptions = suppress == null ? true : Boolean.valueOf(suppress);
148            boolean compress = compressBody == null ? true : Boolean.valueOf(compressBody);
149    
150            int batchCount = batchSize == null ? 1 : Integer.parseInt(batchSize);
151            int reconnectDelay = delay == null ? 0 : Integer.parseInt(delay);
152            int retries = agentRetries == null ? 0 : Integer.parseInt(agentRetries);
153    
154            if (layout == null) {
155                layout = RFC5424Layout.createLayout(null, null, null, "True", null, null, null, null, excludes,
156                    includes, required, null, null);
157            }
158    
159            if (name == null) {
160                LOGGER.error("No name provided for Appender");
161                return null;
162            }
163    
164            AbstractFlumeManager manager;
165    
166            if (embed) {
167                manager = FlumeEmbeddedManager.getManager(name, agents, properties, batchCount, dataDir);
168            } else {
169                if (agents == null || agents.length == 0) {
170                    LOGGER.debug("No agents provided, using defaults");
171                    agents = new Agent[] {Agent.createAgent(null, null)};
172                }
173                manager = FlumeAvroManager.getManager(name, agents, batchCount);
174            }
175    
176            if (manager == null) {
177                return null;
178            }
179    
180            return new FlumeAppender(name, filter, layout,  handleExceptions, includes,
181                excludes, required, mdcPrefix, eventPrefix, compress, reconnectDelay, retries, factory, manager);
182        }
183    }