001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements. See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache license, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License. You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the license for the specific language governing permissions and
015     * limitations under the license.
016     */
017    package org.apache.logging.log4j.flume.appender;
018    
019    import org.apache.flume.event.SimpleEvent;
020    import org.apache.logging.log4j.Level;
021    import org.apache.logging.log4j.LoggingException;
022    import org.apache.logging.log4j.Marker;
023    import org.apache.logging.log4j.ThreadContext;
024    import org.apache.logging.log4j.core.LogEvent;
025    import org.apache.logging.log4j.core.helpers.UUIDUtil;
026    import org.apache.logging.log4j.message.MapMessage;
027    import org.apache.logging.log4j.message.Message;
028    import org.apache.logging.log4j.message.StructuredDataId;
029    import org.apache.logging.log4j.message.StructuredDataMessage;
030    
031    import java.io.ByteArrayOutputStream;
032    import java.io.IOException;
033    import java.util.ArrayList;
034    import java.util.HashMap;
035    import java.util.List;
036    import java.util.Map;
037    import java.util.zip.GZIPOutputStream;
038    
039    /**
040     * Class that is both a Flume and Log4j Event.
041     */
042    public class FlumeEvent extends SimpleEvent implements LogEvent {
043    
044        /**
045         * Generated serial version ID.
046         */
047        private static final long serialVersionUID = -8988674608627854140L;
048    
049        private static final String DEFAULT_MDC_PREFIX = "mdc:";
050    
051        private static final String DEFAULT_EVENT_PREFIX = "";
052    
053        private static final String EVENT_TYPE = "eventType";
054    
055        private static final String EVENT_ID = "eventId";
056    
057        static final String GUID = "guId";
058    
059        private static final String TIMESTAMP = "timeStamp";;
060    
061        private final LogEvent event;
062    
063        private final Map<String, String> ctx = new HashMap<String, String>();
064    
065        private final boolean compress;
066    
067        /**
068         * Construct the FlumeEvent.
069         * @param event The Log4j LogEvent.
070         * @param includes A comma separated list of MDC elements to include.
071         * @param excludes A comma separated list of MDC elements to exclude.
072         * @param required A comma separated list of MDC elements that are required to be defined.
073         * @param mdcPrefix The value to prefix to MDC keys.
074         * @param eventPrefix The value to prefix to event keys.
075         * @param compress If true the event body should be compressed.
076         */
077        public FlumeEvent(final LogEvent event, final String includes, final String excludes, final String required,
078                          String mdcPrefix, String eventPrefix, final boolean compress) {
079            this.event = event;
080            this.compress = compress;
081            final Map<String, String> headers = getHeaders();
082            headers.put(TIMESTAMP, Long.toString(event.getMillis()));
083            if (mdcPrefix == null) {
084                mdcPrefix = DEFAULT_MDC_PREFIX;
085            }
086            if (eventPrefix == null) {
087                eventPrefix = DEFAULT_EVENT_PREFIX;
088            }
089            final Map<String, String> mdc = event.getContextMap();
090            if (includes != null) {
091                final String[] array = includes.split(",");
092                if (array.length > 0) {
093                    for (String str : array) {
094                        str = str.trim();
095                        if (mdc.containsKey(str)) {
096                            ctx.put(str, mdc.get(str));
097                        }
098                    }
099                }
100            } else if (excludes != null) {
101                final String[] array = excludes.split(",");
102                if (array.length > 0) {
103                    final List<String> list = new ArrayList<String>(array.length);
104                    for (final String value : array) {
105                        list.add(value.trim());
106                    }
107                    for (final Map.Entry<String, String> entry : mdc.entrySet()) {
108                        if (!list.contains(entry.getKey())) {
109                            ctx.put(entry.getKey(), entry.getValue());
110                        }
111                    }
112                }
113            } else {
114                ctx.putAll(mdc);
115            }
116    
117            if (required != null) {
118                final String[] array = required.split(",");
119                if (array.length > 0) {
120                    for (String str : array) {
121                        str = str.trim();
122                        if (!mdc.containsKey(str)) {
123                            throw new LoggingException("Required key " + str + " is missing from the MDC");
124                        }
125                    }
126                }
127            }
128            final String guid =  UUIDUtil.getTimeBasedUUID().toString();
129            final Message message = event.getMessage();
130            if (message instanceof MapMessage) {
131                ((MapMessage) message).put(GUID, guid);
132                if (message instanceof StructuredDataMessage) {
133                    addStructuredData(eventPrefix, headers, (StructuredDataMessage) message);
134                }
135                addMapData(eventPrefix, headers, (MapMessage) message);
136            }
137    
138            addContextData(mdcPrefix, headers, ctx);
139        }
140    
141        protected void addStructuredData(final String prefix, final Map<String, String> fields,
142                                         final StructuredDataMessage msg) {
143            fields.put(prefix + EVENT_TYPE, msg.getType());
144            final StructuredDataId id = msg.getId();
145            fields.put(prefix + EVENT_ID, id.getName());
146        }
147    
148        protected void addMapData(final String prefix, final Map<String, String> fields, final MapMessage msg) {
149            final Map<String, String> data = msg.getData();
150            for (final Map.Entry<String, String> entry : data.entrySet()) {
151                fields.put(prefix + entry.getKey(), entry.getValue());
152            }
153        }
154    
155        protected void addContextData(final String prefix, final Map<String, String> fields,
156                                      final Map<String, String> context) {
157            for (final Map.Entry<String, String> entry : context.entrySet()) {
158                if (entry.getKey() != null && entry.getValue() != null) {
159                    fields.put(prefix + entry.getKey(), entry.getValue());
160                }
161            }
162        }
163    
164        /**
165         * Set the body in the event.
166         * @param body The body to add to the event.
167         */
168        @Override
169        public void setBody(final byte[] body) {
170            if (body == null || body.length == 0) {
171                super.setBody(new byte[0]);
172                return;
173            }
174            if (compress) {
175                final ByteArrayOutputStream baos = new ByteArrayOutputStream();
176                try {
177                    final GZIPOutputStream os = new GZIPOutputStream(baos);
178                    os.write(body);
179                    os.close();
180                } catch (final IOException ioe) {
181                    throw new LoggingException("Unable to compress message", ioe);
182                }
183                super.setBody(baos.toByteArray());
184            } else {
185                super.setBody(body);
186            }
187        }
188    
189        /**
190         * Get the Frequently Qualified Class Name.
191         * @return the FQCN String.
192         */
193        public String getFQCN() {
194            return event.getFQCN();
195        }
196    
197        /**
198         * Returns the logging Level.
199         * @return the Level.
200         */
201        public Level getLevel() {
202            return event.getLevel();
203        }
204    
205        /**
206         * Returns the logger name.
207         * @return the logger name.
208         */
209        public String getLoggerName() {
210            return event.getLoggerName();
211        }
212    
213        /**
214         * Returns the StackTraceElement for the caller of the logging API.
215         * @return the StackTraceElement of the caller.
216         */
217        public StackTraceElement getSource() {
218            return event.getSource();
219        }
220    
221        /**
222         * Returns the Message.
223         * @return the Message.
224         */
225        public Message getMessage() {
226            return event.getMessage();
227        }
228    
229        /**
230         * Returns the Marker.
231         * @return the Marker.
232         */
233        public Marker getMarker() {
234            return event.getMarker();
235        }
236    
237        /**
238         * Returns the name of the Thread.
239         * @return the name of the Thread.
240         */
241        public String getThreadName() {
242            return event.getThreadName();
243        }
244    
245        /**
246         * Returns the event timestamp.
247         * @return the event timestamp.
248         */
249        public long getMillis() {
250            return event.getMillis();
251        }
252    
253        /**
254         * Returns the Throwable associated with the event, if any.
255         * @return the Throwable.
256         */
257        public Throwable getThrown() {
258            return event.getThrown();
259        }
260    
261        /**
262         * Returns a copy of the context Map.
263         * @return a copy of the context Map.
264         */
265        public Map<String, String> getContextMap() {
266            return ctx;
267        }
268    
269        /**
270         * Returns a copy of the context stack.
271         * @return a copy of the context stack.
272         */
273        public ThreadContext.ContextStack getContextStack() {
274            return event.getContextStack();
275        }
276    
277        @Override
278        public boolean isIncludeLocation() {
279            return event.isIncludeLocation();
280        }
281    
282        @Override
283        public void setIncludeLocation(boolean includeLocation) {
284            event.setIncludeLocation(includeLocation);
285        }
286    
287        @Override
288        public boolean isEndOfBatch() {
289            return event.isEndOfBatch();
290        }
291    
292        @Override
293        public void setEndOfBatch(boolean endOfBatch) {
294            event.setEndOfBatch(endOfBatch);
295        }
296    }