001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017
018package org.apache.logging.log4j.core.appender.mom.kafka;
019
020import java.io.Serializable;
021import java.nio.charset.StandardCharsets;
022
023import org.apache.logging.log4j.core.Appender;
024import org.apache.logging.log4j.core.Filter;
025import org.apache.logging.log4j.core.Layout;
026import org.apache.logging.log4j.core.LogEvent;
027import org.apache.logging.log4j.core.appender.AbstractAppender;
028import org.apache.logging.log4j.core.appender.AppenderLoggingException;
029import org.apache.logging.log4j.core.config.Node;
030import org.apache.logging.log4j.core.config.Property;
031import org.apache.logging.log4j.core.config.plugins.Plugin;
032import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
033import org.apache.logging.log4j.core.config.plugins.PluginElement;
034import org.apache.logging.log4j.core.config.plugins.PluginFactory;
035import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
036import org.apache.logging.log4j.core.layout.SerializedLayout;
037import org.apache.logging.log4j.core.util.StringEncoder;
038
039/**
040 * Sends log events to an Apache Kafka topic.
041 */
042@Plugin(name = "Kafka", category = Node.CATEGORY, elementType = Appender.ELEMENT_TYPE, printObject = true)
043public final class KafkaAppender extends AbstractAppender {
044
045    @PluginFactory
046    public static KafkaAppender createAppender(
047            @PluginElement("Layout") final Layout<? extends Serializable> layout,
048            @PluginElement("Filter") final Filter filter,
049            @Required(message = "No name provided for KafkaAppender") @PluginAttribute("name") final String name,
050            @PluginAttribute(value = "ignoreExceptions", defaultBoolean = true) final boolean ignoreExceptions,
051            @Required(message = "No topic provided for KafkaAppender") @PluginAttribute("topic") final String topic,
052            @PluginElement("Properties") final Property[] properties) {
053        final KafkaManager kafkaManager = new KafkaManager(name, topic, properties);
054        return new KafkaAppender(name, layout, filter, ignoreExceptions, kafkaManager);
055    }
056
057    private final KafkaManager manager;
058
059    private KafkaAppender(final String name, final Layout<? extends Serializable> layout, final Filter filter, final boolean ignoreExceptions, final KafkaManager manager) {
060        super(name, filter, layout, ignoreExceptions);
061        this.manager = manager;
062    }
063
064    @Override
065    public void append(final LogEvent event) {
066        if (event.getLoggerName().startsWith("org.apache.kafka")) {
067            LOGGER.warn("Recursive logging from [{}] for appender [{}].", event.getLoggerName(), getName());
068        } else {
069            try {
070                Layout<? extends Serializable> layout = getLayout();
071                byte[] data;
072                if (layout != null) {
073                    if (layout instanceof SerializedLayout) {
074                        byte[] header = layout.getHeader();
075                        byte[] body = layout.toByteArray(event);
076                        data = new byte[header.length + body.length];
077                        System.arraycopy(header, 0, data, 0, header.length);
078                        System.arraycopy(body, 0, data, header.length, body.length);
079                    } else {
080                        data = layout.toByteArray(event);
081                    }
082                } else {
083                    data = StringEncoder.toBytes(event.getMessage().getFormattedMessage(), StandardCharsets.UTF_8);
084                }
085                manager.send(data);
086            } catch (final Exception e) {
087                LOGGER.error("Unable to write to Kafka [{}] for appender [{}].", manager.getName(), getName(), e);
088                throw new AppenderLoggingException("Unable to write to Kafka in appender: " + e.getMessage(), e);
089            }
090        }
091    }
092
093    @Override
094    public void start() {
095        super.start();
096        manager.startup();
097    }
098
099    @Override
100    public void stop() {
101        super.stop();
102        manager.release();
103    }
104
105    @Override
106    public String toString() {
107        return "KafkaAppender{" +
108            "name=" + getName() +
109            ", state=" + getState() +
110            ", topic=" + manager.getTopic() +
111            '}';
112    }
113}