001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017package org.apache.logging.log4j.flume.appender; 018 019import java.io.Serializable; 020import java.util.Locale; 021 022import org.apache.logging.log4j.core.Filter; 023import org.apache.logging.log4j.core.Layout; 024import org.apache.logging.log4j.core.LogEvent; 025import org.apache.logging.log4j.core.appender.AbstractAppender; 026import org.apache.logging.log4j.core.config.Property; 027import org.apache.logging.log4j.core.config.plugins.Plugin; 028import org.apache.logging.log4j.core.config.plugins.PluginAttribute; 029import org.apache.logging.log4j.core.config.plugins.PluginElement; 030import org.apache.logging.log4j.core.config.plugins.PluginFactory; 031import org.apache.logging.log4j.core.helpers.Booleans; 032import org.apache.logging.log4j.core.helpers.Integers; 033import org.apache.logging.log4j.core.layout.RFC5424Layout; 034 035/** 036 * An Appender that uses the Avro protocol to route events to Flume. 037 */ 038@Plugin(name = "Flume", category = "Core", elementType = "appender", printObject = true) 039public final class FlumeAppender extends AbstractAppender implements FlumeEventFactory { 040 041 private static final String[] EXCLUDED_PACKAGES = {"org.apache.flume", "org.apache.avro"}; 042 private static final int DEFAULT_MAX_DELAY = 60000; 043 044 private static final int DEFAULT_LOCK_TIMEOUT_RETRY_COUNT = 5; 045 046 private final AbstractFlumeManager manager; 047 048 private final String mdcIncludes; 049 private final String mdcExcludes; 050 private final String mdcRequired; 051 052 private final String eventPrefix; 053 054 private final String mdcPrefix; 055 056 private final boolean compressBody; 057 058 private final FlumeEventFactory factory; 059 060 /** 061 * Which Manager will be used by the appender instance. 062 */ 063 private enum ManagerType { 064 AVRO, EMBEDDED, PERSISTENT; 065 066 public static ManagerType getType(final String type) { 067 return valueOf(type.toUpperCase(Locale.US)); 068 } 069 } 070 071 private FlumeAppender(final String name, final Filter filter, final Layout<? extends Serializable> layout, 072 final boolean ignoreExceptions, final String includes, final String excludes, 073 final String required, final String mdcPrefix, final String eventPrefix, 074 final boolean compress, final FlumeEventFactory factory, final AbstractFlumeManager manager) { 075 super(name, filter, layout, ignoreExceptions); 076 this.manager = manager; 077 this.mdcIncludes = includes; 078 this.mdcExcludes = excludes; 079 this.mdcRequired = required; 080 this.eventPrefix = eventPrefix; 081 this.mdcPrefix = mdcPrefix; 082 this.compressBody = compress; 083 this.factory = factory == null ? this : factory; 084 } 085 086 /** 087 * Publish the event. 088 * @param event The LogEvent. 089 */ 090 @Override 091 public void append(final LogEvent event) { 092 final String name = event.getLoggerName(); 093 if (name != null) { 094 for (final String pkg : EXCLUDED_PACKAGES) { 095 if (name.startsWith(pkg)) { 096 return; 097 } 098 } 099 } 100 final FlumeEvent flumeEvent = factory.createEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix, 101 eventPrefix, compressBody); 102 flumeEvent.setBody(getLayout().toByteArray(flumeEvent)); 103 manager.send(flumeEvent); 104 } 105 106 @Override 107 public void stop() { 108 super.stop(); 109 manager.release(); 110 } 111 112 /** 113 * Create a Flume event. 114 * @param event The Log4j LogEvent. 115 * @param includes comma separated list of mdc elements to include. 116 * @param excludes comma separated list of mdc elements to exclude. 117 * @param required comma separated list of mdc elements that must be present with a value. 118 * @param mdcPrefix The prefix to add to MDC key names. 119 * @param eventPrefix The prefix to add to event fields. 120 * @param compress If true the body will be compressed. 121 * @return A Flume Event. 122 */ 123 @Override 124 public FlumeEvent createEvent(final LogEvent event, final String includes, final String excludes, 125 final String required, final String mdcPrefix, final String eventPrefix, 126 final boolean compress) { 127 return new FlumeEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix, 128 eventPrefix, compressBody); 129 } 130 131 /** 132 * Create a Flume Avro Appender. 133 * @param agents An array of Agents. 134 * @param properties Properties to pass to the embedded agent. 135 * @param embedded true if the embedded agent manager should be used. otherwise the Avro manager will be used. 136 * <b>Note: </b><i>The embedded attribute is deprecated in favor of specifying the type attribute.</i> 137 * @param type Avro (default), Embedded, or Persistent. 138 * @param dataDir The directory where the Flume FileChannel should write its data. 139 * @param connectionTimeout The amount of time in milliseconds to wait before a connection times out. Minimum is 140 * 1000. 141 * @param requestTimeout The amount of time in milliseconds to wait before a request times out. Minimum is 1000. 142 * @param agentRetries The number of times to retry an agent before failing to the next agent. 143 * @param maxDelay The maximum number of seconds to wait for a complete batch. 144 * @param name The name of the Appender. 145 * @param ignore If {@code "true"} (default) exceptions encountered when appending events are logged; otherwise 146 * they are propagated to the caller. 147 * @param excludes A comma separated list of MDC elements to exclude. 148 * @param includes A comma separated list of MDC elements to include. 149 * @param required A comma separated list of MDC elements that are required. 150 * @param mdcPrefix The prefix to add to MDC key names. 151 * @param eventPrefix The prefix to add to event key names. 152 * @param compressBody If true the event body will be compressed. 153 * @param batchSize Number of events to include in a batch. Defaults to 1. 154 * @param lockTimeoutRetries Times to retry a lock timeout when writing to Berkeley DB. 155 * @param factory The factory to use to create Flume events. 156 * @param layout The layout to format the event. 157 * @param filter A Filter to filter events. 158 * 159 * @return A Flume Avro Appender. 160 */ 161 @PluginFactory 162 public static FlumeAppender createAppender(@PluginElement("Agents") Agent[] agents, 163 @PluginElement("Properties") final Property[] properties, 164 @PluginAttribute("embedded") final String embedded, 165 @PluginAttribute("type") final String type, 166 @PluginAttribute("dataDir") final String dataDir, 167 @PluginAttribute("connectTimeout") final String connectionTimeout, 168 @PluginAttribute("requestTimeout") final String requestTimeout, 169 @PluginAttribute("agentRetries") final String agentRetries, 170 @PluginAttribute("maxDelay") final String maxDelay, 171 @PluginAttribute("name") final String name, 172 @PluginAttribute("ignoreExceptions") final String ignore, 173 @PluginAttribute("mdcExcludes") final String excludes, 174 @PluginAttribute("mdcIncludes") final String includes, 175 @PluginAttribute("mdcRequired") final String required, 176 @PluginAttribute("mdcPrefix") final String mdcPrefix, 177 @PluginAttribute("eventPrefix") final String eventPrefix, 178 @PluginAttribute("compress") final String compressBody, 179 @PluginAttribute("batchSize") final String batchSize, 180 @PluginAttribute("lockTimeoutRetries") final String lockTimeoutRetries, 181 @PluginElement("FlumeEventFactory") final FlumeEventFactory factory, 182 @PluginElement("Layout") Layout<? extends Serializable> layout, 183 @PluginElement("Filters") final Filter filter) { 184 185 final boolean embed = embedded != null ? Boolean.parseBoolean(embedded) : 186 (agents == null || agents.length == 0) && properties != null && properties.length > 0; 187 final boolean ignoreExceptions = Booleans.parseBoolean(ignore, true); 188 final boolean compress = Booleans.parseBoolean(compressBody, true); 189 ManagerType managerType; 190 if (type != null) { 191 if (embed && embedded != null) { 192 try { 193 managerType = ManagerType.getType(type); 194 LOGGER.warn("Embedded and type attributes are mutually exclusive. Using type " + type); 195 } catch (final Exception ex) { 196 LOGGER.warn("Embedded and type attributes are mutually exclusive and type " + type + 197 " is invalid."); 198 managerType = ManagerType.EMBEDDED; 199 } 200 } else { 201 try { 202 managerType = ManagerType.getType(type); 203 } catch (final Exception ex) { 204 LOGGER.warn("Type " + type + " is invalid."); 205 managerType = ManagerType.EMBEDDED; 206 } 207 } 208 } else if (embed) { 209 managerType = ManagerType.EMBEDDED; 210 } else { 211 managerType = ManagerType.AVRO; 212 } 213 214 final int batchCount = Integers.parseInt(batchSize, 1); 215 final int connectTimeout = Integers.parseInt(connectionTimeout, 0); 216 final int reqTimeout = Integers.parseInt(requestTimeout, 0); 217 final int retries = Integers.parseInt(agentRetries, 0); 218 final int lockTimeoutRetryCount = Integers.parseInt(lockTimeoutRetries, DEFAULT_LOCK_TIMEOUT_RETRY_COUNT); 219 final int delay = Integers.parseInt(maxDelay, DEFAULT_MAX_DELAY); 220 221 if (layout == null) { 222 layout = RFC5424Layout.createLayout(null, null, null, "True", null, mdcPrefix, eventPrefix, 223 null, null, null, null, excludes, includes, required, null, null, null, null); 224 } 225 226 if (name == null) { 227 LOGGER.error("No name provided for Appender"); 228 return null; 229 } 230 231 AbstractFlumeManager manager; 232 233 switch (managerType) { 234 case EMBEDDED: 235 manager = FlumeEmbeddedManager.getManager(name, agents, properties, batchCount, dataDir); 236 break; 237 case AVRO: 238 if (agents == null || agents.length == 0) { 239 LOGGER.debug("No agents provided, using defaults"); 240 agents = new Agent[] {Agent.createAgent(null, null)}; 241 } 242 manager = FlumeAvroManager.getManager(name, agents, batchCount, retries, connectTimeout, reqTimeout); 243 break; 244 case PERSISTENT: 245 if (agents == null || agents.length == 0) { 246 LOGGER.debug("No agents provided, using defaults"); 247 agents = new Agent[] {Agent.createAgent(null, null)}; 248 } 249 manager = FlumePersistentManager.getManager(name, agents, properties, batchCount, retries, 250 connectTimeout, reqTimeout, delay, lockTimeoutRetryCount, dataDir); 251 break; 252 default: 253 LOGGER.debug("No manager type specified. Defaulting to AVRO"); 254 if (agents == null || agents.length == 0) { 255 LOGGER.debug("No agents provided, using defaults"); 256 agents = new Agent[] {Agent.createAgent(null, null)}; 257 } 258 manager = FlumeAvroManager.getManager(name, agents, batchCount, retries, connectTimeout, reqTimeout); 259 } 260 261 if (manager == null) { 262 return null; 263 } 264 265 return new FlumeAppender(name, filter, layout, ignoreExceptions, includes, 266 excludes, required, mdcPrefix, eventPrefix, compress, factory, manager); 267 } 268}