001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017
018package org.apache.logging.log4j.core.appender.mom.jeromq;
019
020import java.io.Serializable;
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.List;
024
025import org.apache.logging.log4j.Logger;
026import org.apache.logging.log4j.core.Filter;
027import org.apache.logging.log4j.core.Layout;
028import org.apache.logging.log4j.core.LogEvent;
029import org.apache.logging.log4j.core.appender.AbstractAppender;
030import org.apache.logging.log4j.core.config.Property;
031import org.apache.logging.log4j.core.config.plugins.Plugin;
032import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
033import org.apache.logging.log4j.core.config.plugins.PluginElement;
034import org.apache.logging.log4j.core.config.plugins.PluginFactory;
035import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
036import org.apache.logging.log4j.core.layout.PatternLayout;
037import org.apache.logging.log4j.status.StatusLogger;
038import org.apache.logging.log4j.util.PropertiesUtil;
039import org.apache.logging.log4j.util.Strings;
040import org.zeromq.ZMQ;
041import org.zeromq.ZMQ.Socket;
042
043/**
044 * Sends log events to one or more ZeroMQ (JeroMQ) endpoints.
045 * <p>
046 * Requires the JeroMQ jar (LGPL as of 0.3.5)
047 * </p>
048 */
049// TODO
050// Some methods are synchronized because a ZMQ.Socket is not thread-safe
051// Using a ThreadLocal for the publisher hangs tests on shutdown. There must be
052// some issue on threads owning certain resources as opposed to others.
053@Plugin(name = "JeroMQ", category = "Core", elementType = "appender", printObject = true)
054public final class JeroMqAppender extends AbstractAppender {
055
056    // Per ZMQ docs, there should usually only be one ZMQ context per process.
057    private static volatile ZMQ.Context context;
058
059    private static final int DEFAULT_BACKLOG = 100;
060
061    private static final int DEFAULT_IVL = 100;
062
063    private static final int DEFAULT_RCV_HWM = 1000;
064
065    private static final int DEFAULT_SND_HWM = 1000;
066
067    private static Logger logger;
068
069    // ZMQ sockets are not thread safe.
070    private static ZMQ.Socket publisher;
071
072    private static final long serialVersionUID = 1L;
073
074    private static final String SIMPLE_NAME = JeroMqAppender.class.getSimpleName();
075
076    static final String SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK = "log4j.jeromq.enableShutdownHook";
077
078    static final String SYS_PROPERTY_IO_THREADS = "log4j.jeromq.ioThreads";
079
080    static {
081        logger = StatusLogger.getLogger();
082        final PropertiesUtil managerProps = PropertiesUtil.getProperties();
083        final Integer ioThreads = managerProps.getIntegerProperty(SYS_PROPERTY_IO_THREADS, 1);
084        final Boolean enableShutdownHook = managerProps.getBooleanProperty(SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK, true);
085        final String simpleName = SIMPLE_NAME;
086        logger.trace("{} using ZMQ version {}", simpleName, ZMQ.getVersionString());
087        logger.trace("{} creating ZMQ context with ioThreads={}", simpleName, ioThreads);
088        context = ZMQ.context(ioThreads);
089        logger.trace("{} created ZMQ context {}", simpleName, context);
090        if (enableShutdownHook) {
091            final Thread hook = new Thread(simpleName + "-shutdown") {
092                @Override
093                public void run() {
094                    shutdown();
095                }
096            };
097            logger.trace("{} adding shutdown hook {}", simpleName, hook);
098            Runtime.getRuntime().addShutdownHook(hook);
099        }
100    }
101
102    // The ZMQ.Socket class has other set methods that we do not cover because
103    // they throw unsupported operation exceptions.
104    @PluginFactory
105    public static JeroMqAppender createAppender(
106            // @formatter:off
107            @Required(message = "No name provided for JeroMqAppender") @PluginAttribute("name") final String name,
108            @PluginElement("Layout") Layout<?> layout,
109            @PluginElement("Filters") final Filter filter,
110            @PluginElement("Properties") final Property[] properties,
111            // Super attributes
112            @PluginAttribute("ignoreExceptions") final boolean ignoreExceptions,
113            // ZMQ attributes; defaults picked from zmq.Options.
114            @PluginAttribute(value = "affinity", defaultLong = 0) final long affinity,
115            @PluginAttribute(value = "backlog", defaultLong = DEFAULT_BACKLOG) final long backlog,
116            @PluginAttribute(value = "delayAttachOnConnect", defaultBoolean = false) final boolean delayAttachOnConnect,
117            @PluginAttribute(value = "identity") final byte[] identity,
118            @PluginAttribute(value = "ipv4Only", defaultBoolean = true) final boolean ipv4Only,
119            @PluginAttribute(value = "linger", defaultLong = -1) final long linger,
120            @PluginAttribute(value = "maxMsgSize", defaultLong = -1) final long maxMsgSize,
121            @PluginAttribute(value = "rcvHwm", defaultLong = DEFAULT_RCV_HWM) final long rcvHwm,
122            @PluginAttribute(value = "receiveBufferSize", defaultLong = 0) final long receiveBufferSize,
123            @PluginAttribute(value = "receiveTimeOut", defaultLong = -1) final int receiveTimeOut,
124            @PluginAttribute(value = "reconnectIVL", defaultLong = DEFAULT_IVL) final long reconnectIVL,
125            @PluginAttribute(value = "reconnectIVLMax", defaultLong = 0) final long reconnectIVLMax,
126            @PluginAttribute(value = "sendBufferSize", defaultLong = 0) final long sendBufferSize,
127            @PluginAttribute(value = "sendTimeOut", defaultLong = -1) final int sendTimeOut,
128            @PluginAttribute(value = "sndHwm", defaultLong = DEFAULT_SND_HWM) final long sndHwm,
129            @PluginAttribute(value = "tcpKeepAlive", defaultInt = -1) final int tcpKeepAlive,
130            @PluginAttribute(value = "tcpKeepAliveCount", defaultLong = -1) final long tcpKeepAliveCount,
131            @PluginAttribute(value = "tcpKeepAliveIdle", defaultLong = -1) final long tcpKeepAliveIdle,
132            @PluginAttribute(value = "tcpKeepAliveInterval", defaultLong = -1) final long tcpKeepAliveInterval,
133            @PluginAttribute(value = "xpubVerbose", defaultBoolean = false) final boolean xpubVerbose
134            // @formatter:on
135    ) {
136        if (layout == null) {
137            layout = PatternLayout.createDefaultLayout();
138        }
139        List<String> endpoints;
140        if (properties == null) {
141            endpoints = new ArrayList<>(0);
142        } else {
143            endpoints = new ArrayList<>(properties.length);
144            for (final Property property : properties) {
145                if ("endpoint".equalsIgnoreCase(property.getName())) {
146                    final String value = property.getValue();
147                    if (Strings.isNotEmpty(value)) {
148                        endpoints.add(value);
149                    }
150                }
151            }
152        }
153        logger.debug("Creating JeroMqAppender with name={}, filter={}, layout={}, ignoreExceptions={}, endpoints={}",
154                name, filter, layout, ignoreExceptions, endpoints);
155        return new JeroMqAppender(name, filter, layout, ignoreExceptions, endpoints, affinity, backlog,
156                delayAttachOnConnect, identity, ipv4Only, linger, maxMsgSize, rcvHwm, receiveBufferSize, receiveTimeOut,
157                reconnectIVL, reconnectIVLMax, sendBufferSize, sendTimeOut, sndHwm, tcpKeepAlive, tcpKeepAliveCount,
158                tcpKeepAliveIdle, tcpKeepAliveInterval, xpubVerbose);
159    }
160
161    static ZMQ.Context getContext() {
162        return context;
163    }
164
165    private static ZMQ.Socket getPublisher() {
166        return publisher;
167    }
168
169    private static ZMQ.Socket newPublisher() {
170        logger.trace("{} creating a new ZMQ PUB socket with context {}", SIMPLE_NAME, context);
171        final Socket socketPub = context.socket(ZMQ.PUB);
172        logger.trace("{} created new ZMQ PUB socket {}", SIMPLE_NAME, socketPub);
173        return socketPub;
174    }
175
176    static void shutdown() {
177        if (context != null) {
178            logger.trace("{} terminating JeroMQ context {}", SIMPLE_NAME, context);
179            context.term();
180            context = null;
181        }
182    }
183
184    private final long affinity;
185    private final long backlog;
186    private final boolean delayAttachOnConnect;
187    private final List<String> endpoints;
188    private final byte[] identity;
189    private final int ioThreads = 1;
190    private final boolean ipv4Only;
191    private final long linger;
192    private final long maxMsgSize;
193    private final long rcvHwm;
194    private final long receiveBufferSize;
195    private final int receiveTimeOut;
196    private final long reconnectIVL;
197    private final long reconnectIVLMax;
198    private final long sendBufferSize;
199    private int sendRcFalse;
200    private int sendRcTrue;
201    private final int sendTimeOut;
202    private final long sndHwm;
203    private final int tcpKeepAlive;
204    private final long tcpKeepAliveCount;
205    private final long tcpKeepAliveIdle;
206    private final long tcpKeepAliveInterval;
207    private final boolean xpubVerbose;
208
209    private JeroMqAppender(final String name, final Filter filter, final Layout<? extends Serializable> layout,
210            final boolean ignoreExceptions, final List<String> endpoints, final long affinity, final long backlog,
211            final boolean delayAttachOnConnect, final byte[] identity, final boolean ipv4Only, final long linger,
212            final long maxMsgSize, final long rcvHwm, final long receiveBufferSize, final int receiveTimeOut,
213            final long reconnectIVL, final long reconnectIVLMax, final long sendBufferSize, final int sendTimeOut,
214            final long sndHWM, final int tcpKeepAlive, final long tcpKeepAliveCount, final long tcpKeepAliveIdle,
215            final long tcpKeepAliveInterval, final boolean xpubVerbose) {
216        super(name, filter, layout, ignoreExceptions);
217        this.endpoints = endpoints;
218        this.affinity = affinity;
219        this.backlog = backlog;
220        this.delayAttachOnConnect = delayAttachOnConnect;
221        this.identity = identity;
222        this.ipv4Only = ipv4Only;
223        this.linger = linger;
224        this.maxMsgSize = maxMsgSize;
225        this.rcvHwm = rcvHwm;
226        this.receiveBufferSize = receiveBufferSize;
227        this.receiveTimeOut = receiveTimeOut;
228        this.reconnectIVL = reconnectIVL;
229        this.reconnectIVLMax = reconnectIVLMax;
230        this.sendBufferSize = sendBufferSize;
231        this.sendTimeOut = sendTimeOut;
232        this.sndHwm = sndHWM;
233        this.tcpKeepAlive = tcpKeepAlive;
234        this.tcpKeepAliveCount = tcpKeepAliveCount;
235        this.tcpKeepAliveIdle = tcpKeepAliveIdle;
236        this.tcpKeepAliveInterval = tcpKeepAliveInterval;
237        this.xpubVerbose = xpubVerbose;
238    }
239
240    @Override
241    public synchronized void append(final LogEvent event) {
242        final String formattedMessage = event.getMessage().getFormattedMessage();
243        if (getPublisher().send(formattedMessage, 0)) {
244            sendRcTrue++;
245        } else {
246            sendRcFalse++;
247            logger.error("Appender {} could not send message {} to JeroMQ {}", getName(), sendRcFalse,
248                    formattedMessage);
249        }
250    }
251
252    // not public, handy for testing
253    int getSendRcFalse() {
254        return sendRcFalse;
255    }
256
257    // not public, handy for testing
258    int getSendRcTrue() {
259        return sendRcTrue;
260    }
261
262    // not public, handy for testing
263    void resetSendRcs() {
264        sendRcTrue = sendRcFalse = 0;
265    }
266
267    @Override
268    public synchronized void start() {
269        super.start();
270        publisher = newPublisher();
271        final String name = getName();
272        final String prefix = "JeroMQ Appender";
273        logger.debug("Starting {} {} using ZMQ version {}", prefix, name, ZMQ.getVersionString());
274        logger.debug("{} {} context {} with ioThreads={}", prefix, name, context, ioThreads);
275        //
276        final ZMQ.Socket socketPub = getPublisher();
277        logger.trace("{} {} setting {} publisher properties for instance {}", prefix, name,
278                socketPub.getClass().getName(), socketPub);
279        logger.trace("{} {} publisher setAffinity({})", prefix, name, affinity);
280        socketPub.setAffinity(affinity);
281        logger.trace("{} {} publisher setBacklog({})", prefix, name, backlog);
282        socketPub.setBacklog(backlog);
283        logger.trace("{} {} publisher setDelayAttachOnConnect({})", prefix, name, delayAttachOnConnect);
284        socketPub.setDelayAttachOnConnect(delayAttachOnConnect);
285        if (identity != null) {
286            logger.trace("{} {} publisher setIdentity({})", prefix, name, Arrays.toString(identity));
287            socketPub.setIdentity(identity);
288        }
289        logger.trace("{} {} publisher setIPv4Only({})", prefix, name, ipv4Only);
290        socketPub.setIPv4Only(ipv4Only);
291        logger.trace("{} {} publisher setLinger({})", prefix, name, linger);
292        socketPub.setLinger(linger);
293        logger.trace("{} {} publisher setMaxMsgSize({})", prefix, name, maxMsgSize);
294        socketPub.setMaxMsgSize(maxMsgSize);
295        logger.trace("{} {} publisher setRcvHWM({})", prefix, name, rcvHwm);
296        socketPub.setRcvHWM(rcvHwm);
297        logger.trace("{} {} publisher setReceiveBufferSize({})", prefix, name, receiveBufferSize);
298        socketPub.setReceiveBufferSize(receiveBufferSize);
299        logger.trace("{} {} publisher setReceiveTimeOut({})", prefix, name, receiveTimeOut);
300        socketPub.setReceiveTimeOut(receiveTimeOut);
301        logger.trace("{} {} publisher setReconnectIVL({})", prefix, name, reconnectIVL);
302        socketPub.setReconnectIVL(reconnectIVL);
303        logger.trace("{} {} publisher setReconnectIVLMax({})", prefix, name, reconnectIVLMax);
304        socketPub.setReconnectIVLMax(reconnectIVLMax);
305        logger.trace("{} {} publisher setSendBufferSize({})", prefix, name, sendBufferSize);
306        socketPub.setSendBufferSize(sendBufferSize);
307        logger.trace("{} {} publisher setSendTimeOut({})", prefix, name, sendTimeOut);
308        socketPub.setSendTimeOut(sendTimeOut);
309        logger.trace("{} {} publisher setSndHWM({})", prefix, name, sndHwm);
310        socketPub.setSndHWM(sndHwm);
311        logger.trace("{} {} publisher setTCPKeepAlive({})", prefix, name, tcpKeepAlive);
312        socketPub.setTCPKeepAlive(tcpKeepAlive);
313        logger.trace("{} {} publisher setTCPKeepAliveCount({})", prefix, name, tcpKeepAliveCount);
314        socketPub.setTCPKeepAliveCount(tcpKeepAliveCount);
315        logger.trace("{} {} publisher setTCPKeepAliveIdle({})", prefix, name, tcpKeepAliveIdle);
316        socketPub.setTCPKeepAliveIdle(tcpKeepAliveIdle);
317        logger.trace("{} {} publisher setTCPKeepAliveInterval({})", prefix, name, tcpKeepAliveInterval);
318        socketPub.setTCPKeepAliveInterval(tcpKeepAliveInterval);
319        logger.trace("{} {} publisher setXpubVerbose({})", prefix, name, xpubVerbose);
320        socketPub.setXpubVerbose(xpubVerbose);
321        //
322        if (logger.isDebugEnabled()) {
323            logger.debug(
324                    "Created JeroMQ {} publisher {} type {}, affinity={}, backlog={}, delayAttachOnConnect={}, events={}, IPv4Only={}, linger={}, maxMsgSize={}, multicastHops={}, "
325                            + "rate={}, rcvHWM={}, receiveBufferSize={}, receiveTimeOut={}, reconnectIVL={}, reconnectIVLMax={}, recoveryInterval={}, sendBufferSize={}, "
326                            + "sendTimeOut={}, sndHWM={}, TCPKeepAlive={}, TCPKeepAliveCount={}, TCPKeepAliveIdle={}, TCPKeepAliveInterval={}, TCPKeepAliveSetting={}",
327                    name, socketPub, socketPub.getType(), socketPub.getAffinity(), socketPub.getBacklog(),
328                    socketPub.getDelayAttachOnConnect(), socketPub.getEvents(), socketPub.getIPv4Only(),
329                    socketPub.getLinger(), socketPub.getMaxMsgSize(), socketPub.getMulticastHops(), socketPub.getRate(),
330                    socketPub.getRcvHWM(), socketPub.getReceiveBufferSize(), socketPub.getReceiveTimeOut(),
331                    socketPub.getReconnectIVL(), socketPub.getReconnectIVLMax(), socketPub.getRecoveryInterval(),
332                    socketPub.getSendBufferSize(), socketPub.getSendTimeOut(), socketPub.getSndHWM(),
333                    socketPub.getTCPKeepAlive(), socketPub.getTCPKeepAliveCount(), socketPub.getTCPKeepAliveIdle(),
334                    socketPub.getTCPKeepAliveInterval(), socketPub.getTCPKeepAliveSetting());
335        }
336        for (final String endpoint : endpoints) {
337            logger.debug("Binding {} appender {} to endpoint {}", SIMPLE_NAME, name, endpoint);
338            socketPub.bind(endpoint);
339        }
340    }
341
342    @Override
343    public synchronized void stop() {
344        super.stop();
345        final Socket socketPub = getPublisher();
346        if (socketPub != null) {
347            logger.debug("Closing {} appender {} publisher {}", SIMPLE_NAME, getName(), socketPub);
348            socketPub.close();
349            publisher = null;
350        }
351    }
352
353    @Override
354    public String toString() {
355        return "JeroMqAppender [context=" + context + ", publisher=" + publisher + ", endpoints=" + endpoints + "]";
356    }
357
358}