View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.flume.appender;
18  
19  import org.apache.flume.SourceRunner;
20  import org.apache.flume.node.NodeConfiguration;
21  import org.apache.flume.node.nodemanager.DefaultLogicalNodeManager;
22  import org.apache.logging.log4j.core.appender.ManagerFactory;
23  import org.apache.logging.log4j.core.config.ConfigurationException;
24  import org.apache.logging.log4j.core.config.Property;
25  import org.apache.logging.log4j.core.helpers.NameUtil;
26  import org.apache.logging.log4j.util.PropertiesUtil;
27  
28  import java.util.Locale;
29  import java.util.Properties;
30  
31  /**
32   *
33   */
34  public class FlumeEmbeddedManager extends AbstractFlumeManager {
35  
36      /** Name for the Flume source */
37      protected static final String SOURCE_NAME = "log4j-source";
38  
39      private static ManagerFactory factory = new FlumeManagerFactory();
40  
41      private static final String FiLE_SEP = PropertiesUtil.getProperties().getStringProperty("file.separator");
42  
43      private static final String IN_MEMORY = "InMemory";
44  
45      private final FlumeNode node;
46  
47      private NodeConfiguration conf;
48  
49      private final Log4jEventSource source;
50  
51      private final String shortName;
52  
53  
54      /**
55       * Constructor
56       * @param name The unique name of this manager.
57       * @param node The Flume Node.
58       */
59      protected FlumeEmbeddedManager(final String name, final String shortName, final FlumeNode node) {
60          super(name);
61          this.node = node;
62          this.shortName = shortName;
63          final SourceRunner runner = node.getConfiguration().getSourceRunners().get(SOURCE_NAME);
64          if (runner == null || runner.getSource() == null) {
65              throw new IllegalStateException("No Source has been created for Appender " + shortName);
66          }
67          source  = (Log4jEventSource) runner.getSource();
68      }
69  
70      /**
71       * Returns a FlumeEmbeddedManager.
72       * @param name The name of the manager.
73       * @param agents The agents to use.
74       * @param properties Properties for the embedded manager.
75       * @param batchSize The number of events to include in a batch.
76       * @param dataDir The directory where the Flume FileChannel should write to.
77       * @return A FlumeAvroManager.
78       */
79      public static FlumeEmbeddedManager getManager(final String name, final Agent[] agents, final Property[] properties,
80                                                    int batchSize, final String dataDir) {
81  
82          if (batchSize <= 0) {
83              batchSize = 1;
84          }
85  
86          if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) {
87              throw new IllegalArgumentException("Either an Agent or properties are required");
88          } else if (agents != null && agents.length > 0 && properties != null && properties.length > 0) {
89              throw new IllegalArgumentException("Cannot configure both Agents and Properties.");
90          }
91  
92          final StringBuilder sb = new StringBuilder();
93          boolean first = true;
94  
95          if (agents != null && agents.length > 0) {
96              sb.append("FlumeEmbedded[");
97              for (final Agent agent : agents) {
98                  if (!first) {
99                      sb.append(",");
100                 }
101                 sb.append(agent.getHost()).append(":").append(agent.getPort());
102                 first = false;
103             }
104             sb.append("]");
105         } else {
106             String sep = "";
107             sb.append(name).append(":");
108             final StringBuilder props = new StringBuilder();
109             for (final Property prop : properties) {
110                 props.append(sep);
111                 props.append(prop.getName()).append("=").append(prop.getValue());
112                 sep = ",";
113             }
114             sb.append(NameUtil.md5(props.toString()));
115         }
116         return (FlumeEmbeddedManager) getManager(sb.toString(), factory,
117             new FactoryData(name, agents, properties, batchSize, dataDir));
118     }
119 
120     @Override
121     public void send(final FlumeEvent event, final int delay, final int retries) {
122         source.send(event);
123     }
124 
125     @Override
126     protected void releaseSub() {
127         node.stop();
128     }
129 
130     /**
131      * Factory data.
132      */
133     private static class FactoryData {
134         private final Agent[] agents;
135         private final Property[] properties;
136         private final int batchSize;
137         private final String dataDir;
138         private final String name;
139 
140         /**
141          * Constructor.
142          * @param name The name of the Appender.
143          * @param agents The agents.
144          * @param properties The Flume configuration properties.
145          * @param batchSize The number of events to include in a batch.
146          * @param dataDir The directory where Flume should write to.
147          */
148         public FactoryData(final String name, final Agent[] agents, final Property[] properties, final int batchSize,
149                            final String dataDir) {
150             this.name = name;
151             this.agents = agents;
152             this.batchSize = batchSize;
153             this.properties = properties;
154             this.dataDir = dataDir;
155         }
156     }
157 
158     /**
159      * Avro Manager Factory.
160      */
161     private static class FlumeManagerFactory implements ManagerFactory<FlumeEmbeddedManager, FactoryData> {
162         private static final String SOURCE_TYPE = Log4jEventSource.class.getName();
163 
164         /**
165          * Create the FlumeAvroManager.
166          * @param name The name of the entity to manage.
167          * @param data The data required to create the entity.
168          * @return The FlumeAvroManager.
169          */
170         public FlumeEmbeddedManager createManager(final String name, final FactoryData data) {
171             try {
172                 final DefaultLogicalNodeManager nodeManager = new DefaultLogicalNodeManager();
173                 final Properties props = createProperties(data.name, data.agents, data.properties, data.batchSize,
174                     data.dataDir);
175                 final FlumeConfigurationBuilder builder = new FlumeConfigurationBuilder();
176                 final NodeConfiguration conf = builder.load(data.name, props, nodeManager);
177 
178                 final FlumeNode node = new FlumeNode(nodeManager, nodeManager, conf);
179 
180                 node.start();
181 
182                 return new FlumeEmbeddedManager(name, data.name, node);
183             } catch (final Exception ex) {
184                 LOGGER.error("Could not create FlumeEmbeddedManager", ex);
185             }
186             return null;
187         }
188 
189         private Properties createProperties(final String name, final Agent[] agents, final Property[] properties,
190                                             final int batchSize, String dataDir) {
191             final Properties props = new Properties();
192 
193             if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) {
194                 LOGGER.error("No Flume configuration provided");
195                 throw new ConfigurationException("No Flume configuration provided");
196             }
197 
198             if ((agents != null && agents.length > 0 && properties != null && properties.length > 0)) {
199                 LOGGER.error("Agents and Flume configuration cannot both be specified");
200                 throw new ConfigurationException("Agents and Flume configuration cannot both be specified");
201             }
202 
203             if (agents != null && agents.length > 0) {
204                 props.put(name + ".sources", FlumeEmbeddedManager.SOURCE_NAME);
205                 props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".type", SOURCE_TYPE);
206 
207                 if (dataDir != null && dataDir.length() > 0) {
208                     if (dataDir.equals(IN_MEMORY)) {
209                         props.put(name + ".channels", "primary");
210                         props.put(name + ".channels.primary.type", "memory");
211                     } else {
212                         props.put(name + ".channels", "primary");
213                         props.put(name + ".channels.primary.type", "file");
214 
215                         if (!dataDir.endsWith(FiLE_SEP)) {
216                             dataDir = dataDir + FiLE_SEP;
217                         }
218 
219                         props.put(name + ".channels.primary.checkpointDir", dataDir + "checkpoint");
220                         props.put(name + ".channels.primary.dataDirs", dataDir + "data");
221                     }
222 
223                 } else {
224                     props.put(name + ".channels", "primary");
225                     props.put(name + ".channels.primary.type", "file");
226                 }
227 
228                 final StringBuilder sb = new StringBuilder();
229                 String leading = "";
230                 int priority = agents.length;
231                 for (int i = 0; i < agents.length; ++i) {
232                     sb.append(leading).append("agent").append(i);
233                     leading = " ";
234                     final String prefix = name + ".sinks.agent" + i;
235                     props.put(prefix + ".channel", "primary");
236                     props.put(prefix + ".type", "avro");
237                     props.put(prefix + ".hostname", agents[i].getHost());
238                     props.put(prefix + ".port", Integer.toString(agents[i].getPort()));
239                     props.put(prefix + ".batch-size", Integer.toString(batchSize));
240                     props.put(name + ".sinkgroups.group1.processor.priority.agent" + i, Integer.toString(priority));
241                     --priority;
242                 }
243                 props.put(name + ".sinks", sb.toString());
244                 props.put(name + ".sinkgroups", "group1");
245                 props.put(name + ".sinkgroups.group1.sinks", sb.toString());
246                 props.put(name + ".sinkgroups.group1.processor.type", "failover");
247                 final String sourceChannels = "primary";
248                 props.put(name + ".channels", sourceChannels);
249                 props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".channels", sourceChannels);
250             } else {
251                 String channels = null;
252                 String[] sinks = null;
253 
254                 props.put(name + ".sources", FlumeEmbeddedManager.SOURCE_NAME);
255                 props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".type", SOURCE_TYPE);
256 
257                 for (final Property property : properties) {
258                     final String key = property.getName();
259 
260                     if (key == null || key.length() == 0) {
261                         final String msg = "A property name must be provided";
262                         LOGGER.error(msg);
263                         throw new ConfigurationException(msg);
264                     }
265 
266                     final String upperKey = key.toUpperCase(Locale.ENGLISH);
267 
268                     if (upperKey.startsWith(name.toUpperCase(Locale.ENGLISH))) {
269                         final String msg =
270                             "Specification of the agent name is allowed in Flume Appender configuration: " + key;
271                         LOGGER.error(msg);
272                         throw new ConfigurationException(msg);
273                     }
274 
275                     if (upperKey.startsWith("SOURCES.")) {
276                         final String msg = "Specification of Sources is not allowed in Flume Appender: " + key;
277                         LOGGER.error(msg);
278                         throw new ConfigurationException(msg);
279                     }
280 
281                     final String value = property.getValue();
282                     if (value == null || value.length() == 0) {
283                         final String msg = "A value for property " + key + " must be provided";
284                         LOGGER.error(msg);
285                         throw new ConfigurationException(msg);
286                     }
287 
288                     if (upperKey.equals("CHANNELS")) {
289                         channels = value.trim();
290                     } else if (upperKey.equals("SINKS")) {
291                         sinks = value.trim().split(" ");
292                     }
293 
294                     props.put(name + '.' + key, value);
295                 }
296 
297                 String sourceChannels = channels;
298 
299                 if (channels == null) {
300                     sourceChannels = "primary";
301                     props.put(name + ".channels", sourceChannels);
302                 }
303 
304                 props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".channels", sourceChannels);
305 
306                 if (sinks == null || sinks.length == 0) {
307                     final String msg = "At least one Sink must be specified";
308                     LOGGER.error(msg);
309                     throw new ConfigurationException(msg);
310                 }
311             }
312             return props;
313         }
314 
315     }
316 
317 }