001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017
018package org.apache.logging.log4j.core.appender.mom.jeromq;
019
020import java.io.Serializable;
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.List;
024
025import org.apache.logging.log4j.Logger;
026import org.apache.logging.log4j.core.Filter;
027import org.apache.logging.log4j.core.Layout;
028import org.apache.logging.log4j.core.LogEvent;
029import org.apache.logging.log4j.core.appender.AbstractAppender;
030import org.apache.logging.log4j.core.config.Property;
031import org.apache.logging.log4j.core.config.plugins.Plugin;
032import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
033import org.apache.logging.log4j.core.config.plugins.PluginElement;
034import org.apache.logging.log4j.core.config.plugins.PluginFactory;
035import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
036import org.apache.logging.log4j.core.layout.PatternLayout;
037import org.apache.logging.log4j.core.util.Log4jThread;
038import org.apache.logging.log4j.status.StatusLogger;
039import org.apache.logging.log4j.util.PropertiesUtil;
040import org.apache.logging.log4j.util.Strings;
041import org.zeromq.ZMQ;
042import org.zeromq.ZMQ.Socket;
043
044/**
045 * Sends log events to one or more ZeroMQ (JeroMQ) endpoints.
046 * <p>
047 * Requires the JeroMQ jar (LGPL as of 0.3.5)
048 * </p>
049 */
050// TODO
051// Some methods are synchronized because a ZMQ.Socket is not thread-safe
052// Using a ThreadLocal for the publisher hangs tests on shutdown. There must be
053// some issue on threads owning certain resources as opposed to others.
054@Plugin(name = "JeroMQ", category = "Core", elementType = "appender", printObject = true)
055public final class JeroMqAppender extends AbstractAppender {
056
057    /**
058     * System property to enable shutdown hook.
059     */
060    static final String SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK = "log4j.jeromq.enableShutdownHook";
061
062    /**
063     * System property to control JeroMQ I/O thread count.
064     */
065    static final String SYS_PROPERTY_IO_THREADS = "log4j.jeromq.ioThreads";
066
067    // Per ZMQ docs, there should usually only be one ZMQ context per process.
068    private static volatile ZMQ.Context context;
069
070    private static final int DEFAULT_BACKLOG = 100;
071
072    private static final int DEFAULT_IVL = 100;
073
074    private static final int DEFAULT_RCV_HWM = 1000;
075
076    private static final int DEFAULT_SND_HWM = 1000;
077
078    private static Logger logger;
079
080    // ZMQ sockets are not thread safe.
081    private static ZMQ.Socket publisher;
082
083    private static final long serialVersionUID = 1L;
084
085    private static final String SIMPLE_NAME = JeroMqAppender.class.getSimpleName();
086
087    static {
088        logger = StatusLogger.getLogger();
089        final PropertiesUtil managerProps = PropertiesUtil.getProperties();
090        final Integer ioThreads = managerProps.getIntegerProperty(SYS_PROPERTY_IO_THREADS, 1);
091        final Boolean enableShutdownHook = managerProps.getBooleanProperty(SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK, true);
092        final String simpleName = SIMPLE_NAME;
093        logger.trace("{} using ZMQ version {}", simpleName, ZMQ.getVersionString());
094        logger.trace("{} creating ZMQ context with ioThreads={}", simpleName, ioThreads);
095        context = ZMQ.context(ioThreads);
096        logger.trace("{} created ZMQ context {}", simpleName, context);
097        if (enableShutdownHook) {
098            final Thread hook = new Log4jThread(simpleName + "-shutdown") {
099                @Override
100                public void run() {
101                    shutdown();
102                }
103            };
104            logger.trace("{} adding shutdown hook {}", simpleName, hook);
105            Runtime.getRuntime().addShutdownHook(hook);
106        }
107    }
108
109    private final long affinity;
110    private final long backlog;
111    private final boolean delayAttachOnConnect;
112    private final List<String> endpoints;
113    private final byte[] identity;
114    private final int ioThreads = 1;
115    private final boolean ipv4Only;
116    private final long linger;
117    private final long maxMsgSize;
118    private final long rcvHwm;
119    private final long receiveBufferSize;
120    private final int receiveTimeOut;
121    private final long reconnectIVL;
122    private final long reconnectIVLMax;
123    private final long sendBufferSize;
124    private int sendRcFalse;
125    private int sendRcTrue;
126    private final int sendTimeOut;
127    private final long sndHwm;
128    private final int tcpKeepAlive;
129    private final long tcpKeepAliveCount;
130    private final long tcpKeepAliveIdle;
131    private final long tcpKeepAliveInterval;
132    private final boolean xpubVerbose;
133
134    private JeroMqAppender(final String name, final Filter filter, final Layout<? extends Serializable> layout,
135            final boolean ignoreExceptions, final List<String> endpoints, final long affinity, final long backlog,
136            final boolean delayAttachOnConnect, final byte[] identity, final boolean ipv4Only, final long linger,
137            final long maxMsgSize, final long rcvHwm, final long receiveBufferSize, final int receiveTimeOut,
138            final long reconnectIVL, final long reconnectIVLMax, final long sendBufferSize, final int sendTimeOut,
139            final long sndHWM, final int tcpKeepAlive, final long tcpKeepAliveCount, final long tcpKeepAliveIdle,
140            final long tcpKeepAliveInterval, final boolean xpubVerbose) {
141        super(name, filter, layout, ignoreExceptions);
142        this.endpoints = endpoints;
143        this.affinity = affinity;
144        this.backlog = backlog;
145        this.delayAttachOnConnect = delayAttachOnConnect;
146        this.identity = identity;
147        this.ipv4Only = ipv4Only;
148        this.linger = linger;
149        this.maxMsgSize = maxMsgSize;
150        this.rcvHwm = rcvHwm;
151        this.receiveBufferSize = receiveBufferSize;
152        this.receiveTimeOut = receiveTimeOut;
153        this.reconnectIVL = reconnectIVL;
154        this.reconnectIVLMax = reconnectIVLMax;
155        this.sendBufferSize = sendBufferSize;
156        this.sendTimeOut = sendTimeOut;
157        this.sndHwm = sndHWM;
158        this.tcpKeepAlive = tcpKeepAlive;
159        this.tcpKeepAliveCount = tcpKeepAliveCount;
160        this.tcpKeepAliveIdle = tcpKeepAliveIdle;
161        this.tcpKeepAliveInterval = tcpKeepAliveInterval;
162        this.xpubVerbose = xpubVerbose;
163    }
164
165    // The ZMQ.Socket class has other set methods that we do not cover because
166    // they throw unsupported operation exceptions.
167    @PluginFactory
168    public static JeroMqAppender createAppender(
169            // @formatter:off
170            @Required(message = "No name provided for JeroMqAppender") @PluginAttribute("name") final String name,
171            @PluginElement("Layout") Layout<?> layout,
172            @PluginElement("Filter") final Filter filter,
173            @PluginElement("Properties") final Property[] properties,
174            // Super attributes
175            @PluginAttribute("ignoreExceptions") final boolean ignoreExceptions,
176            // ZMQ attributes; defaults picked from zmq.Options.
177            @PluginAttribute(value = "affinity", defaultLong = 0) final long affinity,
178            @PluginAttribute(value = "backlog", defaultLong = DEFAULT_BACKLOG) final long backlog,
179            @PluginAttribute(value = "delayAttachOnConnect", defaultBoolean = false) final boolean delayAttachOnConnect,
180            @PluginAttribute(value = "identity") final byte[] identity,
181            @PluginAttribute(value = "ipv4Only", defaultBoolean = true) final boolean ipv4Only,
182            @PluginAttribute(value = "linger", defaultLong = -1) final long linger,
183            @PluginAttribute(value = "maxMsgSize", defaultLong = -1) final long maxMsgSize,
184            @PluginAttribute(value = "rcvHwm", defaultLong = DEFAULT_RCV_HWM) final long rcvHwm,
185            @PluginAttribute(value = "receiveBufferSize", defaultLong = 0) final long receiveBufferSize,
186            @PluginAttribute(value = "receiveTimeOut", defaultLong = -1) final int receiveTimeOut,
187            @PluginAttribute(value = "reconnectIVL", defaultLong = DEFAULT_IVL) final long reconnectIVL,
188            @PluginAttribute(value = "reconnectIVLMax", defaultLong = 0) final long reconnectIVLMax,
189            @PluginAttribute(value = "sendBufferSize", defaultLong = 0) final long sendBufferSize,
190            @PluginAttribute(value = "sendTimeOut", defaultLong = -1) final int sendTimeOut,
191            @PluginAttribute(value = "sndHwm", defaultLong = DEFAULT_SND_HWM) final long sndHwm,
192            @PluginAttribute(value = "tcpKeepAlive", defaultInt = -1) final int tcpKeepAlive,
193            @PluginAttribute(value = "tcpKeepAliveCount", defaultLong = -1) final long tcpKeepAliveCount,
194            @PluginAttribute(value = "tcpKeepAliveIdle", defaultLong = -1) final long tcpKeepAliveIdle,
195            @PluginAttribute(value = "tcpKeepAliveInterval", defaultLong = -1) final long tcpKeepAliveInterval,
196            @PluginAttribute(value = "xpubVerbose", defaultBoolean = false) final boolean xpubVerbose
197            // @formatter:on
198    ) {
199        if (layout == null) {
200            layout = PatternLayout.createDefaultLayout();
201        }
202        List<String> endpoints;
203        if (properties == null) {
204            endpoints = new ArrayList<>(0);
205        } else {
206            endpoints = new ArrayList<>(properties.length);
207            for (final Property property : properties) {
208                if ("endpoint".equalsIgnoreCase(property.getName())) {
209                    final String value = property.getValue();
210                    if (Strings.isNotEmpty(value)) {
211                        endpoints.add(value);
212                    }
213                }
214            }
215        }
216        logger.debug("Creating JeroMqAppender with name={}, filter={}, layout={}, ignoreExceptions={}, endpoints={}",
217                name, filter, layout, ignoreExceptions, endpoints);
218        return new JeroMqAppender(name, filter, layout, ignoreExceptions, endpoints, affinity, backlog,
219                delayAttachOnConnect, identity, ipv4Only, linger, maxMsgSize, rcvHwm, receiveBufferSize,
220                receiveTimeOut, reconnectIVL, reconnectIVLMax, sendBufferSize, sendTimeOut, sndHwm, tcpKeepAlive,
221                tcpKeepAliveCount, tcpKeepAliveIdle, tcpKeepAliveInterval, xpubVerbose);
222    }
223
224    static ZMQ.Context getContext() {
225        return context;
226    }
227
228    private static ZMQ.Socket getPublisher() {
229        return publisher;
230    }
231
232    private static ZMQ.Socket newPublisher() {
233        logger.trace("{} creating a new ZMQ PUB socket with context {}", SIMPLE_NAME, context);
234        final Socket socketPub = context.socket(ZMQ.PUB);
235        logger.trace("{} created new ZMQ PUB socket {}", SIMPLE_NAME, socketPub);
236        return socketPub;
237    }
238
239    static void shutdown() {
240        if (context != null) {
241            logger.trace("{} terminating JeroMQ context {}", SIMPLE_NAME, context);
242            context.term();
243            context = null;
244        }
245    }
246
247    @Override
248    public synchronized void append(final LogEvent event) {
249        final String formattedMessage = event.getMessage().getFormattedMessage();
250        if (getPublisher().send(formattedMessage, 0)) {
251            sendRcTrue++;
252        } else {
253            sendRcFalse++;
254            logger.error("Appender {} could not send message {} to JeroMQ {}", getName(), sendRcFalse, formattedMessage);
255        }
256    }
257
258    // not public, handy for testing
259    int getSendRcFalse() {
260        return sendRcFalse;
261    }
262
263    // not public, handy for testing
264    int getSendRcTrue() {
265        return sendRcTrue;
266    }
267
268    // not public, handy for testing
269    void resetSendRcs() {
270        sendRcTrue = sendRcFalse = 0;
271    }
272
273    @Override
274    public synchronized void start() {
275        super.start();
276        publisher = newPublisher();
277        final String name = getName();
278        final String prefix = "JeroMQ Appender";
279        logger.debug("Starting {} {} using ZMQ version {}", prefix, name, ZMQ.getVersionString());
280        logger.debug("{} {} context {} with ioThreads={}", prefix, name, context, ioThreads);
281        //
282        final ZMQ.Socket socketPub = getPublisher();
283        logger.trace("{} {} setting {} publisher properties for instance {}", prefix, name, socketPub.getClass()
284                .getName(), socketPub);
285        logger.trace("{} {} publisher setAffinity({})", prefix, name, affinity);
286        socketPub.setAffinity(affinity);
287        logger.trace("{} {} publisher setBacklog({})", prefix, name, backlog);
288        socketPub.setBacklog(backlog);
289        logger.trace("{} {} publisher setDelayAttachOnConnect({})", prefix, name, delayAttachOnConnect);
290        socketPub.setDelayAttachOnConnect(delayAttachOnConnect);
291        if (identity != null) {
292            logger.trace("{} {} publisher setIdentity({})", prefix, name, Arrays.toString(identity));
293            socketPub.setIdentity(identity);
294        }
295        logger.trace("{} {} publisher setIPv4Only({})", prefix, name, ipv4Only);
296        socketPub.setIPv4Only(ipv4Only);
297        logger.trace("{} {} publisher setLinger({})", prefix, name, linger);
298        socketPub.setLinger(linger);
299        logger.trace("{} {} publisher setMaxMsgSize({})", prefix, name, maxMsgSize);
300        socketPub.setMaxMsgSize(maxMsgSize);
301        logger.trace("{} {} publisher setRcvHWM({})", prefix, name, rcvHwm);
302        socketPub.setRcvHWM(rcvHwm);
303        logger.trace("{} {} publisher setReceiveBufferSize({})", prefix, name, receiveBufferSize);
304        socketPub.setReceiveBufferSize(receiveBufferSize);
305        logger.trace("{} {} publisher setReceiveTimeOut({})", prefix, name, receiveTimeOut);
306        socketPub.setReceiveTimeOut(receiveTimeOut);
307        logger.trace("{} {} publisher setReconnectIVL({})", prefix, name, reconnectIVL);
308        socketPub.setReconnectIVL(reconnectIVL);
309        logger.trace("{} {} publisher setReconnectIVLMax({})", prefix, name, reconnectIVLMax);
310        socketPub.setReconnectIVLMax(reconnectIVLMax);
311        logger.trace("{} {} publisher setSendBufferSize({})", prefix, name, sendBufferSize);
312        socketPub.setSendBufferSize(sendBufferSize);
313        logger.trace("{} {} publisher setSendTimeOut({})", prefix, name, sendTimeOut);
314        socketPub.setSendTimeOut(sendTimeOut);
315        logger.trace("{} {} publisher setSndHWM({})", prefix, name, sndHwm);
316        socketPub.setSndHWM(sndHwm);
317        logger.trace("{} {} publisher setTCPKeepAlive({})", prefix, name, tcpKeepAlive);
318        socketPub.setTCPKeepAlive(tcpKeepAlive);
319        logger.trace("{} {} publisher setTCPKeepAliveCount({})", prefix, name, tcpKeepAliveCount);
320        socketPub.setTCPKeepAliveCount(tcpKeepAliveCount);
321        logger.trace("{} {} publisher setTCPKeepAliveIdle({})", prefix, name, tcpKeepAliveIdle);
322        socketPub.setTCPKeepAliveIdle(tcpKeepAliveIdle);
323        logger.trace("{} {} publisher setTCPKeepAliveInterval({})", prefix, name, tcpKeepAliveInterval);
324        socketPub.setTCPKeepAliveInterval(tcpKeepAliveInterval);
325        logger.trace("{} {} publisher setXpubVerbose({})", prefix, name, xpubVerbose);
326        socketPub.setXpubVerbose(xpubVerbose);
327        //
328        if (logger.isDebugEnabled()) {
329            logger.debug(
330                    "Created JeroMQ {} publisher {} type {}, affinity={}, backlog={}, delayAttachOnConnect={}, events={}, IPv4Only={}, linger={}, maxMsgSize={}, multicastHops={}, "
331                            + "rate={}, rcvHWM={}, receiveBufferSize={}, receiveTimeOut={}, reconnectIVL={}, reconnectIVLMax={}, recoveryInterval={}, sendBufferSize={}, "
332                            + "sendTimeOut={}, sndHWM={}, TCPKeepAlive={}, TCPKeepAliveCount={}, TCPKeepAliveIdle={}, TCPKeepAliveInterval={}, TCPKeepAliveSetting={}",
333                    name, socketPub, socketPub.getType(), socketPub.getAffinity(), socketPub.getBacklog(),
334                    socketPub.getDelayAttachOnConnect(), socketPub.getEvents(), socketPub.getIPv4Only(),
335                    socketPub.getLinger(), socketPub.getMaxMsgSize(), socketPub.getMulticastHops(),
336                    socketPub.getRate(), socketPub.getRcvHWM(), socketPub.getReceiveBufferSize(),
337                    socketPub.getReceiveTimeOut(), socketPub.getReconnectIVL(), socketPub.getReconnectIVLMax(),
338                    socketPub.getRecoveryInterval(), socketPub.getSendBufferSize(), socketPub.getSendTimeOut(),
339                    socketPub.getSndHWM(), socketPub.getTCPKeepAlive(), socketPub.getTCPKeepAliveCount(),
340                    socketPub.getTCPKeepAliveIdle(), socketPub.getTCPKeepAliveInterval(),
341                    socketPub.getTCPKeepAliveSetting());
342        }
343        for (final String endpoint : endpoints) {
344            logger.debug("Binding {} appender {} to endpoint {}", SIMPLE_NAME, name, endpoint);
345            socketPub.bind(endpoint);
346        }
347    }
348
349    @Override
350    public synchronized void stop() {
351        super.stop();
352        final Socket socketPub = getPublisher();
353        if (socketPub != null) {
354            logger.debug("Closing {} appender {} publisher {}", SIMPLE_NAME, getName(), socketPub);
355            socketPub.close();
356            publisher = null;
357        }
358    }
359
360    @Override
361    public String toString() {
362        return "JeroMqAppender [context=" + context + ", publisher=" + publisher + ", endpoints=" + endpoints + "]";
363    }
364
365}