1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.flume.appender;
18
19 import org.apache.flume.event.SimpleEvent;
20 import org.apache.logging.log4j.Level;
21 import org.apache.logging.log4j.LoggingException;
22 import org.apache.logging.log4j.Marker;
23 import org.apache.logging.log4j.ThreadContext;
24 import org.apache.logging.log4j.core.LogEvent;
25 import org.apache.logging.log4j.core.helpers.UUIDUtil;
26 import org.apache.logging.log4j.message.MapMessage;
27 import org.apache.logging.log4j.message.Message;
28 import org.apache.logging.log4j.message.StructuredDataId;
29 import org.apache.logging.log4j.message.StructuredDataMessage;
30
31 import java.io.ByteArrayOutputStream;
32 import java.io.IOException;
33 import java.util.ArrayList;
34 import java.util.HashMap;
35 import java.util.List;
36 import java.util.Map;
37 import java.util.zip.GZIPOutputStream;
38
39
40
41
42 public class FlumeEvent extends SimpleEvent implements LogEvent {
43
44
45
46
47 private static final long serialVersionUID = -8988674608627854140L;
48
49 private static final String DEFAULT_MDC_PREFIX = "mdc:";
50
51 private static final String DEFAULT_EVENT_PREFIX = "";
52
53 private static final String EVENT_TYPE = "eventType";
54
55 private static final String EVENT_ID = "eventId";
56
57 static final String GUID = "guId";
58
59 private static final String TIMESTAMP = "timeStamp";;
60
61 private final LogEvent event;
62
63 private final Map<String, String> ctx = new HashMap<String, String>();
64
65 private final boolean compress;
66
67
68
69
70
71
72
73
74
75
76
77 public FlumeEvent(final LogEvent event, final String includes, final String excludes, final String required,
78 String mdcPrefix, String eventPrefix, final boolean compress) {
79 this.event = event;
80 this.compress = compress;
81 final Map<String, String> headers = getHeaders();
82 headers.put(TIMESTAMP, Long.toString(event.getMillis()));
83 if (mdcPrefix == null) {
84 mdcPrefix = DEFAULT_MDC_PREFIX;
85 }
86 if (eventPrefix == null) {
87 eventPrefix = DEFAULT_EVENT_PREFIX;
88 }
89 final Map<String, String> mdc = event.getContextMap();
90 if (includes != null) {
91 final String[] array = includes.split(",");
92 if (array.length > 0) {
93 for (String str : array) {
94 str = str.trim();
95 if (mdc.containsKey(str)) {
96 ctx.put(str, mdc.get(str));
97 }
98 }
99 }
100 } else if (excludes != null) {
101 final String[] array = excludes.split(",");
102 if (array.length > 0) {
103 final List<String> list = new ArrayList<String>(array.length);
104 for (final String value : array) {
105 list.add(value.trim());
106 }
107 for (final Map.Entry<String, String> entry : mdc.entrySet()) {
108 if (!list.contains(entry.getKey())) {
109 ctx.put(entry.getKey(), entry.getValue());
110 }
111 }
112 }
113 } else {
114 ctx.putAll(mdc);
115 }
116
117 if (required != null) {
118 final String[] array = required.split(",");
119 if (array.length > 0) {
120 for (String str : array) {
121 str = str.trim();
122 if (!mdc.containsKey(str)) {
123 throw new LoggingException("Required key " + str + " is missing from the MDC");
124 }
125 }
126 }
127 }
128 final String guid = UUIDUtil.getTimeBasedUUID().toString();
129 final Message message = event.getMessage();
130 if (message instanceof MapMessage) {
131
132 ((MapMessage) message).put(GUID, guid);
133 if (message instanceof StructuredDataMessage) {
134 addStructuredData(eventPrefix, headers, (StructuredDataMessage) message);
135 }
136 addMapData(eventPrefix, headers, (MapMessage) message);
137 } else {
138 headers.put(GUID, guid);
139 }
140
141 addContextData(mdcPrefix, headers, ctx);
142 }
143
144 protected void addStructuredData(final String prefix, final Map<String, String> fields,
145 final StructuredDataMessage msg) {
146 fields.put(prefix + EVENT_TYPE, msg.getType());
147 final StructuredDataId id = msg.getId();
148 fields.put(prefix + EVENT_ID, id.getName());
149 }
150
151 protected void addMapData(final String prefix, final Map<String, String> fields, final MapMessage msg) {
152 final Map<String, String> data = msg.getData();
153 for (final Map.Entry<String, String> entry : data.entrySet()) {
154 fields.put(prefix + entry.getKey(), entry.getValue());
155 }
156 }
157
158 protected void addContextData(final String prefix, final Map<String, String> fields,
159 final Map<String, String> context) {
160 for (final Map.Entry<String, String> entry : context.entrySet()) {
161 if (entry.getKey() != null && entry.getValue() != null) {
162 fields.put(prefix + entry.getKey(), entry.getValue());
163 }
164 }
165 }
166
167
168
169
170
171 @Override
172 public void setBody(final byte[] body) {
173 if (body == null || body.length == 0) {
174 super.setBody(new byte[0]);
175 return;
176 }
177 if (compress) {
178 final ByteArrayOutputStream baos = new ByteArrayOutputStream();
179 try {
180 final GZIPOutputStream os = new GZIPOutputStream(baos);
181 os.write(body);
182 os.close();
183 } catch (final IOException ioe) {
184 throw new LoggingException("Unable to compress message", ioe);
185 }
186 super.setBody(baos.toByteArray());
187 } else {
188 super.setBody(body);
189 }
190 }
191
192
193
194
195
196 @Override
197 public String getFQCN() {
198 return event.getFQCN();
199 }
200
201
202
203
204
205 @Override
206 public Level getLevel() {
207 return event.getLevel();
208 }
209
210
211
212
213
214 @Override
215 public String getLoggerName() {
216 return event.getLoggerName();
217 }
218
219
220
221
222
223 @Override
224 public StackTraceElement getSource() {
225 return event.getSource();
226 }
227
228
229
230
231
232 @Override
233 public Message getMessage() {
234 return event.getMessage();
235 }
236
237
238
239
240
241 @Override
242 public Marker getMarker() {
243 return event.getMarker();
244 }
245
246
247
248
249
250 @Override
251 public String getThreadName() {
252 return event.getThreadName();
253 }
254
255
256
257
258
259 @Override
260 public long getMillis() {
261 return event.getMillis();
262 }
263
264
265
266
267
268 @Override
269 public Throwable getThrown() {
270 return event.getThrown();
271 }
272
273
274
275
276
277 @Override
278 public Map<String, String> getContextMap() {
279 return ctx;
280 }
281
282
283
284
285
286 @Override
287 public ThreadContext.ContextStack getContextStack() {
288 return event.getContextStack();
289 }
290
291 @Override
292 public boolean isIncludeLocation() {
293 return event.isIncludeLocation();
294 }
295
296 @Override
297 public void setIncludeLocation(boolean includeLocation) {
298 event.setIncludeLocation(includeLocation);
299 }
300
301 @Override
302 public boolean isEndOfBatch() {
303 return event.isEndOfBatch();
304 }
305
306 @Override
307 public void setEndOfBatch(boolean endOfBatch) {
308 event.setEndOfBatch(endOfBatch);
309 }
310 }