001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017 package org.apache.logging.log4j.flume.appender; 018 019 import org.apache.logging.log4j.core.Filter; 020 import org.apache.logging.log4j.core.Layout; 021 import org.apache.logging.log4j.core.LogEvent; 022 import org.apache.logging.log4j.core.appender.AbstractAppender; 023 import org.apache.logging.log4j.core.config.Property; 024 import org.apache.logging.log4j.core.config.plugins.Plugin; 025 import org.apache.logging.log4j.core.config.plugins.PluginAttr; 026 import org.apache.logging.log4j.core.config.plugins.PluginElement; 027 import org.apache.logging.log4j.core.config.plugins.PluginFactory; 028 import org.apache.logging.log4j.core.layout.RFC5424Layout; 029 030 import java.util.Locale; 031 032 /** 033 * An Appender that uses the Avro protocol to route events to Flume. 034 */ 035 @Plugin(name = "Flume", type = "Core", elementType = "appender", printObject = true) 036 public final class FlumeAppender extends AbstractAppender implements FlumeEventFactory { 037 038 private final AbstractFlumeManager manager; 039 040 private final String mdcIncludes; 041 private final String mdcExcludes; 042 private final String mdcRequired; 043 044 private final String eventPrefix; 045 046 private final String mdcPrefix; 047 048 private final boolean compressBody; 049 050 private final FlumeEventFactory factory; 051 052 private enum ManagerType { 053 AVRO, EMBEDDED, PERSISTENT; 054 055 public static ManagerType getType(String type) { 056 return valueOf(type.toUpperCase(Locale.US)); 057 } 058 } 059 060 private FlumeAppender(final String name, final Filter filter, final Layout layout, final boolean handleException, 061 final String includes, final String excludes, final String required, final String mdcPrefix, 062 final String eventPrefix, final boolean compress, 063 final FlumeEventFactory factory, final AbstractFlumeManager manager) { 064 super(name, filter, layout, handleException); 065 this.manager = manager; 066 this.mdcIncludes = includes; 067 this.mdcExcludes = excludes; 068 this.mdcRequired = required; 069 this.eventPrefix = eventPrefix; 070 this.mdcPrefix = mdcPrefix; 071 this.compressBody = compress; 072 this.factory = factory == null ? this : factory; 073 } 074 075 /** 076 * Publish the event. 077 * @param event The LogEvent. 078 */ 079 public void append(final LogEvent event) { 080 081 final FlumeEvent flumeEvent = factory.createEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix, 082 eventPrefix, compressBody); 083 flumeEvent.setBody(getLayout().toByteArray(flumeEvent)); 084 manager.send(flumeEvent); 085 } 086 087 @Override 088 public void stop() { 089 super.stop(); 090 manager.release(); 091 } 092 093 /** 094 * Create a Flume event. 095 * @param event The Log4j LogEvent. 096 * @param includes comma separated list of mdc elements to include. 097 * @param excludes comma separated list of mdc elements to exclude. 098 * @param required comma separated list of mdc elements that must be present with a value. 099 * @param mdcPrefix The prefix to add to MDC key names. 100 * @param eventPrefix The prefix to add to event fields. 101 * @param compress If true the body will be compressed. 102 * @return A Flume Event. 103 */ 104 public FlumeEvent createEvent(final LogEvent event, final String includes, final String excludes, 105 final String required, final String mdcPrefix, final String eventPrefix, 106 final boolean compress) { 107 return new FlumeEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix, 108 eventPrefix, compressBody); 109 } 110 111 /** 112 * Create a Flume Avro Appender. 113 * @param agents An array of Agents. 114 * @param properties Properties to pass to the embedded agent. 115 * @param embedded true if the embedded agent manager should be used. otherwise the Avro manager will be used. 116 * <b>Note: </b><i>The embedded attribute is deprecated in favor of specifying the type attribute.</i> 117 * @param type Avro (default), Embedded, or Persistent. 118 * @param dataDir The directory where the Flume FileChannel should write its data. 119 * @param connectionTimeout The amount of time in milliseconds to wait before a connection times out. Minimum is 120 * 1000. 121 * @param requestTimeout The amount of time in milliseconds to wait before a request times out. Minimum is 1000. 122 * @param agentRetries The number of times to retry an agent before failing to the next agent. 123 * @param maxDelay The maximum number of seconds to wait for a complete batch. 124 * @param name The name of the Appender. 125 * @param suppress If true exceptions will be handled in the appender. 126 * @param excludes A comma separated list of MDC elements to exclude. 127 * @param includes A comma separated list of MDC elements to include. 128 * @param required A comma separated list of MDC elements that are required. 129 * @param mdcPrefix The prefix to add to MDC key names. 130 * @param eventPrefix The prefix to add to event key names. 131 * @param compressBody If true the event body will be compressed. 132 * @param batchSize Number of events to include in a batch. Defaults to 1. 133 * @param factory The factory to use to create Flume events. 134 * @param layout The layout to format the event. 135 * @param filter A Filter to filter events. 136 * @return A Flume Avro Appender. 137 */ 138 @PluginFactory 139 public static FlumeAppender createAppender(@PluginElement("agents") Agent[] agents, 140 @PluginElement("properties") final Property[] properties, 141 @PluginAttr("embedded") final String embedded, 142 @PluginAttr("type") final String type, 143 @PluginAttr("dataDir") final String dataDir, 144 @PluginAttr("connectTimeout") final String connectionTimeout, 145 @PluginAttr("requestTimeout") final String requestTimeout, 146 @PluginAttr("agentRetries") final String agentRetries, 147 @PluginAttr("maxDelay") final String maxDelay, 148 @PluginAttr("name") final String name, 149 @PluginAttr("suppressExceptions") final String suppress, 150 @PluginAttr("mdcExcludes") final String excludes, 151 @PluginAttr("mdcIncludes") final String includes, 152 @PluginAttr("mdcRequired") final String required, 153 @PluginAttr("mdcPrefix") final String mdcPrefix, 154 @PluginAttr("eventPrefix") final String eventPrefix, 155 @PluginAttr("compress") final String compressBody, 156 @PluginAttr("batchSize") final String batchSize, 157 @PluginElement("flumeEventFactory") final FlumeEventFactory factory, 158 @PluginElement("layout") Layout layout, 159 @PluginElement("filters") final Filter filter) { 160 161 final boolean embed = embedded != null ? Boolean.valueOf(embedded) : 162 (agents == null || agents.length == 0) && properties != null && properties.length > 0; 163 final boolean handleExceptions = suppress == null ? true : Boolean.valueOf(suppress); 164 final boolean compress = compressBody == null ? true : Boolean.valueOf(compressBody); 165 ManagerType managerType; 166 if (type != null) { 167 if (embed && embedded != null) { 168 try { 169 managerType = ManagerType.getType(type); 170 LOGGER.warn("Embedded and type attributes are mutually exclusive. Using type " + type); 171 } catch (Exception ex) { 172 LOGGER.warn("Embedded and type attributes are mutually exclusive and type " + type + " is invalid."); 173 managerType = ManagerType.EMBEDDED; 174 } 175 } else { 176 try { 177 managerType = ManagerType.getType(type); 178 } catch (Exception ex) { 179 LOGGER.warn("Type " + type + " is invalid."); 180 managerType = ManagerType.EMBEDDED; 181 } 182 } 183 } else if (embed) { 184 managerType = ManagerType.EMBEDDED; 185 } else { 186 managerType = ManagerType.AVRO; 187 } 188 189 final int batchCount = batchSize == null ? 1 : Integer.parseInt(batchSize); 190 final int connectTimeout = connectionTimeout == null ? 0 : Integer.parseInt(connectionTimeout); 191 final int reqTimeout = requestTimeout == null ? 0 : Integer.parseInt(requestTimeout); 192 final int retries = agentRetries == null ? 0 : Integer.parseInt(agentRetries); 193 final int delay = maxDelay == null ? 60000 : Integer.parseInt(maxDelay); 194 195 196 if (layout == null) { 197 layout = RFC5424Layout.createLayout(null, null, null, "True", null, mdcPrefix, eventPrefix, null, null, 198 null, excludes, includes, required, null, null, null, null); 199 } 200 201 if (name == null) { 202 LOGGER.error("No name provided for Appender"); 203 return null; 204 } 205 206 AbstractFlumeManager manager; 207 208 switch (managerType) { 209 case EMBEDDED: 210 manager = FlumeEmbeddedManager.getManager(name, agents, properties, batchCount, dataDir); 211 break; 212 case AVRO: 213 if (agents == null || agents.length == 0) { 214 LOGGER.debug("No agents provided, using defaults"); 215 agents = new Agent[] {Agent.createAgent(null, null)}; 216 } 217 manager = FlumeAvroManager.getManager(name, agents, batchCount, retries, connectTimeout, reqTimeout); 218 break; 219 case PERSISTENT: 220 if (agents == null || agents.length == 0) { 221 LOGGER.debug("No agents provided, using defaults"); 222 agents = new Agent[] {Agent.createAgent(null, null)}; 223 } 224 manager = FlumePersistentManager.getManager(name, agents, properties, batchCount, retries, 225 connectTimeout, reqTimeout, delay, dataDir); 226 break; 227 default: 228 LOGGER.debug("No manager type specified. Defaulting to AVRO"); 229 if (agents == null || agents.length == 0) { 230 LOGGER.debug("No agents provided, using defaults"); 231 agents = new Agent[] {Agent.createAgent(null, null)}; 232 } 233 manager = FlumeAvroManager.getManager(name, agents, batchCount, retries, connectTimeout, reqTimeout); 234 } 235 236 if (manager == null) { 237 return null; 238 } 239 240 return new FlumeAppender(name, filter, layout, handleExceptions, includes, 241 excludes, required, mdcPrefix, eventPrefix, compress, factory, manager); 242 } 243 }