001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017package org.apache.logging.log4j.flume.appender; 018 019import java.io.ByteArrayOutputStream; 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.HashMap; 023import java.util.List; 024import java.util.Map; 025import java.util.zip.GZIPOutputStream; 026 027import org.apache.flume.event.SimpleEvent; 028import org.apache.logging.log4j.Level; 029import org.apache.logging.log4j.LoggingException; 030import org.apache.logging.log4j.Marker; 031import org.apache.logging.log4j.ThreadContext; 032import org.apache.logging.log4j.core.LogEvent; 033import org.apache.logging.log4j.core.impl.ThrowableProxy; 034import org.apache.logging.log4j.core.util.Patterns; 035import org.apache.logging.log4j.core.util.UuidUtil; 036import org.apache.logging.log4j.message.MapMessage; 037import org.apache.logging.log4j.message.Message; 038import org.apache.logging.log4j.message.StructuredDataId; 039import org.apache.logging.log4j.message.StructuredDataMessage; 040import org.apache.logging.log4j.util.Strings; 041 042/** 043 * Class that is both a Flume and Log4j Event. 044 */ 045public class FlumeEvent extends SimpleEvent implements LogEvent { 046 047 static final String GUID = "guId"; 048 /** 049 * Generated serial version ID. 050 */ 051 private static final long serialVersionUID = -8988674608627854140L; 052 053 private static final String DEFAULT_MDC_PREFIX = Strings.EMPTY; 054 055 private static final String DEFAULT_EVENT_PREFIX = Strings.EMPTY; 056 057 private static final String EVENT_TYPE = "eventType"; 058 059 private static final String EVENT_ID = "eventId"; 060 061 private static final String TIMESTAMP = "timeStamp"; 062 063 private final LogEvent event; 064 065 private final Map<String, String> contextMap = new HashMap<>(); 066 067 private final boolean compress; 068 069 /** 070 * Construct the FlumeEvent. 071 * @param event The Log4j LogEvent. 072 * @param includes A comma separated list of MDC elements to include. 073 * @param excludes A comma separated list of MDC elements to exclude. 074 * @param required A comma separated list of MDC elements that are required to be defined. 075 * @param mdcPrefix The value to prefix to MDC keys. 076 * @param eventPrefix The value to prefix to event keys. 077 * @param compress If true the event body should be compressed. 078 */ 079 public FlumeEvent(final LogEvent event, final String includes, final String excludes, final String required, 080 String mdcPrefix, String eventPrefix, final boolean compress) { 081 this.event = event; 082 this.compress = compress; 083 final Map<String, String> headers = getHeaders(); 084 headers.put(TIMESTAMP, Long.toString(event.getTimeMillis())); 085 if (mdcPrefix == null) { 086 mdcPrefix = DEFAULT_MDC_PREFIX; 087 } 088 if (eventPrefix == null) { 089 eventPrefix = DEFAULT_EVENT_PREFIX; 090 } 091 final Map<String, String> mdc = event.getContextMap(); 092 if (includes != null) { 093 final String[] array = includes.split(Patterns.COMMA_SEPARATOR); 094 if (array.length > 0) { 095 for (String str : array) { 096 str = str.trim(); 097 if (mdc.containsKey(str)) { 098 contextMap.put(str, mdc.get(str)); 099 } 100 } 101 } 102 } else if (excludes != null) { 103 final String[] array = excludes.split(Patterns.COMMA_SEPARATOR); 104 if (array.length > 0) { 105 final List<String> list = new ArrayList<>(array.length); 106 for (final String value : array) { 107 list.add(value.trim()); 108 } 109 for (final Map.Entry<String, String> entry : mdc.entrySet()) { 110 if (!list.contains(entry.getKey())) { 111 contextMap.put(entry.getKey(), entry.getValue()); 112 } 113 } 114 } 115 } else { 116 contextMap.putAll(mdc); 117 } 118 119 if (required != null) { 120 final String[] array = required.split(Patterns.COMMA_SEPARATOR); 121 if (array.length > 0) { 122 for (String str : array) { 123 str = str.trim(); 124 if (!mdc.containsKey(str)) { 125 throw new LoggingException("Required key " + str + " is missing from the MDC"); 126 } 127 } 128 } 129 } 130 final String guid = UuidUtil.getTimeBasedUuid().toString(); 131 final Message message = event.getMessage(); 132 if (message instanceof MapMessage) { 133 // Add the guid to the Map so that it can be included in the Layout. 134 ((MapMessage) message).put(GUID, guid); 135 if (message instanceof StructuredDataMessage) { 136 addStructuredData(eventPrefix, headers, (StructuredDataMessage) message); 137 } 138 addMapData(eventPrefix, headers, (MapMessage) message); 139 } else { 140 headers.put(GUID, guid); 141 } 142 143 addContextData(mdcPrefix, headers, contextMap); 144 } 145 146 protected void addStructuredData(final String prefix, final Map<String, String> fields, 147 final StructuredDataMessage msg) { 148 fields.put(prefix + EVENT_TYPE, msg.getType()); 149 final StructuredDataId id = msg.getId(); 150 fields.put(prefix + EVENT_ID, id.getName()); 151 } 152 153 protected void addMapData(final String prefix, final Map<String, String> fields, final MapMessage msg) { 154 final Map<String, String> data = msg.getData(); 155 for (final Map.Entry<String, String> entry : data.entrySet()) { 156 fields.put(prefix + entry.getKey(), entry.getValue()); 157 } 158 } 159 160 protected void addContextData(final String prefix, final Map<String, String> fields, 161 final Map<String, String> context) { 162 final Map<String, String> map = new HashMap<>(); 163 for (final Map.Entry<String, String> entry : context.entrySet()) { 164 if (entry.getKey() != null && entry.getValue() != null) { 165 fields.put(prefix + entry.getKey(), entry.getValue()); 166 map.put(prefix + entry.getKey(), entry.getValue()); 167 } 168 } 169 context.clear(); 170 context.putAll(map); 171 } 172 173 /** 174 * Set the body in the event. 175 * @param body The body to add to the event. 176 */ 177 @Override 178 public void setBody(final byte[] body) { 179 if (body == null || body.length == 0) { 180 super.setBody(new byte[0]); 181 return; 182 } 183 if (compress) { 184 final ByteArrayOutputStream baos = new ByteArrayOutputStream(); 185 try (GZIPOutputStream os = new GZIPOutputStream(baos)) { 186 os.write(body); 187 } catch (final IOException ioe) { 188 throw new LoggingException("Unable to compress message", ioe); 189 } 190 super.setBody(baos.toByteArray()); 191 } else { 192 super.setBody(body); 193 } 194 } 195 196 /** 197 * Get the Frequently Qualified Class Name. 198 * @return the FQCN String. 199 */ 200 @Override 201 public String getLoggerFqcn() { 202 return event.getLoggerFqcn(); 203 } 204 205 /** 206 * Returns the logging Level. 207 * @return the Level. 208 */ 209 @Override 210 public Level getLevel() { 211 return event.getLevel(); 212 } 213 214 /** 215 * Returns the logger name. 216 * @return the logger name. 217 */ 218 @Override 219 public String getLoggerName() { 220 return event.getLoggerName(); 221 } 222 223 /** 224 * Returns the StackTraceElement for the caller of the logging API. 225 * @return the StackTraceElement of the caller. 226 */ 227 @Override 228 public StackTraceElement getSource() { 229 return event.getSource(); 230 } 231 232 /** 233 * Returns the Message. 234 * @return the Message. 235 */ 236 @Override 237 public Message getMessage() { 238 return event.getMessage(); 239 } 240 241 /** 242 * Returns the Marker. 243 * @return the Marker. 244 */ 245 @Override 246 public Marker getMarker() { 247 return event.getMarker(); 248 } 249 250 /** 251 * Returns the name of the Thread. 252 * @return the name of the Thread. 253 */ 254 @Override 255 public String getThreadName() { 256 return event.getThreadName(); 257 } 258 259 /** 260 * Returns the event timestamp. 261 * @return the event timestamp. 262 */ 263 @Override 264 public long getTimeMillis() { 265 return event.getTimeMillis(); 266 } 267 268 /** 269 * Returns the value of the running Java Virtual Machine's high-resolution time source when this event was created, 270 * or a dummy value if it is known that this value will not be used downstream. 271 * @return the event nanosecond timestamp. 272 */ 273 @Override 274 public long getNanoTime() { 275 return event.getNanoTime(); 276 } 277 278 /** 279 * Returns the Throwable associated with the event, if any. 280 * @return the Throwable. 281 */ 282 @Override 283 public Throwable getThrown() { 284 return event.getThrown(); 285 } 286 287 /** 288 * Returns the Throwable associated with the event, if any. 289 * @return the Throwable. 290 */ 291 @Override 292 public ThrowableProxy getThrownProxy() { 293 return event.getThrownProxy(); 294 } 295 296 /** 297 * Returns a copy of the context Map. 298 * @return a copy of the context Map. 299 */ 300 @Override 301 public Map<String, String> getContextMap() { 302 return contextMap; 303 } 304 305 /** 306 * Returns a copy of the context stack. 307 * @return a copy of the context stack. 308 */ 309 @Override 310 public ThreadContext.ContextStack getContextStack() { 311 return event.getContextStack(); 312 } 313 314 @Override 315 public boolean isIncludeLocation() { 316 return event.isIncludeLocation(); 317 } 318 319 @Override 320 public void setIncludeLocation(final boolean includeLocation) { 321 event.setIncludeLocation(includeLocation); 322 } 323 324 @Override 325 public boolean isEndOfBatch() { 326 return event.isEndOfBatch(); 327 } 328 329 @Override 330 public void setEndOfBatch(final boolean endOfBatch) { 331 event.setEndOfBatch(endOfBatch); 332 } 333}