001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017 package org.apache.logging.log4j.flume.appender; 018 019 import org.apache.flume.event.SimpleEvent; 020 import org.apache.logging.log4j.Level; 021 import org.apache.logging.log4j.LoggingException; 022 import org.apache.logging.log4j.Marker; 023 import org.apache.logging.log4j.ThreadContext; 024 import org.apache.logging.log4j.core.LogEvent; 025 import org.apache.logging.log4j.core.helpers.UUIDUtil; 026 import org.apache.logging.log4j.message.MapMessage; 027 import org.apache.logging.log4j.message.Message; 028 import org.apache.logging.log4j.message.StructuredDataId; 029 import org.apache.logging.log4j.message.StructuredDataMessage; 030 031 import java.io.ByteArrayOutputStream; 032 import java.io.IOException; 033 import java.util.ArrayList; 034 import java.util.HashMap; 035 import java.util.List; 036 import java.util.Map; 037 import java.util.zip.GZIPOutputStream; 038 039 /** 040 * Class that is both a Flume and Log4j Event. 041 */ 042 public class FlumeEvent extends SimpleEvent implements LogEvent { 043 044 /** 045 * Generated serial version ID. 046 */ 047 private static final long serialVersionUID = -8988674608627854140L; 048 049 private static final String DEFAULT_MDC_PREFIX = "mdc:"; 050 051 private static final String DEFAULT_EVENT_PREFIX = ""; 052 053 private static final String EVENT_TYPE = "eventType"; 054 055 private static final String EVENT_ID = "eventId"; 056 057 private static final String GUID = "guId"; 058 059 private static final String TIMESTAMP = "timeStamp";; 060 061 private final LogEvent event; 062 063 private final Map<String, String> ctx = new HashMap<String, String>(); 064 065 private final boolean compress; 066 067 /** 068 * Construct the FlumeEvent. 069 * @param event The Log4j LogEvent. 070 * @param includes A comma separated list of MDC elements to include. 071 * @param excludes A comma separated list of MDC elements to exclude. 072 * @param required A comma separated list of MDC elements that are required to be defined. 073 * @param mdcPrefix The value to prefix to MDC keys. 074 * @param eventPrefix The value to prefix to event keys. 075 * @param compress If true the event body should be compressed. 076 */ 077 public FlumeEvent(final LogEvent event, final String includes, final String excludes, final String required, 078 String mdcPrefix, String eventPrefix, final boolean compress) { 079 this.event = event; 080 this.compress = compress; 081 final Map<String, String> headers = getHeaders(); 082 headers.put(TIMESTAMP, Long.toString(event.getMillis())); 083 if (mdcPrefix == null) { 084 mdcPrefix = DEFAULT_MDC_PREFIX; 085 } 086 if (eventPrefix == null) { 087 eventPrefix = DEFAULT_EVENT_PREFIX; 088 } 089 final Map<String, String> mdc = event.getContextMap(); 090 if (includes != null) { 091 final String[] array = includes.split(","); 092 if (array.length > 0) { 093 for (String str : array) { 094 str = str.trim(); 095 if (mdc.containsKey(str)) { 096 ctx.put(str, mdc.get(str)); 097 } 098 } 099 } 100 } else if (excludes != null) { 101 final String[] array = excludes.split(","); 102 if (array.length > 0) { 103 final List<String> list = new ArrayList<String>(array.length); 104 for (final String value : array) { 105 list.add(value.trim()); 106 } 107 for (final Map.Entry<String, String> entry : mdc.entrySet()) { 108 if (!list.contains(entry.getKey())) { 109 ctx.put(entry.getKey(), entry.getValue()); 110 } 111 } 112 } 113 } else { 114 ctx.putAll(mdc); 115 } 116 117 if (required != null) { 118 final String[] array = required.split(","); 119 if (array.length > 0) { 120 for (String str : array) { 121 str = str.trim(); 122 if (!mdc.containsKey(str)) { 123 throw new LoggingException("Required key " + str + " is missing from the MDC"); 124 } 125 } 126 } 127 } 128 final Message message = event.getMessage(); 129 if (message instanceof MapMessage) { 130 if (message instanceof StructuredDataMessage) { 131 addStructuredData(eventPrefix, headers, (StructuredDataMessage) message); 132 } 133 addMapData(eventPrefix, headers, (MapMessage) message); 134 } 135 136 addContextData(mdcPrefix, headers, ctx); 137 138 addGuid(headers); 139 } 140 141 protected void addStructuredData(final String prefix, final Map<String, String> fields, 142 final StructuredDataMessage msg) { 143 fields.put(prefix + EVENT_TYPE, msg.getType()); 144 final StructuredDataId id = msg.getId(); 145 fields.put(prefix + EVENT_ID, id.getName()); 146 } 147 148 protected void addMapData(final String prefix, final Map<String, String> fields, final MapMessage msg) { 149 final Map<String, String> data = msg.getData(); 150 for (final Map.Entry<String, String> entry : data.entrySet()) { 151 fields.put(prefix + entry.getKey(), entry.getValue()); 152 } 153 } 154 155 protected void addContextData(final String prefix, final Map<String, String> fields, 156 final Map<String, String> context) { 157 for (final Map.Entry<String, String> entry : context.entrySet()) { 158 if (entry.getKey() != null && entry.getValue() != null) { 159 fields.put(prefix + entry.getKey(), entry.getValue()); 160 } 161 } 162 } 163 164 protected void addGuid(final Map<String, String> fields) { 165 fields.put(GUID, UUIDUtil.getTimeBasedUUID().toString()); 166 } 167 168 /** 169 * Set the body in the event. 170 * @param body The body to add to the event. 171 */ 172 @Override 173 public void setBody(final byte[] body) { 174 if (body == null || body.length == 0) { 175 super.setBody(new byte[0]); 176 return; 177 } 178 if (compress) { 179 final ByteArrayOutputStream baos = new ByteArrayOutputStream(); 180 try { 181 final GZIPOutputStream os = new GZIPOutputStream(baos); 182 os.write(body); 183 os.close(); 184 } catch (final IOException ioe) { 185 throw new LoggingException("Unable to compress message", ioe); 186 } 187 super.setBody(baos.toByteArray()); 188 } else { 189 super.setBody(body); 190 } 191 } 192 193 /** 194 * Get the Frequently Qualified Class Name. 195 * @return the FQCN String. 196 */ 197 public String getFQCN() { 198 return event.getFQCN(); 199 } 200 201 /** 202 * Returns the logging Level. 203 * @return the Level. 204 */ 205 public Level getLevel() { 206 return event.getLevel(); 207 } 208 209 /** 210 * Returns the logger name. 211 * @return the logger name. 212 */ 213 public String getLoggerName() { 214 return event.getLoggerName(); 215 } 216 217 /** 218 * Returns the StackTraceElement for the caller of the logging API. 219 * @return the StackTraceElement of the caller. 220 */ 221 public StackTraceElement getSource() { 222 return event.getSource(); 223 } 224 225 /** 226 * Returns the Message. 227 * @return the Message. 228 */ 229 public Message getMessage() { 230 return event.getMessage(); 231 } 232 233 /** 234 * Returns the Marker. 235 * @return the Marker. 236 */ 237 public Marker getMarker() { 238 return event.getMarker(); 239 } 240 241 /** 242 * Returns the name of the Thread. 243 * @return the name of the Thread. 244 */ 245 public String getThreadName() { 246 return event.getThreadName(); 247 } 248 249 /** 250 * Returns the event timestamp. 251 * @return the event timestamp. 252 */ 253 public long getMillis() { 254 return event.getMillis(); 255 } 256 257 /** 258 * Returns the Throwable associated with the event, if any. 259 * @return the Throwable. 260 */ 261 public Throwable getThrown() { 262 return event.getThrown(); 263 } 264 265 /** 266 * Returns a copy of the context Map. 267 * @return a copy of the context Map. 268 */ 269 public Map<String, String> getContextMap() { 270 return ctx; 271 } 272 273 /** 274 * Returns a copy of the context stack. 275 * @return a copy of the context stack. 276 */ 277 public ThreadContext.ContextStack getContextStack() { 278 return event.getContextStack(); 279 } 280 }