001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017 package org.apache.logging.log4j.flume.appender; 018 019 import org.apache.flume.Channel; 020 import org.apache.flume.ChannelFactory; 021 import org.apache.flume.ChannelSelector; 022 import org.apache.flume.Context; 023 import org.apache.flume.Sink; 024 import org.apache.flume.SinkFactory; 025 import org.apache.flume.SinkProcessor; 026 import org.apache.flume.SinkRunner; 027 import org.apache.flume.Source; 028 import org.apache.flume.SourceFactory; 029 import org.apache.flume.SourceRunner; 030 import org.apache.flume.channel.ChannelProcessor; 031 import org.apache.flume.channel.ChannelSelectorFactory; 032 import org.apache.flume.channel.DefaultChannelFactory; 033 import org.apache.flume.conf.BasicConfigurationConstants; 034 import org.apache.flume.conf.ComponentConfiguration; 035 import org.apache.flume.conf.Configurables; 036 import org.apache.flume.conf.FlumeConfiguration; 037 import org.apache.flume.conf.FlumeConfigurationError; 038 import org.apache.flume.conf.channel.ChannelSelectorConfiguration; 039 import org.apache.flume.conf.file.SimpleNodeConfiguration; 040 import org.apache.flume.conf.sink.SinkConfiguration; 041 import org.apache.flume.conf.sink.SinkGroupConfiguration; 042 import org.apache.flume.conf.source.SourceConfiguration; 043 import org.apache.flume.node.NodeConfiguration; 044 import org.apache.flume.node.nodemanager.NodeConfigurationAware; 045 import org.apache.flume.sink.DefaultSinkFactory; 046 import org.apache.flume.sink.DefaultSinkProcessor; 047 import org.apache.flume.sink.SinkGroup; 048 import org.apache.flume.source.DefaultSourceFactory; 049 import org.apache.logging.log4j.Logger; 050 import org.apache.logging.log4j.core.config.ConfigurationException; 051 import org.apache.logging.log4j.status.StatusLogger; 052 053 import java.util.ArrayList; 054 import java.util.HashMap; 055 import java.util.List; 056 import java.util.Map; 057 import java.util.Properties; 058 import java.util.Set; 059 import java.util.TreeSet; 060 061 /** 062 * See Flume's PropertiesFileConfigurationProvider. This class would extend that if it were possible. 063 */ 064 065 public class FlumeConfigurationBuilder { 066 067 private static final Logger LOGGER = StatusLogger.getLogger(); 068 069 private final ChannelFactory channelFactory = new DefaultChannelFactory(); 070 private final SourceFactory sourceFactory = new DefaultSourceFactory(); 071 private final SinkFactory sinkFactory = new DefaultSinkFactory(); 072 073 public NodeConfiguration load(String name, Properties props, NodeConfigurationAware configurationAware) { 074 NodeConfiguration conf = new SimpleNodeConfiguration(); 075 FlumeConfiguration fconfig; 076 try { 077 fconfig = new FlumeConfiguration(props); 078 List<FlumeConfigurationError> errors = fconfig.getConfigurationErrors(); 079 if (errors.size() > 0) { 080 boolean isError = false; 081 for (FlumeConfigurationError error : errors) { 082 StringBuilder sb = new StringBuilder(); 083 sb.append("Component: ").append(error.getComponentName()).append(" "); 084 sb.append("Key: ").append(error.getKey()).append(" "); 085 sb.append(error.getErrorType().name()).append(" - ").append(error.getErrorType().getError()); 086 switch (error.getErrorOrWarning()) { 087 case ERROR: 088 isError = true; 089 LOGGER.error(sb.toString()); 090 break; 091 case WARNING: 092 LOGGER.warn(sb.toString()); 093 break; 094 } 095 } 096 if (isError) { 097 throw new ConfigurationException("Unable to configure Flume due to errors"); 098 } 099 } 100 } catch (RuntimeException ex) { 101 printProps(props); 102 throw ex; 103 } 104 105 FlumeConfiguration.AgentConfiguration agentConf = fconfig.getConfigurationFor(name); 106 107 if (agentConf != null) { 108 109 loadChannels(agentConf, conf); 110 loadSources(agentConf, conf); 111 loadSinks(agentConf, conf); 112 113 configurationAware.startAllComponents(conf); 114 } else { 115 LOGGER.warn("No configuration found for: {}", name); 116 } 117 return conf; 118 } 119 120 private void printProps(Properties props) { 121 for (String key : new TreeSet<String>(props.stringPropertyNames())) { 122 LOGGER.error(key + "=" + props.getProperty(key)); 123 } 124 } 125 126 protected void loadChannels(FlumeConfiguration.AgentConfiguration agentConf, NodeConfiguration conf) { 127 LOGGER.info("Creating channels"); 128 Set<String> channels = agentConf.getChannelSet(); 129 Map<String, ComponentConfiguration> compMap = agentConf.getChannelConfigMap(); 130 for (String chName : channels) { 131 ComponentConfiguration comp = compMap.get(chName); 132 if (comp != null) { 133 Channel channel = channelFactory.create(comp.getComponentName(), comp.getType()); 134 135 Configurables.configure(channel, comp); 136 137 conf.getChannels().put(comp.getComponentName(), channel); 138 } 139 } 140 141 for (String ch : channels) { 142 Context context = agentConf.getChannelContext().get(ch); 143 if (context != null) { 144 Channel channel = channelFactory.create(ch, context.getString(BasicConfigurationConstants.CONFIG_TYPE)); 145 Configurables.configure(channel, context); 146 conf.getChannels().put(ch, channel); 147 LOGGER.info("created channel " + ch); 148 } 149 } 150 } 151 152 protected void loadSources(FlumeConfiguration.AgentConfiguration agentConf, NodeConfiguration conf) { 153 154 Set<String> sources = agentConf.getSourceSet(); 155 Map<String, ComponentConfiguration> compMap = agentConf.getSourceConfigMap(); 156 for (String sourceName : sources) { 157 ComponentConfiguration comp = compMap.get(sourceName); 158 if (comp != null) { 159 SourceConfiguration config = (SourceConfiguration) comp; 160 161 Source source = sourceFactory.create(comp.getComponentName(), comp.getType()); 162 163 Configurables.configure(source, config); 164 Set<String> channelNames = config.getChannels(); 165 List<Channel> channels = new ArrayList<Channel>(); 166 for (String chName : channelNames) { 167 channels.add(conf.getChannels().get(chName)); 168 } 169 170 ChannelSelectorConfiguration selectorConfig = config.getSelectorConfiguration(); 171 172 ChannelSelector selector = ChannelSelectorFactory.create(channels, selectorConfig); 173 174 ChannelProcessor channelProcessor = new ChannelProcessor(selector); 175 Configurables.configure(channelProcessor, config); 176 177 source.setChannelProcessor(channelProcessor); 178 conf.getSourceRunners().put(comp.getComponentName(), SourceRunner.forSource(source)); 179 } 180 } 181 Map<String, Context> sourceContexts = agentConf.getSourceContext(); 182 183 for (String src : sources) { 184 Context context = sourceContexts.get(src); 185 if (context != null){ 186 Source source = sourceFactory.create(src, context.getString(BasicConfigurationConstants.CONFIG_TYPE)); 187 List<Channel> channels = new ArrayList<Channel>(); 188 Configurables.configure(source, context); 189 String[] channelNames = context.getString(BasicConfigurationConstants.CONFIG_CHANNELS).split("\\s+"); 190 for (String chName : channelNames) { 191 channels.add(conf.getChannels().get(chName)); 192 } 193 194 Map<String, String> selectorConfig = context.getSubProperties( 195 BasicConfigurationConstants.CONFIG_SOURCE_CHANNELSELECTOR_PREFIX); 196 197 ChannelSelector selector = ChannelSelectorFactory.create(channels, selectorConfig); 198 199 ChannelProcessor channelProcessor = new ChannelProcessor(selector); 200 Configurables.configure(channelProcessor, context); 201 202 source.setChannelProcessor(channelProcessor); 203 conf.getSourceRunners().put(src, SourceRunner.forSource(source)); 204 } 205 } 206 } 207 208 protected void loadSinks(FlumeConfiguration.AgentConfiguration agentConf, NodeConfiguration conf) { 209 Set<String> sinkNames = agentConf.getSinkSet(); 210 Map<String, ComponentConfiguration> compMap = agentConf.getSinkConfigMap(); 211 Map<String, Sink> sinks = new HashMap<String, Sink>(); 212 for (String sinkName : sinkNames) { 213 ComponentConfiguration comp = compMap.get(sinkName); 214 if (comp != null) { 215 SinkConfiguration config = (SinkConfiguration) comp; 216 Sink sink = sinkFactory.create(comp.getComponentName(), comp.getType()); 217 218 Configurables.configure(sink, config); 219 220 sink.setChannel(conf.getChannels().get(config.getChannel())); 221 sinks.put(comp.getComponentName(), sink); 222 } 223 } 224 225 Map<String, Context> sinkContexts = agentConf.getSinkContext(); 226 for (String sinkName : sinkNames) { 227 Context context = sinkContexts.get(sinkName); 228 if (context != null) { 229 Sink sink = sinkFactory.create(sinkName, context.getString(BasicConfigurationConstants.CONFIG_TYPE)); 230 Configurables.configure(sink, context); 231 232 sink.setChannel(conf.getChannels().get(context.getString(BasicConfigurationConstants.CONFIG_CHANNEL))); 233 sinks.put(sinkName, sink); 234 } 235 } 236 237 loadSinkGroups(agentConf, sinks, conf); 238 } 239 240 protected void loadSinkGroups(FlumeConfiguration.AgentConfiguration agentConf, 241 Map<String, Sink> sinks, NodeConfiguration conf) { 242 Set<String> sinkgroupNames = agentConf.getSinkgroupSet(); 243 Map<String, ComponentConfiguration> compMap = agentConf.getSinkGroupConfigMap(); 244 Map<String, String> usedSinks = new HashMap<String, String>(); 245 for (String groupName : sinkgroupNames) { 246 ComponentConfiguration comp = compMap.get(groupName); 247 if (comp != null) { 248 SinkGroupConfiguration groupConf = (SinkGroupConfiguration) comp; 249 List<String> groupSinkList = groupConf.getSinks(); 250 List<Sink> groupSinks = new ArrayList<Sink>(); 251 for (String sink : groupSinkList) { 252 Sink s = sinks.remove(sink); 253 if (s == null) { 254 String sinkUser = usedSinks.get(sink); 255 if (sinkUser != null) { 256 throw new ConfigurationException(String.format( 257 "Sink %s of group %s already in use by group %s", sink, groupName, sinkUser)); 258 } else { 259 throw new ConfigurationException(String.format( 260 "Sink %s of group %s does not exist or is not properly configured", sink, 261 groupName)); 262 } 263 } 264 groupSinks.add(s); 265 usedSinks.put(sink, groupName); 266 } 267 SinkGroup group = new SinkGroup(groupSinks); 268 Configurables.configure(group, groupConf); 269 conf.getSinkRunners().put(comp.getComponentName(), new SinkRunner(group.getProcessor())); 270 } 271 } 272 // add any unasigned sinks to solo collectors 273 for (Map.Entry<String, Sink> entry : sinks.entrySet()) { 274 if (!usedSinks.containsValue(entry.getKey())) { 275 SinkProcessor pr = new DefaultSinkProcessor(); 276 List<Sink> sinkMap = new ArrayList<Sink>(); 277 sinkMap.add(entry.getValue()); 278 pr.setSinks(sinkMap); 279 Configurables.configure(pr, new Context()); 280 conf.getSinkRunners().put(entry.getKey(), new SinkRunner(pr)); 281 } 282 } 283 } 284 }