1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.flume.appender;
18
19 import org.apache.flume.ChannelException;
20 import org.apache.flume.EventDrivenSource;
21 import org.apache.flume.instrumentation.SourceCounter;
22 import org.apache.flume.source.AbstractSource;
23 import org.slf4j.LoggerFactory;
24 import org.slf4j.Logger;
25
26
27
28
29 public class Log4jEventSource extends AbstractSource implements EventDrivenSource {
30
31 private static final Logger LOGGER = LoggerFactory.getLogger(Log4jEventSource.class);
32
33 private final SourceCounter sourceCounter = new SourceCounter("log4j");
34
35 public Log4jEventSource() {
36 setName("Log4jEvent");
37 }
38
39 @Override
40 public synchronized void start() {
41 super.start();
42
43 LOGGER.info("Log4j Source started");
44 }
45
46 @Override
47 public synchronized void stop() {
48 super.stop();
49
50 LOGGER.info("Log4j Source stopped. Metrics {}", sourceCounter);
51 }
52
53
54 public void send(final FlumeEvent event) {
55 sourceCounter.incrementAppendReceivedCount();
56 sourceCounter.incrementEventReceivedCount();
57 try {
58 getChannelProcessor().processEvent(event);
59 } catch (final ChannelException ex) {
60 LOGGER.warn("Unabled to process event {}" + event, ex);
61 throw ex;
62 }
63 sourceCounter.incrementAppendAcceptedCount();
64 sourceCounter.incrementEventAcceptedCount();
65 }
66 }