1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.flume.appender;
18
19 import org.apache.flume.event.SimpleEvent;
20 import org.apache.logging.log4j.Level;
21 import org.apache.logging.log4j.LoggingException;
22 import org.apache.logging.log4j.Marker;
23 import org.apache.logging.log4j.ThreadContext;
24 import org.apache.logging.log4j.core.LogEvent;
25 import org.apache.logging.log4j.core.helpers.UUIDUtil;
26 import org.apache.logging.log4j.message.MapMessage;
27 import org.apache.logging.log4j.message.Message;
28 import org.apache.logging.log4j.message.StructuredDataId;
29 import org.apache.logging.log4j.message.StructuredDataMessage;
30
31 import java.io.ByteArrayOutputStream;
32 import java.io.IOException;
33 import java.util.ArrayList;
34 import java.util.HashMap;
35 import java.util.List;
36 import java.util.Map;
37 import java.util.zip.GZIPOutputStream;
38
39
40
41
42 public class FlumeEvent extends SimpleEvent implements LogEvent {
43
44
45
46
47 private static final long serialVersionUID = -8988674608627854140L;
48
49 private static final String DEFAULT_MDC_PREFIX = "mdc:";
50
51 private static final String DEFAULT_EVENT_PREFIX = "";
52
53 private static final String EVENT_TYPE = "eventType";
54
55 private static final String EVENT_ID = "eventId";
56
57 private static final String GUID = "guId";
58
59 private static final String TIMESTAMP = "timeStamp";;
60
61 private final LogEvent event;
62
63 private final Map<String, String> ctx = new HashMap<String, String>();
64
65 private final boolean compress;
66
67
68
69
70
71
72
73
74
75
76
77 public FlumeEvent(final LogEvent event, final String includes, final String excludes, final String required,
78 String mdcPrefix, String eventPrefix, final boolean compress) {
79 this.event = event;
80 this.compress = compress;
81 final Map<String, String> headers = getHeaders();
82 headers.put(TIMESTAMP, Long.toString(event.getMillis()));
83 if (mdcPrefix == null) {
84 mdcPrefix = DEFAULT_MDC_PREFIX;
85 }
86 if (eventPrefix == null) {
87 eventPrefix = DEFAULT_EVENT_PREFIX;
88 }
89 final Map<String, String> mdc = event.getContextMap();
90 if (includes != null) {
91 final String[] array = includes.split(",");
92 if (array.length > 0) {
93 for (String str : array) {
94 str = str.trim();
95 if (mdc.containsKey(str)) {
96 ctx.put(str, mdc.get(str));
97 }
98 }
99 }
100 } else if (excludes != null) {
101 final String[] array = excludes.split(",");
102 if (array.length > 0) {
103 final List<String> list = new ArrayList<String>(array.length);
104 for (final String value : array) {
105 list.add(value.trim());
106 }
107 for (final Map.Entry<String, String> entry : mdc.entrySet()) {
108 if (!list.contains(entry.getKey())) {
109 ctx.put(entry.getKey(), entry.getValue());
110 }
111 }
112 }
113 } else {
114 ctx.putAll(mdc);
115 }
116
117 if (required != null) {
118 final String[] array = required.split(",");
119 if (array.length > 0) {
120 for (String str : array) {
121 str = str.trim();
122 if (!mdc.containsKey(str)) {
123 throw new LoggingException("Required key " + str + " is missing from the MDC");
124 }
125 }
126 }
127 }
128 final Message message = event.getMessage();
129 if (message instanceof MapMessage) {
130 if (message instanceof StructuredDataMessage) {
131 addStructuredData(eventPrefix, headers, (StructuredDataMessage) message);
132 }
133 addMapData(eventPrefix, headers, (MapMessage) message);
134 }
135
136 addContextData(mdcPrefix, headers, ctx);
137
138 addGuid(headers);
139 }
140
141 protected void addStructuredData(final String prefix, final Map<String, String> fields,
142 final StructuredDataMessage msg) {
143 fields.put(prefix + EVENT_TYPE, msg.getType());
144 final StructuredDataId id = msg.getId();
145 fields.put(prefix + EVENT_ID, id.getName());
146 }
147
148 protected void addMapData(final String prefix, final Map<String, String> fields, final MapMessage msg) {
149 final Map<String, String> data = msg.getData();
150 for (final Map.Entry<String, String> entry : data.entrySet()) {
151 fields.put(prefix + entry.getKey(), entry.getValue());
152 }
153 }
154
155 protected void addContextData(final String prefix, final Map<String, String> fields,
156 final Map<String, String> context) {
157 for (final Map.Entry<String, String> entry : context.entrySet()) {
158 if (entry.getKey() != null && entry.getValue() != null) {
159 fields.put(prefix + entry.getKey(), entry.getValue());
160 }
161 }
162 }
163
164 protected void addGuid(final Map<String, String> fields) {
165 fields.put(GUID, UUIDUtil.getTimeBasedUUID().toString());
166 }
167
168
169
170
171
172 @Override
173 public void setBody(final byte[] body) {
174 if (body == null || body.length == 0) {
175 super.setBody(new byte[0]);
176 return;
177 }
178 if (compress) {
179 final ByteArrayOutputStream baos = new ByteArrayOutputStream();
180 try {
181 final GZIPOutputStream os = new GZIPOutputStream(baos);
182 os.write(body);
183 os.close();
184 } catch (final IOException ioe) {
185 throw new LoggingException("Unable to compress message", ioe);
186 }
187 super.setBody(baos.toByteArray());
188 } else {
189 super.setBody(body);
190 }
191 }
192
193
194
195
196
197 public String getFQCN() {
198 return event.getFQCN();
199 }
200
201
202
203
204
205 public Level getLevel() {
206 return event.getLevel();
207 }
208
209
210
211
212
213 public String getLoggerName() {
214 return event.getLoggerName();
215 }
216
217
218
219
220
221 public StackTraceElement getSource() {
222 return event.getSource();
223 }
224
225
226
227
228
229 public Message getMessage() {
230 return event.getMessage();
231 }
232
233
234
235
236
237 public Marker getMarker() {
238 return event.getMarker();
239 }
240
241
242
243
244
245 public String getThreadName() {
246 return event.getThreadName();
247 }
248
249
250
251
252
253 public long getMillis() {
254 return event.getMillis();
255 }
256
257
258
259
260
261 public Throwable getThrown() {
262 return event.getThrown();
263 }
264
265
266
267
268
269 public Map<String, String> getContextMap() {
270 return ctx;
271 }
272
273
274
275
276
277 public ThreadContext.ContextStack getContextStack() {
278 return event.getContextStack();
279 }
280 }