001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements. See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache license, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License. You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the license for the specific language governing permissions and
015     * limitations under the license.
016     */
017    package org.apache.logging.log4j.flume.appender;
018    
019    import org.apache.logging.log4j.core.Filter;
020    import org.apache.logging.log4j.core.Layout;
021    import org.apache.logging.log4j.core.LogEvent;
022    import org.apache.logging.log4j.core.appender.AbstractAppender;
023    import org.apache.logging.log4j.core.config.Property;
024    import org.apache.logging.log4j.core.config.plugins.Plugin;
025    import org.apache.logging.log4j.core.config.plugins.PluginAttr;
026    import org.apache.logging.log4j.core.config.plugins.PluginElement;
027    import org.apache.logging.log4j.core.config.plugins.PluginFactory;
028    import org.apache.logging.log4j.core.layout.RFC5424Layout;
029    
030    import java.io.Serializable;
031    import java.util.Locale;
032    
033    /**
034     * An Appender that uses the Avro protocol to route events to Flume.
035     */
036    @Plugin(name = "Flume", category = "Core", elementType = "appender", printObject = true)
037    public final class FlumeAppender<T extends Serializable> extends AbstractAppender<T> implements FlumeEventFactory {
038    
039        private final AbstractFlumeManager manager;
040    
041        private final String mdcIncludes;
042        private final String mdcExcludes;
043        private final String mdcRequired;
044    
045        private final String eventPrefix;
046    
047        private final String mdcPrefix;
048    
049        private final boolean compressBody;
050    
051        private final FlumeEventFactory factory;
052    
053        private enum ManagerType {
054            AVRO, EMBEDDED, PERSISTENT;
055    
056            public static ManagerType getType(String type) {
057                return valueOf(type.toUpperCase(Locale.US));
058            }
059        }
060    
061        private FlumeAppender(final String name, final Filter filter, final Layout<T> layout, final boolean handleException,
062                              final String includes, final String excludes, final String required, final String mdcPrefix,
063                              final String eventPrefix, final boolean compress,
064                              final FlumeEventFactory factory, final AbstractFlumeManager manager) {
065            super(name, filter, layout, handleException);
066            this.manager = manager;
067            this.mdcIncludes = includes;
068            this.mdcExcludes = excludes;
069            this.mdcRequired = required;
070            this.eventPrefix = eventPrefix;
071            this.mdcPrefix = mdcPrefix;
072            this.compressBody = compress;
073            this.factory = factory == null ? this : factory;
074        }
075    
076        /**
077         * Publish the event.
078         * @param event The LogEvent.
079         */
080        @Override
081        public void append(final LogEvent event) {
082    
083            final FlumeEvent flumeEvent = factory.createEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
084                eventPrefix, compressBody);
085            flumeEvent.setBody(getLayout().toByteArray(flumeEvent));
086            manager.send(flumeEvent);
087        }
088    
089        @Override
090        public void stop() {
091            super.stop();
092            manager.release();
093        }
094    
095        /**
096         * Create a Flume event.
097         * @param event The Log4j LogEvent.
098         * @param includes comma separated list of mdc elements to include.
099         * @param excludes comma separated list of mdc elements to exclude.
100         * @param required comma separated list of mdc elements that must be present with a value.
101         * @param mdcPrefix The prefix to add to MDC key names.
102         * @param eventPrefix The prefix to add to event fields.
103         * @param compress If true the body will be compressed.
104         * @return A Flume Event.
105         */
106        @Override
107        public FlumeEvent createEvent(final LogEvent event, final String includes, final String excludes,
108                                      final String required, final String mdcPrefix, final String eventPrefix,
109                                      final boolean compress) {
110            return new FlumeEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
111                eventPrefix, compressBody);
112        }
113    
114        /**
115         * Create a Flume Avro Appender.
116         * @param agents An array of Agents.
117         * @param properties Properties to pass to the embedded agent.
118         * @param embedded true if the embedded agent manager should be used. otherwise the Avro manager will be used.
119         * <b>Note: </b><i>The embedded attribute is deprecated in favor of specifying the type attribute.</i>
120         * @param type Avro (default), Embedded, or Persistent.
121         * @param dataDir The directory where the Flume FileChannel should write its data.
122         * @param connectionTimeout The amount of time in milliseconds to wait before a connection times out. Minimum is
123         *                          1000.
124         * @param requestTimeout The amount of time in milliseconds to wait before a request times out. Minimum is 1000.
125         * @param agentRetries The number of times to retry an agent before failing to the next agent.
126         * @param maxDelay The maximum number of seconds to wait for a complete batch.
127         * @param name The name of the Appender.
128         * @param suppress If true exceptions will be handled in the appender.
129         * @param excludes A comma separated list of MDC elements to exclude.
130         * @param includes A comma separated list of MDC elements to include.
131         * @param required A comma separated list of MDC elements that are required.
132         * @param mdcPrefix The prefix to add to MDC key names.
133         * @param eventPrefix The prefix to add to event key names.
134         * @param compressBody If true the event body will be compressed.
135         * @param batchSize Number of events to include in a batch. Defaults to 1.
136         * @param factory The factory to use to create Flume events.
137         * @param layout The layout to format the event.
138         * @param filter A Filter to filter events.
139         * @return A Flume Avro Appender.
140         */
141        @PluginFactory
142        public static <S extends Serializable> FlumeAppender<S> createAppender(@PluginElement("agents") Agent[] agents,
143                                                       @PluginElement("properties") final Property[] properties,
144                                                       @PluginAttr("embedded") final String embedded,
145                                                       @PluginAttr("type") final String type,
146                                                       @PluginAttr("dataDir") final String dataDir,
147                                                       @PluginAttr("connectTimeout") final String connectionTimeout,
148                                                       @PluginAttr("requestTimeout") final String requestTimeout,
149                                                       @PluginAttr("agentRetries") final String agentRetries,
150                                                       @PluginAttr("maxDelay") final String maxDelay,
151                                                       @PluginAttr("name") final String name,
152                                                       @PluginAttr("suppressExceptions") final String suppress,
153                                                       @PluginAttr("mdcExcludes") final String excludes,
154                                                       @PluginAttr("mdcIncludes") final String includes,
155                                                       @PluginAttr("mdcRequired") final String required,
156                                                       @PluginAttr("mdcPrefix") final String mdcPrefix,
157                                                       @PluginAttr("eventPrefix") final String eventPrefix,
158                                                       @PluginAttr("compress") final String compressBody,
159                                                       @PluginAttr("batchSize") final String batchSize,
160                                                       @PluginElement("flumeEventFactory") final FlumeEventFactory factory,
161                                                       @PluginElement("layout") Layout<S> layout,
162                                                       @PluginElement("filters") final Filter filter) {
163    
164            final boolean embed = embedded != null ? Boolean.valueOf(embedded) :
165                (agents == null || agents.length == 0) && properties != null && properties.length > 0;
166            final boolean handleExceptions = suppress == null ? true : Boolean.valueOf(suppress);
167            final boolean compress = compressBody == null ? true : Boolean.valueOf(compressBody);
168            ManagerType managerType;
169            if (type != null) {
170                if (embed && embedded != null) {
171                    try {
172                        managerType = ManagerType.getType(type);
173                        LOGGER.warn("Embedded and type attributes are mutually exclusive. Using type " + type);
174                    } catch (Exception ex) {
175                        LOGGER.warn("Embedded and type attributes are mutually exclusive and type " + type + " is invalid.");
176                        managerType = ManagerType.EMBEDDED;
177                    }
178                } else {
179                    try {
180                        managerType = ManagerType.getType(type);
181                    } catch (Exception ex) {
182                        LOGGER.warn("Type " + type + " is invalid.");
183                        managerType = ManagerType.EMBEDDED;
184                    }
185                }
186            }  else if (embed) {
187               managerType = ManagerType.EMBEDDED;
188            }  else {
189               managerType = ManagerType.AVRO;
190            }
191    
192            final int batchCount = batchSize == null ? 1 : Integer.parseInt(batchSize);
193            final int connectTimeout = connectionTimeout == null ? 0 : Integer.parseInt(connectionTimeout);
194            final int reqTimeout = requestTimeout == null ? 0 : Integer.parseInt(requestTimeout);
195            final int retries = agentRetries == null ? 0 : Integer.parseInt(agentRetries);
196            final int delay = maxDelay == null ? 60000 : Integer.parseInt(maxDelay);
197    
198            if (layout == null) {
199                @SuppressWarnings({"unchecked", "UnnecessaryLocalVariable"})
200                Layout<S> l = (Layout<S>)RFC5424Layout.createLayout(null, null, null, "True", null, mdcPrefix, eventPrefix,
201                        null, null, null, excludes, includes, required, null, null, null);
202                layout = l;
203            }
204    
205            if (name == null) {
206                LOGGER.error("No name provided for Appender");
207                return null;
208            }
209    
210            AbstractFlumeManager manager;
211    
212            switch (managerType) {
213                case EMBEDDED:
214                    manager = FlumeEmbeddedManager.getManager(name, agents, properties, batchCount, dataDir);
215                    break;
216                case AVRO:
217                    if (agents == null || agents.length == 0) {
218                        LOGGER.debug("No agents provided, using defaults");
219                        agents = new Agent[] {Agent.createAgent(null, null)};
220                    }
221                    manager = FlumeAvroManager.getManager(name, agents, batchCount, retries, connectTimeout, reqTimeout);
222                    break;
223                case PERSISTENT:
224                    if (agents == null || agents.length == 0) {
225                        LOGGER.debug("No agents provided, using defaults");
226                        agents = new Agent[] {Agent.createAgent(null, null)};
227                    }
228                    manager = FlumePersistentManager.getManager(name, agents, properties, batchCount, retries,
229                        connectTimeout, reqTimeout, delay, dataDir);
230                    break;
231                default:
232                    LOGGER.debug("No manager type specified. Defaulting to AVRO");
233                    if (agents == null || agents.length == 0) {
234                        LOGGER.debug("No agents provided, using defaults");
235                        agents = new Agent[] {Agent.createAgent(null, null)};
236                    }
237                    manager = FlumeAvroManager.getManager(name, agents, batchCount, retries, connectTimeout, reqTimeout);
238            }
239    
240            if (manager == null) {
241                return null;
242            }
243    
244            return new FlumeAppender<S>(name, filter, layout,  handleExceptions, includes,
245                excludes, required, mdcPrefix, eventPrefix, compress, factory, manager);
246        }
247    }