001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017 package org.apache.logging.log4j.flume.appender; 018 019 import org.apache.flume.Event; 020 import org.apache.flume.SourceRunner; 021 import org.apache.flume.node.NodeConfiguration; 022 import org.apache.flume.node.nodemanager.DefaultLogicalNodeManager; 023 import org.apache.logging.log4j.core.appender.ManagerFactory; 024 import org.apache.logging.log4j.core.config.ConfigurationException; 025 import org.apache.logging.log4j.core.config.Property; 026 import org.apache.logging.log4j.core.helpers.NameUtil; 027 import org.apache.logging.log4j.util.PropertiesUtil; 028 029 import java.util.Locale; 030 import java.util.Properties; 031 032 /** 033 * 034 */ 035 public class FlumeEmbeddedManager extends AbstractFlumeManager { 036 037 /** Name for the Flume source */ 038 protected static final String SOURCE_NAME = "log4j-source"; 039 040 private static FlumeManagerFactory factory = new FlumeManagerFactory(); 041 042 private static final String FiLE_SEP = PropertiesUtil.getProperties().getStringProperty("file.separator"); 043 044 private static final String IN_MEMORY = "InMemory"; 045 046 private final FlumeNode node; 047 048 private NodeConfiguration conf; 049 050 private final Log4jEventSource source; 051 052 private final String shortName; 053 054 055 /** 056 * Constructor 057 * @param name The unique name of this manager. 058 * @param node The Flume Node. 059 */ 060 protected FlumeEmbeddedManager(final String name, final String shortName, final FlumeNode node) { 061 super(name); 062 this.node = node; 063 this.shortName = shortName; 064 final SourceRunner runner = node.getConfiguration().getSourceRunners().get(SOURCE_NAME); 065 if (runner == null || runner.getSource() == null) { 066 throw new IllegalStateException("No Source has been created for Appender " + shortName); 067 } 068 source = (Log4jEventSource) runner.getSource(); 069 } 070 071 /** 072 * Returns a FlumeEmbeddedManager. 073 * @param name The name of the manager. 074 * @param agents The agents to use. 075 * @param properties Properties for the embedded manager. 076 * @param batchSize The number of events to include in a batch. 077 * @param dataDir The directory where the Flume FileChannel should write to. 078 * @return A FlumeAvroManager. 079 */ 080 public static FlumeEmbeddedManager getManager(final String name, final Agent[] agents, final Property[] properties, 081 int batchSize, final String dataDir) { 082 083 if (batchSize <= 0) { 084 batchSize = 1; 085 } 086 087 if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) { 088 throw new IllegalArgumentException("Either an Agent or properties are required"); 089 } else if (agents != null && agents.length > 0 && properties != null && properties.length > 0) { 090 throw new IllegalArgumentException("Cannot configure both Agents and Properties."); 091 } 092 093 final StringBuilder sb = new StringBuilder(); 094 boolean first = true; 095 096 if (agents != null && agents.length > 0) { 097 sb.append("FlumeEmbedded["); 098 for (final Agent agent : agents) { 099 if (!first) { 100 sb.append(","); 101 } 102 sb.append(agent.getHost()).append(":").append(agent.getPort()); 103 first = false; 104 } 105 sb.append("]"); 106 } else { 107 String sep = ""; 108 sb.append(name).append(":"); 109 final StringBuilder props = new StringBuilder(); 110 for (final Property prop : properties) { 111 props.append(sep); 112 props.append(prop.getName()).append("=").append(prop.getValue()); 113 sep = ","; 114 } 115 sb.append(NameUtil.md5(props.toString())); 116 } 117 return getManager(sb.toString(), factory, 118 new FactoryData(name, agents, properties, batchSize, dataDir)); 119 } 120 121 @Override 122 public void send(final Event event) { 123 source.send(event); 124 } 125 126 @Override 127 protected void releaseSub() { 128 node.stop(); 129 } 130 131 /** 132 * Factory data. 133 */ 134 private static class FactoryData { 135 private final Agent[] agents; 136 private final Property[] properties; 137 private final int batchSize; 138 private final String dataDir; 139 private final String name; 140 141 /** 142 * Constructor. 143 * @param name The name of the Appender. 144 * @param agents The agents. 145 * @param properties The Flume configuration properties. 146 * @param batchSize The number of events to include in a batch. 147 * @param dataDir The directory where Flume should write to. 148 */ 149 public FactoryData(final String name, final Agent[] agents, final Property[] properties, final int batchSize, 150 final String dataDir) { 151 this.name = name; 152 this.agents = agents; 153 this.batchSize = batchSize; 154 this.properties = properties; 155 this.dataDir = dataDir; 156 } 157 } 158 159 /** 160 * Avro Manager Factory. 161 */ 162 private static class FlumeManagerFactory implements ManagerFactory<FlumeEmbeddedManager, FactoryData> { 163 private static final String SOURCE_TYPE = Log4jEventSource.class.getName(); 164 165 /** 166 * Create the FlumeAvroManager. 167 * @param name The name of the entity to manage. 168 * @param data The data required to create the entity. 169 * @return The FlumeAvroManager. 170 */ 171 @Override 172 public FlumeEmbeddedManager createManager(final String name, final FactoryData data) { 173 try { 174 final DefaultLogicalNodeManager nodeManager = new DefaultLogicalNodeManager(); 175 final Properties props = createProperties(data.name, data.agents, data.properties, data.batchSize, 176 data.dataDir); 177 final FlumeConfigurationBuilder builder = new FlumeConfigurationBuilder(); 178 final NodeConfiguration conf = builder.load(data.name, props, nodeManager); 179 180 final FlumeNode node = new FlumeNode(nodeManager, nodeManager, conf); 181 182 node.start(); 183 184 return new FlumeEmbeddedManager(name, data.name, node); 185 } catch (final Exception ex) { 186 LOGGER.error("Could not create FlumeEmbeddedManager", ex); 187 } 188 return null; 189 } 190 191 private Properties createProperties(final String name, final Agent[] agents, final Property[] properties, 192 final int batchSize, String dataDir) { 193 final Properties props = new Properties(); 194 195 if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) { 196 LOGGER.error("No Flume configuration provided"); 197 throw new ConfigurationException("No Flume configuration provided"); 198 } 199 200 if ((agents != null && agents.length > 0 && properties != null && properties.length > 0)) { 201 LOGGER.error("Agents and Flume configuration cannot both be specified"); 202 throw new ConfigurationException("Agents and Flume configuration cannot both be specified"); 203 } 204 205 if (agents != null && agents.length > 0) { 206 props.put(name + ".sources", FlumeEmbeddedManager.SOURCE_NAME); 207 props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".type", SOURCE_TYPE); 208 209 if (dataDir != null && dataDir.length() > 0) { 210 if (dataDir.equals(IN_MEMORY)) { 211 props.put(name + ".channels", "primary"); 212 props.put(name + ".channels.primary.type", "memory"); 213 } else { 214 props.put(name + ".channels", "primary"); 215 props.put(name + ".channels.primary.type", "file"); 216 217 if (!dataDir.endsWith(FiLE_SEP)) { 218 dataDir = dataDir + FiLE_SEP; 219 } 220 221 props.put(name + ".channels.primary.checkpointDir", dataDir + "checkpoint"); 222 props.put(name + ".channels.primary.dataDirs", dataDir + "data"); 223 } 224 225 } else { 226 props.put(name + ".channels", "primary"); 227 props.put(name + ".channels.primary.type", "file"); 228 } 229 230 final StringBuilder sb = new StringBuilder(); 231 String leading = ""; 232 int priority = agents.length; 233 for (int i = 0; i < agents.length; ++i) { 234 sb.append(leading).append("agent").append(i); 235 leading = " "; 236 final String prefix = name + ".sinks.agent" + i; 237 props.put(prefix + ".channel", "primary"); 238 props.put(prefix + ".type", "avro"); 239 props.put(prefix + ".hostname", agents[i].getHost()); 240 props.put(prefix + ".port", Integer.toString(agents[i].getPort())); 241 props.put(prefix + ".batch-size", Integer.toString(batchSize)); 242 props.put(name + ".sinkgroups.group1.processor.priority.agent" + i, Integer.toString(priority)); 243 --priority; 244 } 245 props.put(name + ".sinks", sb.toString()); 246 props.put(name + ".sinkgroups", "group1"); 247 props.put(name + ".sinkgroups.group1.sinks", sb.toString()); 248 props.put(name + ".sinkgroups.group1.processor.type", "failover"); 249 final String sourceChannels = "primary"; 250 props.put(name + ".channels", sourceChannels); 251 props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".channels", sourceChannels); 252 } else { 253 String channels = null; 254 String[] sinks = null; 255 256 props.put(name + ".sources", FlumeEmbeddedManager.SOURCE_NAME); 257 props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".type", SOURCE_TYPE); 258 259 for (final Property property : properties) { 260 final String key = property.getName(); 261 262 if (key == null || key.length() == 0) { 263 final String msg = "A property name must be provided"; 264 LOGGER.error(msg); 265 throw new ConfigurationException(msg); 266 } 267 268 final String upperKey = key.toUpperCase(Locale.ENGLISH); 269 270 if (upperKey.startsWith(name.toUpperCase(Locale.ENGLISH))) { 271 final String msg = 272 "Specification of the agent name is not allowed in Flume Appender configuration: " + key; 273 LOGGER.error(msg); 274 throw new ConfigurationException(msg); 275 } 276 277 // Prohibit setting the sources as they are set above but allow interceptors to be set 278 if (upperKey.startsWith("SOURCES.") && !upperKey.startsWith("SOURCES.LOG4J-SOURCE.INTERCEPTORS")) { 279 final String msg = "Specification of Sources is not allowed in Flume Appender: " + key; 280 LOGGER.error(msg); 281 throw new ConfigurationException(msg); 282 } 283 284 final String value = property.getValue(); 285 if (value == null || value.length() == 0) { 286 final String msg = "A value for property " + key + " must be provided"; 287 LOGGER.error(msg); 288 throw new ConfigurationException(msg); 289 } 290 291 if (upperKey.equals("CHANNELS")) { 292 channels = value.trim(); 293 } else if (upperKey.equals("SINKS")) { 294 sinks = value.trim().split(" "); 295 } 296 297 props.put(name + '.' + key, value); 298 } 299 300 String sourceChannels = channels; 301 302 if (channels == null) { 303 sourceChannels = "primary"; 304 props.put(name + ".channels", sourceChannels); 305 } 306 307 props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".channels", sourceChannels); 308 309 if (sinks == null || sinks.length == 0) { 310 final String msg = "At least one Sink must be specified"; 311 LOGGER.error(msg); 312 throw new ConfigurationException(msg); 313 } 314 } 315 return props; 316 } 317 318 } 319 320 }