001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017 package org.apache.logging.log4j.flume.appender; 018 019 import org.apache.logging.log4j.core.Filter; 020 import org.apache.logging.log4j.core.Layout; 021 import org.apache.logging.log4j.core.LogEvent; 022 import org.apache.logging.log4j.core.appender.AbstractAppender; 023 import org.apache.logging.log4j.core.config.Property; 024 import org.apache.logging.log4j.core.config.plugins.Plugin; 025 import org.apache.logging.log4j.core.config.plugins.PluginAttr; 026 import org.apache.logging.log4j.core.config.plugins.PluginElement; 027 import org.apache.logging.log4j.core.config.plugins.PluginFactory; 028 import org.apache.logging.log4j.core.layout.RFC5424Layout; 029 030 import java.io.Serializable; 031 import java.util.Locale; 032 033 /** 034 * An Appender that uses the Avro protocol to route events to Flume. 035 */ 036 @Plugin(name = "Flume", category = "Core", elementType = "appender", printObject = true) 037 public final class FlumeAppender<T extends Serializable> extends AbstractAppender<T> implements FlumeEventFactory { 038 039 private final AbstractFlumeManager manager; 040 041 private final String mdcIncludes; 042 private final String mdcExcludes; 043 private final String mdcRequired; 044 045 private final String eventPrefix; 046 047 private final String mdcPrefix; 048 049 private final boolean compressBody; 050 051 private final FlumeEventFactory factory; 052 053 private enum ManagerType { 054 AVRO, EMBEDDED, PERSISTENT; 055 056 public static ManagerType getType(String type) { 057 return valueOf(type.toUpperCase(Locale.US)); 058 } 059 } 060 061 private FlumeAppender(final String name, final Filter filter, final Layout<T> layout, final boolean handleException, 062 final String includes, final String excludes, final String required, final String mdcPrefix, 063 final String eventPrefix, final boolean compress, 064 final FlumeEventFactory factory, final AbstractFlumeManager manager) { 065 super(name, filter, layout, handleException); 066 this.manager = manager; 067 this.mdcIncludes = includes; 068 this.mdcExcludes = excludes; 069 this.mdcRequired = required; 070 this.eventPrefix = eventPrefix; 071 this.mdcPrefix = mdcPrefix; 072 this.compressBody = compress; 073 this.factory = factory == null ? this : factory; 074 } 075 076 /** 077 * Publish the event. 078 * @param event The LogEvent. 079 */ 080 @Override 081 public void append(final LogEvent event) { 082 083 final FlumeEvent flumeEvent = factory.createEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix, 084 eventPrefix, compressBody); 085 flumeEvent.setBody(getLayout().toByteArray(flumeEvent)); 086 manager.send(flumeEvent); 087 } 088 089 @Override 090 public void stop() { 091 super.stop(); 092 manager.release(); 093 } 094 095 /** 096 * Create a Flume event. 097 * @param event The Log4j LogEvent. 098 * @param includes comma separated list of mdc elements to include. 099 * @param excludes comma separated list of mdc elements to exclude. 100 * @param required comma separated list of mdc elements that must be present with a value. 101 * @param mdcPrefix The prefix to add to MDC key names. 102 * @param eventPrefix The prefix to add to event fields. 103 * @param compress If true the body will be compressed. 104 * @return A Flume Event. 105 */ 106 @Override 107 public FlumeEvent createEvent(final LogEvent event, final String includes, final String excludes, 108 final String required, final String mdcPrefix, final String eventPrefix, 109 final boolean compress) { 110 return new FlumeEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix, 111 eventPrefix, compressBody); 112 } 113 114 /** 115 * Create a Flume Avro Appender. 116 * @param agents An array of Agents. 117 * @param properties Properties to pass to the embedded agent. 118 * @param embedded true if the embedded agent manager should be used. otherwise the Avro manager will be used. 119 * <b>Note: </b><i>The embedded attribute is deprecated in favor of specifying the type attribute.</i> 120 * @param type Avro (default), Embedded, or Persistent. 121 * @param dataDir The directory where the Flume FileChannel should write its data. 122 * @param connectionTimeout The amount of time in milliseconds to wait before a connection times out. Minimum is 123 * 1000. 124 * @param requestTimeout The amount of time in milliseconds to wait before a request times out. Minimum is 1000. 125 * @param agentRetries The number of times to retry an agent before failing to the next agent. 126 * @param maxDelay The maximum number of seconds to wait for a complete batch. 127 * @param name The name of the Appender. 128 * @param suppress If true exceptions will be handled in the appender. 129 * @param excludes A comma separated list of MDC elements to exclude. 130 * @param includes A comma separated list of MDC elements to include. 131 * @param required A comma separated list of MDC elements that are required. 132 * @param mdcPrefix The prefix to add to MDC key names. 133 * @param eventPrefix The prefix to add to event key names. 134 * @param compressBody If true the event body will be compressed. 135 * @param batchSize Number of events to include in a batch. Defaults to 1. 136 * @param factory The factory to use to create Flume events. 137 * @param layout The layout to format the event. 138 * @param filter A Filter to filter events. 139 * @return A Flume Avro Appender. 140 */ 141 @PluginFactory 142 public static <S extends Serializable> FlumeAppender<S> createAppender(@PluginElement("agents") Agent[] agents, 143 @PluginElement("properties") final Property[] properties, 144 @PluginAttr("embedded") final String embedded, 145 @PluginAttr("type") final String type, 146 @PluginAttr("dataDir") final String dataDir, 147 @PluginAttr("connectTimeout") final String connectionTimeout, 148 @PluginAttr("requestTimeout") final String requestTimeout, 149 @PluginAttr("agentRetries") final String agentRetries, 150 @PluginAttr("maxDelay") final String maxDelay, 151 @PluginAttr("name") final String name, 152 @PluginAttr("suppressExceptions") final String suppress, 153 @PluginAttr("mdcExcludes") final String excludes, 154 @PluginAttr("mdcIncludes") final String includes, 155 @PluginAttr("mdcRequired") final String required, 156 @PluginAttr("mdcPrefix") final String mdcPrefix, 157 @PluginAttr("eventPrefix") final String eventPrefix, 158 @PluginAttr("compress") final String compressBody, 159 @PluginAttr("batchSize") final String batchSize, 160 @PluginElement("flumeEventFactory") final FlumeEventFactory factory, 161 @PluginElement("layout") Layout<S> layout, 162 @PluginElement("filters") final Filter filter) { 163 164 final boolean embed = embedded != null ? Boolean.valueOf(embedded) : 165 (agents == null || agents.length == 0) && properties != null && properties.length > 0; 166 final boolean handleExceptions = suppress == null ? true : Boolean.valueOf(suppress); 167 final boolean compress = compressBody == null ? true : Boolean.valueOf(compressBody); 168 ManagerType managerType; 169 if (type != null) { 170 if (embed && embedded != null) { 171 try { 172 managerType = ManagerType.getType(type); 173 LOGGER.warn("Embedded and type attributes are mutually exclusive. Using type " + type); 174 } catch (Exception ex) { 175 LOGGER.warn("Embedded and type attributes are mutually exclusive and type " + type + " is invalid."); 176 managerType = ManagerType.EMBEDDED; 177 } 178 } else { 179 try { 180 managerType = ManagerType.getType(type); 181 } catch (Exception ex) { 182 LOGGER.warn("Type " + type + " is invalid."); 183 managerType = ManagerType.EMBEDDED; 184 } 185 } 186 } else if (embed) { 187 managerType = ManagerType.EMBEDDED; 188 } else { 189 managerType = ManagerType.AVRO; 190 } 191 192 final int batchCount = batchSize == null ? 1 : Integer.parseInt(batchSize); 193 final int connectTimeout = connectionTimeout == null ? 0 : Integer.parseInt(connectionTimeout); 194 final int reqTimeout = requestTimeout == null ? 0 : Integer.parseInt(requestTimeout); 195 final int retries = agentRetries == null ? 0 : Integer.parseInt(agentRetries); 196 final int delay = maxDelay == null ? 60000 : Integer.parseInt(maxDelay); 197 198 if (layout == null) { 199 @SuppressWarnings({"unchecked", "UnnecessaryLocalVariable"}) 200 Layout<S> l = (Layout<S>)RFC5424Layout.createLayout(null, null, null, "True", null, mdcPrefix, eventPrefix, 201 null, null, null, excludes, includes, required, null, null, null); 202 layout = l; 203 } 204 205 if (name == null) { 206 LOGGER.error("No name provided for Appender"); 207 return null; 208 } 209 210 AbstractFlumeManager manager; 211 212 switch (managerType) { 213 case EMBEDDED: 214 manager = FlumeEmbeddedManager.getManager(name, agents, properties, batchCount, dataDir); 215 break; 216 case AVRO: 217 if (agents == null || agents.length == 0) { 218 LOGGER.debug("No agents provided, using defaults"); 219 agents = new Agent[] {Agent.createAgent(null, null)}; 220 } 221 manager = FlumeAvroManager.getManager(name, agents, batchCount, retries, connectTimeout, reqTimeout); 222 break; 223 case PERSISTENT: 224 if (agents == null || agents.length == 0) { 225 LOGGER.debug("No agents provided, using defaults"); 226 agents = new Agent[] {Agent.createAgent(null, null)}; 227 } 228 manager = FlumePersistentManager.getManager(name, agents, properties, batchCount, retries, 229 connectTimeout, reqTimeout, delay, dataDir); 230 break; 231 default: 232 LOGGER.debug("No manager type specified. Defaulting to AVRO"); 233 if (agents == null || agents.length == 0) { 234 LOGGER.debug("No agents provided, using defaults"); 235 agents = new Agent[] {Agent.createAgent(null, null)}; 236 } 237 manager = FlumeAvroManager.getManager(name, agents, batchCount, retries, connectTimeout, reqTimeout); 238 } 239 240 if (manager == null) { 241 return null; 242 } 243 244 return new FlumeAppender<S>(name, filter, layout, handleExceptions, includes, 245 excludes, required, mdcPrefix, eventPrefix, compress, factory, manager); 246 } 247 }