001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements. See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache license, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License. You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the license for the specific language governing permissions and
015     * limitations under the license.
016     */
017    package org.apache.logging.log4j.flume.appender;
018    
019    import org.apache.flume.Channel;
020    import org.apache.flume.ChannelFactory;
021    import org.apache.flume.ChannelSelector;
022    import org.apache.flume.Context;
023    import org.apache.flume.Sink;
024    import org.apache.flume.SinkFactory;
025    import org.apache.flume.SinkProcessor;
026    import org.apache.flume.SinkRunner;
027    import org.apache.flume.Source;
028    import org.apache.flume.SourceFactory;
029    import org.apache.flume.SourceRunner;
030    import org.apache.flume.channel.ChannelProcessor;
031    import org.apache.flume.channel.ChannelSelectorFactory;
032    import org.apache.flume.channel.DefaultChannelFactory;
033    import org.apache.flume.conf.BasicConfigurationConstants;
034    import org.apache.flume.conf.ComponentConfiguration;
035    import org.apache.flume.conf.Configurables;
036    import org.apache.flume.conf.FlumeConfiguration;
037    import org.apache.flume.conf.FlumeConfigurationError;
038    import org.apache.flume.conf.channel.ChannelSelectorConfiguration;
039    import org.apache.flume.conf.file.SimpleNodeConfiguration;
040    import org.apache.flume.conf.sink.SinkConfiguration;
041    import org.apache.flume.conf.sink.SinkGroupConfiguration;
042    import org.apache.flume.conf.source.SourceConfiguration;
043    import org.apache.flume.node.NodeConfiguration;
044    import org.apache.flume.node.nodemanager.NodeConfigurationAware;
045    import org.apache.flume.sink.DefaultSinkFactory;
046    import org.apache.flume.sink.DefaultSinkProcessor;
047    import org.apache.flume.sink.SinkGroup;
048    import org.apache.flume.source.DefaultSourceFactory;
049    import org.apache.logging.log4j.Logger;
050    import org.apache.logging.log4j.core.config.ConfigurationException;
051    import org.apache.logging.log4j.status.StatusLogger;
052    
053    import java.util.ArrayList;
054    import java.util.HashMap;
055    import java.util.List;
056    import java.util.Map;
057    import java.util.Properties;
058    import java.util.Set;
059    import java.util.TreeSet;
060    
061    /**
062     * See Flume's PropertiesFileConfigurationProvider. This class would extend that if it were possible.
063     */
064    
065    public class FlumeConfigurationBuilder {
066    
067        private static final Logger LOGGER = StatusLogger.getLogger();
068    
069        private final ChannelFactory channelFactory = new DefaultChannelFactory();
070        private final SourceFactory sourceFactory = new DefaultSourceFactory();
071        private final SinkFactory sinkFactory = new DefaultSinkFactory();
072    
073        public NodeConfiguration load(final String name, final Properties props,
074                                      final NodeConfigurationAware configurationAware) {
075            final NodeConfiguration conf = new SimpleNodeConfiguration();
076            FlumeConfiguration fconfig;
077            try {
078                fconfig = new FlumeConfiguration(props);
079                final List<FlumeConfigurationError> errors = fconfig.getConfigurationErrors();
080                if (errors.size() > 0) {
081                    boolean isError = false;
082                    for (final FlumeConfigurationError error : errors) {
083                        final StringBuilder sb = new StringBuilder();
084                        sb.append("Component: ").append(error.getComponentName()).append(" ");
085                        sb.append("Key: ").append(error.getKey()).append(" ");
086                        sb.append(error.getErrorType().name()).append(" - ").append(error.getErrorType().getError());
087                        switch (error.getErrorOrWarning()) {
088                            case ERROR:
089                                isError = true;
090                                LOGGER.error(sb.toString());
091                                break;
092                            case WARNING:
093                                LOGGER.warn(sb.toString());
094                                break;
095                        }
096                    }
097                    if (isError) {
098                        throw new ConfigurationException("Unable to configure Flume due to errors");
099                    }
100                }
101            } catch (final RuntimeException ex) {
102                printProps(props);
103                throw ex;
104            }
105    
106            final FlumeConfiguration.AgentConfiguration agentConf = fconfig.getConfigurationFor(name);
107    
108            if (agentConf != null) {
109    
110                loadChannels(agentConf, conf);
111                loadSources(agentConf, conf);
112                loadSinks(agentConf, conf);
113    
114                //configurationAware.startAllComponents(conf);
115            } else {
116                LOGGER.warn("No configuration found for: {}", name);
117            }
118            return conf;
119        }
120    
121        private void printProps(final Properties props) {
122            for (final String key : new TreeSet<String>(props.stringPropertyNames())) {
123                LOGGER.error(key + "=" + props.getProperty(key));
124            }
125        }
126    
127        protected void loadChannels(final FlumeConfiguration.AgentConfiguration agentConf, final NodeConfiguration conf) {
128            LOGGER.info("Creating channels");
129            final Set<String> channels = agentConf.getChannelSet();
130            final Map<String, ComponentConfiguration> compMap = agentConf.getChannelConfigMap();
131            for (final String chName : channels) {
132                final ComponentConfiguration comp = compMap.get(chName);
133                if (comp != null) {
134                    final Channel channel = channelFactory.create(comp.getComponentName(), comp.getType());
135    
136                    Configurables.configure(channel, comp);
137    
138                    conf.getChannels().put(comp.getComponentName(), channel);
139                }
140            }
141    
142            for (final String ch : channels) {
143                final Context context = agentConf.getChannelContext().get(ch);
144                if (context != null) {
145                    final Channel channel = channelFactory.create(ch,
146                        context.getString(BasicConfigurationConstants.CONFIG_TYPE));
147                    Configurables.configure(channel, context);
148                    conf.getChannels().put(ch, channel);
149                    LOGGER.info("created channel " + ch);
150                }
151            }
152        }
153    
154        protected void loadSources(final FlumeConfiguration.AgentConfiguration agentConf, final NodeConfiguration conf) {
155    
156            final Set<String> sources = agentConf.getSourceSet();
157            final Map<String, ComponentConfiguration> compMap = agentConf.getSourceConfigMap();
158            for (final String sourceName : sources) {
159                final ComponentConfiguration comp = compMap.get(sourceName);
160                if (comp != null) {
161                    final SourceConfiguration config = (SourceConfiguration) comp;
162    
163                    final Source source = sourceFactory.create(comp.getComponentName(), comp.getType());
164    
165                    Configurables.configure(source, config);
166                    final Set<String> channelNames = config.getChannels();
167                    final List<Channel> channels = new ArrayList<Channel>();
168                    for (final String chName : channelNames) {
169                        channels.add(conf.getChannels().get(chName));
170                    }
171    
172                    final ChannelSelectorConfiguration selectorConfig = config.getSelectorConfiguration();
173    
174                    final ChannelSelector selector = ChannelSelectorFactory.create(channels, selectorConfig);
175    
176                    final ChannelProcessor channelProcessor = new ChannelProcessor(selector);
177                    Configurables.configure(channelProcessor, config);
178    
179                    source.setChannelProcessor(channelProcessor);
180                    conf.getSourceRunners().put(comp.getComponentName(), SourceRunner.forSource(source));
181                }
182            }
183            final Map<String, Context> sourceContexts = agentConf.getSourceContext();
184    
185            for (final String src : sources) {
186                final Context context = sourceContexts.get(src);
187                if (context != null) {
188                    final Source source = sourceFactory.create(src,
189                        context.getString(BasicConfigurationConstants.CONFIG_TYPE));
190                    final List<Channel> channels = new ArrayList<Channel>();
191                    Configurables.configure(source, context);
192                    final String[] channelNames =
193                        context.getString(BasicConfigurationConstants.CONFIG_CHANNELS).split("\\s+");
194                    for (final String chName : channelNames) {
195                        channels.add(conf.getChannels().get(chName));
196                    }
197    
198                    final Map<String, String> selectorConfig = context.getSubProperties(
199                        BasicConfigurationConstants.CONFIG_SOURCE_CHANNELSELECTOR_PREFIX);
200    
201                    final ChannelSelector selector = ChannelSelectorFactory.create(channels, selectorConfig);
202    
203                    final ChannelProcessor channelProcessor = new ChannelProcessor(selector);
204                    Configurables.configure(channelProcessor, context);
205    
206                    source.setChannelProcessor(channelProcessor);
207                    conf.getSourceRunners().put(src, SourceRunner.forSource(source));
208                }
209            }
210        }
211    
212        protected void loadSinks(final FlumeConfiguration.AgentConfiguration agentConf, final NodeConfiguration conf) {
213            final Set<String> sinkNames = agentConf.getSinkSet();
214            final Map<String, ComponentConfiguration> compMap = agentConf.getSinkConfigMap();
215            final Map<String, Sink> sinks = new HashMap<String, Sink>();
216            for (final String sinkName : sinkNames) {
217                final ComponentConfiguration comp = compMap.get(sinkName);
218                if (comp != null) {
219                    final SinkConfiguration config = (SinkConfiguration) comp;
220                    final Sink sink = sinkFactory.create(comp.getComponentName(), comp.getType());
221    
222                    Configurables.configure(sink, config);
223    
224                    sink.setChannel(conf.getChannels().get(config.getChannel()));
225                    sinks.put(comp.getComponentName(), sink);
226                }
227            }
228    
229            final Map<String, Context> sinkContexts = agentConf.getSinkContext();
230            for (final String sinkName : sinkNames) {
231                final Context context = sinkContexts.get(sinkName);
232                if (context != null) {
233                    final Sink sink = sinkFactory.create(sinkName,
234                        context.getString(BasicConfigurationConstants.CONFIG_TYPE));
235                    Configurables.configure(sink, context);
236    
237                    sink.setChannel(conf.getChannels().get(context.getString(BasicConfigurationConstants.CONFIG_CHANNEL)));
238                    sinks.put(sinkName, sink);
239                }
240            }
241    
242            loadSinkGroups(agentConf, sinks, conf);
243        }
244    
245        protected void loadSinkGroups(final FlumeConfiguration.AgentConfiguration agentConf,
246                                      final Map<String, Sink> sinks, final NodeConfiguration conf) {
247            final Set<String> sinkgroupNames = agentConf.getSinkgroupSet();
248            final Map<String, ComponentConfiguration> compMap = agentConf.getSinkGroupConfigMap();
249            final Map<String, String> usedSinks = new HashMap<String, String>();
250            for (final String groupName : sinkgroupNames) {
251                final ComponentConfiguration comp = compMap.get(groupName);
252                if (comp != null) {
253                    final SinkGroupConfiguration groupConf = (SinkGroupConfiguration) comp;
254                    final List<String> groupSinkList = groupConf.getSinks();
255                    final List<Sink> groupSinks = new ArrayList<Sink>();
256                    for (final String sink : groupSinkList) {
257                        final Sink s = sinks.remove(sink);
258                        if (s == null) {
259                            final String sinkUser = usedSinks.get(sink);
260                            if (sinkUser != null) {
261                                throw new ConfigurationException(String.format(
262                                    "Sink %s of group %s already in use by group %s", sink, groupName, sinkUser));
263                            } else {
264                                throw new ConfigurationException(String.format(
265                                    "Sink %s of group %s does not exist or is not properly configured", sink,
266                                    groupName));
267                            }
268                        }
269                        groupSinks.add(s);
270                        usedSinks.put(sink, groupName);
271                    }
272                    final SinkGroup group = new SinkGroup(groupSinks);
273                    Configurables.configure(group, groupConf);
274                    conf.getSinkRunners().put(comp.getComponentName(), new SinkRunner(group.getProcessor()));
275                }
276            }
277            // add any unassigned sinks to solo collectors
278            for (final Map.Entry<String, Sink> entry : sinks.entrySet()) {
279                if (!usedSinks.containsValue(entry.getKey())) {
280                    final SinkProcessor pr = new DefaultSinkProcessor();
281                    final List<Sink> sinkMap = new ArrayList<Sink>();
282                    sinkMap.add(entry.getValue());
283                    pr.setSinks(sinkMap);
284                    Configurables.configure(pr, new Context());
285                    conf.getSinkRunners().put(entry.getKey(), new SinkRunner(pr));
286                }
287            }
288        }
289    }