001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements. See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache license, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License. You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the license for the specific language governing permissions and
015     * limitations under the license.
016     */
017    package org.apache.logging.log4j.flume.appender;
018    
019    import org.apache.flume.event.SimpleEvent;
020    import org.apache.logging.log4j.Level;
021    import org.apache.logging.log4j.LoggingException;
022    import org.apache.logging.log4j.Marker;
023    import org.apache.logging.log4j.ThreadContext;
024    import org.apache.logging.log4j.core.LogEvent;
025    import org.apache.logging.log4j.core.helpers.UUIDUtil;
026    import org.apache.logging.log4j.message.MapMessage;
027    import org.apache.logging.log4j.message.Message;
028    import org.apache.logging.log4j.message.StructuredDataId;
029    import org.apache.logging.log4j.message.StructuredDataMessage;
030    
031    import java.io.ByteArrayOutputStream;
032    import java.io.IOException;
033    import java.util.ArrayList;
034    import java.util.HashMap;
035    import java.util.List;
036    import java.util.Map;
037    import java.util.zip.GZIPOutputStream;
038    
039    /**
040     * Class that is both a Flume and Log4j Event.
041     */
042    public class FlumeEvent extends SimpleEvent implements LogEvent {
043    
044        /**
045         * Generated serial version ID.
046         */
047        private static final long serialVersionUID = -8988674608627854140L;
048    
049        private static final String DEFAULT_MDC_PREFIX = "mdc:";
050    
051        private static final String DEFAULT_EVENT_PREFIX = "";
052    
053        private static final String EVENT_TYPE = "eventType";
054    
055        private static final String EVENT_ID = "eventId";
056    
057        private static final String GUID = "guId";
058    
059        private static final String TIMESTAMP = "timeStamp";;
060    
061        private final LogEvent event;
062    
063        private final Map<String, String> ctx = new HashMap<String, String>();
064    
065        private final boolean compress;
066    
067        /**
068         * Construct the FlumeEvent.
069         * @param event The Log4j LogEvent.
070         * @param includes A comma separated list of MDC elements to include.
071         * @param excludes A comma separated list of MDC elements to exclude.
072         * @param required A comma separated list of MDC elements that are required to be defined.
073         * @param mdcPrefix The value to prefix to MDC keys.
074         * @param eventPrefix The value to prefix to event keys.
075         * @param compress If true the event body should be compressed.
076         */
077        public FlumeEvent(final LogEvent event, final String includes, final String excludes, final String required,
078                          String mdcPrefix, String eventPrefix, final boolean compress) {
079            this.event = event;
080            this.compress = compress;
081            final Map<String, String> headers = getHeaders();
082            headers.put(TIMESTAMP, Long.toString(event.getMillis()));
083            if (mdcPrefix == null) {
084                mdcPrefix = DEFAULT_MDC_PREFIX;
085            }
086            if (eventPrefix == null) {
087                eventPrefix = DEFAULT_EVENT_PREFIX;
088            }
089            final Map<String, String> mdc = event.getContextMap();
090            if (includes != null) {
091                final String[] array = includes.split(",");
092                if (array.length > 0) {
093                    for (String str : array) {
094                        str = str.trim();
095                        if (mdc.containsKey(str)) {
096                            ctx.put(str, mdc.get(str));
097                        }
098                    }
099                }
100            } else if (excludes != null) {
101                final String[] array = excludes.split(",");
102                if (array.length > 0) {
103                    final List<String> list = new ArrayList<String>(array.length);
104                    for (final String value : array) {
105                        list.add(value.trim());
106                    }
107                    for (final Map.Entry<String, String> entry : mdc.entrySet()) {
108                        if (!list.contains(entry.getKey())) {
109                            ctx.put(entry.getKey(), entry.getValue());
110                        }
111                    }
112                }
113            } else {
114                ctx.putAll(mdc);
115            }
116    
117            if (required != null) {
118                final String[] array = required.split(",");
119                if (array.length > 0) {
120                    for (String str : array) {
121                        str = str.trim();
122                        if (!mdc.containsKey(str)) {
123                            throw new LoggingException("Required key " + str + " is missing from the MDC");
124                        }
125                    }
126                }
127            }
128            final Message message = event.getMessage();
129            if (message instanceof MapMessage) {
130                if (message instanceof StructuredDataMessage) {
131                    addStructuredData(eventPrefix, headers, (StructuredDataMessage) message);
132                }
133                addMapData(eventPrefix, headers, (MapMessage) message);
134            }
135    
136            addContextData(mdcPrefix, headers, ctx);
137    
138            addGuid(headers);
139        }
140    
141        protected void addStructuredData(final String prefix, final Map<String, String> fields,
142                                         final StructuredDataMessage msg) {
143            fields.put(prefix + EVENT_TYPE, msg.getType());
144            final StructuredDataId id = msg.getId();
145            fields.put(prefix + EVENT_ID, id.getName());
146        }
147    
148        protected void addMapData(final String prefix, final Map<String, String> fields, final MapMessage msg) {
149            final Map<String, String> data = msg.getData();
150            for (final Map.Entry<String, String> entry : data.entrySet()) {
151                fields.put(prefix + entry.getKey(), entry.getValue());
152            }
153        }
154    
155        protected void addContextData(final String prefix, final Map<String, String> fields,
156                                      final Map<String, String> context) {
157            for (final Map.Entry<String, String> entry : context.entrySet()) {
158                if (entry.getKey() != null && entry.getValue() != null) {
159                    fields.put(prefix + entry.getKey(), entry.getValue());
160                }
161            }
162        }
163    
164        protected void addGuid(final Map<String, String> fields) {
165            fields.put(GUID, UUIDUtil.getTimeBasedUUID().toString());
166        }
167    
168        /**
169         * Set the body in the event.
170         * @param body The body to add to the event.
171         */
172        @Override
173        public void setBody(final byte[] body) {
174            if (body == null || body.length == 0) {
175                super.setBody(new byte[0]);
176                return;
177            }
178            if (compress) {
179                final ByteArrayOutputStream baos = new ByteArrayOutputStream();
180                try {
181                    final GZIPOutputStream os = new GZIPOutputStream(baos);
182                    os.write(body);
183                    os.close();
184                } catch (final IOException ioe) {
185                    throw new LoggingException("Unable to compress message", ioe);
186                }
187                super.setBody(baos.toByteArray());
188            } else {
189                super.setBody(body);
190            }
191        }
192    
193        /**
194         * Get the Frequently Qualified Class Name.
195         * @return the FQCN String.
196         */
197        public String getFQCN() {
198            return event.getFQCN();
199        }
200    
201        /**
202         * Returns the logging Level.
203         * @return the Level.
204         */
205        public Level getLevel() {
206            return event.getLevel();
207        }
208    
209        /**
210         * Returns the logger name.
211         * @return the logger name.
212         */
213        public String getLoggerName() {
214            return event.getLoggerName();
215        }
216    
217        /**
218         * Returns the StackTraceElement for the caller of the logging API.
219         * @return the StackTraceElement of the caller.
220         */
221        public StackTraceElement getSource() {
222            return event.getSource();
223        }
224    
225        /**
226         * Returns the Message.
227         * @return the Message.
228         */
229        public Message getMessage() {
230            return event.getMessage();
231        }
232    
233        /**
234         * Returns the Marker.
235         * @return the Marker.
236         */
237        public Marker getMarker() {
238            return event.getMarker();
239        }
240    
241        /**
242         * Returns the name of the Thread.
243         * @return the name of the Thread.
244         */
245        public String getThreadName() {
246            return event.getThreadName();
247        }
248    
249        /**
250         * Returns the event timestamp.
251         * @return the event timestamp.
252         */
253        public long getMillis() {
254            return event.getMillis();
255        }
256    
257        /**
258         * Returns the Throwable associated with the event, if any.
259         * @return the Throwable.
260         */
261        public Throwable getThrown() {
262            return event.getThrown();
263        }
264    
265        /**
266         * Returns a copy of the context Map.
267         * @return a copy of the context Map.
268         */
269        public Map<String, String> getContextMap() {
270            return ctx;
271        }
272    
273        /**
274         * Returns a copy of the context stack.
275         * @return a copy of the context stack.
276         */
277        public ThreadContext.ContextStack getContextStack() {
278            return event.getContextStack();
279        }
280    }