001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017package org.apache.logging.log4j.flume.appender; 018 019import java.io.ByteArrayOutputStream; 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.HashMap; 023import java.util.List; 024import java.util.Map; 025import java.util.zip.GZIPOutputStream; 026 027import org.apache.flume.event.SimpleEvent; 028import org.apache.logging.log4j.Level; 029import org.apache.logging.log4j.LoggingException; 030import org.apache.logging.log4j.Marker; 031import org.apache.logging.log4j.ThreadContext; 032import org.apache.logging.log4j.core.LogEvent; 033import org.apache.logging.log4j.core.helpers.UUIDUtil; 034import org.apache.logging.log4j.message.MapMessage; 035import org.apache.logging.log4j.message.Message; 036import org.apache.logging.log4j.message.StructuredDataId; 037import org.apache.logging.log4j.message.StructuredDataMessage; 038 039/** 040 * Class that is both a Flume and Log4j Event. 041 */ 042public class FlumeEvent extends SimpleEvent implements LogEvent { 043 044 static final String GUID = "guId"; 045 /** 046 * Generated serial version ID. 047 */ 048 private static final long serialVersionUID = -8988674608627854140L; 049 050 private static final String DEFAULT_MDC_PREFIX = ""; 051 052 private static final String DEFAULT_EVENT_PREFIX = ""; 053 054 private static final String EVENT_TYPE = "eventType"; 055 056 private static final String EVENT_ID = "eventId"; 057 058 private static final String TIMESTAMP = "timeStamp"; 059 060 private final LogEvent event; 061 062 private final Map<String, String> ctx = new HashMap<String, String>(); 063 064 private final boolean compress; 065 066 /** 067 * Construct the FlumeEvent. 068 * @param event The Log4j LogEvent. 069 * @param includes A comma separated list of MDC elements to include. 070 * @param excludes A comma separated list of MDC elements to exclude. 071 * @param required A comma separated list of MDC elements that are required to be defined. 072 * @param mdcPrefix The value to prefix to MDC keys. 073 * @param eventPrefix The value to prefix to event keys. 074 * @param compress If true the event body should be compressed. 075 */ 076 public FlumeEvent(final LogEvent event, final String includes, final String excludes, final String required, 077 String mdcPrefix, String eventPrefix, final boolean compress) { 078 this.event = event; 079 this.compress = compress; 080 final Map<String, String> headers = getHeaders(); 081 headers.put(TIMESTAMP, Long.toString(event.getMillis())); 082 if (mdcPrefix == null) { 083 mdcPrefix = DEFAULT_MDC_PREFIX; 084 } 085 if (eventPrefix == null) { 086 eventPrefix = DEFAULT_EVENT_PREFIX; 087 } 088 final Map<String, String> mdc = event.getContextMap(); 089 if (includes != null) { 090 final String[] array = includes.split(","); 091 if (array.length > 0) { 092 for (String str : array) { 093 str = str.trim(); 094 if (mdc.containsKey(str)) { 095 ctx.put(str, mdc.get(str)); 096 } 097 } 098 } 099 } else if (excludes != null) { 100 final String[] array = excludes.split(","); 101 if (array.length > 0) { 102 final List<String> list = new ArrayList<String>(array.length); 103 for (final String value : array) { 104 list.add(value.trim()); 105 } 106 for (final Map.Entry<String, String> entry : mdc.entrySet()) { 107 if (!list.contains(entry.getKey())) { 108 ctx.put(entry.getKey(), entry.getValue()); 109 } 110 } 111 } 112 } else { 113 ctx.putAll(mdc); 114 } 115 116 if (required != null) { 117 final String[] array = required.split(","); 118 if (array.length > 0) { 119 for (String str : array) { 120 str = str.trim(); 121 if (!mdc.containsKey(str)) { 122 throw new LoggingException("Required key " + str + " is missing from the MDC"); 123 } 124 } 125 } 126 } 127 final String guid = UUIDUtil.getTimeBasedUUID().toString(); 128 final Message message = event.getMessage(); 129 if (message instanceof MapMessage) { 130 // Add the guid to the Map so that it can be included in the Layout. 131 ((MapMessage) message).put(GUID, guid); 132 if (message instanceof StructuredDataMessage) { 133 addStructuredData(eventPrefix, headers, (StructuredDataMessage) message); 134 } 135 addMapData(eventPrefix, headers, (MapMessage) message); 136 } else { 137 headers.put(GUID, guid); 138 } 139 140 addContextData(mdcPrefix, headers, ctx); 141 } 142 143 protected void addStructuredData(final String prefix, final Map<String, String> fields, 144 final StructuredDataMessage msg) { 145 fields.put(prefix + EVENT_TYPE, msg.getType()); 146 final StructuredDataId id = msg.getId(); 147 fields.put(prefix + EVENT_ID, id.getName()); 148 } 149 150 protected void addMapData(final String prefix, final Map<String, String> fields, final MapMessage msg) { 151 final Map<String, String> data = msg.getData(); 152 for (final Map.Entry<String, String> entry : data.entrySet()) { 153 fields.put(prefix + entry.getKey(), entry.getValue()); 154 } 155 } 156 157 protected void addContextData(final String prefix, final Map<String, String> fields, 158 final Map<String, String> context) { 159 Map<String, String> map = new HashMap<String, String>(); 160 for (final Map.Entry<String, String> entry : context.entrySet()) { 161 if (entry.getKey() != null && entry.getValue() != null) { 162 fields.put(prefix + entry.getKey(), entry.getValue()); 163 map.put(prefix + entry.getKey(), entry.getValue()); 164 } 165 } 166 context.clear(); 167 context.putAll(map); 168 } 169 170 /** 171 * Set the body in the event. 172 * @param body The body to add to the event. 173 */ 174 @Override 175 public void setBody(final byte[] body) { 176 if (body == null || body.length == 0) { 177 super.setBody(new byte[0]); 178 return; 179 } 180 if (compress) { 181 final ByteArrayOutputStream baos = new ByteArrayOutputStream(); 182 try { 183 final GZIPOutputStream os = new GZIPOutputStream(baos); 184 os.write(body); 185 os.close(); 186 } catch (final IOException ioe) { 187 throw new LoggingException("Unable to compress message", ioe); 188 } 189 super.setBody(baos.toByteArray()); 190 } else { 191 super.setBody(body); 192 } 193 } 194 195 /** 196 * Get the Frequently Qualified Class Name. 197 * @return the FQCN String. 198 */ 199 @Override 200 public String getFQCN() { 201 return event.getFQCN(); 202 } 203 204 /** 205 * Returns the logging Level. 206 * @return the Level. 207 */ 208 @Override 209 public Level getLevel() { 210 return event.getLevel(); 211 } 212 213 /** 214 * Returns the logger name. 215 * @return the logger name. 216 */ 217 @Override 218 public String getLoggerName() { 219 return event.getLoggerName(); 220 } 221 222 /** 223 * Returns the StackTraceElement for the caller of the logging API. 224 * @return the StackTraceElement of the caller. 225 */ 226 @Override 227 public StackTraceElement getSource() { 228 return event.getSource(); 229 } 230 231 /** 232 * Returns the Message. 233 * @return the Message. 234 */ 235 @Override 236 public Message getMessage() { 237 return event.getMessage(); 238 } 239 240 /** 241 * Returns the Marker. 242 * @return the Marker. 243 */ 244 @Override 245 public Marker getMarker() { 246 return event.getMarker(); 247 } 248 249 /** 250 * Returns the name of the Thread. 251 * @return the name of the Thread. 252 */ 253 @Override 254 public String getThreadName() { 255 return event.getThreadName(); 256 } 257 258 /** 259 * Returns the event timestamp. 260 * @return the event timestamp. 261 */ 262 @Override 263 public long getMillis() { 264 return event.getMillis(); 265 } 266 267 /** 268 * Returns the Throwable associated with the event, if any. 269 * @return the Throwable. 270 */ 271 @Override 272 public Throwable getThrown() { 273 return event.getThrown(); 274 } 275 276 /** 277 * Returns a copy of the context Map. 278 * @return a copy of the context Map. 279 */ 280 @Override 281 public Map<String, String> getContextMap() { 282 return ctx; 283 } 284 285 /** 286 * Returns a copy of the context stack. 287 * @return a copy of the context stack. 288 */ 289 @Override 290 public ThreadContext.ContextStack getContextStack() { 291 return event.getContextStack(); 292 } 293 294 @Override 295 public boolean isIncludeLocation() { 296 return event.isIncludeLocation(); 297 } 298 299 @Override 300 public void setIncludeLocation(final boolean includeLocation) { 301 event.setIncludeLocation(includeLocation); 302 } 303 304 @Override 305 public boolean isEndOfBatch() { 306 return event.isEndOfBatch(); 307 } 308 309 @Override 310 public void setEndOfBatch(final boolean endOfBatch) { 311 event.setEndOfBatch(endOfBatch); 312 } 313}