001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017 package org.apache.logging.log4j.flume.appender; 018 019 import org.apache.flume.Event; 020 import org.apache.flume.SourceRunner; 021 import org.apache.flume.node.NodeConfiguration; 022 import org.apache.flume.node.nodemanager.DefaultLogicalNodeManager; 023 import org.apache.logging.log4j.core.appender.ManagerFactory; 024 import org.apache.logging.log4j.core.config.ConfigurationException; 025 import org.apache.logging.log4j.core.config.Property; 026 import org.apache.logging.log4j.core.helpers.NameUtil; 027 import org.apache.logging.log4j.util.PropertiesUtil; 028 029 import java.util.Locale; 030 import java.util.Properties; 031 032 /** 033 * 034 */ 035 public class FlumeEmbeddedManager extends AbstractFlumeManager { 036 037 /** Name for the Flume source */ 038 protected static final String SOURCE_NAME = "log4j-source"; 039 040 private static ManagerFactory factory = new FlumeManagerFactory(); 041 042 private static final String FiLE_SEP = PropertiesUtil.getProperties().getStringProperty("file.separator"); 043 044 private static final String IN_MEMORY = "InMemory"; 045 046 private final FlumeNode node; 047 048 private NodeConfiguration conf; 049 050 private final Log4jEventSource source; 051 052 private final String shortName; 053 054 055 /** 056 * Constructor 057 * @param name The unique name of this manager. 058 * @param node The Flume Node. 059 */ 060 protected FlumeEmbeddedManager(final String name, final String shortName, final FlumeNode node) { 061 super(name); 062 this.node = node; 063 this.shortName = shortName; 064 final SourceRunner runner = node.getConfiguration().getSourceRunners().get(SOURCE_NAME); 065 if (runner == null || runner.getSource() == null) { 066 throw new IllegalStateException("No Source has been created for Appender " + shortName); 067 } 068 source = (Log4jEventSource) runner.getSource(); 069 } 070 071 /** 072 * Returns a FlumeEmbeddedManager. 073 * @param name The name of the manager. 074 * @param agents The agents to use. 075 * @param properties Properties for the embedded manager. 076 * @param batchSize The number of events to include in a batch. 077 * @param dataDir The directory where the Flume FileChannel should write to. 078 * @return A FlumeAvroManager. 079 */ 080 public static FlumeEmbeddedManager getManager(final String name, final Agent[] agents, final Property[] properties, 081 int batchSize, final String dataDir) { 082 083 if (batchSize <= 0) { 084 batchSize = 1; 085 } 086 087 if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) { 088 throw new IllegalArgumentException("Either an Agent or properties are required"); 089 } else if (agents != null && agents.length > 0 && properties != null && properties.length > 0) { 090 throw new IllegalArgumentException("Cannot configure both Agents and Properties."); 091 } 092 093 final StringBuilder sb = new StringBuilder(); 094 boolean first = true; 095 096 if (agents != null && agents.length > 0) { 097 sb.append("FlumeEmbedded["); 098 for (final Agent agent : agents) { 099 if (!first) { 100 sb.append(","); 101 } 102 sb.append(agent.getHost()).append(":").append(agent.getPort()); 103 first = false; 104 } 105 sb.append("]"); 106 } else { 107 String sep = ""; 108 sb.append(name).append(":"); 109 final StringBuilder props = new StringBuilder(); 110 for (final Property prop : properties) { 111 props.append(sep); 112 props.append(prop.getName()).append("=").append(prop.getValue()); 113 sep = ","; 114 } 115 sb.append(NameUtil.md5(props.toString())); 116 } 117 return (FlumeEmbeddedManager) getManager(sb.toString(), factory, 118 new FactoryData(name, agents, properties, batchSize, dataDir)); 119 } 120 121 @Override 122 public void send(final Event event) { 123 source.send(event); 124 } 125 126 @Override 127 protected void releaseSub() { 128 node.stop(); 129 } 130 131 /** 132 * Factory data. 133 */ 134 private static class FactoryData { 135 private final Agent[] agents; 136 private final Property[] properties; 137 private final int batchSize; 138 private final String dataDir; 139 private final String name; 140 141 /** 142 * Constructor. 143 * @param name The name of the Appender. 144 * @param agents The agents. 145 * @param properties The Flume configuration properties. 146 * @param batchSize The number of events to include in a batch. 147 * @param dataDir The directory where Flume should write to. 148 */ 149 public FactoryData(final String name, final Agent[] agents, final Property[] properties, final int batchSize, 150 final String dataDir) { 151 this.name = name; 152 this.agents = agents; 153 this.batchSize = batchSize; 154 this.properties = properties; 155 this.dataDir = dataDir; 156 } 157 } 158 159 /** 160 * Avro Manager Factory. 161 */ 162 private static class FlumeManagerFactory implements ManagerFactory<FlumeEmbeddedManager, FactoryData> { 163 private static final String SOURCE_TYPE = Log4jEventSource.class.getName(); 164 165 /** 166 * Create the FlumeAvroManager. 167 * @param name The name of the entity to manage. 168 * @param data The data required to create the entity. 169 * @return The FlumeAvroManager. 170 */ 171 public FlumeEmbeddedManager createManager(final String name, final FactoryData data) { 172 try { 173 final DefaultLogicalNodeManager nodeManager = new DefaultLogicalNodeManager(); 174 final Properties props = createProperties(data.name, data.agents, data.properties, data.batchSize, 175 data.dataDir); 176 final FlumeConfigurationBuilder builder = new FlumeConfigurationBuilder(); 177 final NodeConfiguration conf = builder.load(data.name, props, nodeManager); 178 179 final FlumeNode node = new FlumeNode(nodeManager, nodeManager, conf); 180 181 node.start(); 182 183 return new FlumeEmbeddedManager(name, data.name, node); 184 } catch (final Exception ex) { 185 LOGGER.error("Could not create FlumeEmbeddedManager", ex); 186 } 187 return null; 188 } 189 190 private Properties createProperties(final String name, final Agent[] agents, final Property[] properties, 191 final int batchSize, String dataDir) { 192 final Properties props = new Properties(); 193 194 if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) { 195 LOGGER.error("No Flume configuration provided"); 196 throw new ConfigurationException("No Flume configuration provided"); 197 } 198 199 if ((agents != null && agents.length > 0 && properties != null && properties.length > 0)) { 200 LOGGER.error("Agents and Flume configuration cannot both be specified"); 201 throw new ConfigurationException("Agents and Flume configuration cannot both be specified"); 202 } 203 204 if (agents != null && agents.length > 0) { 205 props.put(name + ".sources", FlumeEmbeddedManager.SOURCE_NAME); 206 props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".type", SOURCE_TYPE); 207 208 if (dataDir != null && dataDir.length() > 0) { 209 if (dataDir.equals(IN_MEMORY)) { 210 props.put(name + ".channels", "primary"); 211 props.put(name + ".channels.primary.type", "memory"); 212 } else { 213 props.put(name + ".channels", "primary"); 214 props.put(name + ".channels.primary.type", "file"); 215 216 if (!dataDir.endsWith(FiLE_SEP)) { 217 dataDir = dataDir + FiLE_SEP; 218 } 219 220 props.put(name + ".channels.primary.checkpointDir", dataDir + "checkpoint"); 221 props.put(name + ".channels.primary.dataDirs", dataDir + "data"); 222 } 223 224 } else { 225 props.put(name + ".channels", "primary"); 226 props.put(name + ".channels.primary.type", "file"); 227 } 228 229 final StringBuilder sb = new StringBuilder(); 230 String leading = ""; 231 int priority = agents.length; 232 for (int i = 0; i < agents.length; ++i) { 233 sb.append(leading).append("agent").append(i); 234 leading = " "; 235 final String prefix = name + ".sinks.agent" + i; 236 props.put(prefix + ".channel", "primary"); 237 props.put(prefix + ".type", "avro"); 238 props.put(prefix + ".hostname", agents[i].getHost()); 239 props.put(prefix + ".port", Integer.toString(agents[i].getPort())); 240 props.put(prefix + ".batch-size", Integer.toString(batchSize)); 241 props.put(name + ".sinkgroups.group1.processor.priority.agent" + i, Integer.toString(priority)); 242 --priority; 243 } 244 props.put(name + ".sinks", sb.toString()); 245 props.put(name + ".sinkgroups", "group1"); 246 props.put(name + ".sinkgroups.group1.sinks", sb.toString()); 247 props.put(name + ".sinkgroups.group1.processor.type", "failover"); 248 final String sourceChannels = "primary"; 249 props.put(name + ".channels", sourceChannels); 250 props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".channels", sourceChannels); 251 } else { 252 String channels = null; 253 String[] sinks = null; 254 255 props.put(name + ".sources", FlumeEmbeddedManager.SOURCE_NAME); 256 props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".type", SOURCE_TYPE); 257 258 for (final Property property : properties) { 259 final String key = property.getName(); 260 261 if (key == null || key.length() == 0) { 262 final String msg = "A property name must be provided"; 263 LOGGER.error(msg); 264 throw new ConfigurationException(msg); 265 } 266 267 final String upperKey = key.toUpperCase(Locale.ENGLISH); 268 269 if (upperKey.startsWith(name.toUpperCase(Locale.ENGLISH))) { 270 final String msg = 271 "Specification of the agent name is allowed in Flume Appender configuration: " + key; 272 LOGGER.error(msg); 273 throw new ConfigurationException(msg); 274 } 275 276 if (upperKey.startsWith("SOURCES.")) { 277 final String msg = "Specification of Sources is not allowed in Flume Appender: " + key; 278 LOGGER.error(msg); 279 throw new ConfigurationException(msg); 280 } 281 282 final String value = property.getValue(); 283 if (value == null || value.length() == 0) { 284 final String msg = "A value for property " + key + " must be provided"; 285 LOGGER.error(msg); 286 throw new ConfigurationException(msg); 287 } 288 289 if (upperKey.equals("CHANNELS")) { 290 channels = value.trim(); 291 } else if (upperKey.equals("SINKS")) { 292 sinks = value.trim().split(" "); 293 } 294 295 props.put(name + '.' + key, value); 296 } 297 298 String sourceChannels = channels; 299 300 if (channels == null) { 301 sourceChannels = "primary"; 302 props.put(name + ".channels", sourceChannels); 303 } 304 305 props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".channels", sourceChannels); 306 307 if (sinks == null || sinks.length == 0) { 308 final String msg = "At least one Sink must be specified"; 309 LOGGER.error(msg); 310 throw new ConfigurationException(msg); 311 } 312 } 313 return props; 314 } 315 316 } 317 318 }