View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.flume.appender;
18  
19  import org.apache.flume.Event;
20  import org.apache.flume.SourceRunner;
21  import org.apache.flume.node.NodeConfiguration;
22  import org.apache.flume.node.nodemanager.DefaultLogicalNodeManager;
23  import org.apache.logging.log4j.core.appender.ManagerFactory;
24  import org.apache.logging.log4j.core.config.ConfigurationException;
25  import org.apache.logging.log4j.core.config.Property;
26  import org.apache.logging.log4j.core.helpers.NameUtil;
27  import org.apache.logging.log4j.util.PropertiesUtil;
28  
29  import java.util.Locale;
30  import java.util.Properties;
31  
32  /**
33   *
34   */
35  public class FlumeEmbeddedManager extends AbstractFlumeManager {
36  
37      /** Name for the Flume source */
38      protected static final String SOURCE_NAME = "log4j-source";
39  
40      private static FlumeManagerFactory factory = new FlumeManagerFactory();
41  
42      private static final String FiLE_SEP = PropertiesUtil.getProperties().getStringProperty("file.separator");
43  
44      private static final String IN_MEMORY = "InMemory";
45  
46      private final FlumeNode node;
47  
48      private NodeConfiguration conf;
49  
50      private final Log4jEventSource source;
51  
52      private final String shortName;
53  
54  
55      /**
56       * Constructor
57       * @param name The unique name of this manager.
58       * @param node The Flume Node.
59       */
60      protected FlumeEmbeddedManager(final String name, final String shortName, final FlumeNode node) {
61          super(name);
62          this.node = node;
63          this.shortName = shortName;
64          final SourceRunner runner = node.getConfiguration().getSourceRunners().get(SOURCE_NAME);
65          if (runner == null || runner.getSource() == null) {
66              throw new IllegalStateException("No Source has been created for Appender " + shortName);
67          }
68          source  = (Log4jEventSource) runner.getSource();
69      }
70  
71      /**
72       * Returns a FlumeEmbeddedManager.
73       * @param name The name of the manager.
74       * @param agents The agents to use.
75       * @param properties Properties for the embedded manager.
76       * @param batchSize The number of events to include in a batch.
77       * @param dataDir The directory where the Flume FileChannel should write to.
78       * @return A FlumeAvroManager.
79       */
80      public static FlumeEmbeddedManager getManager(final String name, final Agent[] agents, final Property[] properties,
81                                                    int batchSize, final String dataDir) {
82  
83          if (batchSize <= 0) {
84              batchSize = 1;
85          }
86  
87          if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) {
88              throw new IllegalArgumentException("Either an Agent or properties are required");
89          } else if (agents != null && agents.length > 0 && properties != null && properties.length > 0) {
90              throw new IllegalArgumentException("Cannot configure both Agents and Properties.");
91          }
92  
93          final StringBuilder sb = new StringBuilder();
94          boolean first = true;
95  
96          if (agents != null && agents.length > 0) {
97              sb.append("FlumeEmbedded[");
98              for (final Agent agent : agents) {
99                  if (!first) {
100                     sb.append(",");
101                 }
102                 sb.append(agent.getHost()).append(":").append(agent.getPort());
103                 first = false;
104             }
105             sb.append("]");
106         } else {
107             String sep = "";
108             sb.append(name).append(":");
109             final StringBuilder props = new StringBuilder();
110             for (final Property prop : properties) {
111                 props.append(sep);
112                 props.append(prop.getName()).append("=").append(prop.getValue());
113                 sep = ",";
114             }
115             sb.append(NameUtil.md5(props.toString()));
116         }
117         return getManager(sb.toString(), factory,
118                 new FactoryData(name, agents, properties, batchSize, dataDir));
119     }
120 
121     @Override
122     public void send(final Event event) {
123         source.send(event);
124     }
125 
126     @Override
127     protected void releaseSub() {
128         node.stop();
129     }
130 
131     /**
132      * Factory data.
133      */
134     private static class FactoryData {
135         private final Agent[] agents;
136         private final Property[] properties;
137         private final int batchSize;
138         private final String dataDir;
139         private final String name;
140 
141         /**
142          * Constructor.
143          * @param name The name of the Appender.
144          * @param agents The agents.
145          * @param properties The Flume configuration properties.
146          * @param batchSize The number of events to include in a batch.
147          * @param dataDir The directory where Flume should write to.
148          */
149         public FactoryData(final String name, final Agent[] agents, final Property[] properties, final int batchSize,
150                            final String dataDir) {
151             this.name = name;
152             this.agents = agents;
153             this.batchSize = batchSize;
154             this.properties = properties;
155             this.dataDir = dataDir;
156         }
157     }
158 
159     /**
160      * Avro Manager Factory.
161      */
162     private static class FlumeManagerFactory implements ManagerFactory<FlumeEmbeddedManager, FactoryData> {
163         private static final String SOURCE_TYPE = Log4jEventSource.class.getName();
164 
165         /**
166          * Create the FlumeAvroManager.
167          * @param name The name of the entity to manage.
168          * @param data The data required to create the entity.
169          * @return The FlumeAvroManager.
170          */
171         @Override
172         public FlumeEmbeddedManager createManager(final String name, final FactoryData data) {
173             try {
174                 final DefaultLogicalNodeManager nodeManager = new DefaultLogicalNodeManager();
175                 final Properties props = createProperties(data.name, data.agents, data.properties, data.batchSize,
176                     data.dataDir);
177                 final FlumeConfigurationBuilder builder = new FlumeConfigurationBuilder();
178                 final NodeConfiguration conf = builder.load(data.name, props, nodeManager);
179 
180                 final FlumeNode node = new FlumeNode(nodeManager, nodeManager, conf);
181 
182                 node.start();
183 
184                 return new FlumeEmbeddedManager(name, data.name, node);
185             } catch (final Exception ex) {
186                 LOGGER.error("Could not create FlumeEmbeddedManager", ex);
187             }
188             return null;
189         }
190 
191         private Properties createProperties(final String name, final Agent[] agents, final Property[] properties,
192                                             final int batchSize, String dataDir) {
193             final Properties props = new Properties();
194 
195             if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) {
196                 LOGGER.error("No Flume configuration provided");
197                 throw new ConfigurationException("No Flume configuration provided");
198             }
199 
200             if ((agents != null && agents.length > 0 && properties != null && properties.length > 0)) {
201                 LOGGER.error("Agents and Flume configuration cannot both be specified");
202                 throw new ConfigurationException("Agents and Flume configuration cannot both be specified");
203             }
204 
205             if (agents != null && agents.length > 0) {
206                 props.put(name + ".sources", FlumeEmbeddedManager.SOURCE_NAME);
207                 props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".type", SOURCE_TYPE);
208 
209                 if (dataDir != null && dataDir.length() > 0) {
210                     if (dataDir.equals(IN_MEMORY)) {
211                         props.put(name + ".channels", "primary");
212                         props.put(name + ".channels.primary.type", "memory");
213                     } else {
214                         props.put(name + ".channels", "primary");
215                         props.put(name + ".channels.primary.type", "file");
216 
217                         if (!dataDir.endsWith(FiLE_SEP)) {
218                             dataDir = dataDir + FiLE_SEP;
219                         }
220 
221                         props.put(name + ".channels.primary.checkpointDir", dataDir + "checkpoint");
222                         props.put(name + ".channels.primary.dataDirs", dataDir + "data");
223                     }
224 
225                 } else {
226                     props.put(name + ".channels", "primary");
227                     props.put(name + ".channels.primary.type", "file");
228                 }
229 
230                 final StringBuilder sb = new StringBuilder();
231                 String leading = "";
232                 int priority = agents.length;
233                 for (int i = 0; i < agents.length; ++i) {
234                     sb.append(leading).append("agent").append(i);
235                     leading = " ";
236                     final String prefix = name + ".sinks.agent" + i;
237                     props.put(prefix + ".channel", "primary");
238                     props.put(prefix + ".type", "avro");
239                     props.put(prefix + ".hostname", agents[i].getHost());
240                     props.put(prefix + ".port", Integer.toString(agents[i].getPort()));
241                     props.put(prefix + ".batch-size", Integer.toString(batchSize));
242                     props.put(name + ".sinkgroups.group1.processor.priority.agent" + i, Integer.toString(priority));
243                     --priority;
244                 }
245                 props.put(name + ".sinks", sb.toString());
246                 props.put(name + ".sinkgroups", "group1");
247                 props.put(name + ".sinkgroups.group1.sinks", sb.toString());
248                 props.put(name + ".sinkgroups.group1.processor.type", "failover");
249                 final String sourceChannels = "primary";
250                 props.put(name + ".channels", sourceChannels);
251                 props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".channels", sourceChannels);
252             } else {
253                 String channels = null;
254                 String[] sinks = null;
255 
256                 props.put(name + ".sources", FlumeEmbeddedManager.SOURCE_NAME);
257                 props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".type", SOURCE_TYPE);
258 
259                 for (final Property property : properties) {
260                     final String key = property.getName();
261 
262                     if (key == null || key.length() == 0) {
263                         final String msg = "A property name must be provided";
264                         LOGGER.error(msg);
265                         throw new ConfigurationException(msg);
266                     }
267 
268                     final String upperKey = key.toUpperCase(Locale.ENGLISH);
269 
270                     if (upperKey.startsWith(name.toUpperCase(Locale.ENGLISH))) {
271                         final String msg =
272                             "Specification of the agent name is not allowed in Flume Appender configuration: " + key;
273                         LOGGER.error(msg);
274                         throw new ConfigurationException(msg);
275                     }
276 
277                     // Prohibit setting the sources as they are set above but allow interceptors to be set
278                     if (upperKey.startsWith("SOURCES.") && !upperKey.startsWith("SOURCES.LOG4J-SOURCE.INTERCEPTORS")) {
279                         final String msg = "Specification of Sources is not allowed in Flume Appender: " + key;
280                         LOGGER.error(msg);
281                         throw new ConfigurationException(msg);
282                     }
283 
284                     final String value = property.getValue();
285                     if (value == null || value.length() == 0) {
286                         final String msg = "A value for property " + key + " must be provided";
287                         LOGGER.error(msg);
288                         throw new ConfigurationException(msg);
289                     }
290 
291                     if (upperKey.equals("CHANNELS")) {
292                         channels = value.trim();
293                     } else if (upperKey.equals("SINKS")) {
294                         sinks = value.trim().split(" ");
295                     }
296 
297                     props.put(name + '.' + key, value);
298                 }
299 
300                 String sourceChannels = channels;
301 
302                 if (channels == null) {
303                     sourceChannels = "primary";
304                     props.put(name + ".channels", sourceChannels);
305                 }
306 
307                 props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".channels", sourceChannels);
308 
309                 if (sinks == null || sinks.length == 0) {
310                     final String msg = "At least one Sink must be specified";
311                     LOGGER.error(msg);
312                     throw new ConfigurationException(msg);
313                 }
314             }
315             return props;
316         }
317 
318     }
319 
320 }