1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.flume.appender;
18
19 import org.apache.logging.log4j.core.Filter;
20 import org.apache.logging.log4j.core.Layout;
21 import org.apache.logging.log4j.core.LogEvent;
22 import org.apache.logging.log4j.core.appender.AbstractAppender;
23 import org.apache.logging.log4j.core.config.Property;
24 import org.apache.logging.log4j.core.config.plugins.Plugin;
25 import org.apache.logging.log4j.core.config.plugins.PluginAttr;
26 import org.apache.logging.log4j.core.config.plugins.PluginElement;
27 import org.apache.logging.log4j.core.config.plugins.PluginFactory;
28 import org.apache.logging.log4j.core.layout.RFC5424Layout;
29
30
31
32
33 @Plugin(name = "Flume", type = "Core", elementType = "appender", printObject = true)
34 public final class FlumeAppender extends AbstractAppender implements FlumeEventFactory {
35
36 private final AbstractFlumeManager manager;
37
38 private final String mdcIncludes;
39 private final String mdcExcludes;
40 private final String mdcRequired;
41
42 private final String eventPrefix;
43
44 private final String mdcPrefix;
45
46 private final boolean compressBody;
47
48 private final int reconnectDelay;
49
50 private final int retries;
51
52 private final FlumeEventFactory factory;
53
54 private FlumeAppender(final String name, final Filter filter, final Layout layout, final boolean handleException,
55 final String includes, final String excludes, final String required, final String mdcPrefix,
56 final String eventPrefix, final boolean compress, final int delay, final int retries,
57 final FlumeEventFactory factory, final AbstractFlumeManager manager) {
58 super(name, filter, layout, handleException);
59 this.manager = manager;
60 this.mdcIncludes = includes;
61 this.mdcExcludes = excludes;
62 this.mdcRequired = required;
63 this.eventPrefix = eventPrefix;
64 this.mdcPrefix = mdcPrefix;
65 this.compressBody = compress;
66 this.reconnectDelay = delay;
67 this.retries = retries;
68 this.factory = factory == null ? this : factory;
69 }
70
71
72
73
74
75 public void append(final LogEvent event) {
76
77 final FlumeEvent flumeEvent = factory.createEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
78 eventPrefix, compressBody);
79 flumeEvent.setBody(getLayout().toByteArray(flumeEvent));
80 manager.send(flumeEvent, reconnectDelay, retries);
81 }
82
83 @Override
84 public void stop() {
85 super.stop();
86 manager.release();
87 }
88
89
90
91
92
93
94
95
96
97
98
99
100 public FlumeEvent createEvent(final LogEvent event, final String includes, final String excludes,
101 final String required, final String mdcPrefix, final String eventPrefix,
102 final boolean compress) {
103 return new FlumeEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
104 eventPrefix, compressBody);
105 }
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129 @PluginFactory
130 public static FlumeAppender createAppender(@PluginElement("agents") Agent[] agents,
131 @PluginElement("properties") final Property[] properties,
132 @PluginAttr("embedded") final String embedded,
133 @PluginAttr("dataDir") final String dataDir,
134 @PluginAttr("reconnectionDelay") final String delay,
135 @PluginAttr("agentRetries") final String agentRetries,
136 @PluginAttr("name") final String name,
137 @PluginAttr("suppressExceptions") final String suppress,
138 @PluginAttr("mdcExcludes") final String excludes,
139 @PluginAttr("mdcIncludes") final String includes,
140 @PluginAttr("mdcRequired") final String required,
141 @PluginAttr("mdcPrefix") final String mdcPrefix,
142 @PluginAttr("eventPrefix") final String eventPrefix,
143 @PluginAttr("compress") final String compressBody,
144 @PluginAttr("batchSize") final String batchSize,
145 @PluginElement("flumeEventFactory") final FlumeEventFactory factory,
146 @PluginElement("layout") Layout layout,
147 @PluginElement("filters") final Filter filter) {
148
149 final boolean embed = embedded != null ? Boolean.valueOf(embedded) :
150 (agents == null || agents.length == 0) && properties != null && properties.length > 0;
151 final boolean handleExceptions = suppress == null ? true : Boolean.valueOf(suppress);
152 final boolean compress = compressBody == null ? true : Boolean.valueOf(compressBody);
153
154 final int batchCount = batchSize == null ? 1 : Integer.parseInt(batchSize);
155 final int reconnectDelay = delay == null ? 0 : Integer.parseInt(delay);
156 final int retries = agentRetries == null ? 0 : Integer.parseInt(agentRetries);
157
158 if (layout == null) {
159 layout = RFC5424Layout.createLayout(null, null, null, "True", null, null, null, null, excludes,
160 includes, required, null, null, null, null);
161 }
162
163 if (name == null) {
164 LOGGER.error("No name provided for Appender");
165 return null;
166 }
167
168 AbstractFlumeManager manager;
169
170 if (embed) {
171 manager = FlumeEmbeddedManager.getManager(name, agents, properties, batchCount, dataDir);
172 } else {
173 if (agents == null || agents.length == 0) {
174 LOGGER.debug("No agents provided, using defaults");
175 agents = new Agent[] {Agent.createAgent(null, null)};
176 }
177 manager = FlumeAvroManager.getManager(name, agents, batchCount);
178 }
179
180 if (manager == null) {
181 return null;
182 }
183
184 return new FlumeAppender(name, filter, layout, handleExceptions, includes,
185 excludes, required, mdcPrefix, eventPrefix, compress, reconnectDelay, retries, factory, manager);
186 }
187 }