1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.flume.appender;
18
19 import java.io.ByteArrayOutputStream;
20 import java.io.IOException;
21 import java.util.ArrayList;
22 import java.util.HashMap;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.zip.GZIPOutputStream;
26
27 import org.apache.flume.event.SimpleEvent;
28 import org.apache.logging.log4j.Level;
29 import org.apache.logging.log4j.LoggingException;
30 import org.apache.logging.log4j.Marker;
31 import org.apache.logging.log4j.ThreadContext;
32 import org.apache.logging.log4j.core.LogEvent;
33 import org.apache.logging.log4j.core.helpers.UUIDUtil;
34 import org.apache.logging.log4j.message.MapMessage;
35 import org.apache.logging.log4j.message.Message;
36 import org.apache.logging.log4j.message.StructuredDataId;
37 import org.apache.logging.log4j.message.StructuredDataMessage;
38
39
40
41
42 public class FlumeEvent extends SimpleEvent implements LogEvent {
43
44 static final String GUID = "guId";
45
46
47
48 private static final long serialVersionUID = -8988674608627854140L;
49
50 private static final String DEFAULT_MDC_PREFIX = "";
51
52 private static final String DEFAULT_EVENT_PREFIX = "";
53
54 private static final String EVENT_TYPE = "eventType";
55
56 private static final String EVENT_ID = "eventId";
57
58 private static final String TIMESTAMP = "timeStamp";
59
60 private final LogEvent event;
61
62 private final Map<String, String> ctx = new HashMap<String, String>();
63
64 private final boolean compress;
65
66
67
68
69
70
71
72
73
74
75
76 public FlumeEvent(final LogEvent event, final String includes, final String excludes, final String required,
77 String mdcPrefix, String eventPrefix, final boolean compress) {
78 this.event = event;
79 this.compress = compress;
80 final Map<String, String> headers = getHeaders();
81 headers.put(TIMESTAMP, Long.toString(event.getMillis()));
82 if (mdcPrefix == null) {
83 mdcPrefix = DEFAULT_MDC_PREFIX;
84 }
85 if (eventPrefix == null) {
86 eventPrefix = DEFAULT_EVENT_PREFIX;
87 }
88 final Map<String, String> mdc = event.getContextMap();
89 if (includes != null) {
90 final String[] array = includes.split(",");
91 if (array.length > 0) {
92 for (String str : array) {
93 str = str.trim();
94 if (mdc.containsKey(str)) {
95 ctx.put(str, mdc.get(str));
96 }
97 }
98 }
99 } else if (excludes != null) {
100 final String[] array = excludes.split(",");
101 if (array.length > 0) {
102 final List<String> list = new ArrayList<String>(array.length);
103 for (final String value : array) {
104 list.add(value.trim());
105 }
106 for (final Map.Entry<String, String> entry : mdc.entrySet()) {
107 if (!list.contains(entry.getKey())) {
108 ctx.put(entry.getKey(), entry.getValue());
109 }
110 }
111 }
112 } else {
113 ctx.putAll(mdc);
114 }
115
116 if (required != null) {
117 final String[] array = required.split(",");
118 if (array.length > 0) {
119 for (String str : array) {
120 str = str.trim();
121 if (!mdc.containsKey(str)) {
122 throw new LoggingException("Required key " + str + " is missing from the MDC");
123 }
124 }
125 }
126 }
127 final String guid = UUIDUtil.getTimeBasedUUID().toString();
128 final Message message = event.getMessage();
129 if (message instanceof MapMessage) {
130
131 ((MapMessage) message).put(GUID, guid);
132 if (message instanceof StructuredDataMessage) {
133 addStructuredData(eventPrefix, headers, (StructuredDataMessage) message);
134 }
135 addMapData(eventPrefix, headers, (MapMessage) message);
136 } else {
137 headers.put(GUID, guid);
138 }
139
140 addContextData(mdcPrefix, headers, ctx);
141 }
142
143 protected void addStructuredData(final String prefix, final Map<String, String> fields,
144 final StructuredDataMessage msg) {
145 fields.put(prefix + EVENT_TYPE, msg.getType());
146 final StructuredDataId id = msg.getId();
147 fields.put(prefix + EVENT_ID, id.getName());
148 }
149
150 protected void addMapData(final String prefix, final Map<String, String> fields, final MapMessage msg) {
151 final Map<String, String> data = msg.getData();
152 for (final Map.Entry<String, String> entry : data.entrySet()) {
153 fields.put(prefix + entry.getKey(), entry.getValue());
154 }
155 }
156
157 protected void addContextData(final String prefix, final Map<String, String> fields,
158 final Map<String, String> context) {
159 Map<String, String> map = new HashMap<String, String>();
160 for (final Map.Entry<String, String> entry : context.entrySet()) {
161 if (entry.getKey() != null && entry.getValue() != null) {
162 fields.put(prefix + entry.getKey(), entry.getValue());
163 map.put(prefix + entry.getKey(), entry.getValue());
164 }
165 }
166 context.clear();
167 context.putAll(map);
168 }
169
170
171
172
173
174 @Override
175 public void setBody(final byte[] body) {
176 if (body == null || body.length == 0) {
177 super.setBody(new byte[0]);
178 return;
179 }
180 if (compress) {
181 final ByteArrayOutputStream baos = new ByteArrayOutputStream();
182 try {
183 final GZIPOutputStream os = new GZIPOutputStream(baos);
184 os.write(body);
185 os.close();
186 } catch (final IOException ioe) {
187 throw new LoggingException("Unable to compress message", ioe);
188 }
189 super.setBody(baos.toByteArray());
190 } else {
191 super.setBody(body);
192 }
193 }
194
195
196
197
198
199 @Override
200 public String getFQCN() {
201 return event.getFQCN();
202 }
203
204
205
206
207
208 @Override
209 public Level getLevel() {
210 return event.getLevel();
211 }
212
213
214
215
216
217 @Override
218 public String getLoggerName() {
219 return event.getLoggerName();
220 }
221
222
223
224
225
226 @Override
227 public StackTraceElement getSource() {
228 return event.getSource();
229 }
230
231
232
233
234
235 @Override
236 public Message getMessage() {
237 return event.getMessage();
238 }
239
240
241
242
243
244 @Override
245 public Marker getMarker() {
246 return event.getMarker();
247 }
248
249
250
251
252
253 @Override
254 public String getThreadName() {
255 return event.getThreadName();
256 }
257
258
259
260
261
262 @Override
263 public long getMillis() {
264 return event.getMillis();
265 }
266
267
268
269
270
271 @Override
272 public Throwable getThrown() {
273 return event.getThrown();
274 }
275
276
277
278
279
280 @Override
281 public Map<String, String> getContextMap() {
282 return ctx;
283 }
284
285
286
287
288
289 @Override
290 public ThreadContext.ContextStack getContextStack() {
291 return event.getContextStack();
292 }
293
294 @Override
295 public boolean isIncludeLocation() {
296 return event.isIncludeLocation();
297 }
298
299 @Override
300 public void setIncludeLocation(final boolean includeLocation) {
301 event.setIncludeLocation(includeLocation);
302 }
303
304 @Override
305 public boolean isEndOfBatch() {
306 return event.isEndOfBatch();
307 }
308
309 @Override
310 public void setEndOfBatch(final boolean endOfBatch) {
311 event.setEndOfBatch(endOfBatch);
312 }
313 }