001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017
018package org.apache.logging.log4j.core.appender.mom.jeromq;
019
020import java.io.Serializable;
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.List;
024
025import org.apache.logging.log4j.Logger;
026import org.apache.logging.log4j.core.Filter;
027import org.apache.logging.log4j.core.Layout;
028import org.apache.logging.log4j.core.LogEvent;
029import org.apache.logging.log4j.core.appender.AbstractAppender;
030import org.apache.logging.log4j.core.config.Property;
031import org.apache.logging.log4j.core.config.plugins.Plugin;
032import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
033import org.apache.logging.log4j.core.config.plugins.PluginElement;
034import org.apache.logging.log4j.core.config.plugins.PluginFactory;
035import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
036import org.apache.logging.log4j.core.layout.PatternLayout;
037import org.apache.logging.log4j.status.StatusLogger;
038import org.apache.logging.log4j.util.PropertiesUtil;
039import org.apache.logging.log4j.util.Strings;
040import org.zeromq.ZMQ;
041import org.zeromq.ZMQ.Socket;
042
043/**
044 * Sends log events to one or more ZeroMQ (JeroMQ) endpoints.
045 * <p>
046 * Requires the JeroMQ jar (LGPL as of 0.3.5)
047 * </p>
048 */
049// TODO
050// Some methods are synchronized because a ZMQ.Socket is not thread-safe
051// Using a ThreadLocal for the publisher hangs tests on shutdown. There must be
052// some issue on threads owning certain resources as opposed to others.
053@Plugin(name = "JeroMQ", category = "Core", elementType = "appender", printObject = true)
054public final class JeroMqAppender extends AbstractAppender {
055
056    /**
057     * System property to enable shutdown hook.
058     */
059    static final String SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK = "log4j.jeromq.enableShutdownHook";
060
061    /**
062     * System property to control JeroMQ I/O thread count.
063     */
064    static final String SYS_PROPERTY_IO_THREADS = "log4j.jeromq.ioThreads";
065
066    // Per ZMQ docs, there should usually only be one ZMQ context per process.
067    private static volatile ZMQ.Context context;
068
069    private static final int DEFAULT_BACKLOG = 100;
070
071    private static final int DEFAULT_IVL = 100;
072
073    private static final int DEFAULT_RCV_HWM = 1000;
074
075    private static final int DEFAULT_SND_HWM = 1000;
076
077    private static Logger logger;
078
079    // ZMQ sockets are not thread safe.
080    private static ZMQ.Socket publisher;
081
082    private static final long serialVersionUID = 1L;
083
084    private static final String SIMPLE_NAME = JeroMqAppender.class.getSimpleName();
085
086    static {
087        logger = StatusLogger.getLogger();
088        final PropertiesUtil managerProps = PropertiesUtil.getProperties();
089        final Integer ioThreads = managerProps.getIntegerProperty(SYS_PROPERTY_IO_THREADS, 1);
090        final Boolean enableShutdownHook = managerProps.getBooleanProperty(SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK, true);
091        final String simpleName = SIMPLE_NAME;
092        logger.trace("{} using ZMQ version {}", simpleName, ZMQ.getVersionString());
093        logger.trace("{} creating ZMQ context with ioThreads={}", simpleName, ioThreads);
094        context = ZMQ.context(ioThreads);
095        logger.trace("{} created ZMQ context {}", simpleName, context);
096        if (enableShutdownHook) {
097            final Thread hook = new Thread(simpleName + "-shutdown") {
098                @Override
099                public void run() {
100                    shutdown();
101                }
102            };
103            logger.trace("{} adding shutdown hook {}", simpleName, hook);
104            Runtime.getRuntime().addShutdownHook(hook);
105        }
106    }
107
108    private final long affinity;
109    private final long backlog;
110    private final boolean delayAttachOnConnect;
111    private final List<String> endpoints;
112    private final byte[] identity;
113    private final int ioThreads = 1;
114    private final boolean ipv4Only;
115    private final long linger;
116    private final long maxMsgSize;
117    private final long rcvHwm;
118    private final long receiveBufferSize;
119    private final int receiveTimeOut;
120    private final long reconnectIVL;
121    private final long reconnectIVLMax;
122    private final long sendBufferSize;
123    private int sendRcFalse;
124    private int sendRcTrue;
125    private final int sendTimeOut;
126    private final long sndHwm;
127    private final int tcpKeepAlive;
128    private final long tcpKeepAliveCount;
129    private final long tcpKeepAliveIdle;
130    private final long tcpKeepAliveInterval;
131    private final boolean xpubVerbose;
132
133    private JeroMqAppender(final String name, final Filter filter, final Layout<? extends Serializable> layout,
134            final boolean ignoreExceptions, final List<String> endpoints, final long affinity, final long backlog,
135            final boolean delayAttachOnConnect, final byte[] identity, final boolean ipv4Only, final long linger,
136            final long maxMsgSize, final long rcvHwm, final long receiveBufferSize, final int receiveTimeOut,
137            final long reconnectIVL, final long reconnectIVLMax, final long sendBufferSize, final int sendTimeOut,
138            final long sndHWM, final int tcpKeepAlive, final long tcpKeepAliveCount, final long tcpKeepAliveIdle,
139            final long tcpKeepAliveInterval, final boolean xpubVerbose) {
140        super(name, filter, layout, ignoreExceptions);
141        this.endpoints = endpoints;
142        this.affinity = affinity;
143        this.backlog = backlog;
144        this.delayAttachOnConnect = delayAttachOnConnect;
145        this.identity = identity;
146        this.ipv4Only = ipv4Only;
147        this.linger = linger;
148        this.maxMsgSize = maxMsgSize;
149        this.rcvHwm = rcvHwm;
150        this.receiveBufferSize = receiveBufferSize;
151        this.receiveTimeOut = receiveTimeOut;
152        this.reconnectIVL = reconnectIVL;
153        this.reconnectIVLMax = reconnectIVLMax;
154        this.sendBufferSize = sendBufferSize;
155        this.sendTimeOut = sendTimeOut;
156        this.sndHwm = sndHWM;
157        this.tcpKeepAlive = tcpKeepAlive;
158        this.tcpKeepAliveCount = tcpKeepAliveCount;
159        this.tcpKeepAliveIdle = tcpKeepAliveIdle;
160        this.tcpKeepAliveInterval = tcpKeepAliveInterval;
161        this.xpubVerbose = xpubVerbose;
162    }
163
164    // The ZMQ.Socket class has other set methods that we do not cover because
165    // they throw unsupported operation exceptions.
166    @PluginFactory
167    public static JeroMqAppender createAppender(
168            // @formatter:off
169            @Required(message = "No name provided for JeroMqAppender") @PluginAttribute("name") final String name,
170            @PluginElement("Layout") Layout<?> layout,
171            @PluginElement("Filter") final Filter filter,
172            @PluginElement("Properties") final Property[] properties,
173            // Super attributes
174            @PluginAttribute("ignoreExceptions") final boolean ignoreExceptions,
175            // ZMQ attributes; defaults picked from zmq.Options.
176            @PluginAttribute(value = "affinity", defaultLong = 0) final long affinity,
177            @PluginAttribute(value = "backlog", defaultLong = DEFAULT_BACKLOG) final long backlog,
178            @PluginAttribute(value = "delayAttachOnConnect", defaultBoolean = false) final boolean delayAttachOnConnect,
179            @PluginAttribute(value = "identity") final byte[] identity,
180            @PluginAttribute(value = "ipv4Only", defaultBoolean = true) final boolean ipv4Only,
181            @PluginAttribute(value = "linger", defaultLong = -1) final long linger,
182            @PluginAttribute(value = "maxMsgSize", defaultLong = -1) final long maxMsgSize,
183            @PluginAttribute(value = "rcvHwm", defaultLong = DEFAULT_RCV_HWM) final long rcvHwm,
184            @PluginAttribute(value = "receiveBufferSize", defaultLong = 0) final long receiveBufferSize,
185            @PluginAttribute(value = "receiveTimeOut", defaultLong = -1) final int receiveTimeOut,
186            @PluginAttribute(value = "reconnectIVL", defaultLong = DEFAULT_IVL) final long reconnectIVL,
187            @PluginAttribute(value = "reconnectIVLMax", defaultLong = 0) final long reconnectIVLMax,
188            @PluginAttribute(value = "sendBufferSize", defaultLong = 0) final long sendBufferSize,
189            @PluginAttribute(value = "sendTimeOut", defaultLong = -1) final int sendTimeOut,
190            @PluginAttribute(value = "sndHwm", defaultLong = DEFAULT_SND_HWM) final long sndHwm,
191            @PluginAttribute(value = "tcpKeepAlive", defaultInt = -1) final int tcpKeepAlive,
192            @PluginAttribute(value = "tcpKeepAliveCount", defaultLong = -1) final long tcpKeepAliveCount,
193            @PluginAttribute(value = "tcpKeepAliveIdle", defaultLong = -1) final long tcpKeepAliveIdle,
194            @PluginAttribute(value = "tcpKeepAliveInterval", defaultLong = -1) final long tcpKeepAliveInterval,
195            @PluginAttribute(value = "xpubVerbose", defaultBoolean = false) final boolean xpubVerbose
196            // @formatter:on
197    ) {
198        if (layout == null) {
199            layout = PatternLayout.createDefaultLayout();
200        }
201        List<String> endpoints;
202        if (properties == null) {
203            endpoints = new ArrayList<>(0);
204        } else {
205            endpoints = new ArrayList<>(properties.length);
206            for (final Property property : properties) {
207                if ("endpoint".equalsIgnoreCase(property.getName())) {
208                    final String value = property.getValue();
209                    if (Strings.isNotEmpty(value)) {
210                        endpoints.add(value);
211                    }
212                }
213            }
214        }
215        logger.debug("Creating JeroMqAppender with name={}, filter={}, layout={}, ignoreExceptions={}, endpoints={}",
216                name, filter, layout, ignoreExceptions, endpoints);
217        return new JeroMqAppender(name, filter, layout, ignoreExceptions, endpoints, affinity, backlog,
218                delayAttachOnConnect, identity, ipv4Only, linger, maxMsgSize, rcvHwm, receiveBufferSize,
219                receiveTimeOut, reconnectIVL, reconnectIVLMax, sendBufferSize, sendTimeOut, sndHwm, tcpKeepAlive,
220                tcpKeepAliveCount, tcpKeepAliveIdle, tcpKeepAliveInterval, xpubVerbose);
221    }
222
223    static ZMQ.Context getContext() {
224        return context;
225    }
226
227    private static ZMQ.Socket getPublisher() {
228        return publisher;
229    }
230
231    private static ZMQ.Socket newPublisher() {
232        logger.trace("{} creating a new ZMQ PUB socket with context {}", SIMPLE_NAME, context);
233        final Socket socketPub = context.socket(ZMQ.PUB);
234        logger.trace("{} created new ZMQ PUB socket {}", SIMPLE_NAME, socketPub);
235        return socketPub;
236    }
237
238    static void shutdown() {
239        if (context != null) {
240            logger.trace("{} terminating JeroMQ context {}", SIMPLE_NAME, context);
241            context.term();
242            context = null;
243        }
244    }
245
246    @Override
247    public synchronized void append(final LogEvent event) {
248        final String formattedMessage = event.getMessage().getFormattedMessage();
249        if (getPublisher().send(formattedMessage, 0)) {
250            sendRcTrue++;
251        } else {
252            sendRcFalse++;
253            logger.error("Appender {} could not send message {} to JeroMQ {}", getName(), sendRcFalse, formattedMessage);
254        }
255    }
256
257    // not public, handy for testing
258    int getSendRcFalse() {
259        return sendRcFalse;
260    }
261
262    // not public, handy for testing
263    int getSendRcTrue() {
264        return sendRcTrue;
265    }
266
267    // not public, handy for testing
268    void resetSendRcs() {
269        sendRcTrue = sendRcFalse = 0;
270    }
271
272    @Override
273    public synchronized void start() {
274        super.start();
275        publisher = newPublisher();
276        final String name = getName();
277        final String prefix = "JeroMQ Appender";
278        logger.debug("Starting {} {} using ZMQ version {}", prefix, name, ZMQ.getVersionString());
279        logger.debug("{} {} context {} with ioThreads={}", prefix, name, context, ioThreads);
280        //
281        final ZMQ.Socket socketPub = getPublisher();
282        logger.trace("{} {} setting {} publisher properties for instance {}", prefix, name, socketPub.getClass()
283                .getName(), socketPub);
284        logger.trace("{} {} publisher setAffinity({})", prefix, name, affinity);
285        socketPub.setAffinity(affinity);
286        logger.trace("{} {} publisher setBacklog({})", prefix, name, backlog);
287        socketPub.setBacklog(backlog);
288        logger.trace("{} {} publisher setDelayAttachOnConnect({})", prefix, name, delayAttachOnConnect);
289        socketPub.setDelayAttachOnConnect(delayAttachOnConnect);
290        if (identity != null) {
291            logger.trace("{} {} publisher setIdentity({})", prefix, name, Arrays.toString(identity));
292            socketPub.setIdentity(identity);
293        }
294        logger.trace("{} {} publisher setIPv4Only({})", prefix, name, ipv4Only);
295        socketPub.setIPv4Only(ipv4Only);
296        logger.trace("{} {} publisher setLinger({})", prefix, name, linger);
297        socketPub.setLinger(linger);
298        logger.trace("{} {} publisher setMaxMsgSize({})", prefix, name, maxMsgSize);
299        socketPub.setMaxMsgSize(maxMsgSize);
300        logger.trace("{} {} publisher setRcvHWM({})", prefix, name, rcvHwm);
301        socketPub.setRcvHWM(rcvHwm);
302        logger.trace("{} {} publisher setReceiveBufferSize({})", prefix, name, receiveBufferSize);
303        socketPub.setReceiveBufferSize(receiveBufferSize);
304        logger.trace("{} {} publisher setReceiveTimeOut({})", prefix, name, receiveTimeOut);
305        socketPub.setReceiveTimeOut(receiveTimeOut);
306        logger.trace("{} {} publisher setReconnectIVL({})", prefix, name, reconnectIVL);
307        socketPub.setReconnectIVL(reconnectIVL);
308        logger.trace("{} {} publisher setReconnectIVLMax({})", prefix, name, reconnectIVLMax);
309        socketPub.setReconnectIVLMax(reconnectIVLMax);
310        logger.trace("{} {} publisher setSendBufferSize({})", prefix, name, sendBufferSize);
311        socketPub.setSendBufferSize(sendBufferSize);
312        logger.trace("{} {} publisher setSendTimeOut({})", prefix, name, sendTimeOut);
313        socketPub.setSendTimeOut(sendTimeOut);
314        logger.trace("{} {} publisher setSndHWM({})", prefix, name, sndHwm);
315        socketPub.setSndHWM(sndHwm);
316        logger.trace("{} {} publisher setTCPKeepAlive({})", prefix, name, tcpKeepAlive);
317        socketPub.setTCPKeepAlive(tcpKeepAlive);
318        logger.trace("{} {} publisher setTCPKeepAliveCount({})", prefix, name, tcpKeepAliveCount);
319        socketPub.setTCPKeepAliveCount(tcpKeepAliveCount);
320        logger.trace("{} {} publisher setTCPKeepAliveIdle({})", prefix, name, tcpKeepAliveIdle);
321        socketPub.setTCPKeepAliveIdle(tcpKeepAliveIdle);
322        logger.trace("{} {} publisher setTCPKeepAliveInterval({})", prefix, name, tcpKeepAliveInterval);
323        socketPub.setTCPKeepAliveInterval(tcpKeepAliveInterval);
324        logger.trace("{} {} publisher setXpubVerbose({})", prefix, name, xpubVerbose);
325        socketPub.setXpubVerbose(xpubVerbose);
326        //
327        if (logger.isDebugEnabled()) {
328            logger.debug(
329                    "Created JeroMQ {} publisher {} type {}, affinity={}, backlog={}, delayAttachOnConnect={}, events={}, IPv4Only={}, linger={}, maxMsgSize={}, multicastHops={}, "
330                            + "rate={}, rcvHWM={}, receiveBufferSize={}, receiveTimeOut={}, reconnectIVL={}, reconnectIVLMax={}, recoveryInterval={}, sendBufferSize={}, "
331                            + "sendTimeOut={}, sndHWM={}, TCPKeepAlive={}, TCPKeepAliveCount={}, TCPKeepAliveIdle={}, TCPKeepAliveInterval={}, TCPKeepAliveSetting={}",
332                    name, socketPub, socketPub.getType(), socketPub.getAffinity(), socketPub.getBacklog(),
333                    socketPub.getDelayAttachOnConnect(), socketPub.getEvents(), socketPub.getIPv4Only(),
334                    socketPub.getLinger(), socketPub.getMaxMsgSize(), socketPub.getMulticastHops(),
335                    socketPub.getRate(), socketPub.getRcvHWM(), socketPub.getReceiveBufferSize(),
336                    socketPub.getReceiveTimeOut(), socketPub.getReconnectIVL(), socketPub.getReconnectIVLMax(),
337                    socketPub.getRecoveryInterval(), socketPub.getSendBufferSize(), socketPub.getSendTimeOut(),
338                    socketPub.getSndHWM(), socketPub.getTCPKeepAlive(), socketPub.getTCPKeepAliveCount(),
339                    socketPub.getTCPKeepAliveIdle(), socketPub.getTCPKeepAliveInterval(),
340                    socketPub.getTCPKeepAliveSetting());
341        }
342        for (final String endpoint : endpoints) {
343            logger.debug("Binding {} appender {} to endpoint {}", SIMPLE_NAME, name, endpoint);
344            socketPub.bind(endpoint);
345        }
346    }
347
348    @Override
349    public synchronized void stop() {
350        super.stop();
351        final Socket socketPub = getPublisher();
352        if (socketPub != null) {
353            logger.debug("Closing {} appender {} publisher {}", SIMPLE_NAME, getName(), socketPub);
354            socketPub.close();
355            publisher = null;
356        }
357    }
358
359    @Override
360    public String toString() {
361        return "JeroMqAppender [context=" + context + ", publisher=" + publisher + ", endpoints=" + endpoints + "]";
362    }
363
364}