1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.flume.appender;
18
19 import org.apache.logging.log4j.core.Filter;
20 import org.apache.logging.log4j.core.Layout;
21 import org.apache.logging.log4j.core.LogEvent;
22 import org.apache.logging.log4j.core.appender.AbstractAppender;
23 import org.apache.logging.log4j.core.config.Property;
24 import org.apache.logging.log4j.core.config.plugins.Plugin;
25 import org.apache.logging.log4j.core.config.plugins.PluginAttr;
26 import org.apache.logging.log4j.core.config.plugins.PluginElement;
27 import org.apache.logging.log4j.core.config.plugins.PluginFactory;
28 import org.apache.logging.log4j.core.layout.RFC5424Layout;
29
30 import java.io.Serializable;
31 import java.util.Locale;
32
33
34
35
36 @Plugin(name = "Flume", category = "Core", elementType = "appender", printObject = true)
37 public final class FlumeAppender<T extends Serializable> extends AbstractAppender<T> implements FlumeEventFactory {
38
39 private final AbstractFlumeManager manager;
40
41 private final String mdcIncludes;
42 private final String mdcExcludes;
43 private final String mdcRequired;
44
45 private final String eventPrefix;
46
47 private final String mdcPrefix;
48
49 private final boolean compressBody;
50
51 private final FlumeEventFactory factory;
52
53 private enum ManagerType {
54 AVRO, EMBEDDED, PERSISTENT;
55
56 public static ManagerType getType(String type) {
57 return valueOf(type.toUpperCase(Locale.US));
58 }
59 }
60
61 private FlumeAppender(final String name, final Filter filter, final Layout<T> layout, final boolean handleException,
62 final String includes, final String excludes, final String required, final String mdcPrefix,
63 final String eventPrefix, final boolean compress,
64 final FlumeEventFactory factory, final AbstractFlumeManager manager) {
65 super(name, filter, layout, handleException);
66 this.manager = manager;
67 this.mdcIncludes = includes;
68 this.mdcExcludes = excludes;
69 this.mdcRequired = required;
70 this.eventPrefix = eventPrefix;
71 this.mdcPrefix = mdcPrefix;
72 this.compressBody = compress;
73 this.factory = factory == null ? this : factory;
74 }
75
76
77
78
79
80 @Override
81 public void append(final LogEvent event) {
82
83 final FlumeEvent flumeEvent = factory.createEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
84 eventPrefix, compressBody);
85 flumeEvent.setBody(getLayout().toByteArray(flumeEvent));
86 manager.send(flumeEvent);
87 }
88
89 @Override
90 public void stop() {
91 super.stop();
92 manager.release();
93 }
94
95
96
97
98
99
100
101
102
103
104
105
106 @Override
107 public FlumeEvent createEvent(final LogEvent event, final String includes, final String excludes,
108 final String required, final String mdcPrefix, final String eventPrefix,
109 final boolean compress) {
110 return new FlumeEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
111 eventPrefix, compressBody);
112 }
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141 @PluginFactory
142 public static <S extends Serializable> FlumeAppender<S> createAppender(@PluginElement("agents") Agent[] agents,
143 @PluginElement("properties") final Property[] properties,
144 @PluginAttr("embedded") final String embedded,
145 @PluginAttr("type") final String type,
146 @PluginAttr("dataDir") final String dataDir,
147 @PluginAttr("connectTimeout") final String connectionTimeout,
148 @PluginAttr("requestTimeout") final String requestTimeout,
149 @PluginAttr("agentRetries") final String agentRetries,
150 @PluginAttr("maxDelay") final String maxDelay,
151 @PluginAttr("name") final String name,
152 @PluginAttr("suppressExceptions") final String suppress,
153 @PluginAttr("mdcExcludes") final String excludes,
154 @PluginAttr("mdcIncludes") final String includes,
155 @PluginAttr("mdcRequired") final String required,
156 @PluginAttr("mdcPrefix") final String mdcPrefix,
157 @PluginAttr("eventPrefix") final String eventPrefix,
158 @PluginAttr("compress") final String compressBody,
159 @PluginAttr("batchSize") final String batchSize,
160 @PluginElement("flumeEventFactory") final FlumeEventFactory factory,
161 @PluginElement("layout") Layout<S> layout,
162 @PluginElement("filters") final Filter filter) {
163
164 final boolean embed = embedded != null ? Boolean.valueOf(embedded) :
165 (agents == null || agents.length == 0) && properties != null && properties.length > 0;
166 final boolean handleExceptions = suppress == null ? true : Boolean.valueOf(suppress);
167 final boolean compress = compressBody == null ? true : Boolean.valueOf(compressBody);
168 ManagerType managerType;
169 if (type != null) {
170 if (embed && embedded != null) {
171 try {
172 managerType = ManagerType.getType(type);
173 LOGGER.warn("Embedded and type attributes are mutually exclusive. Using type " + type);
174 } catch (Exception ex) {
175 LOGGER.warn("Embedded and type attributes are mutually exclusive and type " + type + " is invalid.");
176 managerType = ManagerType.EMBEDDED;
177 }
178 } else {
179 try {
180 managerType = ManagerType.getType(type);
181 } catch (Exception ex) {
182 LOGGER.warn("Type " + type + " is invalid.");
183 managerType = ManagerType.EMBEDDED;
184 }
185 }
186 } else if (embed) {
187 managerType = ManagerType.EMBEDDED;
188 } else {
189 managerType = ManagerType.AVRO;
190 }
191
192 final int batchCount = batchSize == null ? 1 : Integer.parseInt(batchSize);
193 final int connectTimeout = connectionTimeout == null ? 0 : Integer.parseInt(connectionTimeout);
194 final int reqTimeout = requestTimeout == null ? 0 : Integer.parseInt(requestTimeout);
195 final int retries = agentRetries == null ? 0 : Integer.parseInt(agentRetries);
196 final int delay = maxDelay == null ? 60000 : Integer.parseInt(maxDelay);
197
198 if (layout == null) {
199 @SuppressWarnings({"unchecked", "UnnecessaryLocalVariable"})
200 Layout<S> l = (Layout<S>)RFC5424Layout.createLayout(null, null, null, "True", null, mdcPrefix, eventPrefix,
201 null, null, null, excludes, includes, required, null, null, null);
202 layout = l;
203 }
204
205 if (name == null) {
206 LOGGER.error("No name provided for Appender");
207 return null;
208 }
209
210 AbstractFlumeManager manager;
211
212 switch (managerType) {
213 case EMBEDDED:
214 manager = FlumeEmbeddedManager.getManager(name, agents, properties, batchCount, dataDir);
215 break;
216 case AVRO:
217 if (agents == null || agents.length == 0) {
218 LOGGER.debug("No agents provided, using defaults");
219 agents = new Agent[] {Agent.createAgent(null, null)};
220 }
221 manager = FlumeAvroManager.getManager(name, agents, batchCount, retries, connectTimeout, reqTimeout);
222 break;
223 case PERSISTENT:
224 if (agents == null || agents.length == 0) {
225 LOGGER.debug("No agents provided, using defaults");
226 agents = new Agent[] {Agent.createAgent(null, null)};
227 }
228 manager = FlumePersistentManager.getManager(name, agents, properties, batchCount, retries,
229 connectTimeout, reqTimeout, delay, dataDir);
230 break;
231 default:
232 LOGGER.debug("No manager type specified. Defaulting to AVRO");
233 if (agents == null || agents.length == 0) {
234 LOGGER.debug("No agents provided, using defaults");
235 agents = new Agent[] {Agent.createAgent(null, null)};
236 }
237 manager = FlumeAvroManager.getManager(name, agents, batchCount, retries, connectTimeout, reqTimeout);
238 }
239
240 if (manager == null) {
241 return null;
242 }
243
244 return new FlumeAppender<S>(name, filter, layout, handleExceptions, includes,
245 excludes, required, mdcPrefix, eventPrefix, compress, factory, manager);
246 }
247 }