1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.flume.appender;
18
19 import java.io.Serializable;
20 import java.util.Locale;
21
22 import org.apache.logging.log4j.core.Filter;
23 import org.apache.logging.log4j.core.Layout;
24 import org.apache.logging.log4j.core.LogEvent;
25 import org.apache.logging.log4j.core.appender.AbstractAppender;
26 import org.apache.logging.log4j.core.config.Property;
27 import org.apache.logging.log4j.core.config.plugins.Plugin;
28 import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
29 import org.apache.logging.log4j.core.config.plugins.PluginElement;
30 import org.apache.logging.log4j.core.config.plugins.PluginFactory;
31 import org.apache.logging.log4j.core.helpers.Booleans;
32 import org.apache.logging.log4j.core.helpers.Integers;
33 import org.apache.logging.log4j.core.layout.RFC5424Layout;
34
35
36
37
38 @Plugin(name = "Flume", category = "Core", elementType = "appender", printObject = true)
39 public final class FlumeAppender extends AbstractAppender implements FlumeEventFactory {
40
41 private static final String[] EXCLUDED_PACKAGES = {"org.apache.flume", "org.apache.avro"};
42 private static final int DEFAULT_MAX_DELAY = 60000;
43
44 private static final int DEFAULT_LOCK_TIMEOUT_RETRY_COUNT = 5;
45
46 private final AbstractFlumeManager manager;
47
48 private final String mdcIncludes;
49 private final String mdcExcludes;
50 private final String mdcRequired;
51
52 private final String eventPrefix;
53
54 private final String mdcPrefix;
55
56 private final boolean compressBody;
57
58 private final FlumeEventFactory factory;
59
60
61
62
63 private enum ManagerType {
64 AVRO, EMBEDDED, PERSISTENT;
65
66 public static ManagerType getType(final String type) {
67 return valueOf(type.toUpperCase(Locale.US));
68 }
69 }
70
71 private FlumeAppender(final String name, final Filter filter, final Layout<? extends Serializable> layout,
72 final boolean ignoreExceptions, final String includes, final String excludes,
73 final String required, final String mdcPrefix, final String eventPrefix,
74 final boolean compress, final FlumeEventFactory factory, final AbstractFlumeManager manager) {
75 super(name, filter, layout, ignoreExceptions);
76 this.manager = manager;
77 this.mdcIncludes = includes;
78 this.mdcExcludes = excludes;
79 this.mdcRequired = required;
80 this.eventPrefix = eventPrefix;
81 this.mdcPrefix = mdcPrefix;
82 this.compressBody = compress;
83 this.factory = factory == null ? this : factory;
84 }
85
86
87
88
89
90 @Override
91 public void append(final LogEvent event) {
92 final String name = event.getLoggerName();
93 if (name != null) {
94 for (final String pkg : EXCLUDED_PACKAGES) {
95 if (name.startsWith(pkg)) {
96 return;
97 }
98 }
99 }
100 final FlumeEvent flumeEvent = factory.createEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
101 eventPrefix, compressBody);
102 flumeEvent.setBody(getLayout().toByteArray(flumeEvent));
103 manager.send(flumeEvent);
104 }
105
106 @Override
107 public void stop() {
108 super.stop();
109 manager.release();
110 }
111
112
113
114
115
116
117
118
119
120
121
122
123 @Override
124 public FlumeEvent createEvent(final LogEvent event, final String includes, final String excludes,
125 final String required, final String mdcPrefix, final String eventPrefix,
126 final boolean compress) {
127 return new FlumeEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
128 eventPrefix, compressBody);
129 }
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161 @PluginFactory
162 public static FlumeAppender createAppender(@PluginElement("Agents") Agent[] agents,
163 @PluginElement("Properties") final Property[] properties,
164 @PluginAttribute("embedded") final String embedded,
165 @PluginAttribute("type") final String type,
166 @PluginAttribute("dataDir") final String dataDir,
167 @PluginAttribute("connectTimeout") final String connectionTimeout,
168 @PluginAttribute("requestTimeout") final String requestTimeout,
169 @PluginAttribute("agentRetries") final String agentRetries,
170 @PluginAttribute("maxDelay") final String maxDelay,
171 @PluginAttribute("name") final String name,
172 @PluginAttribute("ignoreExceptions") final String ignore,
173 @PluginAttribute("mdcExcludes") final String excludes,
174 @PluginAttribute("mdcIncludes") final String includes,
175 @PluginAttribute("mdcRequired") final String required,
176 @PluginAttribute("mdcPrefix") final String mdcPrefix,
177 @PluginAttribute("eventPrefix") final String eventPrefix,
178 @PluginAttribute("compress") final String compressBody,
179 @PluginAttribute("batchSize") final String batchSize,
180 @PluginAttribute("lockTimeoutRetries") final String lockTimeoutRetries,
181 @PluginElement("FlumeEventFactory") final FlumeEventFactory factory,
182 @PluginElement("Layout") Layout<? extends Serializable> layout,
183 @PluginElement("Filters") final Filter filter) {
184
185 final boolean embed = embedded != null ? Boolean.parseBoolean(embedded) :
186 (agents == null || agents.length == 0) && properties != null && properties.length > 0;
187 final boolean ignoreExceptions = Booleans.parseBoolean(ignore, true);
188 final boolean compress = Booleans.parseBoolean(compressBody, true);
189 ManagerType managerType;
190 if (type != null) {
191 if (embed && embedded != null) {
192 try {
193 managerType = ManagerType.getType(type);
194 LOGGER.warn("Embedded and type attributes are mutually exclusive. Using type " + type);
195 } catch (final Exception ex) {
196 LOGGER.warn("Embedded and type attributes are mutually exclusive and type " + type +
197 " is invalid.");
198 managerType = ManagerType.EMBEDDED;
199 }
200 } else {
201 try {
202 managerType = ManagerType.getType(type);
203 } catch (final Exception ex) {
204 LOGGER.warn("Type " + type + " is invalid.");
205 managerType = ManagerType.EMBEDDED;
206 }
207 }
208 } else if (embed) {
209 managerType = ManagerType.EMBEDDED;
210 } else {
211 managerType = ManagerType.AVRO;
212 }
213
214 final int batchCount = Integers.parseInt(batchSize, 1);
215 final int connectTimeout = Integers.parseInt(connectionTimeout, 0);
216 final int reqTimeout = Integers.parseInt(requestTimeout, 0);
217 final int retries = Integers.parseInt(agentRetries, 0);
218 final int lockTimeoutRetryCount = Integers.parseInt(lockTimeoutRetries, DEFAULT_LOCK_TIMEOUT_RETRY_COUNT);
219 final int delay = Integers.parseInt(maxDelay, DEFAULT_MAX_DELAY);
220
221 if (layout == null) {
222 layout = RFC5424Layout.createLayout(null, null, null, "True", null, mdcPrefix, eventPrefix,
223 null, null, null, null, excludes, includes, required, null, null, null, null);
224 }
225
226 if (name == null) {
227 LOGGER.error("No name provided for Appender");
228 return null;
229 }
230
231 AbstractFlumeManager manager;
232
233 switch (managerType) {
234 case EMBEDDED:
235 manager = FlumeEmbeddedManager.getManager(name, agents, properties, batchCount, dataDir);
236 break;
237 case AVRO:
238 if (agents == null || agents.length == 0) {
239 LOGGER.debug("No agents provided, using defaults");
240 agents = new Agent[] {Agent.createAgent(null, null)};
241 }
242 manager = FlumeAvroManager.getManager(name, agents, batchCount, retries, connectTimeout, reqTimeout);
243 break;
244 case PERSISTENT:
245 if (agents == null || agents.length == 0) {
246 LOGGER.debug("No agents provided, using defaults");
247 agents = new Agent[] {Agent.createAgent(null, null)};
248 }
249 manager = FlumePersistentManager.getManager(name, agents, properties, batchCount, retries,
250 connectTimeout, reqTimeout, delay, lockTimeoutRetryCount, dataDir);
251 break;
252 default:
253 LOGGER.debug("No manager type specified. Defaulting to AVRO");
254 if (agents == null || agents.length == 0) {
255 LOGGER.debug("No agents provided, using defaults");
256 agents = new Agent[] {Agent.createAgent(null, null)};
257 }
258 manager = FlumeAvroManager.getManager(name, agents, batchCount, retries, connectTimeout, reqTimeout);
259 }
260
261 if (manager == null) {
262 return null;
263 }
264
265 return new FlumeAppender(name, filter, layout, ignoreExceptions, includes,
266 excludes, required, mdcPrefix, eventPrefix, compress, factory, manager);
267 }
268 }