1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.flume.appender;
18
19 import org.apache.logging.log4j.core.Filter;
20 import org.apache.logging.log4j.core.Layout;
21 import org.apache.logging.log4j.core.LogEvent;
22 import org.apache.logging.log4j.core.appender.AbstractAppender;
23 import org.apache.logging.log4j.core.config.Property;
24 import org.apache.logging.log4j.core.config.plugins.Plugin;
25 import org.apache.logging.log4j.core.config.plugins.PluginAttr;
26 import org.apache.logging.log4j.core.config.plugins.PluginElement;
27 import org.apache.logging.log4j.core.config.plugins.PluginFactory;
28 import org.apache.logging.log4j.core.layout.RFC5424Layout;
29
30
31
32
33 @Plugin(name = "Flume", type = "Core", elementType = "appender", printObject = true)
34 public final class FlumeAppender extends AbstractAppender implements FlumeEventFactory {
35
36 private AbstractFlumeManager manager;
37
38 private final String mdcIncludes;
39 private final String mdcExcludes;
40 private final String mdcRequired;
41
42 private final String eventPrefix;
43
44 private final String mdcPrefix;
45
46 private final boolean compressBody;
47
48 private final int reconnectDelay;
49
50 private final int retries;
51
52 private final FlumeEventFactory factory;
53
54 private FlumeAppender(String name, Filter filter, Layout layout, boolean handleException,
55 String includes, String excludes, String required, String mdcPrefix,
56 String eventPrefix, boolean compress, int delay, int retries,
57 FlumeEventFactory factory, AbstractFlumeManager manager) {
58 super(name, filter, layout, handleException);
59 this.manager = manager;
60 this.mdcIncludes = includes;
61 this.mdcExcludes = excludes;
62 this.mdcRequired = required;
63 this.eventPrefix = eventPrefix;
64 this.mdcPrefix = mdcPrefix;
65 this.compressBody = compress;
66 this.reconnectDelay = delay;
67 this.retries = retries;
68 this.factory = factory == null ? this : factory;
69 }
70
71
72
73
74
75 public void append(LogEvent event) {
76
77 FlumeEvent flumeEvent = factory.createEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
78 eventPrefix, compressBody);
79 flumeEvent.setBody(getLayout().toByteArray(flumeEvent));
80 manager.send(flumeEvent, reconnectDelay, retries);
81 }
82
83 @Override
84 public void stop() {
85 super.stop();
86 manager.release();
87 }
88
89
90
91
92
93
94
95
96
97
98
99
100 public FlumeEvent createEvent(LogEvent event, String includes, String excludes, String required,
101 String mdcPrefix, String eventPrefix, boolean compress) {
102 return new FlumeEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
103 eventPrefix, compressBody);
104 }
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125 @PluginFactory
126 public static FlumeAppender createAppender(@PluginElement("agents") Agent[] agents,
127 @PluginElement("properties") Property[] properties,
128 @PluginAttr("embedded") String embedded,
129 @PluginAttr("dataDir") String dataDir,
130 @PluginAttr("reconnectionDelay") String delay,
131 @PluginAttr("agentRetries") String agentRetries,
132 @PluginAttr("name") String name,
133 @PluginAttr("suppressExceptions") String suppress,
134 @PluginAttr("mdcExcludes") String excludes,
135 @PluginAttr("mdcIncludes") String includes,
136 @PluginAttr("mdcRequired") String required,
137 @PluginAttr("mdcPrefix") String mdcPrefix,
138 @PluginAttr("eventPrefix") String eventPrefix,
139 @PluginAttr("compress") String compressBody,
140 @PluginAttr("batchSize") String batchSize,
141 @PluginElement("flumeEventFactory") FlumeEventFactory factory,
142 @PluginElement("layout") Layout layout,
143 @PluginElement("filters") Filter filter) {
144
145 boolean embed = embedded != null ? Boolean.valueOf(embedded) :
146 (agents == null || agents.length == 0) && properties != null && properties.length > 0;
147 boolean handleExceptions = suppress == null ? true : Boolean.valueOf(suppress);
148 boolean compress = compressBody == null ? true : Boolean.valueOf(compressBody);
149
150 int batchCount = batchSize == null ? 1 : Integer.parseInt(batchSize);
151 int reconnectDelay = delay == null ? 0 : Integer.parseInt(delay);
152 int retries = agentRetries == null ? 0 : Integer.parseInt(agentRetries);
153
154 if (layout == null) {
155 layout = RFC5424Layout.createLayout(null, null, null, "True", null, null, null, null, excludes,
156 includes, required, null, null);
157 }
158
159 if (name == null) {
160 LOGGER.error("No name provided for Appender");
161 return null;
162 }
163
164 AbstractFlumeManager manager;
165
166 if (embed) {
167 manager = FlumeEmbeddedManager.getManager(name, agents, properties, batchCount, dataDir);
168 } else {
169 if (agents == null || agents.length == 0) {
170 LOGGER.debug("No agents provided, using defaults");
171 agents = new Agent[] {Agent.createAgent(null, null)};
172 }
173 manager = FlumeAvroManager.getManager(name, agents, batchCount);
174 }
175
176 if (manager == null) {
177 return null;
178 }
179
180 return new FlumeAppender(name, filter, layout, handleExceptions, includes,
181 excludes, required, mdcPrefix, eventPrefix, compress, reconnectDelay, retries, factory, manager);
182 }
183 }