001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017 package org.apache.logging.log4j.flume.appender; 018 019 import org.apache.flume.SourceRunner; 020 import org.apache.flume.lifecycle.LifecycleController; 021 import org.apache.flume.lifecycle.LifecycleState; 022 import org.apache.flume.node.NodeConfiguration; 023 import org.apache.flume.node.nodemanager.DefaultLogicalNodeManager; 024 import org.apache.logging.log4j.core.appender.ManagerFactory; 025 import org.apache.logging.log4j.core.config.ConfigurationException; 026 import org.apache.logging.log4j.core.config.Property; 027 028 import java.security.MessageDigest; 029 import java.util.Locale; 030 import java.util.Properties; 031 032 /** 033 * 034 */ 035 public class FlumeEmbeddedManager extends AbstractFlumeManager { 036 037 private static ManagerFactory factory = new FlumeManagerFactory(); 038 039 private final FlumeNode node; 040 041 private NodeConfiguration conf; 042 043 protected static final String SOURCE_NAME = "log4j-source"; 044 045 private static final String LINE_SEP = System.getProperty("file.separator"); 046 047 private final Log4jEventSource source; 048 049 private final String shortName; 050 051 052 /** 053 * Constructor 054 * @param name The unique name of this manager. 055 * @param node The Flume Node. 056 */ 057 protected FlumeEmbeddedManager(String name, String shortName, FlumeNode node) { 058 super(name); 059 this.node = node; 060 this.shortName = shortName; 061 SourceRunner runner = node.getConfiguration().getSourceRunners().get(SOURCE_NAME); 062 if (runner == null || runner.getSource() == null) { 063 throw new IllegalStateException("No Source has been created for Appender " + shortName); 064 } 065 source = (Log4jEventSource) runner.getSource(); 066 } 067 068 /** 069 * Returns a FlumeEmbeddedManager. 070 * @param agents The agents to use. 071 * @param batchSize The number of events to include in a batch. 072 * @return A FlumeAvroManager. 073 */ 074 public static FlumeEmbeddedManager getManager(String name, Agent[] agents, Property[] properties, int batchSize, 075 String dataDir) { 076 077 if (batchSize <= 0) { 078 batchSize = 1; 079 } 080 081 if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) { 082 throw new IllegalArgumentException("Either an Agent or properties are required"); 083 } else if (agents != null && agents.length > 0 && properties != null && properties.length > 0) { 084 throw new IllegalArgumentException("Cannot configure both Agents and Properties."); 085 } 086 087 StringBuilder sb = new StringBuilder(); 088 boolean first = true; 089 090 if (agents != null && agents.length > 0) { 091 sb.append("FlumeEmbedded["); 092 for (Agent agent : agents) { 093 if (!first) { 094 sb.append(","); 095 } 096 sb.append(agent.getHost()).append(":").append(agent.getPort()); 097 first = false; 098 } 099 sb.append("]"); 100 } else { 101 String sep = ""; 102 sb.append(name).append(":"); 103 StringBuilder props = new StringBuilder(); 104 for (Property prop : properties) { 105 props.append(sep); 106 props.append(prop.getName()).append("=").append(prop.getValue()); 107 sep = ","; 108 } 109 try { 110 MessageDigest digest = MessageDigest.getInstance("MD5"); 111 digest.update(sb.toString().getBytes()); 112 byte[] bytes = digest.digest(); 113 StringBuilder md5 = new StringBuilder(); 114 for (byte b : bytes) { 115 String hex = Integer.toHexString(0xff & b); 116 if (hex.length() == 1) { 117 md5.append('0'); 118 } 119 md5.append(hex); 120 } 121 sb.append(md5.toString()); 122 } catch (Exception ex) { 123 sb.append(props); 124 } 125 } 126 return (FlumeEmbeddedManager) getManager(sb.toString(), factory, 127 new FactoryData(name, agents, properties, batchSize, dataDir)); 128 } 129 130 @Override 131 public void send(FlumeEvent event, int delay, int retries) { 132 source.send(event); 133 } 134 135 @Override 136 protected void releaseSub() { 137 node.stop(); 138 } 139 140 /** 141 * Factory data. 142 */ 143 private static class FactoryData { 144 private Agent[] agents; 145 private Property[] properties; 146 private int batchSize; 147 private String dataDir; 148 private String name; 149 150 /** 151 * Constructor. 152 * @param name The name of the Appender. 153 * @param agents The agents. 154 * @param properties The Flume configuration properties. 155 * @param batchSize The number of events to include in a batch. 156 * @param dataDir The directory where Flume should write to. 157 */ 158 public FactoryData(String name, Agent[] agents, Property[] properties, int batchSize, String dataDir) { 159 this.name = name; 160 this.agents = agents; 161 this.batchSize = batchSize; 162 this.properties = properties; 163 this.dataDir = dataDir; 164 } 165 } 166 167 /** 168 * Avro Manager Factory. 169 */ 170 private static class FlumeManagerFactory implements ManagerFactory<FlumeEmbeddedManager, FactoryData> { 171 private static final String sourceType = Log4jEventSource.class.getName(); 172 173 /** 174 * Create the FlumeAvroManager. 175 * @param name The name of the entity to manage. 176 * @param data The data required to create the entity. 177 * @return The FlumeAvroManager. 178 */ 179 public FlumeEmbeddedManager createManager(String name, FactoryData data) { 180 try { 181 DefaultLogicalNodeManager nodeManager = new DefaultLogicalNodeManager(); 182 Properties props = createProperties(data.name, data.agents, data.properties, data.batchSize, 183 data.dataDir); 184 FlumeConfigurationBuilder builder = new FlumeConfigurationBuilder(); 185 NodeConfiguration conf = builder.load(data.name, props, nodeManager); 186 187 FlumeNode node = new FlumeNode(nodeManager, conf); 188 189 node.start(); 190 LifecycleController.waitForOneOf(node, LifecycleState.START_OR_ERROR); 191 192 return new FlumeEmbeddedManager(name, data.name, node); 193 } catch (Exception ex) { 194 LOGGER.error("Could not create FlumeEmbeddedManager", ex); 195 } 196 return null; 197 } 198 199 private Properties createProperties(String name, Agent[] agents, Property[] properties, int batchSize, 200 String dataDir) { 201 Properties props = new Properties(); 202 203 if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) { 204 LOGGER.error("No Flume configuration provided"); 205 throw new ConfigurationException("No Flume configuration provided"); 206 } 207 208 if ((agents != null && agents.length > 0 && properties != null && properties.length > 0)) { 209 LOGGER.error("Agents and Flume configuration cannot both be specified"); 210 throw new ConfigurationException("Agents and Flume configuration cannot both be specified"); 211 } 212 213 if (agents != null && agents.length > 0) { 214 props.put(name + ".sources", FlumeEmbeddedManager.SOURCE_NAME); 215 props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".type", sourceType); 216 props.put(name + ".channels", "file"); 217 props.put(name + ".channels.file.type", "file"); 218 if (dataDir != null && dataDir.length() > 0) { 219 if (!dataDir.endsWith(LINE_SEP)) { 220 dataDir = dataDir + LINE_SEP; 221 } 222 223 props.put(name + ".channels.file.checkpointDir", dataDir + "checkpoint"); 224 props.put(name + ".channels.file.dataDirs", dataDir + "data"); 225 } 226 227 StringBuilder sb = new StringBuilder(); 228 String leading = ""; 229 int priority = agents.length; 230 for (int i=0; i < agents.length; ++i) { 231 sb.append(leading).append("agent").append(i); 232 leading = " "; 233 String prefix = name + ".sinks.agent" + i; 234 props.put(prefix + ".channel", "file"); 235 props.put(prefix + ".type", "avro"); 236 props.put(prefix + ".hostname", agents[i].getHost()); 237 props.put(prefix + ".port", Integer.toString(agents[i].getPort())); 238 props.put(prefix + ".batch-size", Integer.toString(batchSize)); 239 props.put(name + ".sinkgroups.group1.processor.priority.agent" + i, Integer.toString(priority)); 240 --priority; 241 } 242 props.put(name + ".sinks", sb.toString()); 243 props.put(name + ".sinkgroups", "group1"); 244 props.put(name + ".sinkgroups.group1.sinks", sb.toString()); 245 props.put(name + ".sinkgroups.group1.processor.type", "failover"); 246 String sourceChannels = "file"; 247 props.put(name + ".channels", sourceChannels); 248 props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".channels", sourceChannels); 249 } else { 250 String channels = null; 251 String[] sinks = null; 252 253 props.put(name + ".sources", FlumeEmbeddedManager.SOURCE_NAME); 254 props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".type", sourceType); 255 256 for (Property property : properties) { 257 String key = property.getName(); 258 259 if (key == null || key.length() == 0) { 260 String msg = "A property name must be provided"; 261 LOGGER.error(msg); 262 throw new ConfigurationException(msg); 263 } 264 265 String upperKey = key.toUpperCase(Locale.ENGLISH); 266 267 if (upperKey.startsWith(name.toUpperCase(Locale.ENGLISH))) { 268 String msg = "Specification of the agent name is allowed in Flume Appender configuration: " + key; 269 LOGGER.error(msg); 270 throw new ConfigurationException(msg); 271 } 272 273 if (upperKey.startsWith("SOURCES.")) { 274 String msg = "Specification of Sources is not allowed in Flume Appender: " + key; 275 LOGGER.error(msg); 276 throw new ConfigurationException(msg); 277 } 278 279 String value = property.getValue(); 280 if (value == null || value.length() == 0) { 281 String msg = "A value for property " + key + " must be provided"; 282 LOGGER.error(msg); 283 throw new ConfigurationException(msg); 284 } 285 286 if (upperKey.equals("CHANNELS")) { 287 channels = value.trim(); 288 } else if (upperKey.equals("SINKS")) { 289 sinks = value.trim().split(" "); 290 } 291 292 props.put(name + '.' + key, value); 293 } 294 295 String sourceChannels = channels; 296 297 if (channels == null) { 298 sourceChannels = "file"; 299 props.put(name + ".channels", sourceChannels); 300 } 301 302 props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".channels", sourceChannels); 303 304 if (sinks == null || sinks.length == 0) { 305 String msg = "At least one Sink must be specified"; 306 LOGGER.error(msg); 307 throw new ConfigurationException(msg); 308 } 309 } 310 return props; 311 } 312 313 } 314 315 }