001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017 package org.apache.logging.log4j.flume.appender; 018 019 import org.apache.flume.event.SimpleEvent; 020 import org.apache.logging.log4j.Level; 021 import org.apache.logging.log4j.LoggingException; 022 import org.apache.logging.log4j.Marker; 023 import org.apache.logging.log4j.ThreadContext; 024 import org.apache.logging.log4j.core.LogEvent; 025 import org.apache.logging.log4j.core.helpers.UUIDUtil; 026 import org.apache.logging.log4j.message.MapMessage; 027 import org.apache.logging.log4j.message.Message; 028 import org.apache.logging.log4j.message.StructuredDataId; 029 import org.apache.logging.log4j.message.StructuredDataMessage; 030 031 import java.io.ByteArrayOutputStream; 032 import java.io.IOException; 033 import java.util.Arrays; 034 import java.util.HashMap; 035 import java.util.List; 036 import java.util.Map; 037 import java.util.zip.GZIPOutputStream; 038 039 /** 040 * Class that is both a Flume and Log4j Event. 041 */ 042 public class FlumeEvent extends SimpleEvent implements LogEvent { 043 044 private static final String DEFAULT_MDC_PREFIX = "mdc:"; 045 046 private static final String DEFAULT_EVENT_PREFIX = ""; 047 048 private static final String EVENT_TYPE = "eventType"; 049 050 private static final String EVENT_ID = "eventId"; 051 052 private static final String GUID = "guId"; 053 054 private static final String TIMESTAMP = "timeStamp";; 055 056 private final LogEvent event; 057 058 private final Map<String, String> ctx = new HashMap<String, String>(); 059 060 private final boolean compress; 061 062 /** 063 * Construct the FlumeEvent. 064 * @param event The Log4j LogEvent. 065 * @param includes A comma separated list of MDC elements to include. 066 * @param excludes A comma separated list of MDC elements to exclude. 067 * @param required A comma separated list of MDC elements that are required to be defined. 068 * @param mdcPrefix The value to prefix to MDC keys. 069 * @param eventPrefix The value to prefix to event keys. 070 * @param compress If true the event body should be compressed. 071 */ 072 public FlumeEvent(LogEvent event, String includes, String excludes, String required, 073 String mdcPrefix, String eventPrefix, boolean compress) { 074 this.event = event; 075 this.compress = compress; 076 Map<String, String> headers = getHeaders(); 077 headers.put(TIMESTAMP, Long.toString(event.getMillis())); 078 if (mdcPrefix == null) { 079 mdcPrefix = DEFAULT_MDC_PREFIX; 080 } 081 if (eventPrefix == null) { 082 eventPrefix = DEFAULT_EVENT_PREFIX; 083 } 084 Map<String, String> mdc = event.getContextMap(); 085 if (includes != null) { 086 String[] array = includes.split(","); 087 if (array.length > 0) { 088 for (String str : array) { 089 if (mdc.containsKey(str)) { 090 ctx.put(str, mdc.get(str)); 091 } 092 } 093 } 094 } else if (excludes != null) { 095 String[] array = excludes.split(","); 096 if (array.length > 0) { 097 List<String> list = Arrays.asList(array); 098 for (Map.Entry<String, String> entry : mdc.entrySet()) { 099 if (!list.contains(entry.getKey())) { 100 ctx.put(entry.getKey(), entry.getValue()); 101 } 102 } 103 } 104 } else { 105 ctx.putAll(mdc); 106 } 107 108 if (required != null) { 109 String[] array = required.split(","); 110 if (array.length > 0) { 111 for (String str : array) { 112 if (!mdc.containsKey(str)) { 113 throw new LoggingException("Required key " + str + " is missing from the MDC"); 114 } 115 } 116 } 117 } 118 Message message = event.getMessage(); 119 if (message instanceof MapMessage) { 120 if (message instanceof StructuredDataMessage) { 121 addStructuredData(eventPrefix, headers, (StructuredDataMessage) message); 122 } 123 addMapData(eventPrefix, headers, (MapMessage) message); 124 } 125 126 addContextData(mdcPrefix, headers, ctx); 127 128 addGuid(headers); 129 } 130 131 protected void addStructuredData(String prefix, Map<String, String> fields, StructuredDataMessage msg) { 132 fields.put(prefix + EVENT_TYPE, msg.getType()); 133 StructuredDataId id = msg.getId(); 134 fields.put(prefix + EVENT_ID, id.getName()); 135 } 136 137 protected void addMapData(String prefix, Map<String, String> fields, MapMessage msg) { 138 Map<String, String> data = msg.getData(); 139 for (Map.Entry<String, String> entry : data.entrySet()) { 140 fields.put(prefix + entry.getKey(), entry.getValue()); 141 } 142 } 143 144 protected void addContextData(String prefix, Map<String, String> fields, Map<String, String> context) { 145 for (Map.Entry<String, String> entry : context.entrySet()) { 146 fields.put(prefix + entry.getKey(), entry.getValue()); 147 } 148 } 149 150 protected void addGuid(Map<String, String> fields) { 151 fields.put(GUID, UUIDUtil.getTimeBasedUUID().toString()); 152 } 153 154 /** 155 * Set the body in the event. 156 * @param body The body to add to the event. 157 */ 158 @Override 159 public void setBody(byte[] body) { 160 if (body == null || body.length == 0) { 161 super.setBody(new byte[0]); 162 return; 163 } 164 if (compress) { 165 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 166 try { 167 GZIPOutputStream os = new GZIPOutputStream(baos); 168 os.write(body); 169 os.close(); 170 } catch (IOException ioe) { 171 throw new LoggingException("Unable to compress message", ioe); 172 } 173 super.setBody(baos.toByteArray()); 174 } else { 175 super.setBody(body); 176 } 177 } 178 179 /** 180 * Get the Frequently Qualified Class Name. 181 * @return the FQCN String. 182 */ 183 public String getFQCN() { 184 return event.getFQCN(); 185 } 186 187 /** 188 * Returns the logging Level. 189 * @return the Level. 190 */ 191 public Level getLevel() { 192 return event.getLevel(); 193 } 194 195 /** 196 * Returns the logger name. 197 * @return the logger name. 198 */ 199 public String getLoggerName() { 200 return event.getLoggerName(); 201 } 202 203 /** 204 * Returns the StackTraceElement for the caller of the logging API. 205 * @return the StackTraceElement of the caller. 206 */ 207 public StackTraceElement getSource() { 208 return event.getSource(); 209 } 210 211 /** 212 * Returns the Message. 213 * @return the Message. 214 */ 215 public Message getMessage() { 216 return event.getMessage(); 217 } 218 219 /** 220 * Returns the Marker. 221 * @return the Marker. 222 */ 223 public Marker getMarker() { 224 return event.getMarker(); 225 } 226 227 /** 228 * Returns the name of the Thread. 229 * @return the name of the Thread. 230 */ 231 public String getThreadName() { 232 return event.getThreadName(); 233 } 234 235 /** 236 * Returns the event timestamp. 237 * @return the event timestamp. 238 */ 239 public long getMillis() { 240 return event.getMillis(); 241 } 242 243 /** 244 * Returns the Throwable associated with the event, if any. 245 * @return the Throwable. 246 */ 247 public Throwable getThrown() { 248 return event.getThrown(); 249 } 250 251 /** 252 * Returns a copy of the context Map. 253 * @return a copy of the context Map. 254 */ 255 public Map<String, String> getContextMap() { 256 return ctx; 257 } 258 259 /** 260 * Returns a copy of the context stack. 261 * @return a copy of the context stack. 262 */ 263 public ThreadContext.ContextStack getContextStack() { 264 return event.getContextStack(); 265 } 266 }