1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.flume.appender;
18
19 import org.apache.flume.event.SimpleEvent;
20 import org.apache.logging.log4j.Level;
21 import org.apache.logging.log4j.LoggingException;
22 import org.apache.logging.log4j.Marker;
23 import org.apache.logging.log4j.ThreadContext;
24 import org.apache.logging.log4j.core.LogEvent;
25 import org.apache.logging.log4j.core.helpers.UUIDUtil;
26 import org.apache.logging.log4j.message.MapMessage;
27 import org.apache.logging.log4j.message.Message;
28 import org.apache.logging.log4j.message.StructuredDataId;
29 import org.apache.logging.log4j.message.StructuredDataMessage;
30
31 import java.io.ByteArrayOutputStream;
32 import java.io.IOException;
33 import java.util.Arrays;
34 import java.util.HashMap;
35 import java.util.List;
36 import java.util.Map;
37 import java.util.zip.GZIPOutputStream;
38
39
40
41
42 public class FlumeEvent extends SimpleEvent implements LogEvent {
43
44 private static final String DEFAULT_MDC_PREFIX = "mdc:";
45
46 private static final String DEFAULT_EVENT_PREFIX = "";
47
48 private static final String EVENT_TYPE = "eventType";
49
50 private static final String EVENT_ID = "eventId";
51
52 private static final String GUID = "guId";
53
54 private static final String TIMESTAMP = "timeStamp";;
55
56 private final LogEvent event;
57
58 private final Map<String, String> ctx = new HashMap<String, String>();
59
60 private final boolean compress;
61
62
63
64
65
66
67
68
69
70
71
72 public FlumeEvent(LogEvent event, String includes, String excludes, String required,
73 String mdcPrefix, String eventPrefix, boolean compress) {
74 this.event = event;
75 this.compress = compress;
76 Map<String, String> headers = getHeaders();
77 headers.put(TIMESTAMP, Long.toString(event.getMillis()));
78 if (mdcPrefix == null) {
79 mdcPrefix = DEFAULT_MDC_PREFIX;
80 }
81 if (eventPrefix == null) {
82 eventPrefix = DEFAULT_EVENT_PREFIX;
83 }
84 Map<String, String> mdc = event.getContextMap();
85 if (includes != null) {
86 String[] array = includes.split(",");
87 if (array.length > 0) {
88 for (String str : array) {
89 if (mdc.containsKey(str)) {
90 ctx.put(str, mdc.get(str));
91 }
92 }
93 }
94 } else if (excludes != null) {
95 String[] array = excludes.split(",");
96 if (array.length > 0) {
97 List<String> list = Arrays.asList(array);
98 for (Map.Entry<String, String> entry : mdc.entrySet()) {
99 if (!list.contains(entry.getKey())) {
100 ctx.put(entry.getKey(), entry.getValue());
101 }
102 }
103 }
104 } else {
105 ctx.putAll(mdc);
106 }
107
108 if (required != null) {
109 String[] array = required.split(",");
110 if (array.length > 0) {
111 for (String str : array) {
112 if (!mdc.containsKey(str)) {
113 throw new LoggingException("Required key " + str + " is missing from the MDC");
114 }
115 }
116 }
117 }
118 Message message = event.getMessage();
119 if (message instanceof MapMessage) {
120 if (message instanceof StructuredDataMessage) {
121 addStructuredData(eventPrefix, headers, (StructuredDataMessage) message);
122 }
123 addMapData(eventPrefix, headers, (MapMessage) message);
124 }
125
126 addContextData(mdcPrefix, headers, ctx);
127
128 addGuid(headers);
129 }
130
131 protected void addStructuredData(String prefix, Map<String, String> fields, StructuredDataMessage msg) {
132 fields.put(prefix + EVENT_TYPE, msg.getType());
133 StructuredDataId id = msg.getId();
134 fields.put(prefix + EVENT_ID, id.getName());
135 }
136
137 protected void addMapData(String prefix, Map<String, String> fields, MapMessage msg) {
138 Map<String, String> data = msg.getData();
139 for (Map.Entry<String, String> entry : data.entrySet()) {
140 fields.put(prefix + entry.getKey(), entry.getValue());
141 }
142 }
143
144 protected void addContextData(String prefix, Map<String, String> fields, Map<String, String> context) {
145 for (Map.Entry<String, String> entry : context.entrySet()) {
146 fields.put(prefix + entry.getKey(), entry.getValue());
147 }
148 }
149
150 protected void addGuid(Map<String, String> fields) {
151 fields.put(GUID, UUIDUtil.getTimeBasedUUID().toString());
152 }
153
154
155
156
157
158 @Override
159 public void setBody(byte[] body) {
160 if (body == null || body.length == 0) {
161 super.setBody(new byte[0]);
162 return;
163 }
164 if (compress) {
165 ByteArrayOutputStream baos = new ByteArrayOutputStream();
166 try {
167 GZIPOutputStream os = new GZIPOutputStream(baos);
168 os.write(body);
169 os.close();
170 } catch (IOException ioe) {
171 throw new LoggingException("Unable to compress message", ioe);
172 }
173 super.setBody(baos.toByteArray());
174 } else {
175 super.setBody(body);
176 }
177 }
178
179
180
181
182
183 public String getFQCN() {
184 return event.getFQCN();
185 }
186
187
188
189
190
191 public Level getLevel() {
192 return event.getLevel();
193 }
194
195
196
197
198
199 public String getLoggerName() {
200 return event.getLoggerName();
201 }
202
203
204
205
206
207 public StackTraceElement getSource() {
208 return event.getSource();
209 }
210
211
212
213
214
215 public Message getMessage() {
216 return event.getMessage();
217 }
218
219
220
221
222
223 public Marker getMarker() {
224 return event.getMarker();
225 }
226
227
228
229
230
231 public String getThreadName() {
232 return event.getThreadName();
233 }
234
235
236
237
238
239 public long getMillis() {
240 return event.getMillis();
241 }
242
243
244
245
246
247 public Throwable getThrown() {
248 return event.getThrown();
249 }
250
251
252
253
254
255 public Map<String, String> getContextMap() {
256 return ctx;
257 }
258
259
260
261
262
263 public ThreadContext.ContextStack getContextStack() {
264 return event.getContextStack();
265 }
266 }