1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.flume.appender;
18
19 import java.io.ByteArrayOutputStream;
20 import java.io.IOException;
21 import java.util.ArrayList;
22 import java.util.HashMap;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.zip.GZIPOutputStream;
26
27 import org.apache.flume.event.SimpleEvent;
28 import org.apache.logging.log4j.Level;
29 import org.apache.logging.log4j.LoggingException;
30 import org.apache.logging.log4j.Marker;
31 import org.apache.logging.log4j.ThreadContext;
32 import org.apache.logging.log4j.core.LogEvent;
33 import org.apache.logging.log4j.core.impl.ThrowableProxy;
34 import org.apache.logging.log4j.core.util.Patterns;
35 import org.apache.logging.log4j.core.util.UuidUtil;
36 import org.apache.logging.log4j.message.MapMessage;
37 import org.apache.logging.log4j.message.Message;
38 import org.apache.logging.log4j.message.StructuredDataId;
39 import org.apache.logging.log4j.message.StructuredDataMessage;
40 import org.apache.logging.log4j.util.Strings;
41
42
43
44
45 public class FlumeEvent extends SimpleEvent implements LogEvent {
46
47 static final String GUID = "guId";
48
49
50
51 private static final long serialVersionUID = -8988674608627854140L;
52
53 private static final String DEFAULT_MDC_PREFIX = Strings.EMPTY;
54
55 private static final String DEFAULT_EVENT_PREFIX = Strings.EMPTY;
56
57 private static final String EVENT_TYPE = "eventType";
58
59 private static final String EVENT_ID = "eventId";
60
61 private static final String TIMESTAMP = "timeStamp";
62
63 private final LogEvent event;
64
65 private final Map<String, String> contextMap = new HashMap<>();
66
67 private final boolean compress;
68
69
70
71
72
73
74
75
76
77
78
79 public FlumeEvent(final LogEvent event, final String includes, final String excludes, final String required,
80 String mdcPrefix, String eventPrefix, final boolean compress) {
81 this.event = event;
82 this.compress = compress;
83 final Map<String, String> headers = getHeaders();
84 headers.put(TIMESTAMP, Long.toString(event.getTimeMillis()));
85 if (mdcPrefix == null) {
86 mdcPrefix = DEFAULT_MDC_PREFIX;
87 }
88 if (eventPrefix == null) {
89 eventPrefix = DEFAULT_EVENT_PREFIX;
90 }
91 final Map<String, String> mdc = event.getContextMap();
92 if (includes != null) {
93 final String[] array = includes.split(Patterns.COMMA_SEPARATOR);
94 if (array.length > 0) {
95 for (String str : array) {
96 str = str.trim();
97 if (mdc.containsKey(str)) {
98 contextMap.put(str, mdc.get(str));
99 }
100 }
101 }
102 } else if (excludes != null) {
103 final String[] array = excludes.split(Patterns.COMMA_SEPARATOR);
104 if (array.length > 0) {
105 final List<String> list = new ArrayList<>(array.length);
106 for (final String value : array) {
107 list.add(value.trim());
108 }
109 for (final Map.Entry<String, String> entry : mdc.entrySet()) {
110 if (!list.contains(entry.getKey())) {
111 contextMap.put(entry.getKey(), entry.getValue());
112 }
113 }
114 }
115 } else {
116 contextMap.putAll(mdc);
117 }
118
119 if (required != null) {
120 final String[] array = required.split(Patterns.COMMA_SEPARATOR);
121 if (array.length > 0) {
122 for (String str : array) {
123 str = str.trim();
124 if (!mdc.containsKey(str)) {
125 throw new LoggingException("Required key " + str + " is missing from the MDC");
126 }
127 }
128 }
129 }
130 final String guid = UuidUtil.getTimeBasedUuid().toString();
131 final Message message = event.getMessage();
132 if (message instanceof MapMessage) {
133
134 ((MapMessage) message).put(GUID, guid);
135 if (message instanceof StructuredDataMessage) {
136 addStructuredData(eventPrefix, headers, (StructuredDataMessage) message);
137 }
138 addMapData(eventPrefix, headers, (MapMessage) message);
139 } else {
140 headers.put(GUID, guid);
141 }
142
143 addContextData(mdcPrefix, headers, contextMap);
144 }
145
146 protected void addStructuredData(final String prefix, final Map<String, String> fields,
147 final StructuredDataMessage msg) {
148 fields.put(prefix + EVENT_TYPE, msg.getType());
149 final StructuredDataId id = msg.getId();
150 fields.put(prefix + EVENT_ID, id.getName());
151 }
152
153 protected void addMapData(final String prefix, final Map<String, String> fields, final MapMessage msg) {
154 final Map<String, String> data = msg.getData();
155 for (final Map.Entry<String, String> entry : data.entrySet()) {
156 fields.put(prefix + entry.getKey(), entry.getValue());
157 }
158 }
159
160 protected void addContextData(final String prefix, final Map<String, String> fields,
161 final Map<String, String> context) {
162 final Map<String, String> map = new HashMap<>();
163 for (final Map.Entry<String, String> entry : context.entrySet()) {
164 if (entry.getKey() != null && entry.getValue() != null) {
165 fields.put(prefix + entry.getKey(), entry.getValue());
166 map.put(prefix + entry.getKey(), entry.getValue());
167 }
168 }
169 context.clear();
170 context.putAll(map);
171 }
172
173
174
175
176
177 @Override
178 public void setBody(final byte[] body) {
179 if (body == null || body.length == 0) {
180 super.setBody(new byte[0]);
181 return;
182 }
183 if (compress) {
184 final ByteArrayOutputStream baos = new ByteArrayOutputStream();
185 try (GZIPOutputStream os = new GZIPOutputStream(baos)) {
186 os.write(body);
187 } catch (final IOException ioe) {
188 throw new LoggingException("Unable to compress message", ioe);
189 }
190 super.setBody(baos.toByteArray());
191 } else {
192 super.setBody(body);
193 }
194 }
195
196
197
198
199
200 @Override
201 public String getLoggerFqcn() {
202 return event.getLoggerFqcn();
203 }
204
205
206
207
208
209 @Override
210 public Level getLevel() {
211 return event.getLevel();
212 }
213
214
215
216
217
218 @Override
219 public String getLoggerName() {
220 return event.getLoggerName();
221 }
222
223
224
225
226
227 @Override
228 public StackTraceElement getSource() {
229 return event.getSource();
230 }
231
232
233
234
235
236 @Override
237 public Message getMessage() {
238 return event.getMessage();
239 }
240
241
242
243
244
245 @Override
246 public Marker getMarker() {
247 return event.getMarker();
248 }
249
250
251
252
253
254 @Override
255 public String getThreadName() {
256 return event.getThreadName();
257 }
258
259
260
261
262
263 @Override
264 public long getTimeMillis() {
265 return event.getTimeMillis();
266 }
267
268
269
270
271
272
273 @Override
274 public long getNanoTime() {
275 return event.getNanoTime();
276 }
277
278
279
280
281
282 @Override
283 public Throwable getThrown() {
284 return event.getThrown();
285 }
286
287
288
289
290
291 @Override
292 public ThrowableProxy getThrownProxy() {
293 return event.getThrownProxy();
294 }
295
296
297
298
299
300 @Override
301 public Map<String, String> getContextMap() {
302 return contextMap;
303 }
304
305
306
307
308
309 @Override
310 public ThreadContext.ContextStack getContextStack() {
311 return event.getContextStack();
312 }
313
314 @Override
315 public boolean isIncludeLocation() {
316 return event.isIncludeLocation();
317 }
318
319 @Override
320 public void setIncludeLocation(final boolean includeLocation) {
321 event.setIncludeLocation(includeLocation);
322 }
323
324 @Override
325 public boolean isEndOfBatch() {
326 return event.isEndOfBatch();
327 }
328
329 @Override
330 public void setEndOfBatch(final boolean endOfBatch) {
331 event.setEndOfBatch(endOfBatch);
332 }
333 }