View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.flume.appender;
18  
19  import org.apache.flume.Channel;
20  import org.apache.flume.ChannelFactory;
21  import org.apache.flume.ChannelSelector;
22  import org.apache.flume.Context;
23  import org.apache.flume.Sink;
24  import org.apache.flume.SinkFactory;
25  import org.apache.flume.SinkProcessor;
26  import org.apache.flume.SinkRunner;
27  import org.apache.flume.Source;
28  import org.apache.flume.SourceFactory;
29  import org.apache.flume.SourceRunner;
30  import org.apache.flume.channel.ChannelProcessor;
31  import org.apache.flume.channel.ChannelSelectorFactory;
32  import org.apache.flume.channel.DefaultChannelFactory;
33  import org.apache.flume.conf.BasicConfigurationConstants;
34  import org.apache.flume.conf.ComponentConfiguration;
35  import org.apache.flume.conf.Configurables;
36  import org.apache.flume.conf.FlumeConfiguration;
37  import org.apache.flume.conf.FlumeConfigurationError;
38  import org.apache.flume.conf.channel.ChannelSelectorConfiguration;
39  import org.apache.flume.conf.file.SimpleNodeConfiguration;
40  import org.apache.flume.conf.sink.SinkConfiguration;
41  import org.apache.flume.conf.sink.SinkGroupConfiguration;
42  import org.apache.flume.conf.source.SourceConfiguration;
43  import org.apache.flume.node.NodeConfiguration;
44  import org.apache.flume.node.nodemanager.NodeConfigurationAware;
45  import org.apache.flume.sink.DefaultSinkFactory;
46  import org.apache.flume.sink.DefaultSinkProcessor;
47  import org.apache.flume.sink.SinkGroup;
48  import org.apache.flume.source.DefaultSourceFactory;
49  import org.apache.logging.log4j.Logger;
50  import org.apache.logging.log4j.core.config.ConfigurationException;
51  import org.apache.logging.log4j.status.StatusLogger;
52  
53  import java.util.ArrayList;
54  import java.util.HashMap;
55  import java.util.List;
56  import java.util.Map;
57  import java.util.Properties;
58  import java.util.Set;
59  import java.util.TreeSet;
60  
61  /**
62   * See Flume's PropertiesFileConfigurationProvider. This class would extend that if it were possible.
63   */
64  
65  public class FlumeConfigurationBuilder {
66  
67      private static final Logger LOGGER = StatusLogger.getLogger();
68  
69      private final ChannelFactory channelFactory = new DefaultChannelFactory();
70      private final SourceFactory sourceFactory = new DefaultSourceFactory();
71      private final SinkFactory sinkFactory = new DefaultSinkFactory();
72  
73      public NodeConfiguration load(final String name, final Properties props,
74                                    final NodeConfigurationAware configurationAware) {
75          final NodeConfiguration conf = new SimpleNodeConfiguration();
76          FlumeConfiguration fconfig;
77          try {
78              fconfig = new FlumeConfiguration(props);
79              final List<FlumeConfigurationError> errors = fconfig.getConfigurationErrors();
80              if (errors.size() > 0) {
81                  boolean isError = false;
82                  for (final FlumeConfigurationError error : errors) {
83                      final StringBuilder sb = new StringBuilder();
84                      sb.append("Component: ").append(error.getComponentName()).append(" ");
85                      sb.append("Key: ").append(error.getKey()).append(" ");
86                      sb.append(error.getErrorType().name()).append(" - ").append(error.getErrorType().getError());
87                      switch (error.getErrorOrWarning()) {
88                          case ERROR:
89                              isError = true;
90                              LOGGER.error(sb.toString());
91                              break;
92                          case WARNING:
93                              LOGGER.warn(sb.toString());
94                              break;
95                      }
96                  }
97                  if (isError) {
98                      throw new ConfigurationException("Unable to configure Flume due to errors");
99                  }
100             }
101         } catch (final RuntimeException ex) {
102             printProps(props);
103             throw ex;
104         }
105 
106         final FlumeConfiguration.AgentConfiguration agentConf = fconfig.getConfigurationFor(name);
107 
108         if (agentConf != null) {
109 
110             loadChannels(agentConf, conf);
111             loadSources(agentConf, conf);
112             loadSinks(agentConf, conf);
113 
114             //configurationAware.startAllComponents(conf);
115         } else {
116             LOGGER.warn("No configuration found for: {}", name);
117         }
118         return conf;
119     }
120 
121     private void printProps(final Properties props) {
122         for (final String key : new TreeSet<String>(props.stringPropertyNames())) {
123             LOGGER.error(key + "=" + props.getProperty(key));
124         }
125     }
126 
127     protected void loadChannels(final FlumeConfiguration.AgentConfiguration agentConf, final NodeConfiguration conf) {
128         LOGGER.info("Creating channels");
129         final Set<String> channels = agentConf.getChannelSet();
130         final Map<String, ComponentConfiguration> compMap = agentConf.getChannelConfigMap();
131         for (final String chName : channels) {
132             final ComponentConfiguration comp = compMap.get(chName);
133             if (comp != null) {
134                 final Channel channel = channelFactory.create(comp.getComponentName(), comp.getType());
135 
136                 Configurables.configure(channel, comp);
137 
138                 conf.getChannels().put(comp.getComponentName(), channel);
139             }
140         }
141 
142         for (final String ch : channels) {
143             final Context context = agentConf.getChannelContext().get(ch);
144             if (context != null) {
145                 final Channel channel = channelFactory.create(ch,
146                     context.getString(BasicConfigurationConstants.CONFIG_TYPE));
147                 Configurables.configure(channel, context);
148                 conf.getChannels().put(ch, channel);
149                 LOGGER.info("created channel " + ch);
150             }
151         }
152     }
153 
154     protected void loadSources(final FlumeConfiguration.AgentConfiguration agentConf, final NodeConfiguration conf) {
155 
156         final Set<String> sources = agentConf.getSourceSet();
157         final Map<String, ComponentConfiguration> compMap = agentConf.getSourceConfigMap();
158         for (final String sourceName : sources) {
159             final ComponentConfiguration comp = compMap.get(sourceName);
160             if (comp != null) {
161                 final SourceConfiguration config = (SourceConfiguration) comp;
162 
163                 final Source source = sourceFactory.create(comp.getComponentName(), comp.getType());
164 
165                 Configurables.configure(source, config);
166                 final Set<String> channelNames = config.getChannels();
167                 final List<Channel> channels = new ArrayList<Channel>();
168                 for (final String chName : channelNames) {
169                     channels.add(conf.getChannels().get(chName));
170                 }
171 
172                 final ChannelSelectorConfiguration selectorConfig = config.getSelectorConfiguration();
173 
174                 final ChannelSelector selector = ChannelSelectorFactory.create(channels, selectorConfig);
175 
176                 final ChannelProcessor channelProcessor = new ChannelProcessor(selector);
177                 Configurables.configure(channelProcessor, config);
178 
179                 source.setChannelProcessor(channelProcessor);
180                 conf.getSourceRunners().put(comp.getComponentName(), SourceRunner.forSource(source));
181             }
182         }
183         final Map<String, Context> sourceContexts = agentConf.getSourceContext();
184 
185         for (final String src : sources) {
186             final Context context = sourceContexts.get(src);
187             if (context != null) {
188                 final Source source = sourceFactory.create(src,
189                     context.getString(BasicConfigurationConstants.CONFIG_TYPE));
190                 final List<Channel> channels = new ArrayList<Channel>();
191                 Configurables.configure(source, context);
192                 final String[] channelNames =
193                     context.getString(BasicConfigurationConstants.CONFIG_CHANNELS).split("\\s+");
194                 for (final String chName : channelNames) {
195                     channels.add(conf.getChannels().get(chName));
196                 }
197 
198                 final Map<String, String> selectorConfig = context.getSubProperties(
199                     BasicConfigurationConstants.CONFIG_SOURCE_CHANNELSELECTOR_PREFIX);
200 
201                 final ChannelSelector selector = ChannelSelectorFactory.create(channels, selectorConfig);
202 
203                 final ChannelProcessor channelProcessor = new ChannelProcessor(selector);
204                 Configurables.configure(channelProcessor, context);
205 
206                 source.setChannelProcessor(channelProcessor);
207                 conf.getSourceRunners().put(src, SourceRunner.forSource(source));
208             }
209         }
210     }
211 
212     protected void loadSinks(final FlumeConfiguration.AgentConfiguration agentConf, final NodeConfiguration conf) {
213         final Set<String> sinkNames = agentConf.getSinkSet();
214         final Map<String, ComponentConfiguration> compMap = agentConf.getSinkConfigMap();
215         final Map<String, Sink> sinks = new HashMap<String, Sink>();
216         for (final String sinkName : sinkNames) {
217             final ComponentConfiguration comp = compMap.get(sinkName);
218             if (comp != null) {
219                 final SinkConfiguration config = (SinkConfiguration) comp;
220                 final Sink sink = sinkFactory.create(comp.getComponentName(), comp.getType());
221 
222                 Configurables.configure(sink, config);
223 
224                 sink.setChannel(conf.getChannels().get(config.getChannel()));
225                 sinks.put(comp.getComponentName(), sink);
226             }
227         }
228 
229         final Map<String, Context> sinkContexts = agentConf.getSinkContext();
230         for (final String sinkName : sinkNames) {
231             final Context context = sinkContexts.get(sinkName);
232             if (context != null) {
233                 final Sink sink = sinkFactory.create(sinkName,
234                     context.getString(BasicConfigurationConstants.CONFIG_TYPE));
235                 Configurables.configure(sink, context);
236 
237                 sink.setChannel(conf.getChannels().get(context.getString(BasicConfigurationConstants.CONFIG_CHANNEL)));
238                 sinks.put(sinkName, sink);
239             }
240         }
241 
242         loadSinkGroups(agentConf, sinks, conf);
243     }
244 
245     protected void loadSinkGroups(final FlumeConfiguration.AgentConfiguration agentConf,
246                                   final Map<String, Sink> sinks, final NodeConfiguration conf) {
247         final Set<String> sinkgroupNames = agentConf.getSinkgroupSet();
248         final Map<String, ComponentConfiguration> compMap = agentConf.getSinkGroupConfigMap();
249         final Map<String, String> usedSinks = new HashMap<String, String>();
250         for (final String groupName : sinkgroupNames) {
251             final ComponentConfiguration comp = compMap.get(groupName);
252             if (comp != null) {
253                 final SinkGroupConfiguration groupConf = (SinkGroupConfiguration) comp;
254                 final List<String> groupSinkList = groupConf.getSinks();
255                 final List<Sink> groupSinks = new ArrayList<Sink>();
256                 for (final String sink : groupSinkList) {
257                     final Sink s = sinks.remove(sink);
258                     if (s == null) {
259                         final String sinkUser = usedSinks.get(sink);
260                         if (sinkUser != null) {
261                             throw new ConfigurationException(String.format(
262                                 "Sink %s of group %s already in use by group %s", sink, groupName, sinkUser));
263                         } else {
264                             throw new ConfigurationException(String.format(
265                                 "Sink %s of group %s does not exist or is not properly configured", sink,
266                                 groupName));
267                         }
268                     }
269                     groupSinks.add(s);
270                     usedSinks.put(sink, groupName);
271                 }
272                 final SinkGroup group = new SinkGroup(groupSinks);
273                 Configurables.configure(group, groupConf);
274                 conf.getSinkRunners().put(comp.getComponentName(), new SinkRunner(group.getProcessor()));
275             }
276         }
277         // add any unasigned sinks to solo collectors
278         for (final Map.Entry<String, Sink> entry : sinks.entrySet()) {
279             if (!usedSinks.containsValue(entry.getKey())) {
280                 final SinkProcessor pr = new DefaultSinkProcessor();
281                 final List<Sink> sinkMap = new ArrayList<Sink>();
282                 sinkMap.add(entry.getValue());
283                 pr.setSinks(sinkMap);
284                 Configurables.configure(pr, new Context());
285                 conf.getSinkRunners().put(entry.getKey(), new SinkRunner(pr));
286             }
287         }
288     }
289 }