001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017 package org.apache.logging.log4j.flume.appender; 018 019 import org.apache.flume.event.SimpleEvent; 020 import org.apache.logging.log4j.Level; 021 import org.apache.logging.log4j.LoggingException; 022 import org.apache.logging.log4j.Marker; 023 import org.apache.logging.log4j.ThreadContext; 024 import org.apache.logging.log4j.core.LogEvent; 025 import org.apache.logging.log4j.core.helpers.UUIDUtil; 026 import org.apache.logging.log4j.message.MapMessage; 027 import org.apache.logging.log4j.message.Message; 028 import org.apache.logging.log4j.message.StructuredDataId; 029 import org.apache.logging.log4j.message.StructuredDataMessage; 030 031 import java.io.ByteArrayOutputStream; 032 import java.io.IOException; 033 import java.util.ArrayList; 034 import java.util.HashMap; 035 import java.util.List; 036 import java.util.Map; 037 import java.util.zip.GZIPOutputStream; 038 039 /** 040 * Class that is both a Flume and Log4j Event. 041 */ 042 public class FlumeEvent extends SimpleEvent implements LogEvent { 043 044 /** 045 * Generated serial version ID. 046 */ 047 private static final long serialVersionUID = -8988674608627854140L; 048 049 private static final String DEFAULT_MDC_PREFIX = "mdc:"; 050 051 private static final String DEFAULT_EVENT_PREFIX = ""; 052 053 private static final String EVENT_TYPE = "eventType"; 054 055 private static final String EVENT_ID = "eventId"; 056 057 static final String GUID = "guId"; 058 059 private static final String TIMESTAMP = "timeStamp";; 060 061 private final LogEvent event; 062 063 private final Map<String, String> ctx = new HashMap<String, String>(); 064 065 private final boolean compress; 066 067 /** 068 * Construct the FlumeEvent. 069 * @param event The Log4j LogEvent. 070 * @param includes A comma separated list of MDC elements to include. 071 * @param excludes A comma separated list of MDC elements to exclude. 072 * @param required A comma separated list of MDC elements that are required to be defined. 073 * @param mdcPrefix The value to prefix to MDC keys. 074 * @param eventPrefix The value to prefix to event keys. 075 * @param compress If true the event body should be compressed. 076 */ 077 public FlumeEvent(final LogEvent event, final String includes, final String excludes, final String required, 078 String mdcPrefix, String eventPrefix, final boolean compress) { 079 this.event = event; 080 this.compress = compress; 081 final Map<String, String> headers = getHeaders(); 082 headers.put(TIMESTAMP, Long.toString(event.getMillis())); 083 if (mdcPrefix == null) { 084 mdcPrefix = DEFAULT_MDC_PREFIX; 085 } 086 if (eventPrefix == null) { 087 eventPrefix = DEFAULT_EVENT_PREFIX; 088 } 089 final Map<String, String> mdc = event.getContextMap(); 090 if (includes != null) { 091 final String[] array = includes.split(","); 092 if (array.length > 0) { 093 for (String str : array) { 094 str = str.trim(); 095 if (mdc.containsKey(str)) { 096 ctx.put(str, mdc.get(str)); 097 } 098 } 099 } 100 } else if (excludes != null) { 101 final String[] array = excludes.split(","); 102 if (array.length > 0) { 103 final List<String> list = new ArrayList<String>(array.length); 104 for (final String value : array) { 105 list.add(value.trim()); 106 } 107 for (final Map.Entry<String, String> entry : mdc.entrySet()) { 108 if (!list.contains(entry.getKey())) { 109 ctx.put(entry.getKey(), entry.getValue()); 110 } 111 } 112 } 113 } else { 114 ctx.putAll(mdc); 115 } 116 117 if (required != null) { 118 final String[] array = required.split(","); 119 if (array.length > 0) { 120 for (String str : array) { 121 str = str.trim(); 122 if (!mdc.containsKey(str)) { 123 throw new LoggingException("Required key " + str + " is missing from the MDC"); 124 } 125 } 126 } 127 } 128 final String guid = UUIDUtil.getTimeBasedUUID().toString(); 129 final Message message = event.getMessage(); 130 if (message instanceof MapMessage) { 131 ((MapMessage) message).put(GUID, guid); 132 if (message instanceof StructuredDataMessage) { 133 addStructuredData(eventPrefix, headers, (StructuredDataMessage) message); 134 } 135 addMapData(eventPrefix, headers, (MapMessage) message); 136 } 137 138 addContextData(mdcPrefix, headers, ctx); 139 } 140 141 protected void addStructuredData(final String prefix, final Map<String, String> fields, 142 final StructuredDataMessage msg) { 143 fields.put(prefix + EVENT_TYPE, msg.getType()); 144 final StructuredDataId id = msg.getId(); 145 fields.put(prefix + EVENT_ID, id.getName()); 146 } 147 148 protected void addMapData(final String prefix, final Map<String, String> fields, final MapMessage msg) { 149 final Map<String, String> data = msg.getData(); 150 for (final Map.Entry<String, String> entry : data.entrySet()) { 151 fields.put(prefix + entry.getKey(), entry.getValue()); 152 } 153 } 154 155 protected void addContextData(final String prefix, final Map<String, String> fields, 156 final Map<String, String> context) { 157 for (final Map.Entry<String, String> entry : context.entrySet()) { 158 if (entry.getKey() != null && entry.getValue() != null) { 159 fields.put(prefix + entry.getKey(), entry.getValue()); 160 } 161 } 162 } 163 164 /** 165 * Set the body in the event. 166 * @param body The body to add to the event. 167 */ 168 @Override 169 public void setBody(final byte[] body) { 170 if (body == null || body.length == 0) { 171 super.setBody(new byte[0]); 172 return; 173 } 174 if (compress) { 175 final ByteArrayOutputStream baos = new ByteArrayOutputStream(); 176 try { 177 final GZIPOutputStream os = new GZIPOutputStream(baos); 178 os.write(body); 179 os.close(); 180 } catch (final IOException ioe) { 181 throw new LoggingException("Unable to compress message", ioe); 182 } 183 super.setBody(baos.toByteArray()); 184 } else { 185 super.setBody(body); 186 } 187 } 188 189 /** 190 * Get the Frequently Qualified Class Name. 191 * @return the FQCN String. 192 */ 193 public String getFQCN() { 194 return event.getFQCN(); 195 } 196 197 /** 198 * Returns the logging Level. 199 * @return the Level. 200 */ 201 public Level getLevel() { 202 return event.getLevel(); 203 } 204 205 /** 206 * Returns the logger name. 207 * @return the logger name. 208 */ 209 public String getLoggerName() { 210 return event.getLoggerName(); 211 } 212 213 /** 214 * Returns the StackTraceElement for the caller of the logging API. 215 * @return the StackTraceElement of the caller. 216 */ 217 public StackTraceElement getSource() { 218 return event.getSource(); 219 } 220 221 /** 222 * Returns the Message. 223 * @return the Message. 224 */ 225 public Message getMessage() { 226 return event.getMessage(); 227 } 228 229 /** 230 * Returns the Marker. 231 * @return the Marker. 232 */ 233 public Marker getMarker() { 234 return event.getMarker(); 235 } 236 237 /** 238 * Returns the name of the Thread. 239 * @return the name of the Thread. 240 */ 241 public String getThreadName() { 242 return event.getThreadName(); 243 } 244 245 /** 246 * Returns the event timestamp. 247 * @return the event timestamp. 248 */ 249 public long getMillis() { 250 return event.getMillis(); 251 } 252 253 /** 254 * Returns the Throwable associated with the event, if any. 255 * @return the Throwable. 256 */ 257 public Throwable getThrown() { 258 return event.getThrown(); 259 } 260 261 /** 262 * Returns a copy of the context Map. 263 * @return a copy of the context Map. 264 */ 265 public Map<String, String> getContextMap() { 266 return ctx; 267 } 268 269 /** 270 * Returns a copy of the context stack. 271 * @return a copy of the context stack. 272 */ 273 public ThreadContext.ContextStack getContextStack() { 274 return event.getContextStack(); 275 } 276 277 @Override 278 public boolean isIncludeLocation() { 279 return event.isIncludeLocation(); 280 } 281 282 @Override 283 public void setIncludeLocation(boolean includeLocation) { 284 event.setIncludeLocation(includeLocation); 285 } 286 287 @Override 288 public boolean isEndOfBatch() { 289 return event.isEndOfBatch(); 290 } 291 292 @Override 293 public void setEndOfBatch(boolean endOfBatch) { 294 event.setEndOfBatch(endOfBatch); 295 } 296 }