001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017 package org.apache.logging.log4j.flume.appender; 018 019 import org.apache.flume.event.SimpleEvent; 020 import org.apache.logging.log4j.Level; 021 import org.apache.logging.log4j.LoggingException; 022 import org.apache.logging.log4j.Marker; 023 import org.apache.logging.log4j.ThreadContext; 024 import org.apache.logging.log4j.core.LogEvent; 025 import org.apache.logging.log4j.core.helpers.UUIDUtil; 026 import org.apache.logging.log4j.message.MapMessage; 027 import org.apache.logging.log4j.message.Message; 028 import org.apache.logging.log4j.message.StructuredDataId; 029 import org.apache.logging.log4j.message.StructuredDataMessage; 030 031 import java.io.ByteArrayOutputStream; 032 import java.io.IOException; 033 import java.util.ArrayList; 034 import java.util.HashMap; 035 import java.util.List; 036 import java.util.Map; 037 import java.util.zip.GZIPOutputStream; 038 039 /** 040 * Class that is both a Flume and Log4j Event. 041 */ 042 public class FlumeEvent extends SimpleEvent implements LogEvent { 043 044 /** 045 * Generated serial version ID. 046 */ 047 private static final long serialVersionUID = -8988674608627854140L; 048 049 private static final String DEFAULT_MDC_PREFIX = "mdc:"; 050 051 private static final String DEFAULT_EVENT_PREFIX = ""; 052 053 private static final String EVENT_TYPE = "eventType"; 054 055 private static final String EVENT_ID = "eventId"; 056 057 static final String GUID = "guId"; 058 059 private static final String TIMESTAMP = "timeStamp";; 060 061 private final LogEvent event; 062 063 private final Map<String, String> ctx = new HashMap<String, String>(); 064 065 private final boolean compress; 066 067 /** 068 * Construct the FlumeEvent. 069 * @param event The Log4j LogEvent. 070 * @param includes A comma separated list of MDC elements to include. 071 * @param excludes A comma separated list of MDC elements to exclude. 072 * @param required A comma separated list of MDC elements that are required to be defined. 073 * @param mdcPrefix The value to prefix to MDC keys. 074 * @param eventPrefix The value to prefix to event keys. 075 * @param compress If true the event body should be compressed. 076 */ 077 public FlumeEvent(final LogEvent event, final String includes, final String excludes, final String required, 078 String mdcPrefix, String eventPrefix, final boolean compress) { 079 this.event = event; 080 this.compress = compress; 081 final Map<String, String> headers = getHeaders(); 082 headers.put(TIMESTAMP, Long.toString(event.getMillis())); 083 if (mdcPrefix == null) { 084 mdcPrefix = DEFAULT_MDC_PREFIX; 085 } 086 if (eventPrefix == null) { 087 eventPrefix = DEFAULT_EVENT_PREFIX; 088 } 089 final Map<String, String> mdc = event.getContextMap(); 090 if (includes != null) { 091 final String[] array = includes.split(","); 092 if (array.length > 0) { 093 for (String str : array) { 094 str = str.trim(); 095 if (mdc.containsKey(str)) { 096 ctx.put(str, mdc.get(str)); 097 } 098 } 099 } 100 } else if (excludes != null) { 101 final String[] array = excludes.split(","); 102 if (array.length > 0) { 103 final List<String> list = new ArrayList<String>(array.length); 104 for (final String value : array) { 105 list.add(value.trim()); 106 } 107 for (final Map.Entry<String, String> entry : mdc.entrySet()) { 108 if (!list.contains(entry.getKey())) { 109 ctx.put(entry.getKey(), entry.getValue()); 110 } 111 } 112 } 113 } else { 114 ctx.putAll(mdc); 115 } 116 117 if (required != null) { 118 final String[] array = required.split(","); 119 if (array.length > 0) { 120 for (String str : array) { 121 str = str.trim(); 122 if (!mdc.containsKey(str)) { 123 throw new LoggingException("Required key " + str + " is missing from the MDC"); 124 } 125 } 126 } 127 } 128 final String guid = UUIDUtil.getTimeBasedUUID().toString(); 129 final Message message = event.getMessage(); 130 if (message instanceof MapMessage) { 131 // Add the guid to the Map so that it can be included in the Layout. 132 ((MapMessage) message).put(GUID, guid); 133 if (message instanceof StructuredDataMessage) { 134 addStructuredData(eventPrefix, headers, (StructuredDataMessage) message); 135 } 136 addMapData(eventPrefix, headers, (MapMessage) message); 137 } else { 138 headers.put(GUID, guid); 139 } 140 141 addContextData(mdcPrefix, headers, ctx); 142 } 143 144 protected void addStructuredData(final String prefix, final Map<String, String> fields, 145 final StructuredDataMessage msg) { 146 fields.put(prefix + EVENT_TYPE, msg.getType()); 147 final StructuredDataId id = msg.getId(); 148 fields.put(prefix + EVENT_ID, id.getName()); 149 } 150 151 protected void addMapData(final String prefix, final Map<String, String> fields, final MapMessage msg) { 152 final Map<String, String> data = msg.getData(); 153 for (final Map.Entry<String, String> entry : data.entrySet()) { 154 fields.put(prefix + entry.getKey(), entry.getValue()); 155 } 156 } 157 158 protected void addContextData(final String prefix, final Map<String, String> fields, 159 final Map<String, String> context) { 160 for (final Map.Entry<String, String> entry : context.entrySet()) { 161 if (entry.getKey() != null && entry.getValue() != null) { 162 fields.put(prefix + entry.getKey(), entry.getValue()); 163 } 164 } 165 } 166 167 /** 168 * Set the body in the event. 169 * @param body The body to add to the event. 170 */ 171 @Override 172 public void setBody(final byte[] body) { 173 if (body == null || body.length == 0) { 174 super.setBody(new byte[0]); 175 return; 176 } 177 if (compress) { 178 final ByteArrayOutputStream baos = new ByteArrayOutputStream(); 179 try { 180 final GZIPOutputStream os = new GZIPOutputStream(baos); 181 os.write(body); 182 os.close(); 183 } catch (final IOException ioe) { 184 throw new LoggingException("Unable to compress message", ioe); 185 } 186 super.setBody(baos.toByteArray()); 187 } else { 188 super.setBody(body); 189 } 190 } 191 192 /** 193 * Get the Frequently Qualified Class Name. 194 * @return the FQCN String. 195 */ 196 @Override 197 public String getFQCN() { 198 return event.getFQCN(); 199 } 200 201 /** 202 * Returns the logging Level. 203 * @return the Level. 204 */ 205 @Override 206 public Level getLevel() { 207 return event.getLevel(); 208 } 209 210 /** 211 * Returns the logger name. 212 * @return the logger name. 213 */ 214 @Override 215 public String getLoggerName() { 216 return event.getLoggerName(); 217 } 218 219 /** 220 * Returns the StackTraceElement for the caller of the logging API. 221 * @return the StackTraceElement of the caller. 222 */ 223 @Override 224 public StackTraceElement getSource() { 225 return event.getSource(); 226 } 227 228 /** 229 * Returns the Message. 230 * @return the Message. 231 */ 232 @Override 233 public Message getMessage() { 234 return event.getMessage(); 235 } 236 237 /** 238 * Returns the Marker. 239 * @return the Marker. 240 */ 241 @Override 242 public Marker getMarker() { 243 return event.getMarker(); 244 } 245 246 /** 247 * Returns the name of the Thread. 248 * @return the name of the Thread. 249 */ 250 @Override 251 public String getThreadName() { 252 return event.getThreadName(); 253 } 254 255 /** 256 * Returns the event timestamp. 257 * @return the event timestamp. 258 */ 259 @Override 260 public long getMillis() { 261 return event.getMillis(); 262 } 263 264 /** 265 * Returns the Throwable associated with the event, if any. 266 * @return the Throwable. 267 */ 268 @Override 269 public Throwable getThrown() { 270 return event.getThrown(); 271 } 272 273 /** 274 * Returns a copy of the context Map. 275 * @return a copy of the context Map. 276 */ 277 @Override 278 public Map<String, String> getContextMap() { 279 return ctx; 280 } 281 282 /** 283 * Returns a copy of the context stack. 284 * @return a copy of the context stack. 285 */ 286 @Override 287 public ThreadContext.ContextStack getContextStack() { 288 return event.getContextStack(); 289 } 290 291 @Override 292 public boolean isIncludeLocation() { 293 return event.isIncludeLocation(); 294 } 295 296 @Override 297 public void setIncludeLocation(boolean includeLocation) { 298 event.setIncludeLocation(includeLocation); 299 } 300 301 @Override 302 public boolean isEndOfBatch() { 303 return event.isEndOfBatch(); 304 } 305 306 @Override 307 public void setEndOfBatch(boolean endOfBatch) { 308 event.setEndOfBatch(endOfBatch); 309 } 310 }