001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017 package org.apache.logging.log4j.flume.appender; 018 019 import org.apache.flume.SourceRunner; 020 import org.apache.flume.node.NodeConfiguration; 021 import org.apache.flume.node.nodemanager.DefaultLogicalNodeManager; 022 import org.apache.logging.log4j.core.appender.ManagerFactory; 023 import org.apache.logging.log4j.core.config.ConfigurationException; 024 import org.apache.logging.log4j.core.config.Property; 025 import org.apache.logging.log4j.core.helpers.NameUtil; 026 import org.apache.logging.log4j.util.PropertiesUtil; 027 028 import java.util.Locale; 029 import java.util.Properties; 030 031 /** 032 * 033 */ 034 public class FlumeEmbeddedManager extends AbstractFlumeManager { 035 036 /** Name for the Flume source */ 037 protected static final String SOURCE_NAME = "log4j-source"; 038 039 private static ManagerFactory factory = new FlumeManagerFactory(); 040 041 private static final String FiLE_SEP = PropertiesUtil.getProperties().getStringProperty("file.separator"); 042 043 private static final String IN_MEMORY = "InMemory"; 044 045 private final FlumeNode node; 046 047 private NodeConfiguration conf; 048 049 private final Log4jEventSource source; 050 051 private final String shortName; 052 053 054 /** 055 * Constructor 056 * @param name The unique name of this manager. 057 * @param node The Flume Node. 058 */ 059 protected FlumeEmbeddedManager(final String name, final String shortName, final FlumeNode node) { 060 super(name); 061 this.node = node; 062 this.shortName = shortName; 063 final SourceRunner runner = node.getConfiguration().getSourceRunners().get(SOURCE_NAME); 064 if (runner == null || runner.getSource() == null) { 065 throw new IllegalStateException("No Source has been created for Appender " + shortName); 066 } 067 source = (Log4jEventSource) runner.getSource(); 068 } 069 070 /** 071 * Returns a FlumeEmbeddedManager. 072 * @param name The name of the manager. 073 * @param agents The agents to use. 074 * @param properties Properties for the embedded manager. 075 * @param batchSize The number of events to include in a batch. 076 * @param dataDir The directory where the Flume FileChannel should write to. 077 * @return A FlumeAvroManager. 078 */ 079 public static FlumeEmbeddedManager getManager(final String name, final Agent[] agents, final Property[] properties, 080 int batchSize, final String dataDir) { 081 082 if (batchSize <= 0) { 083 batchSize = 1; 084 } 085 086 if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) { 087 throw new IllegalArgumentException("Either an Agent or properties are required"); 088 } else if (agents != null && agents.length > 0 && properties != null && properties.length > 0) { 089 throw new IllegalArgumentException("Cannot configure both Agents and Properties."); 090 } 091 092 final StringBuilder sb = new StringBuilder(); 093 boolean first = true; 094 095 if (agents != null && agents.length > 0) { 096 sb.append("FlumeEmbedded["); 097 for (final Agent agent : agents) { 098 if (!first) { 099 sb.append(","); 100 } 101 sb.append(agent.getHost()).append(":").append(agent.getPort()); 102 first = false; 103 } 104 sb.append("]"); 105 } else { 106 String sep = ""; 107 sb.append(name).append(":"); 108 final StringBuilder props = new StringBuilder(); 109 for (final Property prop : properties) { 110 props.append(sep); 111 props.append(prop.getName()).append("=").append(prop.getValue()); 112 sep = ","; 113 } 114 sb.append(NameUtil.md5(props.toString())); 115 } 116 return (FlumeEmbeddedManager) getManager(sb.toString(), factory, 117 new FactoryData(name, agents, properties, batchSize, dataDir)); 118 } 119 120 @Override 121 public void send(final FlumeEvent event, final int delay, final int retries) { 122 source.send(event); 123 } 124 125 @Override 126 protected void releaseSub() { 127 node.stop(); 128 } 129 130 /** 131 * Factory data. 132 */ 133 private static class FactoryData { 134 private final Agent[] agents; 135 private final Property[] properties; 136 private final int batchSize; 137 private final String dataDir; 138 private final String name; 139 140 /** 141 * Constructor. 142 * @param name The name of the Appender. 143 * @param agents The agents. 144 * @param properties The Flume configuration properties. 145 * @param batchSize The number of events to include in a batch. 146 * @param dataDir The directory where Flume should write to. 147 */ 148 public FactoryData(final String name, final Agent[] agents, final Property[] properties, final int batchSize, 149 final String dataDir) { 150 this.name = name; 151 this.agents = agents; 152 this.batchSize = batchSize; 153 this.properties = properties; 154 this.dataDir = dataDir; 155 } 156 } 157 158 /** 159 * Avro Manager Factory. 160 */ 161 private static class FlumeManagerFactory implements ManagerFactory<FlumeEmbeddedManager, FactoryData> { 162 private static final String SOURCE_TYPE = Log4jEventSource.class.getName(); 163 164 /** 165 * Create the FlumeAvroManager. 166 * @param name The name of the entity to manage. 167 * @param data The data required to create the entity. 168 * @return The FlumeAvroManager. 169 */ 170 public FlumeEmbeddedManager createManager(final String name, final FactoryData data) { 171 try { 172 final DefaultLogicalNodeManager nodeManager = new DefaultLogicalNodeManager(); 173 final Properties props = createProperties(data.name, data.agents, data.properties, data.batchSize, 174 data.dataDir); 175 final FlumeConfigurationBuilder builder = new FlumeConfigurationBuilder(); 176 final NodeConfiguration conf = builder.load(data.name, props, nodeManager); 177 178 final FlumeNode node = new FlumeNode(nodeManager, nodeManager, conf); 179 180 node.start(); 181 182 return new FlumeEmbeddedManager(name, data.name, node); 183 } catch (final Exception ex) { 184 LOGGER.error("Could not create FlumeEmbeddedManager", ex); 185 } 186 return null; 187 } 188 189 private Properties createProperties(final String name, final Agent[] agents, final Property[] properties, 190 final int batchSize, String dataDir) { 191 final Properties props = new Properties(); 192 193 if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) { 194 LOGGER.error("No Flume configuration provided"); 195 throw new ConfigurationException("No Flume configuration provided"); 196 } 197 198 if ((agents != null && agents.length > 0 && properties != null && properties.length > 0)) { 199 LOGGER.error("Agents and Flume configuration cannot both be specified"); 200 throw new ConfigurationException("Agents and Flume configuration cannot both be specified"); 201 } 202 203 if (agents != null && agents.length > 0) { 204 props.put(name + ".sources", FlumeEmbeddedManager.SOURCE_NAME); 205 props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".type", SOURCE_TYPE); 206 207 if (dataDir != null && dataDir.length() > 0) { 208 if (dataDir.equals(IN_MEMORY)) { 209 props.put(name + ".channels", "primary"); 210 props.put(name + ".channels.primary.type", "memory"); 211 } else { 212 props.put(name + ".channels", "primary"); 213 props.put(name + ".channels.primary.type", "file"); 214 215 if (!dataDir.endsWith(FiLE_SEP)) { 216 dataDir = dataDir + FiLE_SEP; 217 } 218 219 props.put(name + ".channels.primary.checkpointDir", dataDir + "checkpoint"); 220 props.put(name + ".channels.primary.dataDirs", dataDir + "data"); 221 } 222 223 } else { 224 props.put(name + ".channels", "primary"); 225 props.put(name + ".channels.primary.type", "file"); 226 } 227 228 final StringBuilder sb = new StringBuilder(); 229 String leading = ""; 230 int priority = agents.length; 231 for (int i = 0; i < agents.length; ++i) { 232 sb.append(leading).append("agent").append(i); 233 leading = " "; 234 final String prefix = name + ".sinks.agent" + i; 235 props.put(prefix + ".channel", "primary"); 236 props.put(prefix + ".type", "avro"); 237 props.put(prefix + ".hostname", agents[i].getHost()); 238 props.put(prefix + ".port", Integer.toString(agents[i].getPort())); 239 props.put(prefix + ".batch-size", Integer.toString(batchSize)); 240 props.put(name + ".sinkgroups.group1.processor.priority.agent" + i, Integer.toString(priority)); 241 --priority; 242 } 243 props.put(name + ".sinks", sb.toString()); 244 props.put(name + ".sinkgroups", "group1"); 245 props.put(name + ".sinkgroups.group1.sinks", sb.toString()); 246 props.put(name + ".sinkgroups.group1.processor.type", "failover"); 247 final String sourceChannels = "primary"; 248 props.put(name + ".channels", sourceChannels); 249 props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".channels", sourceChannels); 250 } else { 251 String channels = null; 252 String[] sinks = null; 253 254 props.put(name + ".sources", FlumeEmbeddedManager.SOURCE_NAME); 255 props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".type", SOURCE_TYPE); 256 257 for (final Property property : properties) { 258 final String key = property.getName(); 259 260 if (key == null || key.length() == 0) { 261 final String msg = "A property name must be provided"; 262 LOGGER.error(msg); 263 throw new ConfigurationException(msg); 264 } 265 266 final String upperKey = key.toUpperCase(Locale.ENGLISH); 267 268 if (upperKey.startsWith(name.toUpperCase(Locale.ENGLISH))) { 269 final String msg = 270 "Specification of the agent name is allowed in Flume Appender configuration: " + key; 271 LOGGER.error(msg); 272 throw new ConfigurationException(msg); 273 } 274 275 if (upperKey.startsWith("SOURCES.")) { 276 final String msg = "Specification of Sources is not allowed in Flume Appender: " + key; 277 LOGGER.error(msg); 278 throw new ConfigurationException(msg); 279 } 280 281 final String value = property.getValue(); 282 if (value == null || value.length() == 0) { 283 final String msg = "A value for property " + key + " must be provided"; 284 LOGGER.error(msg); 285 throw new ConfigurationException(msg); 286 } 287 288 if (upperKey.equals("CHANNELS")) { 289 channels = value.trim(); 290 } else if (upperKey.equals("SINKS")) { 291 sinks = value.trim().split(" "); 292 } 293 294 props.put(name + '.' + key, value); 295 } 296 297 String sourceChannels = channels; 298 299 if (channels == null) { 300 sourceChannels = "primary"; 301 props.put(name + ".channels", sourceChannels); 302 } 303 304 props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".channels", sourceChannels); 305 306 if (sinks == null || sinks.length == 0) { 307 final String msg = "At least one Sink must be specified"; 308 LOGGER.error(msg); 309 throw new ConfigurationException(msg); 310 } 311 } 312 return props; 313 } 314 315 } 316 317 }