View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.flume.appender;
18  
19  import org.apache.flume.event.SimpleEvent;
20  import org.apache.logging.log4j.Level;
21  import org.apache.logging.log4j.LoggingException;
22  import org.apache.logging.log4j.Marker;
23  import org.apache.logging.log4j.ThreadContext;
24  import org.apache.logging.log4j.core.LogEvent;
25  import org.apache.logging.log4j.core.helpers.UUIDUtil;
26  import org.apache.logging.log4j.message.MapMessage;
27  import org.apache.logging.log4j.message.Message;
28  import org.apache.logging.log4j.message.StructuredDataId;
29  import org.apache.logging.log4j.message.StructuredDataMessage;
30  
31  import java.io.ByteArrayOutputStream;
32  import java.io.IOException;
33  import java.util.ArrayList;
34  import java.util.HashMap;
35  import java.util.List;
36  import java.util.Map;
37  import java.util.zip.GZIPOutputStream;
38  
39  /**
40   * Class that is both a Flume and Log4j Event.
41   */
42  public class FlumeEvent extends SimpleEvent implements LogEvent {
43  
44      /**
45       * Generated serial version ID.
46       */
47      private static final long serialVersionUID = -8988674608627854140L;
48  
49      private static final String DEFAULT_MDC_PREFIX = "mdc:";
50  
51      private static final String DEFAULT_EVENT_PREFIX = "";
52  
53      private static final String EVENT_TYPE = "eventType";
54  
55      private static final String EVENT_ID = "eventId";
56  
57      static final String GUID = "guId";
58  
59      private static final String TIMESTAMP = "timeStamp";;
60  
61      private final LogEvent event;
62  
63      private final Map<String, String> ctx = new HashMap<String, String>();
64  
65      private final boolean compress;
66  
67      /**
68       * Construct the FlumeEvent.
69       * @param event The Log4j LogEvent.
70       * @param includes A comma separated list of MDC elements to include.
71       * @param excludes A comma separated list of MDC elements to exclude.
72       * @param required A comma separated list of MDC elements that are required to be defined.
73       * @param mdcPrefix The value to prefix to MDC keys.
74       * @param eventPrefix The value to prefix to event keys.
75       * @param compress If true the event body should be compressed.
76       */
77      public FlumeEvent(final LogEvent event, final String includes, final String excludes, final String required,
78                        String mdcPrefix, String eventPrefix, final boolean compress) {
79          this.event = event;
80          this.compress = compress;
81          final Map<String, String> headers = getHeaders();
82          headers.put(TIMESTAMP, Long.toString(event.getMillis()));
83          if (mdcPrefix == null) {
84              mdcPrefix = DEFAULT_MDC_PREFIX;
85          }
86          if (eventPrefix == null) {
87              eventPrefix = DEFAULT_EVENT_PREFIX;
88          }
89          final Map<String, String> mdc = event.getContextMap();
90          if (includes != null) {
91              final String[] array = includes.split(",");
92              if (array.length > 0) {
93                  for (String str : array) {
94                      str = str.trim();
95                      if (mdc.containsKey(str)) {
96                          ctx.put(str, mdc.get(str));
97                      }
98                  }
99              }
100         } else if (excludes != null) {
101             final String[] array = excludes.split(",");
102             if (array.length > 0) {
103                 final List<String> list = new ArrayList<String>(array.length);
104                 for (final String value : array) {
105                     list.add(value.trim());
106                 }
107                 for (final Map.Entry<String, String> entry : mdc.entrySet()) {
108                     if (!list.contains(entry.getKey())) {
109                         ctx.put(entry.getKey(), entry.getValue());
110                     }
111                 }
112             }
113         } else {
114             ctx.putAll(mdc);
115         }
116 
117         if (required != null) {
118             final String[] array = required.split(",");
119             if (array.length > 0) {
120                 for (String str : array) {
121                     str = str.trim();
122                     if (!mdc.containsKey(str)) {
123                         throw new LoggingException("Required key " + str + " is missing from the MDC");
124                     }
125                 }
126             }
127         }
128         final String guid =  UUIDUtil.getTimeBasedUUID().toString();
129         final Message message = event.getMessage();
130         if (message instanceof MapMessage) {
131             // Add the guid to the Map so that it can be included in the Layout.
132             ((MapMessage) message).put(GUID, guid);
133             if (message instanceof StructuredDataMessage) {
134                 addStructuredData(eventPrefix, headers, (StructuredDataMessage) message);
135             }
136             addMapData(eventPrefix, headers, (MapMessage) message);
137         } else {
138             headers.put(GUID, guid);
139         }
140 
141         addContextData(mdcPrefix, headers, ctx);
142     }
143 
144     protected void addStructuredData(final String prefix, final Map<String, String> fields,
145                                      final StructuredDataMessage msg) {
146         fields.put(prefix + EVENT_TYPE, msg.getType());
147         final StructuredDataId id = msg.getId();
148         fields.put(prefix + EVENT_ID, id.getName());
149     }
150 
151     protected void addMapData(final String prefix, final Map<String, String> fields, final MapMessage msg) {
152         final Map<String, String> data = msg.getData();
153         for (final Map.Entry<String, String> entry : data.entrySet()) {
154             fields.put(prefix + entry.getKey(), entry.getValue());
155         }
156     }
157 
158     protected void addContextData(final String prefix, final Map<String, String> fields,
159                                   final Map<String, String> context) {
160         for (final Map.Entry<String, String> entry : context.entrySet()) {
161             if (entry.getKey() != null && entry.getValue() != null) {
162                 fields.put(prefix + entry.getKey(), entry.getValue());
163             }
164         }
165     }
166 
167     /**
168      * Set the body in the event.
169      * @param body The body to add to the event.
170      */
171     @Override
172     public void setBody(final byte[] body) {
173         if (body == null || body.length == 0) {
174             super.setBody(new byte[0]);
175             return;
176         }
177         if (compress) {
178             final ByteArrayOutputStream baos = new ByteArrayOutputStream();
179             try {
180                 final GZIPOutputStream os = new GZIPOutputStream(baos);
181                 os.write(body);
182                 os.close();
183             } catch (final IOException ioe) {
184                 throw new LoggingException("Unable to compress message", ioe);
185             }
186             super.setBody(baos.toByteArray());
187         } else {
188             super.setBody(body);
189         }
190     }
191 
192     /**
193      * Get the Frequently Qualified Class Name.
194      * @return the FQCN String.
195      */
196     @Override
197     public String getFQCN() {
198         return event.getFQCN();
199     }
200 
201     /**
202      * Returns the logging Level.
203      * @return the Level.
204      */
205     @Override
206     public Level getLevel() {
207         return event.getLevel();
208     }
209 
210     /**
211      * Returns the logger name.
212      * @return the logger name.
213      */
214     @Override
215     public String getLoggerName() {
216         return event.getLoggerName();
217     }
218 
219     /**
220      * Returns the StackTraceElement for the caller of the logging API.
221      * @return the StackTraceElement of the caller.
222      */
223     @Override
224     public StackTraceElement getSource() {
225         return event.getSource();
226     }
227 
228     /**
229      * Returns the Message.
230      * @return the Message.
231      */
232     @Override
233     public Message getMessage() {
234         return event.getMessage();
235     }
236 
237     /**
238      * Returns the Marker.
239      * @return the Marker.
240      */
241     @Override
242     public Marker getMarker() {
243         return event.getMarker();
244     }
245 
246     /**
247      * Returns the name of the Thread.
248      * @return the name of the Thread.
249      */
250     @Override
251     public String getThreadName() {
252         return event.getThreadName();
253     }
254 
255     /**
256      * Returns the event timestamp.
257      * @return the event timestamp.
258      */
259     @Override
260     public long getMillis() {
261         return event.getMillis();
262     }
263 
264     /**
265      * Returns the Throwable associated with the event, if any.
266      * @return the Throwable.
267      */
268     @Override
269     public Throwable getThrown() {
270         return event.getThrown();
271     }
272 
273     /**
274      * Returns a copy of the context Map.
275      * @return a copy of the context Map.
276      */
277     @Override
278     public Map<String, String> getContextMap() {
279         return ctx;
280     }
281 
282     /**
283      * Returns a copy of the context stack.
284      * @return a copy of the context stack.
285      */
286     @Override
287     public ThreadContext.ContextStack getContextStack() {
288         return event.getContextStack();
289     }
290 
291     @Override
292     public boolean isIncludeLocation() {
293         return event.isIncludeLocation();
294     }
295 
296     @Override
297     public void setIncludeLocation(boolean includeLocation) {
298         event.setIncludeLocation(includeLocation);
299     }
300 
301     @Override
302     public boolean isEndOfBatch() {
303         return event.isEndOfBatch();
304     }
305 
306     @Override
307     public void setEndOfBatch(boolean endOfBatch) {
308         event.setEndOfBatch(endOfBatch);
309     }
310 }