001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017
018package org.apache.logging.log4j.core.appender.mom.jeromq;
019
020import java.io.Serializable;
021import java.util.ArrayList;
022import java.util.List;
023
024import org.apache.logging.log4j.core.Appender;
025import org.apache.logging.log4j.core.Filter;
026import org.apache.logging.log4j.core.Layout;
027import org.apache.logging.log4j.core.LogEvent;
028import org.apache.logging.log4j.core.appender.AbstractAppender;
029import org.apache.logging.log4j.core.config.Node;
030import org.apache.logging.log4j.core.config.Property;
031import org.apache.logging.log4j.core.config.plugins.Plugin;
032import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
033import org.apache.logging.log4j.core.config.plugins.PluginElement;
034import org.apache.logging.log4j.core.config.plugins.PluginFactory;
035import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
036import org.apache.logging.log4j.core.layout.PatternLayout;
037import org.apache.logging.log4j.util.Strings;
038
039/**
040 * Sends log events to one or more ZeroMQ (JeroMQ) endpoints.
041 * <p>
042 * Requires the JeroMQ jar (LGPL as of 0.3.5)
043 * </p>
044 */
045// TODO
046// Some methods are synchronized because a ZMQ.Socket is not thread-safe
047// Using a ThreadLocal for the publisher hangs tests on shutdown. There must be
048// some issue on threads owning certain resources as opposed to others.
049@Plugin(name = "JeroMQ", category = Node.CATEGORY, elementType = Appender.ELEMENT_TYPE, printObject = true)
050public final class JeroMqAppender extends AbstractAppender {
051
052    private static final int DEFAULT_BACKLOG = 100;
053
054    private static final int DEFAULT_IVL = 100;
055
056    private static final int DEFAULT_RCV_HWM = 1000;
057
058    private static final int DEFAULT_SND_HWM = 1000;
059
060    private final JeroMqManager manager;
061    private final List<String> endpoints;
062    private int sendRcFalse;
063    private int sendRcTrue;
064
065    private JeroMqAppender(final String name, final Filter filter, final Layout<? extends Serializable> layout,
066            final boolean ignoreExceptions, final List<String> endpoints, final long affinity, final long backlog,
067            final boolean delayAttachOnConnect, final byte[] identity, final boolean ipv4Only, final long linger,
068            final long maxMsgSize, final long rcvHwm, final long receiveBufferSize, final int receiveTimeOut,
069            final long reconnectIVL, final long reconnectIVLMax, final long sendBufferSize, final int sendTimeOut,
070            final long sndHWM, final int tcpKeepAlive, final long tcpKeepAliveCount, final long tcpKeepAliveIdle,
071            final long tcpKeepAliveInterval, final boolean xpubVerbose) {
072        super(name, filter, layout, ignoreExceptions);
073        this.manager = JeroMqManager.getJeroMqManager(name, affinity, backlog, delayAttachOnConnect, identity, ipv4Only,
074            linger, maxMsgSize, rcvHwm, receiveBufferSize, receiveTimeOut, reconnectIVL, reconnectIVLMax,
075            sendBufferSize, sendTimeOut, sndHWM, tcpKeepAlive, tcpKeepAliveCount, tcpKeepAliveIdle,
076            tcpKeepAliveInterval, xpubVerbose, endpoints);
077        this.endpoints = endpoints;
078    }
079
080    // The ZMQ.Socket class has other set methods that we do not cover because
081    // they throw unsupported operation exceptions.
082    @PluginFactory
083    public static JeroMqAppender createAppender(
084            // @formatter:off
085            @Required(message = "No name provided for JeroMqAppender") @PluginAttribute("name") final String name,
086            @PluginElement("Layout") Layout<?> layout,
087            @PluginElement("Filter") final Filter filter,
088            @PluginElement("Properties") final Property[] properties,
089            // Super attributes
090            @PluginAttribute("ignoreExceptions") final boolean ignoreExceptions,
091            // ZMQ attributes; defaults picked from zmq.Options.
092            @PluginAttribute(value = "affinity", defaultLong = 0) final long affinity,
093            @PluginAttribute(value = "backlog", defaultLong = DEFAULT_BACKLOG) final long backlog,
094            @PluginAttribute(value = "delayAttachOnConnect", defaultBoolean = false) final boolean delayAttachOnConnect,
095            @PluginAttribute(value = "identity") final byte[] identity,
096            @PluginAttribute(value = "ipv4Only", defaultBoolean = true) final boolean ipv4Only,
097            @PluginAttribute(value = "linger", defaultLong = -1) final long linger,
098            @PluginAttribute(value = "maxMsgSize", defaultLong = -1) final long maxMsgSize,
099            @PluginAttribute(value = "rcvHwm", defaultLong = DEFAULT_RCV_HWM) final long rcvHwm,
100            @PluginAttribute(value = "receiveBufferSize", defaultLong = 0) final long receiveBufferSize,
101            @PluginAttribute(value = "receiveTimeOut", defaultLong = -1) final int receiveTimeOut,
102            @PluginAttribute(value = "reconnectIVL", defaultLong = DEFAULT_IVL) final long reconnectIVL,
103            @PluginAttribute(value = "reconnectIVLMax", defaultLong = 0) final long reconnectIVLMax,
104            @PluginAttribute(value = "sendBufferSize", defaultLong = 0) final long sendBufferSize,
105            @PluginAttribute(value = "sendTimeOut", defaultLong = -1) final int sendTimeOut,
106            @PluginAttribute(value = "sndHwm", defaultLong = DEFAULT_SND_HWM) final long sndHwm,
107            @PluginAttribute(value = "tcpKeepAlive", defaultInt = -1) final int tcpKeepAlive,
108            @PluginAttribute(value = "tcpKeepAliveCount", defaultLong = -1) final long tcpKeepAliveCount,
109            @PluginAttribute(value = "tcpKeepAliveIdle", defaultLong = -1) final long tcpKeepAliveIdle,
110            @PluginAttribute(value = "tcpKeepAliveInterval", defaultLong = -1) final long tcpKeepAliveInterval,
111            @PluginAttribute(value = "xpubVerbose", defaultBoolean = false) final boolean xpubVerbose
112            // @formatter:on
113    ) {
114        if (layout == null) {
115            layout = PatternLayout.createDefaultLayout();
116        }
117        List<String> endpoints;
118        if (properties == null) {
119            endpoints = new ArrayList<>(0);
120        } else {
121            endpoints = new ArrayList<>(properties.length);
122            for (final Property property : properties) {
123                if ("endpoint".equalsIgnoreCase(property.getName())) {
124                    final String value = property.getValue();
125                    if (Strings.isNotEmpty(value)) {
126                        endpoints.add(value);
127                    }
128                }
129            }
130        }
131        LOGGER.debug("Creating JeroMqAppender with name={}, filter={}, layout={}, ignoreExceptions={}, endpoints={}",
132                name, filter, layout, ignoreExceptions, endpoints);
133        return new JeroMqAppender(name, filter, layout, ignoreExceptions, endpoints, affinity, backlog,
134                delayAttachOnConnect, identity, ipv4Only, linger, maxMsgSize, rcvHwm, receiveBufferSize,
135                receiveTimeOut, reconnectIVL, reconnectIVLMax, sendBufferSize, sendTimeOut, sndHwm, tcpKeepAlive,
136                tcpKeepAliveCount, tcpKeepAliveIdle, tcpKeepAliveInterval, xpubVerbose);
137    }
138
139    @Override
140    public synchronized void append(final LogEvent event) {
141        final Layout<? extends Serializable> layout = getLayout();
142        final byte[] formattedMessage = layout.toByteArray(event);
143        if (manager.send(getLayout().toByteArray(event))) {
144            sendRcTrue++;
145        } else {
146            sendRcFalse++;
147            LOGGER.error("Appender {} could not send message {} to JeroMQ {}", getName(), sendRcFalse, formattedMessage);
148        }
149    }
150
151    @Override
152    public void stop() {
153        manager.release();
154        super.stop();
155    }
156
157    // not public, handy for testing
158    int getSendRcFalse() {
159        return sendRcFalse;
160    }
161
162    // not public, handy for testing
163    int getSendRcTrue() {
164        return sendRcTrue;
165    }
166
167    // not public, handy for testing
168    void resetSendRcs() {
169        sendRcTrue = sendRcFalse = 0;
170    }
171
172    @Override
173    public String toString() {
174        return "JeroMqAppender{" +
175            "name=" + getName() +
176            ", state=" + getState() +
177            ", manager=" + manager +
178            ", endpoints=" + endpoints +
179            '}';
180    }
181}