001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017 package org.apache.logging.log4j.flume.appender; 018 019 import org.apache.logging.log4j.core.Filter; 020 import org.apache.logging.log4j.core.Layout; 021 import org.apache.logging.log4j.core.LogEvent; 022 import org.apache.logging.log4j.core.appender.AbstractAppender; 023 import org.apache.logging.log4j.core.config.Property; 024 import org.apache.logging.log4j.core.config.plugins.Plugin; 025 import org.apache.logging.log4j.core.config.plugins.PluginAttr; 026 import org.apache.logging.log4j.core.config.plugins.PluginElement; 027 import org.apache.logging.log4j.core.config.plugins.PluginFactory; 028 import org.apache.logging.log4j.core.layout.RFC5424Layout; 029 030 /** 031 * An Appender that uses the Avro protocol to route events to Flume. 032 */ 033 @Plugin(name = "Flume", type = "Core", elementType = "appender", printObject = true) 034 public final class FlumeAppender extends AbstractAppender implements FlumeEventFactory { 035 036 private AbstractFlumeManager manager; 037 038 private final String mdcIncludes; 039 private final String mdcExcludes; 040 private final String mdcRequired; 041 042 private final String eventPrefix; 043 044 private final String mdcPrefix; 045 046 private final boolean compressBody; 047 048 private final int reconnectDelay; 049 050 private final int retries; 051 052 private final FlumeEventFactory factory; 053 054 private FlumeAppender(String name, Filter filter, Layout layout, boolean handleException, 055 String includes, String excludes, String required, String mdcPrefix, 056 String eventPrefix, boolean compress, int delay, int retries, 057 FlumeEventFactory factory, AbstractFlumeManager manager) { 058 super(name, filter, layout, handleException); 059 this.manager = manager; 060 this.mdcIncludes = includes; 061 this.mdcExcludes = excludes; 062 this.mdcRequired = required; 063 this.eventPrefix = eventPrefix; 064 this.mdcPrefix = mdcPrefix; 065 this.compressBody = compress; 066 this.reconnectDelay = delay; 067 this.retries = retries; 068 this.factory = factory == null ? this : factory; 069 } 070 071 /** 072 * Publish the event. 073 * @param event The LogEvent. 074 */ 075 public void append(LogEvent event) { 076 077 FlumeEvent flumeEvent = factory.createEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix, 078 eventPrefix, compressBody); 079 flumeEvent.setBody(getLayout().toByteArray(flumeEvent)); 080 manager.send(flumeEvent, reconnectDelay, retries); 081 } 082 083 @Override 084 public void stop() { 085 super.stop(); 086 manager.release(); 087 } 088 089 /** 090 * Create a Flume event. 091 * @param event The Log4j LogEvent. 092 * @param includes comma separated list of mdc elements to include. 093 * @param excludes comma separated list of mdc elements to exclude. 094 * @param required comma separated list of mdc elements that must be present with a value. 095 * @param mdcPrefix The prefix to add to MDC key names. 096 * @param eventPrefix The prefix to add to event fields. 097 * @param compress If true the body will be compressed. 098 * @return A Flume Event. 099 */ 100 public FlumeEvent createEvent(LogEvent event, String includes, String excludes, String required, 101 String mdcPrefix, String eventPrefix, boolean compress) { 102 return new FlumeEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix, 103 eventPrefix, compressBody); 104 } 105 106 /** 107 * Create a Flume Avro Appender. 108 * @param agents An array of Agents. 109 * @param delay The amount of time in milliseconds to wait between retries. 110 * @param agentRetries The number of times to retry an agent before failing to the next agent. 111 * @param name The name of the Appender. 112 * @param suppress If true exceptions will be handled in the appender. 113 * @param excludes A comma separated list of MDC elements to exclude. 114 * @param includes A comma separated list of MDC elements to include. 115 * @param required A comma separated list of MDC elements that are required. 116 * @param mdcPrefix The prefix to add to MDC key names. 117 * @param eventPrefix The prefix to add to event key names. 118 * @param compressBody If true the event body will be compressed. 119 * @param batchSize Number of events to include in a batch. Defaults to 1. 120 * @param factory The factory to use to create Flume events. 121 * @param layout The layout to format the event. 122 * @param filter A Filter to filter events. 123 * @return A Flume Avro Appender. 124 */ 125 @PluginFactory 126 public static FlumeAppender createAppender(@PluginElement("agents") Agent[] agents, 127 @PluginElement("properties") Property[] properties, 128 @PluginAttr("embedded") String embedded, 129 @PluginAttr("dataDir") String dataDir, 130 @PluginAttr("reconnectionDelay") String delay, 131 @PluginAttr("agentRetries") String agentRetries, 132 @PluginAttr("name") String name, 133 @PluginAttr("suppressExceptions") String suppress, 134 @PluginAttr("mdcExcludes") String excludes, 135 @PluginAttr("mdcIncludes") String includes, 136 @PluginAttr("mdcRequired") String required, 137 @PluginAttr("mdcPrefix") String mdcPrefix, 138 @PluginAttr("eventPrefix") String eventPrefix, 139 @PluginAttr("compress") String compressBody, 140 @PluginAttr("batchSize") String batchSize, 141 @PluginElement("flumeEventFactory") FlumeEventFactory factory, 142 @PluginElement("layout") Layout layout, 143 @PluginElement("filters") Filter filter) { 144 145 boolean embed = embedded != null ? Boolean.valueOf(embedded) : 146 (agents == null || agents.length == 0) && properties != null && properties.length > 0; 147 boolean handleExceptions = suppress == null ? true : Boolean.valueOf(suppress); 148 boolean compress = compressBody == null ? true : Boolean.valueOf(compressBody); 149 150 int batchCount = batchSize == null ? 1 : Integer.parseInt(batchSize); 151 int reconnectDelay = delay == null ? 0 : Integer.parseInt(delay); 152 int retries = agentRetries == null ? 0 : Integer.parseInt(agentRetries); 153 154 if (layout == null) { 155 layout = RFC5424Layout.createLayout(null, null, null, "True", null, null, null, null, excludes, 156 includes, required, null, null); 157 } 158 159 if (name == null) { 160 LOGGER.error("No name provided for Appender"); 161 return null; 162 } 163 164 AbstractFlumeManager manager; 165 166 if (embed) { 167 manager = FlumeEmbeddedManager.getManager(name, agents, properties, batchCount, dataDir); 168 } else { 169 if (agents == null || agents.length == 0) { 170 LOGGER.debug("No agents provided, using defaults"); 171 agents = new Agent[] {Agent.createAgent(null, null)}; 172 } 173 manager = FlumeAvroManager.getManager(name, agents, batchCount); 174 } 175 176 if (manager == null) { 177 return null; 178 } 179 180 return new FlumeAppender(name, filter, layout, handleExceptions, includes, 181 excludes, required, mdcPrefix, eventPrefix, compress, reconnectDelay, retries, factory, manager); 182 } 183 }