001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017package org.apache.logging.log4j.flume.appender;
018
019import java.io.ByteArrayOutputStream;
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.HashMap;
023import java.util.List;
024import java.util.Map;
025import java.util.zip.GZIPOutputStream;
026
027import org.apache.flume.event.SimpleEvent;
028import org.apache.logging.log4j.Level;
029import org.apache.logging.log4j.LoggingException;
030import org.apache.logging.log4j.Marker;
031import org.apache.logging.log4j.ThreadContext;
032import org.apache.logging.log4j.core.LogEvent;
033import org.apache.logging.log4j.core.helpers.UUIDUtil;
034import org.apache.logging.log4j.message.MapMessage;
035import org.apache.logging.log4j.message.Message;
036import org.apache.logging.log4j.message.StructuredDataId;
037import org.apache.logging.log4j.message.StructuredDataMessage;
038
039/**
040 * Class that is both a Flume and Log4j Event.
041 */
042public class FlumeEvent extends SimpleEvent implements LogEvent {
043
044    static final String GUID = "guId";
045    /**
046     * Generated serial version ID.
047     */
048    private static final long serialVersionUID = -8988674608627854140L;
049
050    private static final String DEFAULT_MDC_PREFIX = "";
051
052    private static final String DEFAULT_EVENT_PREFIX = "";
053
054    private static final String EVENT_TYPE = "eventType";
055
056    private static final String EVENT_ID = "eventId";
057
058    private static final String TIMESTAMP = "timeStamp";
059
060    private final LogEvent event;
061
062    private final Map<String, String> ctx = new HashMap<String, String>();
063
064    private final boolean compress;
065
066    /**
067     * Construct the FlumeEvent.
068     * @param event The Log4j LogEvent.
069     * @param includes A comma separated list of MDC elements to include.
070     * @param excludes A comma separated list of MDC elements to exclude.
071     * @param required A comma separated list of MDC elements that are required to be defined.
072     * @param mdcPrefix The value to prefix to MDC keys.
073     * @param eventPrefix The value to prefix to event keys.
074     * @param compress If true the event body should be compressed.
075     */
076    public FlumeEvent(final LogEvent event, final String includes, final String excludes, final String required,
077                      String mdcPrefix, String eventPrefix, final boolean compress) {
078        this.event = event;
079        this.compress = compress;
080        final Map<String, String> headers = getHeaders();
081        headers.put(TIMESTAMP, Long.toString(event.getMillis()));
082        if (mdcPrefix == null) {
083            mdcPrefix = DEFAULT_MDC_PREFIX;
084        }
085        if (eventPrefix == null) {
086            eventPrefix = DEFAULT_EVENT_PREFIX;
087        }
088        final Map<String, String> mdc = event.getContextMap();
089        if (includes != null) {
090            final String[] array = includes.split(",");
091            if (array.length > 0) {
092                for (String str : array) {
093                    str = str.trim();
094                    if (mdc.containsKey(str)) {
095                        ctx.put(str, mdc.get(str));
096                    }
097                }
098            }
099        } else if (excludes != null) {
100            final String[] array = excludes.split(",");
101            if (array.length > 0) {
102                final List<String> list = new ArrayList<String>(array.length);
103                for (final String value : array) {
104                    list.add(value.trim());
105                }
106                for (final Map.Entry<String, String> entry : mdc.entrySet()) {
107                    if (!list.contains(entry.getKey())) {
108                        ctx.put(entry.getKey(), entry.getValue());
109                    }
110                }
111            }
112        } else {
113            ctx.putAll(mdc);
114        }
115
116        if (required != null) {
117            final String[] array = required.split(",");
118            if (array.length > 0) {
119                for (String str : array) {
120                    str = str.trim();
121                    if (!mdc.containsKey(str)) {
122                        throw new LoggingException("Required key " + str + " is missing from the MDC");
123                    }
124                }
125            }
126        }
127        final String guid =  UUIDUtil.getTimeBasedUUID().toString();
128        final Message message = event.getMessage();
129        if (message instanceof MapMessage) {
130            // Add the guid to the Map so that it can be included in the Layout.
131            ((MapMessage) message).put(GUID, guid);
132            if (message instanceof StructuredDataMessage) {
133                addStructuredData(eventPrefix, headers, (StructuredDataMessage) message);
134            }
135            addMapData(eventPrefix, headers, (MapMessage) message);
136        } else {
137            headers.put(GUID, guid);
138        }
139
140        addContextData(mdcPrefix, headers, ctx);
141    }
142
143    protected void addStructuredData(final String prefix, final Map<String, String> fields,
144                                     final StructuredDataMessage msg) {
145        fields.put(prefix + EVENT_TYPE, msg.getType());
146        final StructuredDataId id = msg.getId();
147        fields.put(prefix + EVENT_ID, id.getName());
148    }
149
150    protected void addMapData(final String prefix, final Map<String, String> fields, final MapMessage msg) {
151        final Map<String, String> data = msg.getData();
152        for (final Map.Entry<String, String> entry : data.entrySet()) {
153            fields.put(prefix + entry.getKey(), entry.getValue());
154        }
155    }
156
157    protected void addContextData(final String prefix, final Map<String, String> fields,
158                                  final Map<String, String> context) {
159        Map<String, String> map = new HashMap<String, String>();
160        for (final Map.Entry<String, String> entry : context.entrySet()) {
161            if (entry.getKey() != null && entry.getValue() != null) {
162                fields.put(prefix + entry.getKey(), entry.getValue());
163                map.put(prefix + entry.getKey(), entry.getValue());
164            }
165        }
166        context.clear();
167        context.putAll(map);
168    }
169
170    /**
171     * Set the body in the event.
172     * @param body The body to add to the event.
173     */
174    @Override
175    public void setBody(final byte[] body) {
176        if (body == null || body.length == 0) {
177            super.setBody(new byte[0]);
178            return;
179        }
180        if (compress) {
181            final ByteArrayOutputStream baos = new ByteArrayOutputStream();
182            try {
183                final GZIPOutputStream os = new GZIPOutputStream(baos);
184                os.write(body);
185                os.close();
186            } catch (final IOException ioe) {
187                throw new LoggingException("Unable to compress message", ioe);
188            }
189            super.setBody(baos.toByteArray());
190        } else {
191            super.setBody(body);
192        }
193    }
194
195    /**
196     * Get the Frequently Qualified Class Name.
197     * @return the FQCN String.
198     */
199    @Override
200    public String getFQCN() {
201        return event.getFQCN();
202    }
203
204    /**
205     * Returns the logging Level.
206     * @return the Level.
207     */
208    @Override
209    public Level getLevel() {
210        return event.getLevel();
211    }
212
213    /**
214     * Returns the logger name.
215     * @return the logger name.
216     */
217    @Override
218    public String getLoggerName() {
219        return event.getLoggerName();
220    }
221
222    /**
223     * Returns the StackTraceElement for the caller of the logging API.
224     * @return the StackTraceElement of the caller.
225     */
226    @Override
227    public StackTraceElement getSource() {
228        return event.getSource();
229    }
230
231    /**
232     * Returns the Message.
233     * @return the Message.
234     */
235    @Override
236    public Message getMessage() {
237        return event.getMessage();
238    }
239
240    /**
241     * Returns the Marker.
242     * @return the Marker.
243     */
244    @Override
245    public Marker getMarker() {
246        return event.getMarker();
247    }
248
249    /**
250     * Returns the name of the Thread.
251     * @return the name of the Thread.
252     */
253    @Override
254    public String getThreadName() {
255        return event.getThreadName();
256    }
257
258    /**
259     * Returns the event timestamp.
260     * @return the event timestamp.
261     */
262    @Override
263    public long getMillis() {
264        return event.getMillis();
265    }
266
267    /**
268     * Returns the Throwable associated with the event, if any.
269     * @return the Throwable.
270     */
271    @Override
272    public Throwable getThrown() {
273        return event.getThrown();
274    }
275
276    /**
277     * Returns a copy of the context Map.
278     * @return a copy of the context Map.
279     */
280    @Override
281    public Map<String, String> getContextMap() {
282        return ctx;
283    }
284
285    /**
286     * Returns a copy of the context stack.
287     * @return a copy of the context stack.
288     */
289    @Override
290    public ThreadContext.ContextStack getContextStack() {
291        return event.getContextStack();
292    }
293
294    @Override
295    public boolean isIncludeLocation() {
296        return event.isIncludeLocation();
297    }
298
299    @Override
300    public void setIncludeLocation(final boolean includeLocation) {
301        event.setIncludeLocation(includeLocation);
302    }
303
304    @Override
305    public boolean isEndOfBatch() {
306        return event.isEndOfBatch();
307    }
308
309    @Override
310    public void setEndOfBatch(final boolean endOfBatch) {
311        event.setEndOfBatch(endOfBatch);
312    }
313}