001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017 package org.apache.logging.log4j.flume.appender; 018 019 import org.apache.flume.Channel; 020 import org.apache.flume.ChannelFactory; 021 import org.apache.flume.ChannelSelector; 022 import org.apache.flume.Context; 023 import org.apache.flume.Sink; 024 import org.apache.flume.SinkFactory; 025 import org.apache.flume.SinkProcessor; 026 import org.apache.flume.SinkRunner; 027 import org.apache.flume.Source; 028 import org.apache.flume.SourceFactory; 029 import org.apache.flume.SourceRunner; 030 import org.apache.flume.channel.ChannelProcessor; 031 import org.apache.flume.channel.ChannelSelectorFactory; 032 import org.apache.flume.channel.DefaultChannelFactory; 033 import org.apache.flume.conf.BasicConfigurationConstants; 034 import org.apache.flume.conf.ComponentConfiguration; 035 import org.apache.flume.conf.Configurables; 036 import org.apache.flume.conf.FlumeConfiguration; 037 import org.apache.flume.conf.FlumeConfigurationError; 038 import org.apache.flume.conf.channel.ChannelSelectorConfiguration; 039 import org.apache.flume.conf.file.SimpleNodeConfiguration; 040 import org.apache.flume.conf.sink.SinkConfiguration; 041 import org.apache.flume.conf.sink.SinkGroupConfiguration; 042 import org.apache.flume.conf.source.SourceConfiguration; 043 import org.apache.flume.node.NodeConfiguration; 044 import org.apache.flume.node.nodemanager.NodeConfigurationAware; 045 import org.apache.flume.sink.DefaultSinkFactory; 046 import org.apache.flume.sink.DefaultSinkProcessor; 047 import org.apache.flume.sink.SinkGroup; 048 import org.apache.flume.source.DefaultSourceFactory; 049 import org.apache.logging.log4j.Logger; 050 import org.apache.logging.log4j.core.config.ConfigurationException; 051 import org.apache.logging.log4j.status.StatusLogger; 052 053 import java.util.ArrayList; 054 import java.util.HashMap; 055 import java.util.List; 056 import java.util.Map; 057 import java.util.Properties; 058 import java.util.Set; 059 import java.util.TreeSet; 060 061 /** 062 * See Flume's PropertiesFileConfigurationProvider. This class would extend that if it were possible. 063 */ 064 065 public class FlumeConfigurationBuilder { 066 067 private static final Logger LOGGER = StatusLogger.getLogger(); 068 069 private final ChannelFactory channelFactory = new DefaultChannelFactory(); 070 private final SourceFactory sourceFactory = new DefaultSourceFactory(); 071 private final SinkFactory sinkFactory = new DefaultSinkFactory(); 072 073 public NodeConfiguration load(final String name, final Properties props, 074 final NodeConfigurationAware configurationAware) { 075 final NodeConfiguration conf = new SimpleNodeConfiguration(); 076 FlumeConfiguration fconfig; 077 try { 078 fconfig = new FlumeConfiguration(props); 079 final List<FlumeConfigurationError> errors = fconfig.getConfigurationErrors(); 080 if (errors.size() > 0) { 081 boolean isError = false; 082 for (final FlumeConfigurationError error : errors) { 083 final StringBuilder sb = new StringBuilder(); 084 sb.append("Component: ").append(error.getComponentName()).append(" "); 085 sb.append("Key: ").append(error.getKey()).append(" "); 086 sb.append(error.getErrorType().name()).append(" - ").append(error.getErrorType().getError()); 087 switch (error.getErrorOrWarning()) { 088 case ERROR: 089 isError = true; 090 LOGGER.error(sb.toString()); 091 break; 092 case WARNING: 093 LOGGER.warn(sb.toString()); 094 break; 095 } 096 } 097 if (isError) { 098 throw new ConfigurationException("Unable to configure Flume due to errors"); 099 } 100 } 101 } catch (final RuntimeException ex) { 102 printProps(props); 103 throw ex; 104 } 105 106 final FlumeConfiguration.AgentConfiguration agentConf = fconfig.getConfigurationFor(name); 107 108 if (agentConf != null) { 109 110 loadChannels(agentConf, conf); 111 loadSources(agentConf, conf); 112 loadSinks(agentConf, conf); 113 114 //configurationAware.startAllComponents(conf); 115 } else { 116 LOGGER.warn("No configuration found for: {}", name); 117 } 118 return conf; 119 } 120 121 private void printProps(final Properties props) { 122 for (final String key : new TreeSet<String>(props.stringPropertyNames())) { 123 LOGGER.error(key + "=" + props.getProperty(key)); 124 } 125 } 126 127 protected void loadChannels(final FlumeConfiguration.AgentConfiguration agentConf, final NodeConfiguration conf) { 128 LOGGER.info("Creating channels"); 129 final Set<String> channels = agentConf.getChannelSet(); 130 final Map<String, ComponentConfiguration> compMap = agentConf.getChannelConfigMap(); 131 for (final String chName : channels) { 132 final ComponentConfiguration comp = compMap.get(chName); 133 if (comp != null) { 134 final Channel channel = channelFactory.create(comp.getComponentName(), comp.getType()); 135 136 Configurables.configure(channel, comp); 137 138 conf.getChannels().put(comp.getComponentName(), channel); 139 } 140 } 141 142 for (final String ch : channels) { 143 final Context context = agentConf.getChannelContext().get(ch); 144 if (context != null) { 145 final Channel channel = channelFactory.create(ch, 146 context.getString(BasicConfigurationConstants.CONFIG_TYPE)); 147 Configurables.configure(channel, context); 148 conf.getChannels().put(ch, channel); 149 LOGGER.info("created channel " + ch); 150 } 151 } 152 } 153 154 protected void loadSources(final FlumeConfiguration.AgentConfiguration agentConf, final NodeConfiguration conf) { 155 156 final Set<String> sources = agentConf.getSourceSet(); 157 final Map<String, ComponentConfiguration> compMap = agentConf.getSourceConfigMap(); 158 for (final String sourceName : sources) { 159 final ComponentConfiguration comp = compMap.get(sourceName); 160 if (comp != null) { 161 final SourceConfiguration config = (SourceConfiguration) comp; 162 163 final Source source = sourceFactory.create(comp.getComponentName(), comp.getType()); 164 165 Configurables.configure(source, config); 166 final Set<String> channelNames = config.getChannels(); 167 final List<Channel> channels = new ArrayList<Channel>(); 168 for (final String chName : channelNames) { 169 channels.add(conf.getChannels().get(chName)); 170 } 171 172 final ChannelSelectorConfiguration selectorConfig = config.getSelectorConfiguration(); 173 174 final ChannelSelector selector = ChannelSelectorFactory.create(channels, selectorConfig); 175 176 final ChannelProcessor channelProcessor = new ChannelProcessor(selector); 177 Configurables.configure(channelProcessor, config); 178 179 source.setChannelProcessor(channelProcessor); 180 conf.getSourceRunners().put(comp.getComponentName(), SourceRunner.forSource(source)); 181 } 182 } 183 final Map<String, Context> sourceContexts = agentConf.getSourceContext(); 184 185 for (final String src : sources) { 186 final Context context = sourceContexts.get(src); 187 if (context != null) { 188 final Source source = sourceFactory.create(src, 189 context.getString(BasicConfigurationConstants.CONFIG_TYPE)); 190 final List<Channel> channels = new ArrayList<Channel>(); 191 Configurables.configure(source, context); 192 final String[] channelNames = 193 context.getString(BasicConfigurationConstants.CONFIG_CHANNELS).split("\\s+"); 194 for (final String chName : channelNames) { 195 channels.add(conf.getChannels().get(chName)); 196 } 197 198 final Map<String, String> selectorConfig = context.getSubProperties( 199 BasicConfigurationConstants.CONFIG_SOURCE_CHANNELSELECTOR_PREFIX); 200 201 final ChannelSelector selector = ChannelSelectorFactory.create(channels, selectorConfig); 202 203 final ChannelProcessor channelProcessor = new ChannelProcessor(selector); 204 Configurables.configure(channelProcessor, context); 205 206 source.setChannelProcessor(channelProcessor); 207 conf.getSourceRunners().put(src, SourceRunner.forSource(source)); 208 } 209 } 210 } 211 212 protected void loadSinks(final FlumeConfiguration.AgentConfiguration agentConf, final NodeConfiguration conf) { 213 final Set<String> sinkNames = agentConf.getSinkSet(); 214 final Map<String, ComponentConfiguration> compMap = agentConf.getSinkConfigMap(); 215 final Map<String, Sink> sinks = new HashMap<String, Sink>(); 216 for (final String sinkName : sinkNames) { 217 final ComponentConfiguration comp = compMap.get(sinkName); 218 if (comp != null) { 219 final SinkConfiguration config = (SinkConfiguration) comp; 220 final Sink sink = sinkFactory.create(comp.getComponentName(), comp.getType()); 221 222 Configurables.configure(sink, config); 223 224 sink.setChannel(conf.getChannels().get(config.getChannel())); 225 sinks.put(comp.getComponentName(), sink); 226 } 227 } 228 229 final Map<String, Context> sinkContexts = agentConf.getSinkContext(); 230 for (final String sinkName : sinkNames) { 231 final Context context = sinkContexts.get(sinkName); 232 if (context != null) { 233 final Sink sink = sinkFactory.create(sinkName, 234 context.getString(BasicConfigurationConstants.CONFIG_TYPE)); 235 Configurables.configure(sink, context); 236 237 sink.setChannel(conf.getChannels().get(context.getString(BasicConfigurationConstants.CONFIG_CHANNEL))); 238 sinks.put(sinkName, sink); 239 } 240 } 241 242 loadSinkGroups(agentConf, sinks, conf); 243 } 244 245 protected void loadSinkGroups(final FlumeConfiguration.AgentConfiguration agentConf, 246 final Map<String, Sink> sinks, final NodeConfiguration conf) { 247 final Set<String> sinkgroupNames = agentConf.getSinkgroupSet(); 248 final Map<String, ComponentConfiguration> compMap = agentConf.getSinkGroupConfigMap(); 249 final Map<String, String> usedSinks = new HashMap<String, String>(); 250 for (final String groupName : sinkgroupNames) { 251 final ComponentConfiguration comp = compMap.get(groupName); 252 if (comp != null) { 253 final SinkGroupConfiguration groupConf = (SinkGroupConfiguration) comp; 254 final List<String> groupSinkList = groupConf.getSinks(); 255 final List<Sink> groupSinks = new ArrayList<Sink>(); 256 for (final String sink : groupSinkList) { 257 final Sink s = sinks.remove(sink); 258 if (s == null) { 259 final String sinkUser = usedSinks.get(sink); 260 if (sinkUser != null) { 261 throw new ConfigurationException(String.format( 262 "Sink %s of group %s already in use by group %s", sink, groupName, sinkUser)); 263 } else { 264 throw new ConfigurationException(String.format( 265 "Sink %s of group %s does not exist or is not properly configured", sink, 266 groupName)); 267 } 268 } 269 groupSinks.add(s); 270 usedSinks.put(sink, groupName); 271 } 272 final SinkGroup group = new SinkGroup(groupSinks); 273 Configurables.configure(group, groupConf); 274 conf.getSinkRunners().put(comp.getComponentName(), new SinkRunner(group.getProcessor())); 275 } 276 } 277 // add any unassigned sinks to solo collectors 278 for (final Map.Entry<String, Sink> entry : sinks.entrySet()) { 279 if (!usedSinks.containsValue(entry.getKey())) { 280 final SinkProcessor pr = new DefaultSinkProcessor(); 281 final List<Sink> sinkMap = new ArrayList<Sink>(); 282 sinkMap.add(entry.getValue()); 283 pr.setSinks(sinkMap); 284 Configurables.configure(pr, new Context()); 285 conf.getSinkRunners().put(entry.getKey(), new SinkRunner(pr)); 286 } 287 } 288 } 289 }