001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements. See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache license, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License. You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the license for the specific language governing permissions and
015     * limitations under the license.
016     */
017    package org.apache.logging.log4j.flume.appender;
018    
019    import org.apache.flume.event.SimpleEvent;
020    import org.apache.logging.log4j.Level;
021    import org.apache.logging.log4j.LoggingException;
022    import org.apache.logging.log4j.Marker;
023    import org.apache.logging.log4j.ThreadContext;
024    import org.apache.logging.log4j.core.LogEvent;
025    import org.apache.logging.log4j.core.helpers.UUIDUtil;
026    import org.apache.logging.log4j.message.MapMessage;
027    import org.apache.logging.log4j.message.Message;
028    import org.apache.logging.log4j.message.StructuredDataId;
029    import org.apache.logging.log4j.message.StructuredDataMessage;
030    
031    import java.io.ByteArrayOutputStream;
032    import java.io.IOException;
033    import java.util.Arrays;
034    import java.util.HashMap;
035    import java.util.List;
036    import java.util.Map;
037    import java.util.zip.GZIPOutputStream;
038    
039    /**
040     * Class that is both a Flume and Log4j Event.
041     */
042    public class FlumeEvent extends SimpleEvent implements LogEvent {
043    
044        private static final String DEFAULT_MDC_PREFIX = "mdc:";
045    
046        private static final String DEFAULT_EVENT_PREFIX = "";
047    
048        private static final String EVENT_TYPE = "eventType";
049    
050        private static final String EVENT_ID = "eventId";
051    
052        private static final String GUID = "guId";
053    
054        private static final String TIMESTAMP = "timeStamp";;
055    
056        private final LogEvent event;
057    
058        private final Map<String, String> ctx = new HashMap<String, String>();
059    
060        private final boolean compress;
061    
062        /**
063         * Construct the FlumeEvent.
064         * @param event The Log4j LogEvent.
065         * @param includes A comma separated list of MDC elements to include.
066         * @param excludes A comma separated list of MDC elements to exclude.
067         * @param required A comma separated list of MDC elements that are required to be defined.
068         * @param mdcPrefix The value to prefix to MDC keys.
069         * @param eventPrefix The value to prefix to event keys.
070         * @param compress If true the event body should be compressed.
071         */
072        public FlumeEvent(LogEvent event, String includes, String excludes, String required,
073                          String mdcPrefix, String eventPrefix, boolean compress) {
074            this.event = event;
075            this.compress = compress;
076            Map<String, String> headers = getHeaders();
077            headers.put(TIMESTAMP, Long.toString(event.getMillis()));
078            if (mdcPrefix == null) {
079                mdcPrefix = DEFAULT_MDC_PREFIX;
080            }
081            if (eventPrefix == null) {
082                eventPrefix = DEFAULT_EVENT_PREFIX;
083            }
084            Map<String, String> mdc = event.getContextMap();
085            if (includes != null) {
086                String[] array = includes.split(",");
087                if (array.length > 0) {
088                    for (String str : array) {
089                        if (mdc.containsKey(str)) {
090                            ctx.put(str, mdc.get(str));
091                        }
092                    }
093                }
094            } else if (excludes != null) {
095                String[] array = excludes.split(",");
096                if (array.length > 0) {
097                    List<String> list = Arrays.asList(array);
098                    for (Map.Entry<String, String> entry : mdc.entrySet()) {
099                        if (!list.contains(entry.getKey())) {
100                            ctx.put(entry.getKey(), entry.getValue());
101                        }
102                    }
103                }
104            } else {
105                ctx.putAll(mdc);
106            }
107    
108            if (required != null) {
109                String[] array = required.split(",");
110                if (array.length > 0) {
111                    for (String str : array) {
112                        if (!mdc.containsKey(str)) {
113                            throw new LoggingException("Required key " + str + " is missing from the MDC");
114                        }
115                    }
116                }
117            }
118            Message message = event.getMessage();
119            if (message instanceof MapMessage) {
120                if (message instanceof StructuredDataMessage) {
121                    addStructuredData(eventPrefix, headers, (StructuredDataMessage) message);
122                }
123                addMapData(eventPrefix, headers, (MapMessage) message);
124            }
125    
126            addContextData(mdcPrefix, headers, ctx);
127    
128            addGuid(headers);
129        }
130    
131        protected void addStructuredData(String prefix, Map<String, String> fields, StructuredDataMessage msg) {
132            fields.put(prefix + EVENT_TYPE, msg.getType());
133            StructuredDataId id = msg.getId();
134            fields.put(prefix + EVENT_ID, id.getName());
135        }
136    
137        protected void addMapData(String prefix, Map<String, String> fields, MapMessage msg) {
138            Map<String, String> data = msg.getData();
139            for (Map.Entry<String, String> entry : data.entrySet()) {
140                fields.put(prefix + entry.getKey(), entry.getValue());
141            }
142        }
143    
144        protected void addContextData(String prefix, Map<String, String> fields, Map<String, String> context) {
145            for (Map.Entry<String, String> entry : context.entrySet()) {
146                fields.put(prefix + entry.getKey(), entry.getValue());
147            }
148        }
149    
150        protected void addGuid(Map<String, String> fields) {
151            fields.put(GUID, UUIDUtil.getTimeBasedUUID().toString());
152        }
153    
154        /**
155         * Set the body in the event.
156         * @param body The body to add to the event.
157         */
158        @Override
159        public void setBody(byte[] body) {
160            if (body == null || body.length == 0) {
161                super.setBody(new byte[0]);
162                return;
163            }
164            if (compress) {
165                ByteArrayOutputStream baos = new ByteArrayOutputStream();
166                try {
167                    GZIPOutputStream os = new GZIPOutputStream(baos);
168                    os.write(body);
169                    os.close();
170                } catch (IOException ioe) {
171                    throw new LoggingException("Unable to compress message", ioe);
172                }
173                super.setBody(baos.toByteArray());
174            } else {
175                super.setBody(body);
176            }
177        }
178    
179        /**
180         * Get the Frequently Qualified Class Name.
181         * @return the FQCN String.
182         */
183        public String getFQCN() {
184            return event.getFQCN();
185        }
186    
187        /**
188         * Returns the logging Level.
189         * @return the Level.
190         */
191        public Level getLevel() {
192            return event.getLevel();
193        }
194    
195        /**
196         * Returns the logger name.
197         * @return the logger name.
198         */
199        public String getLoggerName() {
200            return event.getLoggerName();
201        }
202    
203        /**
204         * Returns the StackTraceElement for the caller of the logging API.
205         * @return the StackTraceElement of the caller.
206         */
207        public StackTraceElement getSource() {
208            return event.getSource();
209        }
210    
211        /**
212         * Returns the Message.
213         * @return the Message.
214         */
215        public Message getMessage() {
216            return event.getMessage();
217        }
218    
219        /**
220         * Returns the Marker.
221         * @return the Marker.
222         */
223        public Marker getMarker() {
224            return event.getMarker();
225        }
226    
227        /**
228         * Returns the name of the Thread.
229         * @return the name of the Thread.
230         */
231        public String getThreadName() {
232            return event.getThreadName();
233        }
234    
235        /**
236         * Returns the event timestamp.
237         * @return the event timestamp.
238         */
239        public long getMillis() {
240            return event.getMillis();
241        }
242    
243        /**
244         * Returns the Throwable associated with the event, if any.
245         * @return the Throwable.
246         */
247        public Throwable getThrown() {
248            return event.getThrown();
249        }
250    
251        /**
252         * Returns a copy of the context Map.
253         * @return a copy of the context Map.
254         */
255        public Map<String, String> getContextMap() {
256            return ctx;
257        }
258    
259        /**
260         * Returns a copy of the context stack.
261         * @return a copy of the context stack.
262         */
263        public ThreadContext.ContextStack getContextStack() {
264            return event.getContextStack();
265        }
266    }