001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017 018package org.apache.logging.log4j.core.appender.mom.kafka; 019 020import java.io.Serializable; 021import java.nio.charset.StandardCharsets; 022 023import org.apache.logging.log4j.core.Appender; 024import org.apache.logging.log4j.core.Filter; 025import org.apache.logging.log4j.core.Layout; 026import org.apache.logging.log4j.core.LogEvent; 027import org.apache.logging.log4j.core.appender.AbstractAppender; 028import org.apache.logging.log4j.core.appender.AppenderLoggingException; 029import org.apache.logging.log4j.core.config.Node; 030import org.apache.logging.log4j.core.config.Property; 031import org.apache.logging.log4j.core.config.plugins.Plugin; 032import org.apache.logging.log4j.core.config.plugins.PluginAttribute; 033import org.apache.logging.log4j.core.config.plugins.PluginElement; 034import org.apache.logging.log4j.core.config.plugins.PluginFactory; 035import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required; 036import org.apache.logging.log4j.core.layout.SerializedLayout; 037import org.apache.logging.log4j.core.util.StringEncoder; 038 039/** 040 * Sends log events to an Apache Kafka topic. 041 */ 042@Plugin(name = "Kafka", category = Node.CATEGORY, elementType = Appender.ELEMENT_TYPE, printObject = true) 043public final class KafkaAppender extends AbstractAppender { 044 045 @PluginFactory 046 public static KafkaAppender createAppender( 047 @PluginElement("Layout") final Layout<? extends Serializable> layout, 048 @PluginElement("Filter") final Filter filter, 049 @Required(message = "No name provided for KafkaAppender") @PluginAttribute("name") final String name, 050 @PluginAttribute(value = "ignoreExceptions", defaultBoolean = true) final boolean ignoreExceptions, 051 @Required(message = "No topic provided for KafkaAppender") @PluginAttribute("topic") final String topic, 052 @PluginElement("Properties") final Property[] properties) { 053 final KafkaManager kafkaManager = new KafkaManager(name, topic, properties); 054 return new KafkaAppender(name, layout, filter, ignoreExceptions, kafkaManager); 055 } 056 057 private final KafkaManager manager; 058 059 private KafkaAppender(final String name, final Layout<? extends Serializable> layout, final Filter filter, final boolean ignoreExceptions, final KafkaManager manager) { 060 super(name, filter, layout, ignoreExceptions); 061 this.manager = manager; 062 } 063 064 @Override 065 public void append(final LogEvent event) { 066 if (event.getLoggerName().startsWith("org.apache.kafka")) { 067 LOGGER.warn("Recursive logging from [{}] for appender [{}].", event.getLoggerName(), getName()); 068 } else { 069 try { 070 Layout<? extends Serializable> layout = getLayout(); 071 byte[] data; 072 if (layout != null) { 073 if (layout instanceof SerializedLayout) { 074 byte[] header = layout.getHeader(); 075 byte[] body = layout.toByteArray(event); 076 data = new byte[header.length + body.length]; 077 System.arraycopy(header, 0, data, 0, header.length); 078 System.arraycopy(body, 0, data, header.length, body.length); 079 } else { 080 data = layout.toByteArray(event); 081 } 082 } else { 083 data = StringEncoder.toBytes(event.getMessage().getFormattedMessage(), StandardCharsets.UTF_8); 084 } 085 manager.send(data); 086 } catch (final Exception e) { 087 LOGGER.error("Unable to write to Kafka [{}] for appender [{}].", manager.getName(), getName(), e); 088 throw new AppenderLoggingException("Unable to write to Kafka in appender: " + e.getMessage(), e); 089 } 090 } 091 } 092 093 @Override 094 public void start() { 095 super.start(); 096 manager.startup(); 097 } 098 099 @Override 100 public void stop() { 101 super.stop(); 102 manager.release(); 103 } 104 105 @Override 106 public String toString() { 107 return "KafkaAppender{" + 108 "name=" + getName() + 109 ", state=" + getState() + 110 ", topic=" + manager.getTopic() + 111 '}'; 112 } 113}