001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements. See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache license, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License. You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the license for the specific language governing permissions and
015     * limitations under the license.
016     */
017    package org.apache.logging.log4j.flume.appender;
018    
019    import org.apache.flume.Channel;
020    import org.apache.flume.ChannelFactory;
021    import org.apache.flume.ChannelSelector;
022    import org.apache.flume.Context;
023    import org.apache.flume.Sink;
024    import org.apache.flume.SinkFactory;
025    import org.apache.flume.SinkProcessor;
026    import org.apache.flume.SinkRunner;
027    import org.apache.flume.Source;
028    import org.apache.flume.SourceFactory;
029    import org.apache.flume.SourceRunner;
030    import org.apache.flume.channel.ChannelProcessor;
031    import org.apache.flume.channel.ChannelSelectorFactory;
032    import org.apache.flume.channel.DefaultChannelFactory;
033    import org.apache.flume.conf.BasicConfigurationConstants;
034    import org.apache.flume.conf.ComponentConfiguration;
035    import org.apache.flume.conf.Configurables;
036    import org.apache.flume.conf.FlumeConfiguration;
037    import org.apache.flume.conf.FlumeConfigurationError;
038    import org.apache.flume.conf.channel.ChannelSelectorConfiguration;
039    import org.apache.flume.conf.file.SimpleNodeConfiguration;
040    import org.apache.flume.conf.sink.SinkConfiguration;
041    import org.apache.flume.conf.sink.SinkGroupConfiguration;
042    import org.apache.flume.conf.source.SourceConfiguration;
043    import org.apache.flume.node.NodeConfiguration;
044    import org.apache.flume.node.nodemanager.NodeConfigurationAware;
045    import org.apache.flume.sink.DefaultSinkFactory;
046    import org.apache.flume.sink.DefaultSinkProcessor;
047    import org.apache.flume.sink.SinkGroup;
048    import org.apache.flume.source.DefaultSourceFactory;
049    import org.apache.logging.log4j.Logger;
050    import org.apache.logging.log4j.core.config.ConfigurationException;
051    import org.apache.logging.log4j.status.StatusLogger;
052    
053    import java.util.ArrayList;
054    import java.util.HashMap;
055    import java.util.List;
056    import java.util.Map;
057    import java.util.Properties;
058    import java.util.Set;
059    import java.util.TreeSet;
060    
061    /**
062     * See Flume's PropertiesFileConfigurationProvider. This class would extend that if it were possible.
063     */
064    
065    public class FlumeConfigurationBuilder {
066    
067        private static final Logger LOGGER = StatusLogger.getLogger();
068    
069        private final ChannelFactory channelFactory = new DefaultChannelFactory();
070        private final SourceFactory sourceFactory = new DefaultSourceFactory();
071        private final SinkFactory sinkFactory = new DefaultSinkFactory();
072    
073        public NodeConfiguration load(String name, Properties props, NodeConfigurationAware configurationAware) {
074            NodeConfiguration conf = new SimpleNodeConfiguration();
075            FlumeConfiguration fconfig;
076            try {
077                fconfig = new FlumeConfiguration(props);
078                List<FlumeConfigurationError> errors = fconfig.getConfigurationErrors();
079                if (errors.size() > 0) {
080                    boolean isError = false;
081                    for (FlumeConfigurationError error : errors) {
082                        StringBuilder sb = new StringBuilder();
083                        sb.append("Component: ").append(error.getComponentName()).append(" ");
084                        sb.append("Key: ").append(error.getKey()).append(" ");
085                        sb.append(error.getErrorType().name()).append(" - ").append(error.getErrorType().getError());
086                        switch (error.getErrorOrWarning()) {
087                            case ERROR:
088                                isError = true;
089                                LOGGER.error(sb.toString());
090                                break;
091                            case WARNING:
092                                LOGGER.warn(sb.toString());
093                                break;
094                        }
095                    }
096                    if (isError) {
097                        throw new ConfigurationException("Unable to configure Flume due to errors");
098                    }
099                }
100            } catch (RuntimeException ex) {
101                printProps(props);
102                throw ex;
103            }
104    
105            FlumeConfiguration.AgentConfiguration agentConf = fconfig.getConfigurationFor(name);
106    
107            if (agentConf != null) {
108    
109                loadChannels(agentConf, conf);
110                loadSources(agentConf, conf);
111                loadSinks(agentConf, conf);
112    
113                configurationAware.startAllComponents(conf);
114            } else {
115                LOGGER.warn("No configuration found for: {}", name);
116            }
117            return conf;
118        }
119    
120        private void printProps(Properties props) {
121            for (String key : new TreeSet<String>(props.stringPropertyNames())) {
122                LOGGER.error(key + "=" + props.getProperty(key));
123            }
124        }
125    
126        protected void loadChannels(FlumeConfiguration.AgentConfiguration agentConf, NodeConfiguration conf) {
127            LOGGER.info("Creating channels");
128            Set<String> channels = agentConf.getChannelSet();
129            Map<String, ComponentConfiguration> compMap = agentConf.getChannelConfigMap();
130            for (String chName : channels) {
131                ComponentConfiguration comp = compMap.get(chName);
132                if (comp != null) {
133                    Channel channel = channelFactory.create(comp.getComponentName(), comp.getType());
134    
135                    Configurables.configure(channel, comp);
136    
137                    conf.getChannels().put(comp.getComponentName(), channel);
138                }
139            }
140    
141            for (String ch : channels) {
142                Context context = agentConf.getChannelContext().get(ch);
143                if (context != null) {
144                    Channel channel = channelFactory.create(ch, context.getString(BasicConfigurationConstants.CONFIG_TYPE));
145                    Configurables.configure(channel, context);
146                    conf.getChannels().put(ch, channel);
147                    LOGGER.info("created channel " + ch);
148                }
149            }
150        }
151    
152        protected void loadSources(FlumeConfiguration.AgentConfiguration agentConf, NodeConfiguration conf) {
153    
154            Set<String> sources = agentConf.getSourceSet();
155            Map<String, ComponentConfiguration> compMap = agentConf.getSourceConfigMap();
156            for (String sourceName : sources) {
157                ComponentConfiguration comp = compMap.get(sourceName);
158                if (comp != null) {
159                    SourceConfiguration config = (SourceConfiguration) comp;
160    
161                    Source source = sourceFactory.create(comp.getComponentName(), comp.getType());
162    
163                    Configurables.configure(source, config);
164                    Set<String> channelNames = config.getChannels();
165                    List<Channel> channels = new ArrayList<Channel>();
166                    for (String chName : channelNames) {
167                        channels.add(conf.getChannels().get(chName));
168                    }
169    
170                    ChannelSelectorConfiguration selectorConfig = config.getSelectorConfiguration();
171    
172                    ChannelSelector selector = ChannelSelectorFactory.create(channels, selectorConfig);
173    
174                    ChannelProcessor channelProcessor = new ChannelProcessor(selector);
175                    Configurables.configure(channelProcessor, config);
176    
177                    source.setChannelProcessor(channelProcessor);
178                    conf.getSourceRunners().put(comp.getComponentName(), SourceRunner.forSource(source));
179                }
180            }
181            Map<String, Context> sourceContexts = agentConf.getSourceContext();
182    
183            for (String src : sources) {
184                Context context = sourceContexts.get(src);
185                if (context != null){
186                    Source source = sourceFactory.create(src, context.getString(BasicConfigurationConstants.CONFIG_TYPE));
187                    List<Channel> channels = new ArrayList<Channel>();
188                    Configurables.configure(source, context);
189                    String[] channelNames = context.getString(BasicConfigurationConstants.CONFIG_CHANNELS).split("\\s+");
190                    for (String chName : channelNames) {
191                        channels.add(conf.getChannels().get(chName));
192                    }
193    
194                    Map<String, String> selectorConfig = context.getSubProperties(
195                        BasicConfigurationConstants.CONFIG_SOURCE_CHANNELSELECTOR_PREFIX);
196    
197                    ChannelSelector selector = ChannelSelectorFactory.create(channels, selectorConfig);
198    
199                    ChannelProcessor channelProcessor = new ChannelProcessor(selector);
200                    Configurables.configure(channelProcessor, context);
201    
202                    source.setChannelProcessor(channelProcessor);
203                    conf.getSourceRunners().put(src, SourceRunner.forSource(source));
204                }
205            }
206        }
207    
208        protected void loadSinks(FlumeConfiguration.AgentConfiguration agentConf, NodeConfiguration conf) {
209            Set<String> sinkNames = agentConf.getSinkSet();
210            Map<String, ComponentConfiguration> compMap = agentConf.getSinkConfigMap();
211            Map<String, Sink> sinks = new HashMap<String, Sink>();
212            for (String sinkName : sinkNames) {
213                ComponentConfiguration comp = compMap.get(sinkName);
214                if (comp != null) {
215                    SinkConfiguration config = (SinkConfiguration) comp;
216                    Sink sink = sinkFactory.create(comp.getComponentName(), comp.getType());
217    
218                    Configurables.configure(sink, config);
219    
220                    sink.setChannel(conf.getChannels().get(config.getChannel()));
221                    sinks.put(comp.getComponentName(), sink);
222                }
223            }
224    
225            Map<String, Context> sinkContexts = agentConf.getSinkContext();
226            for (String sinkName : sinkNames) {
227                Context context = sinkContexts.get(sinkName);
228                if (context != null) {
229                    Sink sink = sinkFactory.create(sinkName, context.getString(BasicConfigurationConstants.CONFIG_TYPE));
230                    Configurables.configure(sink, context);
231    
232                    sink.setChannel(conf.getChannels().get(context.getString(BasicConfigurationConstants.CONFIG_CHANNEL)));
233                    sinks.put(sinkName, sink);
234                }
235            }
236    
237            loadSinkGroups(agentConf, sinks, conf);
238        }
239    
240        protected void loadSinkGroups(FlumeConfiguration.AgentConfiguration agentConf,
241                                      Map<String, Sink> sinks, NodeConfiguration conf) {
242            Set<String> sinkgroupNames = agentConf.getSinkgroupSet();
243            Map<String, ComponentConfiguration> compMap = agentConf.getSinkGroupConfigMap();
244            Map<String, String> usedSinks = new HashMap<String, String>();
245            for (String groupName : sinkgroupNames) {
246                ComponentConfiguration comp = compMap.get(groupName);
247                if (comp != null) {
248                    SinkGroupConfiguration groupConf = (SinkGroupConfiguration) comp;
249                    List<String> groupSinkList = groupConf.getSinks();
250                    List<Sink> groupSinks = new ArrayList<Sink>();
251                    for (String sink : groupSinkList) {
252                        Sink s = sinks.remove(sink);
253                        if (s == null) {
254                            String sinkUser = usedSinks.get(sink);
255                            if (sinkUser != null) {
256                                throw new ConfigurationException(String.format(
257                                    "Sink %s of group %s already in use by group %s", sink, groupName, sinkUser));
258                            } else {
259                                throw new ConfigurationException(String.format(
260                                    "Sink %s of group %s does not exist or is not properly configured", sink,
261                                    groupName));
262                            }
263                        }
264                        groupSinks.add(s);
265                        usedSinks.put(sink, groupName);
266                    }
267                    SinkGroup group = new SinkGroup(groupSinks);
268                    Configurables.configure(group, groupConf);
269                    conf.getSinkRunners().put(comp.getComponentName(), new SinkRunner(group.getProcessor()));
270                }
271            }
272            // add any unasigned sinks to solo collectors
273            for (Map.Entry<String, Sink> entry : sinks.entrySet()) {
274                if (!usedSinks.containsValue(entry.getKey())) {
275                    SinkProcessor pr = new DefaultSinkProcessor();
276                    List<Sink> sinkMap = new ArrayList<Sink>();
277                    sinkMap.add(entry.getValue());
278                    pr.setSinks(sinkMap);
279                    Configurables.configure(pr, new Context());
280                    conf.getSinkRunners().put(entry.getKey(), new SinkRunner(pr));
281                }
282            }
283        }
284    }