1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.flume.appender;
18
19 import org.apache.flume.SourceRunner;
20 import org.apache.flume.node.NodeConfiguration;
21 import org.apache.flume.node.nodemanager.DefaultLogicalNodeManager;
22 import org.apache.logging.log4j.core.appender.ManagerFactory;
23 import org.apache.logging.log4j.core.config.ConfigurationException;
24 import org.apache.logging.log4j.core.config.Property;
25 import org.apache.logging.log4j.core.helpers.NameUtil;
26 import org.apache.logging.log4j.util.PropertiesUtil;
27
28 import java.util.Locale;
29 import java.util.Properties;
30
31
32
33
34 public class FlumeEmbeddedManager extends AbstractFlumeManager {
35
36
37 protected static final String SOURCE_NAME = "log4j-source";
38
39 private static ManagerFactory factory = new FlumeManagerFactory();
40
41 private static final String FiLE_SEP = PropertiesUtil.getProperties().getStringProperty("file.separator");
42
43 private static final String IN_MEMORY = "InMemory";
44
45 private final FlumeNode node;
46
47 private NodeConfiguration conf;
48
49 private final Log4jEventSource source;
50
51 private final String shortName;
52
53
54
55
56
57
58
59 protected FlumeEmbeddedManager(final String name, final String shortName, final FlumeNode node) {
60 super(name);
61 this.node = node;
62 this.shortName = shortName;
63 final SourceRunner runner = node.getConfiguration().getSourceRunners().get(SOURCE_NAME);
64 if (runner == null || runner.getSource() == null) {
65 throw new IllegalStateException("No Source has been created for Appender " + shortName);
66 }
67 source = (Log4jEventSource) runner.getSource();
68 }
69
70
71
72
73
74
75
76
77
78
79 public static FlumeEmbeddedManager getManager(final String name, final Agent[] agents, final Property[] properties,
80 int batchSize, final String dataDir) {
81
82 if (batchSize <= 0) {
83 batchSize = 1;
84 }
85
86 if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) {
87 throw new IllegalArgumentException("Either an Agent or properties are required");
88 } else if (agents != null && agents.length > 0 && properties != null && properties.length > 0) {
89 throw new IllegalArgumentException("Cannot configure both Agents and Properties.");
90 }
91
92 final StringBuilder sb = new StringBuilder();
93 boolean first = true;
94
95 if (agents != null && agents.length > 0) {
96 sb.append("FlumeEmbedded[");
97 for (final Agent agent : agents) {
98 if (!first) {
99 sb.append(",");
100 }
101 sb.append(agent.getHost()).append(":").append(agent.getPort());
102 first = false;
103 }
104 sb.append("]");
105 } else {
106 String sep = "";
107 sb.append(name).append(":");
108 final StringBuilder props = new StringBuilder();
109 for (final Property prop : properties) {
110 props.append(sep);
111 props.append(prop.getName()).append("=").append(prop.getValue());
112 sep = ",";
113 }
114 sb.append(NameUtil.md5(props.toString()));
115 }
116 return (FlumeEmbeddedManager) getManager(sb.toString(), factory,
117 new FactoryData(name, agents, properties, batchSize, dataDir));
118 }
119
120 @Override
121 public void send(final FlumeEvent event, final int delay, final int retries) {
122 source.send(event);
123 }
124
125 @Override
126 protected void releaseSub() {
127 node.stop();
128 }
129
130
131
132
133 private static class FactoryData {
134 private final Agent[] agents;
135 private final Property[] properties;
136 private final int batchSize;
137 private final String dataDir;
138 private final String name;
139
140
141
142
143
144
145
146
147
148 public FactoryData(final String name, final Agent[] agents, final Property[] properties, final int batchSize,
149 final String dataDir) {
150 this.name = name;
151 this.agents = agents;
152 this.batchSize = batchSize;
153 this.properties = properties;
154 this.dataDir = dataDir;
155 }
156 }
157
158
159
160
161 private static class FlumeManagerFactory implements ManagerFactory<FlumeEmbeddedManager, FactoryData> {
162 private static final String SOURCE_TYPE = Log4jEventSource.class.getName();
163
164
165
166
167
168
169
170 public FlumeEmbeddedManager createManager(final String name, final FactoryData data) {
171 try {
172 final DefaultLogicalNodeManager nodeManager = new DefaultLogicalNodeManager();
173 final Properties props = createProperties(data.name, data.agents, data.properties, data.batchSize,
174 data.dataDir);
175 final FlumeConfigurationBuilder builder = new FlumeConfigurationBuilder();
176 final NodeConfiguration conf = builder.load(data.name, props, nodeManager);
177
178 final FlumeNode node = new FlumeNode(nodeManager, nodeManager, conf);
179
180 node.start();
181
182 return new FlumeEmbeddedManager(name, data.name, node);
183 } catch (final Exception ex) {
184 LOGGER.error("Could not create FlumeEmbeddedManager", ex);
185 }
186 return null;
187 }
188
189 private Properties createProperties(final String name, final Agent[] agents, final Property[] properties,
190 final int batchSize, String dataDir) {
191 final Properties props = new Properties();
192
193 if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) {
194 LOGGER.error("No Flume configuration provided");
195 throw new ConfigurationException("No Flume configuration provided");
196 }
197
198 if ((agents != null && agents.length > 0 && properties != null && properties.length > 0)) {
199 LOGGER.error("Agents and Flume configuration cannot both be specified");
200 throw new ConfigurationException("Agents and Flume configuration cannot both be specified");
201 }
202
203 if (agents != null && agents.length > 0) {
204 props.put(name + ".sources", FlumeEmbeddedManager.SOURCE_NAME);
205 props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".type", SOURCE_TYPE);
206
207 if (dataDir != null && dataDir.length() > 0) {
208 if (dataDir.equals(IN_MEMORY)) {
209 props.put(name + ".channels", "primary");
210 props.put(name + ".channels.primary.type", "memory");
211 } else {
212 props.put(name + ".channels", "primary");
213 props.put(name + ".channels.primary.type", "file");
214
215 if (!dataDir.endsWith(FiLE_SEP)) {
216 dataDir = dataDir + FiLE_SEP;
217 }
218
219 props.put(name + ".channels.primary.checkpointDir", dataDir + "checkpoint");
220 props.put(name + ".channels.primary.dataDirs", dataDir + "data");
221 }
222
223 } else {
224 props.put(name + ".channels", "primary");
225 props.put(name + ".channels.primary.type", "file");
226 }
227
228 final StringBuilder sb = new StringBuilder();
229 String leading = "";
230 int priority = agents.length;
231 for (int i = 0; i < agents.length; ++i) {
232 sb.append(leading).append("agent").append(i);
233 leading = " ";
234 final String prefix = name + ".sinks.agent" + i;
235 props.put(prefix + ".channel", "primary");
236 props.put(prefix + ".type", "avro");
237 props.put(prefix + ".hostname", agents[i].getHost());
238 props.put(prefix + ".port", Integer.toString(agents[i].getPort()));
239 props.put(prefix + ".batch-size", Integer.toString(batchSize));
240 props.put(name + ".sinkgroups.group1.processor.priority.agent" + i, Integer.toString(priority));
241 --priority;
242 }
243 props.put(name + ".sinks", sb.toString());
244 props.put(name + ".sinkgroups", "group1");
245 props.put(name + ".sinkgroups.group1.sinks", sb.toString());
246 props.put(name + ".sinkgroups.group1.processor.type", "failover");
247 final String sourceChannels = "primary";
248 props.put(name + ".channels", sourceChannels);
249 props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".channels", sourceChannels);
250 } else {
251 String channels = null;
252 String[] sinks = null;
253
254 props.put(name + ".sources", FlumeEmbeddedManager.SOURCE_NAME);
255 props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".type", SOURCE_TYPE);
256
257 for (final Property property : properties) {
258 final String key = property.getName();
259
260 if (key == null || key.length() == 0) {
261 final String msg = "A property name must be provided";
262 LOGGER.error(msg);
263 throw new ConfigurationException(msg);
264 }
265
266 final String upperKey = key.toUpperCase(Locale.ENGLISH);
267
268 if (upperKey.startsWith(name.toUpperCase(Locale.ENGLISH))) {
269 final String msg =
270 "Specification of the agent name is allowed in Flume Appender configuration: " + key;
271 LOGGER.error(msg);
272 throw new ConfigurationException(msg);
273 }
274
275 if (upperKey.startsWith("SOURCES.")) {
276 final String msg = "Specification of Sources is not allowed in Flume Appender: " + key;
277 LOGGER.error(msg);
278 throw new ConfigurationException(msg);
279 }
280
281 final String value = property.getValue();
282 if (value == null || value.length() == 0) {
283 final String msg = "A value for property " + key + " must be provided";
284 LOGGER.error(msg);
285 throw new ConfigurationException(msg);
286 }
287
288 if (upperKey.equals("CHANNELS")) {
289 channels = value.trim();
290 } else if (upperKey.equals("SINKS")) {
291 sinks = value.trim().split(" ");
292 }
293
294 props.put(name + '.' + key, value);
295 }
296
297 String sourceChannels = channels;
298
299 if (channels == null) {
300 sourceChannels = "primary";
301 props.put(name + ".channels", sourceChannels);
302 }
303
304 props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".channels", sourceChannels);
305
306 if (sinks == null || sinks.length == 0) {
307 final String msg = "At least one Sink must be specified";
308 LOGGER.error(msg);
309 throw new ConfigurationException(msg);
310 }
311 }
312 return props;
313 }
314
315 }
316
317 }