View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.flume.appender;
18  
19  import org.apache.logging.log4j.core.Filter;
20  import org.apache.logging.log4j.core.Layout;
21  import org.apache.logging.log4j.core.LogEvent;
22  import org.apache.logging.log4j.core.appender.AbstractAppender;
23  import org.apache.logging.log4j.core.config.Property;
24  import org.apache.logging.log4j.core.config.plugins.Plugin;
25  import org.apache.logging.log4j.core.config.plugins.PluginAttr;
26  import org.apache.logging.log4j.core.config.plugins.PluginElement;
27  import org.apache.logging.log4j.core.config.plugins.PluginFactory;
28  import org.apache.logging.log4j.core.layout.RFC5424Layout;
29  
30  /**
31   * An Appender that uses the Avro protocol to route events to Flume.
32   */
33  @Plugin(name = "Flume", type = "Core", elementType = "appender", printObject = true)
34  public final class FlumeAppender extends AbstractAppender implements FlumeEventFactory {
35  
36      private final AbstractFlumeManager manager;
37  
38      private final String mdcIncludes;
39      private final String mdcExcludes;
40      private final String mdcRequired;
41  
42      private final String eventPrefix;
43  
44      private final String mdcPrefix;
45  
46      private final boolean compressBody;
47  
48      private final int reconnectDelay;
49  
50      private final int retries;
51  
52      private final FlumeEventFactory factory;
53  
54      private FlumeAppender(final String name, final Filter filter, final Layout layout, final boolean handleException,
55                            final String includes, final String excludes, final String required, final String mdcPrefix,
56                            final String eventPrefix, final boolean compress, final int delay, final int retries,
57                            final FlumeEventFactory factory, final AbstractFlumeManager manager) {
58          super(name, filter, layout, handleException);
59          this.manager = manager;
60          this.mdcIncludes = includes;
61          this.mdcExcludes = excludes;
62          this.mdcRequired = required;
63          this.eventPrefix = eventPrefix;
64          this.mdcPrefix = mdcPrefix;
65          this.compressBody = compress;
66          this.reconnectDelay = delay;
67          this.retries = retries;
68          this.factory = factory == null ? this : factory;
69      }
70  
71      /**
72       * Publish the event.
73       * @param event The LogEvent.
74       */
75      public void append(final LogEvent event) {
76  
77          final FlumeEvent flumeEvent = factory.createEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
78              eventPrefix, compressBody);
79          flumeEvent.setBody(getLayout().toByteArray(flumeEvent));
80          manager.send(flumeEvent, reconnectDelay, retries);
81      }
82  
83      @Override
84      public void stop() {
85          super.stop();
86          manager.release();
87      }
88  
89      /**
90       * Create a Flume event.
91       * @param event The Log4j LogEvent.
92       * @param includes comma separated list of mdc elements to include.
93       * @param excludes comma separated list of mdc elements to exclude.
94       * @param required comma separated list of mdc elements that must be present with a value.
95       * @param mdcPrefix The prefix to add to MDC key names.
96       * @param eventPrefix The prefix to add to event fields.
97       * @param compress If true the body will be compressed.
98       * @return A Flume Event.
99       */
100     public FlumeEvent createEvent(final LogEvent event, final String includes, final String excludes,
101                                   final String required, final String mdcPrefix, final String eventPrefix,
102                                   final boolean compress) {
103         return new FlumeEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
104             eventPrefix, compressBody);
105     }
106 
107     /**
108      * Create a Flume Avro Appender.
109      * @param agents An array of Agents.
110      * @param properties Properties to pass to the embedded agent.
111      * @param embedded true if the embedded agent manager should be used. otherwise the Avro mangaer will be used.
112      * @param dataDir The directory where the Flume FileChannel should write its data.
113      * @param delay The amount of time in milliseconds to wait between retries.
114      * @param agentRetries The number of times to retry an agent before failing to the next agent.
115      * @param name The name of the Appender.
116      * @param suppress If true exceptions will be handled in the appender.
117      * @param excludes A comma separated list of MDC elements to exclude.
118      * @param includes A comma separated list of MDC elements to include.
119      * @param required A comma separated list of MDC elements that are required.
120      * @param mdcPrefix The prefix to add to MDC key names.
121      * @param eventPrefix The prefix to add to event key names.
122      * @param compressBody If true the event body will be compressed.
123      * @param batchSize Number of events to include in a batch. Defaults to 1.
124      * @param factory The factory to use to create Flume events.
125      * @param layout The layout to format the event.
126      * @param filter A Filter to filter events.
127      * @return A Flume Avro Appender.
128      */
129     @PluginFactory
130     public static FlumeAppender createAppender(@PluginElement("agents") Agent[] agents,
131                                                    @PluginElement("properties") final Property[] properties,
132                                                    @PluginAttr("embedded") final String embedded,
133                                                    @PluginAttr("dataDir") final String dataDir,
134                                                    @PluginAttr("reconnectionDelay") final String delay,
135                                                    @PluginAttr("agentRetries") final String agentRetries,
136                                                    @PluginAttr("name") final String name,
137                                                    @PluginAttr("suppressExceptions") final String suppress,
138                                                    @PluginAttr("mdcExcludes") final String excludes,
139                                                    @PluginAttr("mdcIncludes") final String includes,
140                                                    @PluginAttr("mdcRequired") final String required,
141                                                    @PluginAttr("mdcPrefix") final String mdcPrefix,
142                                                    @PluginAttr("eventPrefix") final String eventPrefix,
143                                                    @PluginAttr("compress") final String compressBody,
144                                                    @PluginAttr("batchSize") final String batchSize,
145                                                    @PluginElement("flumeEventFactory") final FlumeEventFactory factory,
146                                                    @PluginElement("layout") Layout layout,
147                                                    @PluginElement("filters") final Filter filter) {
148 
149         final boolean embed = embedded != null ? Boolean.valueOf(embedded) :
150             (agents == null || agents.length == 0) && properties != null && properties.length > 0;
151         final boolean handleExceptions = suppress == null ? true : Boolean.valueOf(suppress);
152         final boolean compress = compressBody == null ? true : Boolean.valueOf(compressBody);
153 
154         final int batchCount = batchSize == null ? 1 : Integer.parseInt(batchSize);
155         final int reconnectDelay = delay == null ? 0 : Integer.parseInt(delay);
156         final int retries = agentRetries == null ? 0 : Integer.parseInt(agentRetries);
157 
158         if (layout == null) {
159             layout = RFC5424Layout.createLayout(null, null, null, "True", null, null, null, null, excludes,
160                 includes, required, null, null, null, null);
161         }
162 
163         if (name == null) {
164             LOGGER.error("No name provided for Appender");
165             return null;
166         }
167 
168         AbstractFlumeManager manager;
169 
170         if (embed) {
171             manager = FlumeEmbeddedManager.getManager(name, agents, properties, batchCount, dataDir);
172         } else {
173             if (agents == null || agents.length == 0) {
174                 LOGGER.debug("No agents provided, using defaults");
175                 agents = new Agent[] {Agent.createAgent(null, null)};
176             }
177             manager = FlumeAvroManager.getManager(name, agents, batchCount);
178         }
179 
180         if (manager == null) {
181             return null;
182         }
183 
184         return new FlumeAppender(name, filter, layout,  handleExceptions, includes,
185             excludes, required, mdcPrefix, eventPrefix, compress, reconnectDelay, retries, factory, manager);
186     }
187 }