1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.flume.appender;
18
19 import org.apache.flume.Event;
20 import org.apache.flume.SourceRunner;
21 import org.apache.flume.node.NodeConfiguration;
22 import org.apache.flume.node.nodemanager.DefaultLogicalNodeManager;
23 import org.apache.logging.log4j.core.appender.ManagerFactory;
24 import org.apache.logging.log4j.core.config.ConfigurationException;
25 import org.apache.logging.log4j.core.config.Property;
26 import org.apache.logging.log4j.core.helpers.NameUtil;
27 import org.apache.logging.log4j.util.PropertiesUtil;
28
29 import java.util.Locale;
30 import java.util.Properties;
31
32
33
34
35 public class FlumeEmbeddedManager extends AbstractFlumeManager {
36
37
38 protected static final String SOURCE_NAME = "log4j-source";
39
40 private static FlumeManagerFactory factory = new FlumeManagerFactory();
41
42 private static final String FiLE_SEP = PropertiesUtil.getProperties().getStringProperty("file.separator");
43
44 private static final String IN_MEMORY = "InMemory";
45
46 private final FlumeNode node;
47
48 private NodeConfiguration conf;
49
50 private final Log4jEventSource source;
51
52 private final String shortName;
53
54
55
56
57
58
59
60 protected FlumeEmbeddedManager(final String name, final String shortName, final FlumeNode node) {
61 super(name);
62 this.node = node;
63 this.shortName = shortName;
64 final SourceRunner runner = node.getConfiguration().getSourceRunners().get(SOURCE_NAME);
65 if (runner == null || runner.getSource() == null) {
66 throw new IllegalStateException("No Source has been created for Appender " + shortName);
67 }
68 source = (Log4jEventSource) runner.getSource();
69 }
70
71
72
73
74
75
76
77
78
79
80 public static FlumeEmbeddedManager getManager(final String name, final Agent[] agents, final Property[] properties,
81 int batchSize, final String dataDir) {
82
83 if (batchSize <= 0) {
84 batchSize = 1;
85 }
86
87 if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) {
88 throw new IllegalArgumentException("Either an Agent or properties are required");
89 } else if (agents != null && agents.length > 0 && properties != null && properties.length > 0) {
90 throw new IllegalArgumentException("Cannot configure both Agents and Properties.");
91 }
92
93 final StringBuilder sb = new StringBuilder();
94 boolean first = true;
95
96 if (agents != null && agents.length > 0) {
97 sb.append("FlumeEmbedded[");
98 for (final Agent agent : agents) {
99 if (!first) {
100 sb.append(",");
101 }
102 sb.append(agent.getHost()).append(":").append(agent.getPort());
103 first = false;
104 }
105 sb.append("]");
106 } else {
107 String sep = "";
108 sb.append(name).append(":");
109 final StringBuilder props = new StringBuilder();
110 for (final Property prop : properties) {
111 props.append(sep);
112 props.append(prop.getName()).append("=").append(prop.getValue());
113 sep = ",";
114 }
115 sb.append(NameUtil.md5(props.toString()));
116 }
117 return getManager(sb.toString(), factory,
118 new FactoryData(name, agents, properties, batchSize, dataDir));
119 }
120
121 @Override
122 public void send(final Event event) {
123 source.send(event);
124 }
125
126 @Override
127 protected void releaseSub() {
128 node.stop();
129 }
130
131
132
133
134 private static class FactoryData {
135 private final Agent[] agents;
136 private final Property[] properties;
137 private final int batchSize;
138 private final String dataDir;
139 private final String name;
140
141
142
143
144
145
146
147
148
149 public FactoryData(final String name, final Agent[] agents, final Property[] properties, final int batchSize,
150 final String dataDir) {
151 this.name = name;
152 this.agents = agents;
153 this.batchSize = batchSize;
154 this.properties = properties;
155 this.dataDir = dataDir;
156 }
157 }
158
159
160
161
162 private static class FlumeManagerFactory implements ManagerFactory<FlumeEmbeddedManager, FactoryData> {
163 private static final String SOURCE_TYPE = Log4jEventSource.class.getName();
164
165
166
167
168
169
170
171 @Override
172 public FlumeEmbeddedManager createManager(final String name, final FactoryData data) {
173 try {
174 final DefaultLogicalNodeManager nodeManager = new DefaultLogicalNodeManager();
175 final Properties props = createProperties(data.name, data.agents, data.properties, data.batchSize,
176 data.dataDir);
177 final FlumeConfigurationBuilder builder = new FlumeConfigurationBuilder();
178 final NodeConfiguration conf = builder.load(data.name, props, nodeManager);
179
180 final FlumeNode node = new FlumeNode(nodeManager, nodeManager, conf);
181
182 node.start();
183
184 return new FlumeEmbeddedManager(name, data.name, node);
185 } catch (final Exception ex) {
186 LOGGER.error("Could not create FlumeEmbeddedManager", ex);
187 }
188 return null;
189 }
190
191 private Properties createProperties(final String name, final Agent[] agents, final Property[] properties,
192 final int batchSize, String dataDir) {
193 final Properties props = new Properties();
194
195 if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) {
196 LOGGER.error("No Flume configuration provided");
197 throw new ConfigurationException("No Flume configuration provided");
198 }
199
200 if ((agents != null && agents.length > 0 && properties != null && properties.length > 0)) {
201 LOGGER.error("Agents and Flume configuration cannot both be specified");
202 throw new ConfigurationException("Agents and Flume configuration cannot both be specified");
203 }
204
205 if (agents != null && agents.length > 0) {
206 props.put(name + ".sources", FlumeEmbeddedManager.SOURCE_NAME);
207 props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".type", SOURCE_TYPE);
208
209 if (dataDir != null && dataDir.length() > 0) {
210 if (dataDir.equals(IN_MEMORY)) {
211 props.put(name + ".channels", "primary");
212 props.put(name + ".channels.primary.type", "memory");
213 } else {
214 props.put(name + ".channels", "primary");
215 props.put(name + ".channels.primary.type", "file");
216
217 if (!dataDir.endsWith(FiLE_SEP)) {
218 dataDir = dataDir + FiLE_SEP;
219 }
220
221 props.put(name + ".channels.primary.checkpointDir", dataDir + "checkpoint");
222 props.put(name + ".channels.primary.dataDirs", dataDir + "data");
223 }
224
225 } else {
226 props.put(name + ".channels", "primary");
227 props.put(name + ".channels.primary.type", "file");
228 }
229
230 final StringBuilder sb = new StringBuilder();
231 String leading = "";
232 int priority = agents.length;
233 for (int i = 0; i < agents.length; ++i) {
234 sb.append(leading).append("agent").append(i);
235 leading = " ";
236 final String prefix = name + ".sinks.agent" + i;
237 props.put(prefix + ".channel", "primary");
238 props.put(prefix + ".type", "avro");
239 props.put(prefix + ".hostname", agents[i].getHost());
240 props.put(prefix + ".port", Integer.toString(agents[i].getPort()));
241 props.put(prefix + ".batch-size", Integer.toString(batchSize));
242 props.put(name + ".sinkgroups.group1.processor.priority.agent" + i, Integer.toString(priority));
243 --priority;
244 }
245 props.put(name + ".sinks", sb.toString());
246 props.put(name + ".sinkgroups", "group1");
247 props.put(name + ".sinkgroups.group1.sinks", sb.toString());
248 props.put(name + ".sinkgroups.group1.processor.type", "failover");
249 final String sourceChannels = "primary";
250 props.put(name + ".channels", sourceChannels);
251 props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".channels", sourceChannels);
252 } else {
253 String channels = null;
254 String[] sinks = null;
255
256 props.put(name + ".sources", FlumeEmbeddedManager.SOURCE_NAME);
257 props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".type", SOURCE_TYPE);
258
259 for (final Property property : properties) {
260 final String key = property.getName();
261
262 if (key == null || key.length() == 0) {
263 final String msg = "A property name must be provided";
264 LOGGER.error(msg);
265 throw new ConfigurationException(msg);
266 }
267
268 final String upperKey = key.toUpperCase(Locale.ENGLISH);
269
270 if (upperKey.startsWith(name.toUpperCase(Locale.ENGLISH))) {
271 final String msg =
272 "Specification of the agent name is not allowed in Flume Appender configuration: " + key;
273 LOGGER.error(msg);
274 throw new ConfigurationException(msg);
275 }
276
277
278 if (upperKey.startsWith("SOURCES.") && !upperKey.startsWith("SOURCES.LOG4J-SOURCE.INTERCEPTORS")) {
279 final String msg = "Specification of Sources is not allowed in Flume Appender: " + key;
280 LOGGER.error(msg);
281 throw new ConfigurationException(msg);
282 }
283
284 final String value = property.getValue();
285 if (value == null || value.length() == 0) {
286 final String msg = "A value for property " + key + " must be provided";
287 LOGGER.error(msg);
288 throw new ConfigurationException(msg);
289 }
290
291 if (upperKey.equals("CHANNELS")) {
292 channels = value.trim();
293 } else if (upperKey.equals("SINKS")) {
294 sinks = value.trim().split(" ");
295 }
296
297 props.put(name + '.' + key, value);
298 }
299
300 String sourceChannels = channels;
301
302 if (channels == null) {
303 sourceChannels = "primary";
304 props.put(name + ".channels", sourceChannels);
305 }
306
307 props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".channels", sourceChannels);
308
309 if (sinks == null || sinks.length == 0) {
310 final String msg = "At least one Sink must be specified";
311 LOGGER.error(msg);
312 throw new ConfigurationException(msg);
313 }
314 }
315 return props;
316 }
317
318 }
319
320 }