1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.flume.appender;
18
19 import org.apache.logging.log4j.core.Filter;
20 import org.apache.logging.log4j.core.Layout;
21 import org.apache.logging.log4j.core.LogEvent;
22 import org.apache.logging.log4j.core.appender.AbstractAppender;
23 import org.apache.logging.log4j.core.config.Property;
24 import org.apache.logging.log4j.core.config.plugins.Plugin;
25 import org.apache.logging.log4j.core.config.plugins.PluginAttr;
26 import org.apache.logging.log4j.core.config.plugins.PluginElement;
27 import org.apache.logging.log4j.core.config.plugins.PluginFactory;
28 import org.apache.logging.log4j.core.layout.RFC5424Layout;
29
30 import java.util.Locale;
31
32
33
34
35 @Plugin(name = "Flume", type = "Core", elementType = "appender", printObject = true)
36 public final class FlumeAppender extends AbstractAppender implements FlumeEventFactory {
37
38 private final AbstractFlumeManager manager;
39
40 private final String mdcIncludes;
41 private final String mdcExcludes;
42 private final String mdcRequired;
43
44 private final String eventPrefix;
45
46 private final String mdcPrefix;
47
48 private final boolean compressBody;
49
50 private final FlumeEventFactory factory;
51
52 private enum ManagerType {
53 AVRO, EMBEDDED, PERSISTENT;
54
55 public static ManagerType getType(String type) {
56 return valueOf(type.toUpperCase(Locale.US));
57 }
58 }
59
60 private FlumeAppender(final String name, final Filter filter, final Layout layout, final boolean handleException,
61 final String includes, final String excludes, final String required, final String mdcPrefix,
62 final String eventPrefix, final boolean compress,
63 final FlumeEventFactory factory, final AbstractFlumeManager manager) {
64 super(name, filter, layout, handleException);
65 this.manager = manager;
66 this.mdcIncludes = includes;
67 this.mdcExcludes = excludes;
68 this.mdcRequired = required;
69 this.eventPrefix = eventPrefix;
70 this.mdcPrefix = mdcPrefix;
71 this.compressBody = compress;
72 this.factory = factory == null ? this : factory;
73 }
74
75
76
77
78
79 public void append(final LogEvent event) {
80
81 final FlumeEvent flumeEvent = factory.createEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
82 eventPrefix, compressBody);
83 flumeEvent.setBody(getLayout().toByteArray(flumeEvent));
84 manager.send(flumeEvent);
85 }
86
87 @Override
88 public void stop() {
89 super.stop();
90 manager.release();
91 }
92
93
94
95
96
97
98
99
100
101
102
103
104 public FlumeEvent createEvent(final LogEvent event, final String includes, final String excludes,
105 final String required, final String mdcPrefix, final String eventPrefix,
106 final boolean compress) {
107 return new FlumeEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
108 eventPrefix, compressBody);
109 }
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138 @PluginFactory
139 public static FlumeAppender createAppender(@PluginElement("agents") Agent[] agents,
140 @PluginElement("properties") final Property[] properties,
141 @PluginAttr("embedded") final String embedded,
142 @PluginAttr("type") final String type,
143 @PluginAttr("dataDir") final String dataDir,
144 @PluginAttr("connectTimeout") final String connectionTimeout,
145 @PluginAttr("requestTimeout") final String requestTimeout,
146 @PluginAttr("agentRetries") final String agentRetries,
147 @PluginAttr("maxDelay") final String maxDelay,
148 @PluginAttr("name") final String name,
149 @PluginAttr("suppressExceptions") final String suppress,
150 @PluginAttr("mdcExcludes") final String excludes,
151 @PluginAttr("mdcIncludes") final String includes,
152 @PluginAttr("mdcRequired") final String required,
153 @PluginAttr("mdcPrefix") final String mdcPrefix,
154 @PluginAttr("eventPrefix") final String eventPrefix,
155 @PluginAttr("compress") final String compressBody,
156 @PluginAttr("batchSize") final String batchSize,
157 @PluginElement("flumeEventFactory") final FlumeEventFactory factory,
158 @PluginElement("layout") Layout layout,
159 @PluginElement("filters") final Filter filter) {
160
161 final boolean embed = embedded != null ? Boolean.valueOf(embedded) :
162 (agents == null || agents.length == 0) && properties != null && properties.length > 0;
163 final boolean handleExceptions = suppress == null ? true : Boolean.valueOf(suppress);
164 final boolean compress = compressBody == null ? true : Boolean.valueOf(compressBody);
165 ManagerType managerType;
166 if (type != null) {
167 if (embed && embedded != null) {
168 try {
169 managerType = ManagerType.getType(type);
170 LOGGER.warn("Embedded and type attributes are mutually exclusive. Using type " + type);
171 } catch (Exception ex) {
172 LOGGER.warn("Embedded and type attributes are mutually exclusive and type " + type + " is invalid.");
173 managerType = ManagerType.EMBEDDED;
174 }
175 } else {
176 try {
177 managerType = ManagerType.getType(type);
178 } catch (Exception ex) {
179 LOGGER.warn("Type " + type + " is invalid.");
180 managerType = ManagerType.EMBEDDED;
181 }
182 }
183 } else if (embed) {
184 managerType = ManagerType.EMBEDDED;
185 } else {
186 managerType = ManagerType.AVRO;
187 }
188
189 final int batchCount = batchSize == null ? 1 : Integer.parseInt(batchSize);
190 final int connectTimeout = connectionTimeout == null ? 0 : Integer.parseInt(connectionTimeout);
191 final int reqTimeout = requestTimeout == null ? 0 : Integer.parseInt(requestTimeout);
192 final int retries = agentRetries == null ? 0 : Integer.parseInt(agentRetries);
193 final int delay = maxDelay == null ? 60000 : Integer.parseInt(maxDelay);
194
195
196 if (layout == null) {
197 layout = RFC5424Layout.createLayout(null, null, null, "True", null, mdcPrefix, eventPrefix, null, null,
198 null, excludes, includes, required, null, null, null, null);
199 }
200
201 if (name == null) {
202 LOGGER.error("No name provided for Appender");
203 return null;
204 }
205
206 AbstractFlumeManager manager;
207
208 switch (managerType) {
209 case EMBEDDED:
210 manager = FlumeEmbeddedManager.getManager(name, agents, properties, batchCount, dataDir);
211 break;
212 case AVRO:
213 if (agents == null || agents.length == 0) {
214 LOGGER.debug("No agents provided, using defaults");
215 agents = new Agent[] {Agent.createAgent(null, null)};
216 }
217 manager = FlumeAvroManager.getManager(name, agents, batchCount, retries, connectTimeout, reqTimeout);
218 break;
219 case PERSISTENT:
220 if (agents == null || agents.length == 0) {
221 LOGGER.debug("No agents provided, using defaults");
222 agents = new Agent[] {Agent.createAgent(null, null)};
223 }
224 manager = FlumePersistentManager.getManager(name, agents, properties, batchCount, retries,
225 connectTimeout, reqTimeout, delay, dataDir);
226 break;
227 default:
228 LOGGER.debug("No manager type specified. Defaulting to AVRO");
229 if (agents == null || agents.length == 0) {
230 LOGGER.debug("No agents provided, using defaults");
231 agents = new Agent[] {Agent.createAgent(null, null)};
232 }
233 manager = FlumeAvroManager.getManager(name, agents, batchCount, retries, connectTimeout, reqTimeout);
234 }
235
236 if (manager == null) {
237 return null;
238 }
239
240 return new FlumeAppender(name, filter, layout, handleExceptions, includes,
241 excludes, required, mdcPrefix, eventPrefix, compress, factory, manager);
242 }
243 }