1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.flume.appender;
18
19 import org.apache.flume.Channel;
20 import org.apache.flume.ChannelFactory;
21 import org.apache.flume.ChannelSelector;
22 import org.apache.flume.Context;
23 import org.apache.flume.Sink;
24 import org.apache.flume.SinkFactory;
25 import org.apache.flume.SinkProcessor;
26 import org.apache.flume.SinkRunner;
27 import org.apache.flume.Source;
28 import org.apache.flume.SourceFactory;
29 import org.apache.flume.SourceRunner;
30 import org.apache.flume.channel.ChannelProcessor;
31 import org.apache.flume.channel.ChannelSelectorFactory;
32 import org.apache.flume.channel.DefaultChannelFactory;
33 import org.apache.flume.conf.BasicConfigurationConstants;
34 import org.apache.flume.conf.ComponentConfiguration;
35 import org.apache.flume.conf.Configurables;
36 import org.apache.flume.conf.FlumeConfiguration;
37 import org.apache.flume.conf.FlumeConfigurationError;
38 import org.apache.flume.conf.channel.ChannelSelectorConfiguration;
39 import org.apache.flume.conf.file.SimpleNodeConfiguration;
40 import org.apache.flume.conf.sink.SinkConfiguration;
41 import org.apache.flume.conf.sink.SinkGroupConfiguration;
42 import org.apache.flume.conf.source.SourceConfiguration;
43 import org.apache.flume.node.NodeConfiguration;
44 import org.apache.flume.node.nodemanager.NodeConfigurationAware;
45 import org.apache.flume.sink.DefaultSinkFactory;
46 import org.apache.flume.sink.DefaultSinkProcessor;
47 import org.apache.flume.sink.SinkGroup;
48 import org.apache.flume.source.DefaultSourceFactory;
49 import org.apache.logging.log4j.Logger;
50 import org.apache.logging.log4j.core.config.ConfigurationException;
51 import org.apache.logging.log4j.status.StatusLogger;
52
53 import java.util.ArrayList;
54 import java.util.HashMap;
55 import java.util.List;
56 import java.util.Map;
57 import java.util.Properties;
58 import java.util.Set;
59 import java.util.TreeSet;
60
61
62
63
64
65 public class FlumeConfigurationBuilder {
66
67 private static final Logger LOGGER = StatusLogger.getLogger();
68
69 private final ChannelFactory channelFactory = new DefaultChannelFactory();
70 private final SourceFactory sourceFactory = new DefaultSourceFactory();
71 private final SinkFactory sinkFactory = new DefaultSinkFactory();
72
73 public NodeConfiguration load(final String name, final Properties props,
74 final NodeConfigurationAware configurationAware) {
75 final NodeConfiguration conf = new SimpleNodeConfiguration();
76 FlumeConfiguration fconfig;
77 try {
78 fconfig = new FlumeConfiguration(props);
79 final List<FlumeConfigurationError> errors = fconfig.getConfigurationErrors();
80 if (errors.size() > 0) {
81 boolean isError = false;
82 for (final FlumeConfigurationError error : errors) {
83 final StringBuilder sb = new StringBuilder();
84 sb.append("Component: ").append(error.getComponentName()).append(" ");
85 sb.append("Key: ").append(error.getKey()).append(" ");
86 sb.append(error.getErrorType().name()).append(" - ").append(error.getErrorType().getError());
87 switch (error.getErrorOrWarning()) {
88 case ERROR:
89 isError = true;
90 LOGGER.error(sb.toString());
91 break;
92 case WARNING:
93 LOGGER.warn(sb.toString());
94 break;
95 }
96 }
97 if (isError) {
98 throw new ConfigurationException("Unable to configure Flume due to errors");
99 }
100 }
101 } catch (final RuntimeException ex) {
102 printProps(props);
103 throw ex;
104 }
105
106 final FlumeConfiguration.AgentConfiguration agentConf = fconfig.getConfigurationFor(name);
107
108 if (agentConf != null) {
109
110 loadChannels(agentConf, conf);
111 loadSources(agentConf, conf);
112 loadSinks(agentConf, conf);
113
114
115 } else {
116 LOGGER.warn("No configuration found for: {}", name);
117 }
118 return conf;
119 }
120
121 private void printProps(final Properties props) {
122 for (final String key : new TreeSet<String>(props.stringPropertyNames())) {
123 LOGGER.error(key + "=" + props.getProperty(key));
124 }
125 }
126
127 protected void loadChannels(final FlumeConfiguration.AgentConfiguration agentConf, final NodeConfiguration conf) {
128 LOGGER.info("Creating channels");
129 final Set<String> channels = agentConf.getChannelSet();
130 final Map<String, ComponentConfiguration> compMap = agentConf.getChannelConfigMap();
131 for (final String chName : channels) {
132 final ComponentConfiguration comp = compMap.get(chName);
133 if (comp != null) {
134 final Channel channel = channelFactory.create(comp.getComponentName(), comp.getType());
135
136 Configurables.configure(channel, comp);
137
138 conf.getChannels().put(comp.getComponentName(), channel);
139 }
140 }
141
142 for (final String ch : channels) {
143 final Context context = agentConf.getChannelContext().get(ch);
144 if (context != null) {
145 final Channel channel = channelFactory.create(ch,
146 context.getString(BasicConfigurationConstants.CONFIG_TYPE));
147 Configurables.configure(channel, context);
148 conf.getChannels().put(ch, channel);
149 LOGGER.info("created channel " + ch);
150 }
151 }
152 }
153
154 protected void loadSources(final FlumeConfiguration.AgentConfiguration agentConf, final NodeConfiguration conf) {
155
156 final Set<String> sources = agentConf.getSourceSet();
157 final Map<String, ComponentConfiguration> compMap = agentConf.getSourceConfigMap();
158 for (final String sourceName : sources) {
159 final ComponentConfiguration comp = compMap.get(sourceName);
160 if (comp != null) {
161 final SourceConfiguration config = (SourceConfiguration) comp;
162
163 final Source source = sourceFactory.create(comp.getComponentName(), comp.getType());
164
165 Configurables.configure(source, config);
166 final Set<String> channelNames = config.getChannels();
167 final List<Channel> channels = new ArrayList<Channel>();
168 for (final String chName : channelNames) {
169 channels.add(conf.getChannels().get(chName));
170 }
171
172 final ChannelSelectorConfiguration selectorConfig = config.getSelectorConfiguration();
173
174 final ChannelSelector selector = ChannelSelectorFactory.create(channels, selectorConfig);
175
176 final ChannelProcessor channelProcessor = new ChannelProcessor(selector);
177 Configurables.configure(channelProcessor, config);
178
179 source.setChannelProcessor(channelProcessor);
180 conf.getSourceRunners().put(comp.getComponentName(), SourceRunner.forSource(source));
181 }
182 }
183 final Map<String, Context> sourceContexts = agentConf.getSourceContext();
184
185 for (final String src : sources) {
186 final Context context = sourceContexts.get(src);
187 if (context != null) {
188 final Source source = sourceFactory.create(src,
189 context.getString(BasicConfigurationConstants.CONFIG_TYPE));
190 final List<Channel> channels = new ArrayList<Channel>();
191 Configurables.configure(source, context);
192 final String[] channelNames =
193 context.getString(BasicConfigurationConstants.CONFIG_CHANNELS).split("\\s+");
194 for (final String chName : channelNames) {
195 channels.add(conf.getChannels().get(chName));
196 }
197
198 final Map<String, String> selectorConfig = context.getSubProperties(
199 BasicConfigurationConstants.CONFIG_SOURCE_CHANNELSELECTOR_PREFIX);
200
201 final ChannelSelector selector = ChannelSelectorFactory.create(channels, selectorConfig);
202
203 final ChannelProcessor channelProcessor = new ChannelProcessor(selector);
204 Configurables.configure(channelProcessor, context);
205
206 source.setChannelProcessor(channelProcessor);
207 conf.getSourceRunners().put(src, SourceRunner.forSource(source));
208 }
209 }
210 }
211
212 protected void loadSinks(final FlumeConfiguration.AgentConfiguration agentConf, final NodeConfiguration conf) {
213 final Set<String> sinkNames = agentConf.getSinkSet();
214 final Map<String, ComponentConfiguration> compMap = agentConf.getSinkConfigMap();
215 final Map<String, Sink> sinks = new HashMap<String, Sink>();
216 for (final String sinkName : sinkNames) {
217 final ComponentConfiguration comp = compMap.get(sinkName);
218 if (comp != null) {
219 final SinkConfiguration config = (SinkConfiguration) comp;
220 final Sink sink = sinkFactory.create(comp.getComponentName(), comp.getType());
221
222 Configurables.configure(sink, config);
223
224 sink.setChannel(conf.getChannels().get(config.getChannel()));
225 sinks.put(comp.getComponentName(), sink);
226 }
227 }
228
229 final Map<String, Context> sinkContexts = agentConf.getSinkContext();
230 for (final String sinkName : sinkNames) {
231 final Context context = sinkContexts.get(sinkName);
232 if (context != null) {
233 final Sink sink = sinkFactory.create(sinkName,
234 context.getString(BasicConfigurationConstants.CONFIG_TYPE));
235 Configurables.configure(sink, context);
236
237 sink.setChannel(conf.getChannels().get(context.getString(BasicConfigurationConstants.CONFIG_CHANNEL)));
238 sinks.put(sinkName, sink);
239 }
240 }
241
242 loadSinkGroups(agentConf, sinks, conf);
243 }
244
245 protected void loadSinkGroups(final FlumeConfiguration.AgentConfiguration agentConf,
246 final Map<String, Sink> sinks, final NodeConfiguration conf) {
247 final Set<String> sinkgroupNames = agentConf.getSinkgroupSet();
248 final Map<String, ComponentConfiguration> compMap = agentConf.getSinkGroupConfigMap();
249 final Map<String, String> usedSinks = new HashMap<String, String>();
250 for (final String groupName : sinkgroupNames) {
251 final ComponentConfiguration comp = compMap.get(groupName);
252 if (comp != null) {
253 final SinkGroupConfiguration groupConf = (SinkGroupConfiguration) comp;
254 final List<String> groupSinkList = groupConf.getSinks();
255 final List<Sink> groupSinks = new ArrayList<Sink>();
256 for (final String sink : groupSinkList) {
257 final Sink s = sinks.remove(sink);
258 if (s == null) {
259 final String sinkUser = usedSinks.get(sink);
260 if (sinkUser != null) {
261 throw new ConfigurationException(String.format(
262 "Sink %s of group %s already in use by group %s", sink, groupName, sinkUser));
263 } else {
264 throw new ConfigurationException(String.format(
265 "Sink %s of group %s does not exist or is not properly configured", sink,
266 groupName));
267 }
268 }
269 groupSinks.add(s);
270 usedSinks.put(sink, groupName);
271 }
272 final SinkGroup group = new SinkGroup(groupSinks);
273 Configurables.configure(group, groupConf);
274 conf.getSinkRunners().put(comp.getComponentName(), new SinkRunner(group.getProcessor()));
275 }
276 }
277
278 for (final Map.Entry<String, Sink> entry : sinks.entrySet()) {
279 if (!usedSinks.containsValue(entry.getKey())) {
280 final SinkProcessor pr = new DefaultSinkProcessor();
281 final List<Sink> sinkMap = new ArrayList<Sink>();
282 sinkMap.add(entry.getValue());
283 pr.setSinks(sinkMap);
284 Configurables.configure(pr, new Context());
285 conf.getSinkRunners().put(entry.getKey(), new SinkRunner(pr));
286 }
287 }
288 }
289 }