001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017package org.apache.logging.log4j.flume.appender;
018
019import java.io.ByteArrayOutputStream;
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.HashMap;
023import java.util.List;
024import java.util.Map;
025import java.util.zip.GZIPOutputStream;
026
027import org.apache.flume.event.SimpleEvent;
028import org.apache.logging.log4j.Level;
029import org.apache.logging.log4j.LoggingException;
030import org.apache.logging.log4j.Marker;
031import org.apache.logging.log4j.ThreadContext;
032import org.apache.logging.log4j.core.LogEvent;
033import org.apache.logging.log4j.core.impl.ThrowableProxy;
034import org.apache.logging.log4j.core.util.Patterns;
035import org.apache.logging.log4j.core.util.UuidUtil;
036import org.apache.logging.log4j.message.MapMessage;
037import org.apache.logging.log4j.message.Message;
038import org.apache.logging.log4j.message.StructuredDataId;
039import org.apache.logging.log4j.message.StructuredDataMessage;
040import org.apache.logging.log4j.util.Strings;
041
042/**
043 * Class that is both a Flume and Log4j Event.
044 */
045public class FlumeEvent extends SimpleEvent implements LogEvent {
046
047    static final String GUID = "guId";
048    /**
049     * Generated serial version ID.
050     */
051    private static final long serialVersionUID = -8988674608627854140L;
052
053    private static final String DEFAULT_MDC_PREFIX = Strings.EMPTY;
054
055    private static final String DEFAULT_EVENT_PREFIX = Strings.EMPTY;
056
057    private static final String EVENT_TYPE = "eventType";
058
059    private static final String EVENT_ID = "eventId";
060
061    private static final String TIMESTAMP = "timeStamp";
062
063    private final LogEvent event;
064
065    private final Map<String, String> contextMap = new HashMap<>();
066
067    private final boolean compress;
068
069    /**
070     * Construct the FlumeEvent.
071     * @param event The Log4j LogEvent.
072     * @param includes A comma separated list of MDC elements to include.
073     * @param excludes A comma separated list of MDC elements to exclude.
074     * @param required A comma separated list of MDC elements that are required to be defined.
075     * @param mdcPrefix The value to prefix to MDC keys.
076     * @param eventPrefix The value to prefix to event keys.
077     * @param compress If true the event body should be compressed.
078     */
079    public FlumeEvent(final LogEvent event, final String includes, final String excludes, final String required,
080                      String mdcPrefix, String eventPrefix, final boolean compress) {
081        this.event = event;
082        this.compress = compress;
083        final Map<String, String> headers = getHeaders();
084        headers.put(TIMESTAMP, Long.toString(event.getTimeMillis()));
085        if (mdcPrefix == null) {
086            mdcPrefix = DEFAULT_MDC_PREFIX;
087        }
088        if (eventPrefix == null) {
089            eventPrefix = DEFAULT_EVENT_PREFIX;
090        }
091        final Map<String, String> mdc = event.getContextMap();
092        if (includes != null) {
093            final String[] array = includes.split(Patterns.COMMA_SEPARATOR);
094            if (array.length > 0) {
095                for (String str : array) {
096                    str = str.trim();
097                    if (mdc.containsKey(str)) {
098                        contextMap.put(str, mdc.get(str));
099                    }
100                }
101            }
102        } else if (excludes != null) {
103            final String[] array = excludes.split(Patterns.COMMA_SEPARATOR);
104            if (array.length > 0) {
105                final List<String> list = new ArrayList<>(array.length);
106                for (final String value : array) {
107                    list.add(value.trim());
108                }
109                for (final Map.Entry<String, String> entry : mdc.entrySet()) {
110                    if (!list.contains(entry.getKey())) {
111                        contextMap.put(entry.getKey(), entry.getValue());
112                    }
113                }
114            }
115        } else {
116            contextMap.putAll(mdc);
117        }
118
119        if (required != null) {
120            final String[] array = required.split(Patterns.COMMA_SEPARATOR);
121            if (array.length > 0) {
122                for (String str : array) {
123                    str = str.trim();
124                    if (!mdc.containsKey(str)) {
125                        throw new LoggingException("Required key " + str + " is missing from the MDC");
126                    }
127                }
128            }
129        }
130        final String guid =  UuidUtil.getTimeBasedUuid().toString();
131        final Message message = event.getMessage();
132        if (message instanceof MapMessage) {
133            // Add the guid to the Map so that it can be included in the Layout.
134            ((MapMessage) message).put(GUID, guid);
135            if (message instanceof StructuredDataMessage) {
136                addStructuredData(eventPrefix, headers, (StructuredDataMessage) message);
137            }
138            addMapData(eventPrefix, headers, (MapMessage) message);
139        } else {
140            headers.put(GUID, guid);
141        }
142
143        addContextData(mdcPrefix, headers, contextMap);
144    }
145
146    protected void addStructuredData(final String prefix, final Map<String, String> fields,
147                                     final StructuredDataMessage msg) {
148        fields.put(prefix + EVENT_TYPE, msg.getType());
149        final StructuredDataId id = msg.getId();
150        fields.put(prefix + EVENT_ID, id.getName());
151    }
152
153    protected void addMapData(final String prefix, final Map<String, String> fields, final MapMessage msg) {
154        final Map<String, String> data = msg.getData();
155        for (final Map.Entry<String, String> entry : data.entrySet()) {
156            fields.put(prefix + entry.getKey(), entry.getValue());
157        }
158    }
159
160    protected void addContextData(final String prefix, final Map<String, String> fields,
161                                  final Map<String, String> context) {
162        final Map<String, String> map = new HashMap<>();
163        for (final Map.Entry<String, String> entry : context.entrySet()) {
164            if (entry.getKey() != null && entry.getValue() != null) {
165                fields.put(prefix + entry.getKey(), entry.getValue());
166                map.put(prefix + entry.getKey(), entry.getValue());
167            }
168        }
169        context.clear();
170        context.putAll(map);
171    }
172
173    /**
174     * Set the body in the event.
175     * @param body The body to add to the event.
176     */
177    @Override
178    public void setBody(final byte[] body) {
179        if (body == null || body.length == 0) {
180            super.setBody(new byte[0]);
181            return;
182        }
183        if (compress) {
184            final ByteArrayOutputStream baos = new ByteArrayOutputStream();
185            try (GZIPOutputStream os = new GZIPOutputStream(baos)) {
186                os.write(body);
187            } catch (final IOException ioe) {
188                throw new LoggingException("Unable to compress message", ioe);
189            }
190            super.setBody(baos.toByteArray());
191        } else {
192            super.setBody(body);
193        }
194    }
195
196    /**
197     * Get the Frequently Qualified Class Name.
198     * @return the FQCN String.
199     */
200    @Override
201    public String getLoggerFqcn() {
202        return event.getLoggerFqcn();
203    }
204
205    /**
206     * Returns the logging Level.
207     * @return the Level.
208     */
209    @Override
210    public Level getLevel() {
211        return event.getLevel();
212    }
213
214    /**
215     * Returns the logger name.
216     * @return the logger name.
217     */
218    @Override
219    public String getLoggerName() {
220        return event.getLoggerName();
221    }
222
223    /**
224     * Returns the StackTraceElement for the caller of the logging API.
225     * @return the StackTraceElement of the caller.
226     */
227    @Override
228    public StackTraceElement getSource() {
229        return event.getSource();
230    }
231
232    /**
233     * Returns the Message.
234     * @return the Message.
235     */
236    @Override
237    public Message getMessage() {
238        return event.getMessage();
239    }
240
241    /**
242     * Returns the Marker.
243     * @return the Marker.
244     */
245    @Override
246    public Marker getMarker() {
247        return event.getMarker();
248    }
249
250    /**
251     * Returns the ID of the Thread.
252     * @return the ID of the Thread.
253     */
254    @Override
255    public long getThreadId() {
256        return event.getThreadId();
257    }
258
259    /**
260     * Returns the priority of the Thread.
261     * @return the priority of the Thread.
262     */
263    @Override
264    public int getThreadPriority() {
265        return event.getThreadPriority();
266    }
267
268    /**
269     * Returns the name of the Thread.
270     * @return the name of the Thread.
271     */
272    @Override
273    public String getThreadName() {
274        return event.getThreadName();
275    }
276
277    /**
278     * Returns the event timestamp.
279     * @return the event timestamp.
280     */
281    @Override
282    public long getTimeMillis() {
283        return event.getTimeMillis();
284    }
285
286    /**
287     * Returns the value of the running Java Virtual Machine's high-resolution time source when this event was created,
288     * or a dummy value if it is known that this value will not be used downstream.
289     * @return the event nanosecond timestamp.
290     */
291    @Override
292    public long getNanoTime() {
293        return event.getNanoTime();
294    }
295
296    /**
297     * Returns the Throwable associated with the event, if any.
298     * @return the Throwable.
299     */
300    @Override
301    public Throwable getThrown() {
302        return event.getThrown();
303    }
304
305    /**
306     * Returns the Throwable associated with the event, if any.
307     * @return the Throwable.
308     */
309    @Override
310    public ThrowableProxy getThrownProxy() {
311        return event.getThrownProxy();
312    }
313
314    /**
315     * Returns a copy of the context Map.
316     * @return a copy of the context Map.
317     */
318    @Override
319    public Map<String, String> getContextMap() {
320        return contextMap;
321    }
322
323    /**
324     * Returns a copy of the context stack.
325     * @return a copy of the context stack.
326     */
327    @Override
328    public ThreadContext.ContextStack getContextStack() {
329        return event.getContextStack();
330    }
331
332    @Override
333    public boolean isIncludeLocation() {
334        return event.isIncludeLocation();
335    }
336
337    @Override
338    public void setIncludeLocation(final boolean includeLocation) {
339        event.setIncludeLocation(includeLocation);
340    }
341
342    @Override
343    public boolean isEndOfBatch() {
344        return event.isEndOfBatch();
345    }
346
347    @Override
348    public void setEndOfBatch(final boolean endOfBatch) {
349        event.setEndOfBatch(endOfBatch);
350    }
351}