View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.flume.appender;
18  
19  import org.apache.flume.event.SimpleEvent;
20  import org.apache.logging.log4j.Level;
21  import org.apache.logging.log4j.LoggingException;
22  import org.apache.logging.log4j.Marker;
23  import org.apache.logging.log4j.ThreadContext;
24  import org.apache.logging.log4j.core.LogEvent;
25  import org.apache.logging.log4j.core.helpers.UUIDUtil;
26  import org.apache.logging.log4j.message.MapMessage;
27  import org.apache.logging.log4j.message.Message;
28  import org.apache.logging.log4j.message.StructuredDataId;
29  import org.apache.logging.log4j.message.StructuredDataMessage;
30  
31  import java.io.ByteArrayOutputStream;
32  import java.io.IOException;
33  import java.util.Arrays;
34  import java.util.HashMap;
35  import java.util.List;
36  import java.util.Map;
37  import java.util.zip.GZIPOutputStream;
38  
39  /**
40   * Class that is both a Flume and Log4j Event.
41   */
42  public class FlumeEvent extends SimpleEvent implements LogEvent {
43  
44      private static final String DEFAULT_MDC_PREFIX = "mdc:";
45  
46      private static final String DEFAULT_EVENT_PREFIX = "";
47  
48      private static final String EVENT_TYPE = "eventType";
49  
50      private static final String EVENT_ID = "eventId";
51  
52      private static final String GUID = "guId";
53  
54      private static final String TIMESTAMP = "timeStamp";;
55  
56      private final LogEvent event;
57  
58      private final Map<String, String> ctx = new HashMap<String, String>();
59  
60      private final boolean compress;
61  
62      /**
63       * Construct the FlumeEvent.
64       * @param event The Log4j LogEvent.
65       * @param includes A comma separated list of MDC elements to include.
66       * @param excludes A comma separated list of MDC elements to exclude.
67       * @param required A comma separated list of MDC elements that are required to be defined.
68       * @param mdcPrefix The value to prefix to MDC keys.
69       * @param eventPrefix The value to prefix to event keys.
70       * @param compress If true the event body should be compressed.
71       */
72      public FlumeEvent(LogEvent event, String includes, String excludes, String required,
73                        String mdcPrefix, String eventPrefix, boolean compress) {
74          this.event = event;
75          this.compress = compress;
76          Map<String, String> headers = getHeaders();
77          headers.put(TIMESTAMP, Long.toString(event.getMillis()));
78          if (mdcPrefix == null) {
79              mdcPrefix = DEFAULT_MDC_PREFIX;
80          }
81          if (eventPrefix == null) {
82              eventPrefix = DEFAULT_EVENT_PREFIX;
83          }
84          Map<String, String> mdc = event.getContextMap();
85          if (includes != null) {
86              String[] array = includes.split(",");
87              if (array.length > 0) {
88                  for (String str : array) {
89                      if (mdc.containsKey(str)) {
90                          ctx.put(str, mdc.get(str));
91                      }
92                  }
93              }
94          } else if (excludes != null) {
95              String[] array = excludes.split(",");
96              if (array.length > 0) {
97                  List<String> list = Arrays.asList(array);
98                  for (Map.Entry<String, String> entry : mdc.entrySet()) {
99                      if (!list.contains(entry.getKey())) {
100                         ctx.put(entry.getKey(), entry.getValue());
101                     }
102                 }
103             }
104         } else {
105             ctx.putAll(mdc);
106         }
107 
108         if (required != null) {
109             String[] array = required.split(",");
110             if (array.length > 0) {
111                 for (String str : array) {
112                     if (!mdc.containsKey(str)) {
113                         throw new LoggingException("Required key " + str + " is missing from the MDC");
114                     }
115                 }
116             }
117         }
118         Message message = event.getMessage();
119         if (message instanceof MapMessage) {
120             if (message instanceof StructuredDataMessage) {
121                 addStructuredData(eventPrefix, headers, (StructuredDataMessage) message);
122             }
123             addMapData(eventPrefix, headers, (MapMessage) message);
124         }
125 
126         addContextData(mdcPrefix, headers, ctx);
127 
128         addGuid(headers);
129     }
130 
131     protected void addStructuredData(String prefix, Map<String, String> fields, StructuredDataMessage msg) {
132         fields.put(prefix + EVENT_TYPE, msg.getType());
133         StructuredDataId id = msg.getId();
134         fields.put(prefix + EVENT_ID, id.getName());
135     }
136 
137     protected void addMapData(String prefix, Map<String, String> fields, MapMessage msg) {
138         Map<String, String> data = msg.getData();
139         for (Map.Entry<String, String> entry : data.entrySet()) {
140             fields.put(prefix + entry.getKey(), entry.getValue());
141         }
142     }
143 
144     protected void addContextData(String prefix, Map<String, String> fields, Map<String, String> context) {
145         for (Map.Entry<String, String> entry : context.entrySet()) {
146             fields.put(prefix + entry.getKey(), entry.getValue());
147         }
148     }
149 
150     protected void addGuid(Map<String, String> fields) {
151         fields.put(GUID, UUIDUtil.getTimeBasedUUID().toString());
152     }
153 
154     /**
155      * Set the body in the event.
156      * @param body The body to add to the event.
157      */
158     @Override
159     public void setBody(byte[] body) {
160         if (body == null || body.length == 0) {
161             super.setBody(new byte[0]);
162             return;
163         }
164         if (compress) {
165             ByteArrayOutputStream baos = new ByteArrayOutputStream();
166             try {
167                 GZIPOutputStream os = new GZIPOutputStream(baos);
168                 os.write(body);
169                 os.close();
170             } catch (IOException ioe) {
171                 throw new LoggingException("Unable to compress message", ioe);
172             }
173             super.setBody(baos.toByteArray());
174         } else {
175             super.setBody(body);
176         }
177     }
178 
179     /**
180      * Get the Frequently Qualified Class Name.
181      * @return the FQCN String.
182      */
183     public String getFQCN() {
184         return event.getFQCN();
185     }
186 
187     /**
188      * Returns the logging Level.
189      * @return the Level.
190      */
191     public Level getLevel() {
192         return event.getLevel();
193     }
194 
195     /**
196      * Returns the logger name.
197      * @return the logger name.
198      */
199     public String getLoggerName() {
200         return event.getLoggerName();
201     }
202 
203     /**
204      * Returns the StackTraceElement for the caller of the logging API.
205      * @return the StackTraceElement of the caller.
206      */
207     public StackTraceElement getSource() {
208         return event.getSource();
209     }
210 
211     /**
212      * Returns the Message.
213      * @return the Message.
214      */
215     public Message getMessage() {
216         return event.getMessage();
217     }
218 
219     /**
220      * Returns the Marker.
221      * @return the Marker.
222      */
223     public Marker getMarker() {
224         return event.getMarker();
225     }
226 
227     /**
228      * Returns the name of the Thread.
229      * @return the name of the Thread.
230      */
231     public String getThreadName() {
232         return event.getThreadName();
233     }
234 
235     /**
236      * Returns the event timestamp.
237      * @return the event timestamp.
238      */
239     public long getMillis() {
240         return event.getMillis();
241     }
242 
243     /**
244      * Returns the Throwable associated with the event, if any.
245      * @return the Throwable.
246      */
247     public Throwable getThrown() {
248         return event.getThrown();
249     }
250 
251     /**
252      * Returns a copy of the context Map.
253      * @return a copy of the context Map.
254      */
255     public Map<String, String> getContextMap() {
256         return ctx;
257     }
258 
259     /**
260      * Returns a copy of the context stack.
261      * @return a copy of the context stack.
262      */
263     public ThreadContext.ContextStack getContextStack() {
264         return event.getContextStack();
265     }
266 }