View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.flume.appender;
18  
19  import org.apache.logging.log4j.core.Filter;
20  import org.apache.logging.log4j.core.Layout;
21  import org.apache.logging.log4j.core.LogEvent;
22  import org.apache.logging.log4j.core.appender.AbstractAppender;
23  import org.apache.logging.log4j.core.config.Property;
24  import org.apache.logging.log4j.core.config.plugins.Plugin;
25  import org.apache.logging.log4j.core.config.plugins.PluginAttr;
26  import org.apache.logging.log4j.core.config.plugins.PluginElement;
27  import org.apache.logging.log4j.core.config.plugins.PluginFactory;
28  import org.apache.logging.log4j.core.layout.RFC5424Layout;
29  
30  import java.util.Locale;
31  
32  /**
33   * An Appender that uses the Avro protocol to route events to Flume.
34   */
35  @Plugin(name = "Flume", type = "Core", elementType = "appender", printObject = true)
36  public final class FlumeAppender extends AbstractAppender implements FlumeEventFactory {
37  
38      private final AbstractFlumeManager manager;
39  
40      private final String mdcIncludes;
41      private final String mdcExcludes;
42      private final String mdcRequired;
43  
44      private final String eventPrefix;
45  
46      private final String mdcPrefix;
47  
48      private final boolean compressBody;
49  
50      private final FlumeEventFactory factory;
51  
52      private enum ManagerType {
53          AVRO, EMBEDDED, PERSISTENT;
54  
55          public static ManagerType getType(String type) {
56              return valueOf(type.toUpperCase(Locale.US));
57          }
58      }
59  
60      private FlumeAppender(final String name, final Filter filter, final Layout layout, final boolean handleException,
61                            final String includes, final String excludes, final String required, final String mdcPrefix,
62                            final String eventPrefix, final boolean compress,
63                            final FlumeEventFactory factory, final AbstractFlumeManager manager) {
64          super(name, filter, layout, handleException);
65          this.manager = manager;
66          this.mdcIncludes = includes;
67          this.mdcExcludes = excludes;
68          this.mdcRequired = required;
69          this.eventPrefix = eventPrefix;
70          this.mdcPrefix = mdcPrefix;
71          this.compressBody = compress;
72          this.factory = factory == null ? this : factory;
73      }
74  
75      /**
76       * Publish the event.
77       * @param event The LogEvent.
78       */
79      public void append(final LogEvent event) {
80  
81          final FlumeEvent flumeEvent = factory.createEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
82              eventPrefix, compressBody);
83          flumeEvent.setBody(getLayout().toByteArray(flumeEvent));
84          manager.send(flumeEvent);
85      }
86  
87      @Override
88      public void stop() {
89          super.stop();
90          manager.release();
91      }
92  
93      /**
94       * Create a Flume event.
95       * @param event The Log4j LogEvent.
96       * @param includes comma separated list of mdc elements to include.
97       * @param excludes comma separated list of mdc elements to exclude.
98       * @param required comma separated list of mdc elements that must be present with a value.
99       * @param mdcPrefix The prefix to add to MDC key names.
100      * @param eventPrefix The prefix to add to event fields.
101      * @param compress If true the body will be compressed.
102      * @return A Flume Event.
103      */
104     public FlumeEvent createEvent(final LogEvent event, final String includes, final String excludes,
105                                   final String required, final String mdcPrefix, final String eventPrefix,
106                                   final boolean compress) {
107         return new FlumeEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
108             eventPrefix, compressBody);
109     }
110 
111     /**
112      * Create a Flume Avro Appender.
113      * @param agents An array of Agents.
114      * @param properties Properties to pass to the embedded agent.
115      * @param embedded true if the embedded agent manager should be used. otherwise the Avro manager will be used.
116      * <b>Note: </b><i>The embedded attribute is deprecated in favor of specifying the type attribute.</i>
117      * @param type Avro (default), Embedded, or Persistent.
118      * @param dataDir The directory where the Flume FileChannel should write its data.
119      * @param connectionTimeout The amount of time in milliseconds to wait before a connection times out. Minimum is
120      *                          1000.
121      * @param requestTimeout The amount of time in milliseconds to wait before a request times out. Minimum is 1000.
122      * @param agentRetries The number of times to retry an agent before failing to the next agent.
123      * @param maxDelay The maximum number of seconds to wait for a complete batch.
124      * @param name The name of the Appender.
125      * @param suppress If true exceptions will be handled in the appender.
126      * @param excludes A comma separated list of MDC elements to exclude.
127      * @param includes A comma separated list of MDC elements to include.
128      * @param required A comma separated list of MDC elements that are required.
129      * @param mdcPrefix The prefix to add to MDC key names.
130      * @param eventPrefix The prefix to add to event key names.
131      * @param compressBody If true the event body will be compressed.
132      * @param batchSize Number of events to include in a batch. Defaults to 1.
133      * @param factory The factory to use to create Flume events.
134      * @param layout The layout to format the event.
135      * @param filter A Filter to filter events.
136      * @return A Flume Avro Appender.
137      */
138     @PluginFactory
139     public static FlumeAppender createAppender(@PluginElement("agents") Agent[] agents,
140                                                    @PluginElement("properties") final Property[] properties,
141                                                    @PluginAttr("embedded") final String embedded,
142                                                    @PluginAttr("type") final String type,
143                                                    @PluginAttr("dataDir") final String dataDir,
144                                                    @PluginAttr("connectTimeout") final String connectionTimeout,
145                                                    @PluginAttr("requestTimeout") final String requestTimeout,
146                                                    @PluginAttr("agentRetries") final String agentRetries,
147                                                    @PluginAttr("maxDelay") final String maxDelay,
148                                                    @PluginAttr("name") final String name,
149                                                    @PluginAttr("suppressExceptions") final String suppress,
150                                                    @PluginAttr("mdcExcludes") final String excludes,
151                                                    @PluginAttr("mdcIncludes") final String includes,
152                                                    @PluginAttr("mdcRequired") final String required,
153                                                    @PluginAttr("mdcPrefix") final String mdcPrefix,
154                                                    @PluginAttr("eventPrefix") final String eventPrefix,
155                                                    @PluginAttr("compress") final String compressBody,
156                                                    @PluginAttr("batchSize") final String batchSize,
157                                                    @PluginElement("flumeEventFactory") final FlumeEventFactory factory,
158                                                    @PluginElement("layout") Layout layout,
159                                                    @PluginElement("filters") final Filter filter) {
160 
161         final boolean embed = embedded != null ? Boolean.valueOf(embedded) :
162             (agents == null || agents.length == 0) && properties != null && properties.length > 0;
163         final boolean handleExceptions = suppress == null ? true : Boolean.valueOf(suppress);
164         final boolean compress = compressBody == null ? true : Boolean.valueOf(compressBody);
165         ManagerType managerType;
166         if (type != null) {
167             if (embed && embedded != null) {
168                 try {
169                     managerType = ManagerType.getType(type);
170                     LOGGER.warn("Embedded and type attributes are mutually exclusive. Using type " + type);
171                 } catch (Exception ex) {
172                     LOGGER.warn("Embedded and type attributes are mutually exclusive and type " + type + " is invalid.");
173                     managerType = ManagerType.EMBEDDED;
174                 }
175             } else {
176                 try {
177                     managerType = ManagerType.getType(type);
178                 } catch (Exception ex) {
179                     LOGGER.warn("Type " + type + " is invalid.");
180                     managerType = ManagerType.EMBEDDED;
181                 }
182             }
183         }  else if (embed) {
184            managerType = ManagerType.EMBEDDED;
185         }  else {
186            managerType = ManagerType.AVRO;
187         }
188 
189         final int batchCount = batchSize == null ? 1 : Integer.parseInt(batchSize);
190         final int connectTimeout = connectionTimeout == null ? 0 : Integer.parseInt(connectionTimeout);
191         final int reqTimeout = requestTimeout == null ? 0 : Integer.parseInt(requestTimeout);
192         final int retries = agentRetries == null ? 0 : Integer.parseInt(agentRetries);
193         final int delay = maxDelay == null ? 60000 : Integer.parseInt(maxDelay);
194 
195 
196         if (layout == null) {
197             layout = RFC5424Layout.createLayout(null, null, null, "True", null, mdcPrefix, eventPrefix, null, null,
198                 null, excludes, includes, required, null, null, null, null);
199         }
200 
201         if (name == null) {
202             LOGGER.error("No name provided for Appender");
203             return null;
204         }
205 
206         AbstractFlumeManager manager;
207 
208         switch (managerType) {
209             case EMBEDDED:
210                 manager = FlumeEmbeddedManager.getManager(name, agents, properties, batchCount, dataDir);
211                 break;
212             case AVRO:
213                 if (agents == null || agents.length == 0) {
214                     LOGGER.debug("No agents provided, using defaults");
215                     agents = new Agent[] {Agent.createAgent(null, null)};
216                 }
217                 manager = FlumeAvroManager.getManager(name, agents, batchCount, retries, connectTimeout, reqTimeout);
218                 break;
219             case PERSISTENT:
220                 if (agents == null || agents.length == 0) {
221                     LOGGER.debug("No agents provided, using defaults");
222                     agents = new Agent[] {Agent.createAgent(null, null)};
223                 }
224                 manager = FlumePersistentManager.getManager(name, agents, properties, batchCount, retries,
225                     connectTimeout, reqTimeout, delay, dataDir);
226                 break;
227             default:
228                 LOGGER.debug("No manager type specified. Defaulting to AVRO");
229                 if (agents == null || agents.length == 0) {
230                     LOGGER.debug("No agents provided, using defaults");
231                     agents = new Agent[] {Agent.createAgent(null, null)};
232                 }
233                 manager = FlumeAvroManager.getManager(name, agents, batchCount, retries, connectTimeout, reqTimeout);
234         }
235 
236         if (manager == null) {
237             return null;
238         }
239 
240         return new FlumeAppender(name, filter, layout,  handleExceptions, includes,
241             excludes, required, mdcPrefix, eventPrefix, compress, factory, manager);
242     }
243 }