001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017 package org.apache.logging.log4j.flume.appender; 018 019 import org.apache.logging.log4j.core.Filter; 020 import org.apache.logging.log4j.core.Layout; 021 import org.apache.logging.log4j.core.LogEvent; 022 import org.apache.logging.log4j.core.appender.AbstractAppender; 023 import org.apache.logging.log4j.core.config.Property; 024 import org.apache.logging.log4j.core.config.plugins.Plugin; 025 import org.apache.logging.log4j.core.config.plugins.PluginAttr; 026 import org.apache.logging.log4j.core.config.plugins.PluginElement; 027 import org.apache.logging.log4j.core.config.plugins.PluginFactory; 028 import org.apache.logging.log4j.core.layout.RFC5424Layout; 029 030 /** 031 * An Appender that uses the Avro protocol to route events to Flume. 032 */ 033 @Plugin(name = "Flume", type = "Core", elementType = "appender", printObject = true) 034 public final class FlumeAppender extends AbstractAppender implements FlumeEventFactory { 035 036 private final AbstractFlumeManager manager; 037 038 private final String mdcIncludes; 039 private final String mdcExcludes; 040 private final String mdcRequired; 041 042 private final String eventPrefix; 043 044 private final String mdcPrefix; 045 046 private final boolean compressBody; 047 048 private final int reconnectDelay; 049 050 private final int retries; 051 052 private final FlumeEventFactory factory; 053 054 private FlumeAppender(final String name, final Filter filter, final Layout layout, final boolean handleException, 055 final String includes, final String excludes, final String required, final String mdcPrefix, 056 final String eventPrefix, final boolean compress, final int delay, final int retries, 057 final FlumeEventFactory factory, final AbstractFlumeManager manager) { 058 super(name, filter, layout, handleException); 059 this.manager = manager; 060 this.mdcIncludes = includes; 061 this.mdcExcludes = excludes; 062 this.mdcRequired = required; 063 this.eventPrefix = eventPrefix; 064 this.mdcPrefix = mdcPrefix; 065 this.compressBody = compress; 066 this.reconnectDelay = delay; 067 this.retries = retries; 068 this.factory = factory == null ? this : factory; 069 } 070 071 /** 072 * Publish the event. 073 * @param event The LogEvent. 074 */ 075 public void append(final LogEvent event) { 076 077 final FlumeEvent flumeEvent = factory.createEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix, 078 eventPrefix, compressBody); 079 flumeEvent.setBody(getLayout().toByteArray(flumeEvent)); 080 manager.send(flumeEvent, reconnectDelay, retries); 081 } 082 083 @Override 084 public void stop() { 085 super.stop(); 086 manager.release(); 087 } 088 089 /** 090 * Create a Flume event. 091 * @param event The Log4j LogEvent. 092 * @param includes comma separated list of mdc elements to include. 093 * @param excludes comma separated list of mdc elements to exclude. 094 * @param required comma separated list of mdc elements that must be present with a value. 095 * @param mdcPrefix The prefix to add to MDC key names. 096 * @param eventPrefix The prefix to add to event fields. 097 * @param compress If true the body will be compressed. 098 * @return A Flume Event. 099 */ 100 public FlumeEvent createEvent(final LogEvent event, final String includes, final String excludes, 101 final String required, final String mdcPrefix, final String eventPrefix, 102 final boolean compress) { 103 return new FlumeEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix, 104 eventPrefix, compressBody); 105 } 106 107 /** 108 * Create a Flume Avro Appender. 109 * @param agents An array of Agents. 110 * @param properties Properties to pass to the embedded agent. 111 * @param embedded true if the embedded agent manager should be used. otherwise the Avro mangaer will be used. 112 * @param dataDir The directory where the Flume FileChannel should write its data. 113 * @param delay The amount of time in milliseconds to wait between retries. 114 * @param agentRetries The number of times to retry an agent before failing to the next agent. 115 * @param name The name of the Appender. 116 * @param suppress If true exceptions will be handled in the appender. 117 * @param excludes A comma separated list of MDC elements to exclude. 118 * @param includes A comma separated list of MDC elements to include. 119 * @param required A comma separated list of MDC elements that are required. 120 * @param mdcPrefix The prefix to add to MDC key names. 121 * @param eventPrefix The prefix to add to event key names. 122 * @param compressBody If true the event body will be compressed. 123 * @param batchSize Number of events to include in a batch. Defaults to 1. 124 * @param factory The factory to use to create Flume events. 125 * @param layout The layout to format the event. 126 * @param filter A Filter to filter events. 127 * @return A Flume Avro Appender. 128 */ 129 @PluginFactory 130 public static FlumeAppender createAppender(@PluginElement("agents") Agent[] agents, 131 @PluginElement("properties") final Property[] properties, 132 @PluginAttr("embedded") final String embedded, 133 @PluginAttr("dataDir") final String dataDir, 134 @PluginAttr("reconnectionDelay") final String delay, 135 @PluginAttr("agentRetries") final String agentRetries, 136 @PluginAttr("name") final String name, 137 @PluginAttr("suppressExceptions") final String suppress, 138 @PluginAttr("mdcExcludes") final String excludes, 139 @PluginAttr("mdcIncludes") final String includes, 140 @PluginAttr("mdcRequired") final String required, 141 @PluginAttr("mdcPrefix") final String mdcPrefix, 142 @PluginAttr("eventPrefix") final String eventPrefix, 143 @PluginAttr("compress") final String compressBody, 144 @PluginAttr("batchSize") final String batchSize, 145 @PluginElement("flumeEventFactory") final FlumeEventFactory factory, 146 @PluginElement("layout") Layout layout, 147 @PluginElement("filters") final Filter filter) { 148 149 final boolean embed = embedded != null ? Boolean.valueOf(embedded) : 150 (agents == null || agents.length == 0) && properties != null && properties.length > 0; 151 final boolean handleExceptions = suppress == null ? true : Boolean.valueOf(suppress); 152 final boolean compress = compressBody == null ? true : Boolean.valueOf(compressBody); 153 154 final int batchCount = batchSize == null ? 1 : Integer.parseInt(batchSize); 155 final int reconnectDelay = delay == null ? 0 : Integer.parseInt(delay); 156 final int retries = agentRetries == null ? 0 : Integer.parseInt(agentRetries); 157 158 if (layout == null) { 159 layout = RFC5424Layout.createLayout(null, null, null, "True", null, null, null, null, excludes, 160 includes, required, null, null, null, null); 161 } 162 163 if (name == null) { 164 LOGGER.error("No name provided for Appender"); 165 return null; 166 } 167 168 AbstractFlumeManager manager; 169 170 if (embed) { 171 manager = FlumeEmbeddedManager.getManager(name, agents, properties, batchCount, dataDir); 172 } else { 173 if (agents == null || agents.length == 0) { 174 LOGGER.debug("No agents provided, using defaults"); 175 agents = new Agent[] {Agent.createAgent(null, null)}; 176 } 177 manager = FlumeAvroManager.getManager(name, agents, batchCount); 178 } 179 180 if (manager == null) { 181 return null; 182 } 183 184 return new FlumeAppender(name, filter, layout, handleExceptions, includes, 185 excludes, required, mdcPrefix, eventPrefix, compress, reconnectDelay, retries, factory, manager); 186 } 187 }