001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017
018package org.apache.logging.log4j.core.appender.mom.kafka;
019
020import org.apache.logging.log4j.core.Filter;
021import org.apache.logging.log4j.core.Layout;
022import org.apache.logging.log4j.core.LogEvent;
023import org.apache.logging.log4j.core.appender.AbstractAppender;
024import org.apache.logging.log4j.core.appender.AppenderLoggingException;
025import org.apache.logging.log4j.core.config.Property;
026import org.apache.logging.log4j.core.config.plugins.Plugin;
027import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
028import org.apache.logging.log4j.core.config.plugins.PluginElement;
029import org.apache.logging.log4j.core.config.plugins.PluginFactory;
030import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
031import org.apache.logging.log4j.core.layout.SerializedLayout;
032import org.apache.logging.log4j.core.util.StringEncoder;
033
034import java.io.Serializable;
035import java.nio.charset.StandardCharsets;
036
037/**
038 * Sends log events to an Apache Kafka topic.
039 */
040@Plugin(name = "Kafka", category = "Core", elementType = "appender", printObject = true)
041public final class KafkaAppender extends AbstractAppender {
042
043    /**
044     * 
045     */
046    private static final long serialVersionUID = 1L;
047    @PluginFactory
048    public static KafkaAppender createAppender(
049            @PluginElement("Layout") final Layout<? extends Serializable> layout,
050            @PluginElement("Filter") final Filter filter,
051            @Required(message = "No name provided for KafkaAppender") @PluginAttribute("name") final String name,
052            @PluginAttribute(value = "ignoreExceptions", defaultBoolean = true) final boolean ignoreExceptions,
053            @Required(message = "No topic provided for KafkaAppender") @PluginAttribute("topic") final String topic,
054            @PluginElement("Properties") final Property[] properties) {
055        final KafkaManager kafkaManager = new KafkaManager(name, topic, properties);
056        return new KafkaAppender(name, layout, filter, ignoreExceptions, kafkaManager);
057    }
058
059    private final KafkaManager manager;
060
061    private KafkaAppender(final String name, final Layout<? extends Serializable> layout, final Filter filter, final boolean ignoreExceptions, final KafkaManager manager) {
062        super(name, filter, layout, ignoreExceptions);
063        this.manager = manager;
064    }
065
066    @Override
067    public void append(final LogEvent event) {
068        if (event.getLoggerName().startsWith("org.apache.kafka")) {
069            LOGGER.warn("Recursive logging from [{}] for appender [{}].", event.getLoggerName(), getName());
070        } else {
071            try {
072                Layout<? extends Serializable> layout = getLayout();
073                byte[] data;
074                if (layout != null) {
075                    if (layout instanceof SerializedLayout) {
076                        byte[] header = layout.getHeader();
077                        byte[] body = layout.toByteArray(event);
078                        data = new byte[header.length + body.length];
079                        System.arraycopy(header, 0, data, 0, header.length);
080                        System.arraycopy(body, 0, data, header.length, body.length);
081                    } else {
082                        data = layout.toByteArray(event);
083                    }
084                } else {
085                    data = StringEncoder.toBytes(event.getMessage().getFormattedMessage(), StandardCharsets.UTF_8);
086                }
087                manager.send(data);
088            } catch (final Exception e) {
089                LOGGER.error("Unable to write to Kafka [{}] for appender [{}].", manager.getName(), getName(), e);
090                throw new AppenderLoggingException("Unable to write to Kafka in appender: " + e.getMessage(), e);
091            }
092        }
093    }
094
095    @Override
096    public void start() {
097        super.start();
098        manager.startup();
099    }
100
101    @Override
102    public void stop() {
103        super.stop();
104        manager.release();
105    }
106
107}