View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.flume.appender;
18  
19  import org.apache.logging.log4j.core.Filter;
20  import org.apache.logging.log4j.core.Layout;
21  import org.apache.logging.log4j.core.LogEvent;
22  import org.apache.logging.log4j.core.appender.AbstractAppender;
23  import org.apache.logging.log4j.core.config.Property;
24  import org.apache.logging.log4j.core.config.plugins.Plugin;
25  import org.apache.logging.log4j.core.config.plugins.PluginAttr;
26  import org.apache.logging.log4j.core.config.plugins.PluginElement;
27  import org.apache.logging.log4j.core.config.plugins.PluginFactory;
28  import org.apache.logging.log4j.core.layout.RFC5424Layout;
29  
30  import java.io.Serializable;
31  import java.util.Locale;
32  
33  /**
34   * An Appender that uses the Avro protocol to route events to Flume.
35   */
36  @Plugin(name = "Flume", category = "Core", elementType = "appender", printObject = true)
37  public final class FlumeAppender<T extends Serializable> extends AbstractAppender<T> implements FlumeEventFactory {
38  
39      private final AbstractFlumeManager manager;
40  
41      private final String mdcIncludes;
42      private final String mdcExcludes;
43      private final String mdcRequired;
44  
45      private final String eventPrefix;
46  
47      private final String mdcPrefix;
48  
49      private final boolean compressBody;
50  
51      private final FlumeEventFactory factory;
52  
53      private enum ManagerType {
54          AVRO, EMBEDDED, PERSISTENT;
55  
56          public static ManagerType getType(String type) {
57              return valueOf(type.toUpperCase(Locale.US));
58          }
59      }
60  
61      private FlumeAppender(final String name, final Filter filter, final Layout<T> layout, final boolean handleException,
62                            final String includes, final String excludes, final String required, final String mdcPrefix,
63                            final String eventPrefix, final boolean compress,
64                            final FlumeEventFactory factory, final AbstractFlumeManager manager) {
65          super(name, filter, layout, handleException);
66          this.manager = manager;
67          this.mdcIncludes = includes;
68          this.mdcExcludes = excludes;
69          this.mdcRequired = required;
70          this.eventPrefix = eventPrefix;
71          this.mdcPrefix = mdcPrefix;
72          this.compressBody = compress;
73          this.factory = factory == null ? this : factory;
74      }
75  
76      /**
77       * Publish the event.
78       * @param event The LogEvent.
79       */
80      @Override
81      public void append(final LogEvent event) {
82  
83          final FlumeEvent flumeEvent = factory.createEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
84              eventPrefix, compressBody);
85          flumeEvent.setBody(getLayout().toByteArray(flumeEvent));
86          manager.send(flumeEvent);
87      }
88  
89      @Override
90      public void stop() {
91          super.stop();
92          manager.release();
93      }
94  
95      /**
96       * Create a Flume event.
97       * @param event The Log4j LogEvent.
98       * @param includes comma separated list of mdc elements to include.
99       * @param excludes comma separated list of mdc elements to exclude.
100      * @param required comma separated list of mdc elements that must be present with a value.
101      * @param mdcPrefix The prefix to add to MDC key names.
102      * @param eventPrefix The prefix to add to event fields.
103      * @param compress If true the body will be compressed.
104      * @return A Flume Event.
105      */
106     @Override
107     public FlumeEvent createEvent(final LogEvent event, final String includes, final String excludes,
108                                   final String required, final String mdcPrefix, final String eventPrefix,
109                                   final boolean compress) {
110         return new FlumeEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
111             eventPrefix, compressBody);
112     }
113 
114     /**
115      * Create a Flume Avro Appender.
116      * @param agents An array of Agents.
117      * @param properties Properties to pass to the embedded agent.
118      * @param embedded true if the embedded agent manager should be used. otherwise the Avro manager will be used.
119      * <b>Note: </b><i>The embedded attribute is deprecated in favor of specifying the type attribute.</i>
120      * @param type Avro (default), Embedded, or Persistent.
121      * @param dataDir The directory where the Flume FileChannel should write its data.
122      * @param connectionTimeout The amount of time in milliseconds to wait before a connection times out. Minimum is
123      *                          1000.
124      * @param requestTimeout The amount of time in milliseconds to wait before a request times out. Minimum is 1000.
125      * @param agentRetries The number of times to retry an agent before failing to the next agent.
126      * @param maxDelay The maximum number of seconds to wait for a complete batch.
127      * @param name The name of the Appender.
128      * @param suppress If true exceptions will be handled in the appender.
129      * @param excludes A comma separated list of MDC elements to exclude.
130      * @param includes A comma separated list of MDC elements to include.
131      * @param required A comma separated list of MDC elements that are required.
132      * @param mdcPrefix The prefix to add to MDC key names.
133      * @param eventPrefix The prefix to add to event key names.
134      * @param compressBody If true the event body will be compressed.
135      * @param batchSize Number of events to include in a batch. Defaults to 1.
136      * @param factory The factory to use to create Flume events.
137      * @param layout The layout to format the event.
138      * @param filter A Filter to filter events.
139      * @return A Flume Avro Appender.
140      */
141     @PluginFactory
142     public static <S extends Serializable> FlumeAppender<S> createAppender(@PluginElement("agents") Agent[] agents,
143                                                    @PluginElement("properties") final Property[] properties,
144                                                    @PluginAttr("embedded") final String embedded,
145                                                    @PluginAttr("type") final String type,
146                                                    @PluginAttr("dataDir") final String dataDir,
147                                                    @PluginAttr("connectTimeout") final String connectionTimeout,
148                                                    @PluginAttr("requestTimeout") final String requestTimeout,
149                                                    @PluginAttr("agentRetries") final String agentRetries,
150                                                    @PluginAttr("maxDelay") final String maxDelay,
151                                                    @PluginAttr("name") final String name,
152                                                    @PluginAttr("suppressExceptions") final String suppress,
153                                                    @PluginAttr("mdcExcludes") final String excludes,
154                                                    @PluginAttr("mdcIncludes") final String includes,
155                                                    @PluginAttr("mdcRequired") final String required,
156                                                    @PluginAttr("mdcPrefix") final String mdcPrefix,
157                                                    @PluginAttr("eventPrefix") final String eventPrefix,
158                                                    @PluginAttr("compress") final String compressBody,
159                                                    @PluginAttr("batchSize") final String batchSize,
160                                                    @PluginElement("flumeEventFactory") final FlumeEventFactory factory,
161                                                    @PluginElement("layout") Layout<S> layout,
162                                                    @PluginElement("filters") final Filter filter) {
163 
164         final boolean embed = embedded != null ? Boolean.valueOf(embedded) :
165             (agents == null || agents.length == 0) && properties != null && properties.length > 0;
166         final boolean handleExceptions = suppress == null ? true : Boolean.valueOf(suppress);
167         final boolean compress = compressBody == null ? true : Boolean.valueOf(compressBody);
168         ManagerType managerType;
169         if (type != null) {
170             if (embed && embedded != null) {
171                 try {
172                     managerType = ManagerType.getType(type);
173                     LOGGER.warn("Embedded and type attributes are mutually exclusive. Using type " + type);
174                 } catch (Exception ex) {
175                     LOGGER.warn("Embedded and type attributes are mutually exclusive and type " + type + " is invalid.");
176                     managerType = ManagerType.EMBEDDED;
177                 }
178             } else {
179                 try {
180                     managerType = ManagerType.getType(type);
181                 } catch (Exception ex) {
182                     LOGGER.warn("Type " + type + " is invalid.");
183                     managerType = ManagerType.EMBEDDED;
184                 }
185             }
186         }  else if (embed) {
187            managerType = ManagerType.EMBEDDED;
188         }  else {
189            managerType = ManagerType.AVRO;
190         }
191 
192         final int batchCount = batchSize == null ? 1 : Integer.parseInt(batchSize);
193         final int connectTimeout = connectionTimeout == null ? 0 : Integer.parseInt(connectionTimeout);
194         final int reqTimeout = requestTimeout == null ? 0 : Integer.parseInt(requestTimeout);
195         final int retries = agentRetries == null ? 0 : Integer.parseInt(agentRetries);
196         final int delay = maxDelay == null ? 60000 : Integer.parseInt(maxDelay);
197 
198         if (layout == null) {
199             @SuppressWarnings({"unchecked", "UnnecessaryLocalVariable"})
200             Layout<S> l = (Layout<S>)RFC5424Layout.createLayout(null, null, null, "True", null, mdcPrefix, eventPrefix,
201                     null, null, null, excludes, includes, required, null, null, null);
202             layout = l;
203         }
204 
205         if (name == null) {
206             LOGGER.error("No name provided for Appender");
207             return null;
208         }
209 
210         AbstractFlumeManager manager;
211 
212         switch (managerType) {
213             case EMBEDDED:
214                 manager = FlumeEmbeddedManager.getManager(name, agents, properties, batchCount, dataDir);
215                 break;
216             case AVRO:
217                 if (agents == null || agents.length == 0) {
218                     LOGGER.debug("No agents provided, using defaults");
219                     agents = new Agent[] {Agent.createAgent(null, null)};
220                 }
221                 manager = FlumeAvroManager.getManager(name, agents, batchCount, retries, connectTimeout, reqTimeout);
222                 break;
223             case PERSISTENT:
224                 if (agents == null || agents.length == 0) {
225                     LOGGER.debug("No agents provided, using defaults");
226                     agents = new Agent[] {Agent.createAgent(null, null)};
227                 }
228                 manager = FlumePersistentManager.getManager(name, agents, properties, batchCount, retries,
229                     connectTimeout, reqTimeout, delay, dataDir);
230                 break;
231             default:
232                 LOGGER.debug("No manager type specified. Defaulting to AVRO");
233                 if (agents == null || agents.length == 0) {
234                     LOGGER.debug("No agents provided, using defaults");
235                     agents = new Agent[] {Agent.createAgent(null, null)};
236                 }
237                 manager = FlumeAvroManager.getManager(name, agents, batchCount, retries, connectTimeout, reqTimeout);
238         }
239 
240         if (manager == null) {
241             return null;
242         }
243 
244         return new FlumeAppender<S>(name, filter, layout,  handleExceptions, includes,
245             excludes, required, mdcPrefix, eventPrefix, compress, factory, manager);
246     }
247 }