001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements. See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache license, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License. You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the license for the specific language governing permissions and
015     * limitations under the license.
016     */
017    package org.apache.logging.log4j.flume.appender;
018    
019    import org.apache.flume.event.SimpleEvent;
020    import org.apache.logging.log4j.Level;
021    import org.apache.logging.log4j.LoggingException;
022    import org.apache.logging.log4j.Marker;
023    import org.apache.logging.log4j.ThreadContext;
024    import org.apache.logging.log4j.core.LogEvent;
025    import org.apache.logging.log4j.core.helpers.UUIDUtil;
026    import org.apache.logging.log4j.message.MapMessage;
027    import org.apache.logging.log4j.message.Message;
028    import org.apache.logging.log4j.message.StructuredDataId;
029    import org.apache.logging.log4j.message.StructuredDataMessage;
030    
031    import java.io.ByteArrayOutputStream;
032    import java.io.IOException;
033    import java.util.ArrayList;
034    import java.util.HashMap;
035    import java.util.List;
036    import java.util.Map;
037    import java.util.zip.GZIPOutputStream;
038    
039    /**
040     * Class that is both a Flume and Log4j Event.
041     */
042    public class FlumeEvent extends SimpleEvent implements LogEvent {
043    
044        /**
045         * Generated serial version ID.
046         */
047        private static final long serialVersionUID = -8988674608627854140L;
048    
049        private static final String DEFAULT_MDC_PREFIX = "mdc:";
050    
051        private static final String DEFAULT_EVENT_PREFIX = "";
052    
053        private static final String EVENT_TYPE = "eventType";
054    
055        private static final String EVENT_ID = "eventId";
056    
057        static final String GUID = "guId";
058    
059        private static final String TIMESTAMP = "timeStamp";;
060    
061        private final LogEvent event;
062    
063        private final Map<String, String> ctx = new HashMap<String, String>();
064    
065        private final boolean compress;
066    
067        /**
068         * Construct the FlumeEvent.
069         * @param event The Log4j LogEvent.
070         * @param includes A comma separated list of MDC elements to include.
071         * @param excludes A comma separated list of MDC elements to exclude.
072         * @param required A comma separated list of MDC elements that are required to be defined.
073         * @param mdcPrefix The value to prefix to MDC keys.
074         * @param eventPrefix The value to prefix to event keys.
075         * @param compress If true the event body should be compressed.
076         */
077        public FlumeEvent(final LogEvent event, final String includes, final String excludes, final String required,
078                          String mdcPrefix, String eventPrefix, final boolean compress) {
079            this.event = event;
080            this.compress = compress;
081            final Map<String, String> headers = getHeaders();
082            headers.put(TIMESTAMP, Long.toString(event.getMillis()));
083            if (mdcPrefix == null) {
084                mdcPrefix = DEFAULT_MDC_PREFIX;
085            }
086            if (eventPrefix == null) {
087                eventPrefix = DEFAULT_EVENT_PREFIX;
088            }
089            final Map<String, String> mdc = event.getContextMap();
090            if (includes != null) {
091                final String[] array = includes.split(",");
092                if (array.length > 0) {
093                    for (String str : array) {
094                        str = str.trim();
095                        if (mdc.containsKey(str)) {
096                            ctx.put(str, mdc.get(str));
097                        }
098                    }
099                }
100            } else if (excludes != null) {
101                final String[] array = excludes.split(",");
102                if (array.length > 0) {
103                    final List<String> list = new ArrayList<String>(array.length);
104                    for (final String value : array) {
105                        list.add(value.trim());
106                    }
107                    for (final Map.Entry<String, String> entry : mdc.entrySet()) {
108                        if (!list.contains(entry.getKey())) {
109                            ctx.put(entry.getKey(), entry.getValue());
110                        }
111                    }
112                }
113            } else {
114                ctx.putAll(mdc);
115            }
116    
117            if (required != null) {
118                final String[] array = required.split(",");
119                if (array.length > 0) {
120                    for (String str : array) {
121                        str = str.trim();
122                        if (!mdc.containsKey(str)) {
123                            throw new LoggingException("Required key " + str + " is missing from the MDC");
124                        }
125                    }
126                }
127            }
128            final String guid =  UUIDUtil.getTimeBasedUUID().toString();
129            final Message message = event.getMessage();
130            if (message instanceof MapMessage) {
131                // Add the guid to the Map so that it can be included in the Layout.
132                ((MapMessage) message).put(GUID, guid);
133                if (message instanceof StructuredDataMessage) {
134                    addStructuredData(eventPrefix, headers, (StructuredDataMessage) message);
135                }
136                addMapData(eventPrefix, headers, (MapMessage) message);
137            } else {
138                headers.put(GUID, guid);
139            }
140    
141            addContextData(mdcPrefix, headers, ctx);
142        }
143    
144        protected void addStructuredData(final String prefix, final Map<String, String> fields,
145                                         final StructuredDataMessage msg) {
146            fields.put(prefix + EVENT_TYPE, msg.getType());
147            final StructuredDataId id = msg.getId();
148            fields.put(prefix + EVENT_ID, id.getName());
149        }
150    
151        protected void addMapData(final String prefix, final Map<String, String> fields, final MapMessage msg) {
152            final Map<String, String> data = msg.getData();
153            for (final Map.Entry<String, String> entry : data.entrySet()) {
154                fields.put(prefix + entry.getKey(), entry.getValue());
155            }
156        }
157    
158        protected void addContextData(final String prefix, final Map<String, String> fields,
159                                      final Map<String, String> context) {
160            for (final Map.Entry<String, String> entry : context.entrySet()) {
161                if (entry.getKey() != null && entry.getValue() != null) {
162                    fields.put(prefix + entry.getKey(), entry.getValue());
163                }
164            }
165        }
166    
167        /**
168         * Set the body in the event.
169         * @param body The body to add to the event.
170         */
171        @Override
172        public void setBody(final byte[] body) {
173            if (body == null || body.length == 0) {
174                super.setBody(new byte[0]);
175                return;
176            }
177            if (compress) {
178                final ByteArrayOutputStream baos = new ByteArrayOutputStream();
179                try {
180                    final GZIPOutputStream os = new GZIPOutputStream(baos);
181                    os.write(body);
182                    os.close();
183                } catch (final IOException ioe) {
184                    throw new LoggingException("Unable to compress message", ioe);
185                }
186                super.setBody(baos.toByteArray());
187            } else {
188                super.setBody(body);
189            }
190        }
191    
192        /**
193         * Get the Frequently Qualified Class Name.
194         * @return the FQCN String.
195         */
196        @Override
197        public String getFQCN() {
198            return event.getFQCN();
199        }
200    
201        /**
202         * Returns the logging Level.
203         * @return the Level.
204         */
205        @Override
206        public Level getLevel() {
207            return event.getLevel();
208        }
209    
210        /**
211         * Returns the logger name.
212         * @return the logger name.
213         */
214        @Override
215        public String getLoggerName() {
216            return event.getLoggerName();
217        }
218    
219        /**
220         * Returns the StackTraceElement for the caller of the logging API.
221         * @return the StackTraceElement of the caller.
222         */
223        @Override
224        public StackTraceElement getSource() {
225            return event.getSource();
226        }
227    
228        /**
229         * Returns the Message.
230         * @return the Message.
231         */
232        @Override
233        public Message getMessage() {
234            return event.getMessage();
235        }
236    
237        /**
238         * Returns the Marker.
239         * @return the Marker.
240         */
241        @Override
242        public Marker getMarker() {
243            return event.getMarker();
244        }
245    
246        /**
247         * Returns the name of the Thread.
248         * @return the name of the Thread.
249         */
250        @Override
251        public String getThreadName() {
252            return event.getThreadName();
253        }
254    
255        /**
256         * Returns the event timestamp.
257         * @return the event timestamp.
258         */
259        @Override
260        public long getMillis() {
261            return event.getMillis();
262        }
263    
264        /**
265         * Returns the Throwable associated with the event, if any.
266         * @return the Throwable.
267         */
268        @Override
269        public Throwable getThrown() {
270            return event.getThrown();
271        }
272    
273        /**
274         * Returns a copy of the context Map.
275         * @return a copy of the context Map.
276         */
277        @Override
278        public Map<String, String> getContextMap() {
279            return ctx;
280        }
281    
282        /**
283         * Returns a copy of the context stack.
284         * @return a copy of the context stack.
285         */
286        @Override
287        public ThreadContext.ContextStack getContextStack() {
288            return event.getContextStack();
289        }
290    
291        @Override
292        public boolean isIncludeLocation() {
293            return event.isIncludeLocation();
294        }
295    
296        @Override
297        public void setIncludeLocation(boolean includeLocation) {
298            event.setIncludeLocation(includeLocation);
299        }
300    
301        @Override
302        public boolean isEndOfBatch() {
303            return event.isEndOfBatch();
304        }
305    
306        @Override
307        public void setEndOfBatch(boolean endOfBatch) {
308            event.setEndOfBatch(endOfBatch);
309        }
310    }