001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017
018package org.apache.logging.log4j.core.appender.mom.jeromq;
019
020import java.util.Arrays;
021import java.util.List;
022
023import org.apache.logging.log4j.LogManager;
024import org.apache.logging.log4j.core.appender.AbstractManager;
025import org.apache.logging.log4j.core.appender.ManagerFactory;
026import org.apache.logging.log4j.core.util.ShutdownCallbackRegistry;
027import org.apache.logging.log4j.util.PropertiesUtil;
028import org.zeromq.ZMQ;
029
030/**
031 * Manager for publishing messages via JeroMq.
032 *
033 * @since 2.6
034 */
035public class JeroMqManager extends AbstractManager {
036
037    /**
038     * System property to enable shutdown hook.
039     */
040    public static final String SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK = "log4j.jeromq.enableShutdownHook";
041
042    /**
043     * System property to control JeroMQ I/O thread count.
044     */
045    public static final String SYS_PROPERTY_IO_THREADS = "log4j.jeromq.ioThreads";
046
047    private static final JeroMqManagerFactory FACTORY = new JeroMqManagerFactory();
048    private static final ZMQ.Context CONTEXT;
049
050    static {
051        LOGGER.trace("JeroMqManager using ZMQ version {}", ZMQ.getVersionString());
052
053        final int ioThreads = PropertiesUtil.getProperties().getIntegerProperty(SYS_PROPERTY_IO_THREADS, 1);
054        LOGGER.trace("JeroMqManager creating ZMQ context with ioThreads = {}", ioThreads);
055        CONTEXT = ZMQ.context(ioThreads);
056
057        final boolean enableShutdownHook = PropertiesUtil.getProperties().getBooleanProperty(
058            SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK, true);
059        if (enableShutdownHook) {
060            ((ShutdownCallbackRegistry) LogManager.getFactory()).addShutdownCallback(new Runnable() {
061                @Override
062                public void run() {
063                    CONTEXT.close();
064                }
065            });
066        }
067    }
068
069    private final ZMQ.Socket publisher;
070
071    private JeroMqManager(final String name, final JeroMqConfiguration config) {
072        super(name);
073        publisher = CONTEXT.socket(ZMQ.PUB);
074        publisher.setAffinity(config.affinity);
075        publisher.setBacklog(config.backlog);
076        publisher.setDelayAttachOnConnect(config.delayAttachOnConnect);
077        if (config.identity != null) {
078            publisher.setIdentity(config.identity);
079        }
080        publisher.setIPv4Only(config.ipv4Only);
081        publisher.setLinger(config.linger);
082        publisher.setMaxMsgSize(config.maxMsgSize);
083        publisher.setRcvHWM(config.rcvHwm);
084        publisher.setReceiveBufferSize(config.receiveBufferSize);
085        publisher.setReceiveTimeOut(config.receiveTimeOut);
086        publisher.setReconnectIVL(config.reconnectIVL);
087        publisher.setReconnectIVLMax(config.reconnectIVLMax);
088        publisher.setSendBufferSize(config.sendBufferSize);
089        publisher.setSendTimeOut(config.sendTimeOut);
090        publisher.setSndHWM(config.sndHwm);
091        publisher.setTCPKeepAlive(config.tcpKeepAlive);
092        publisher.setTCPKeepAliveCount(config.tcpKeepAliveCount);
093        publisher.setTCPKeepAliveIdle(config.tcpKeepAliveIdle);
094        publisher.setTCPKeepAliveInterval(config.tcpKeepAliveInterval);
095        publisher.setXpubVerbose(config.xpubVerbose);
096        for (final String endpoint : config.endpoints) {
097            publisher.bind(endpoint);
098        }
099        LOGGER.debug("Created JeroMqManager with {}", config);
100    }
101
102    public boolean send(final byte[] data) {
103        return publisher.send(data);
104    }
105
106    @Override
107    protected void releaseSub() {
108        publisher.close();
109    }
110
111    public static JeroMqManager getJeroMqManager(final String name, final long affinity, final long backlog,
112                                                 final boolean delayAttachOnConnect, final byte[] identity,
113                                                 final boolean ipv4Only, final long linger, final long maxMsgSize,
114                                                 final long rcvHwm, final long receiveBufferSize,
115                                                 final int receiveTimeOut, final long reconnectIVL,
116                                                 final long reconnectIVLMax, final long sendBufferSize,
117                                                 final int sendTimeOut, final long sndHwm, final int tcpKeepAlive,
118                                                 final long tcpKeepAliveCount, final long tcpKeepAliveIdle,
119                                                 final long tcpKeepAliveInterval, final boolean xpubVerbose,
120                                                 final List<String> endpoints) {
121        return getManager(name, FACTORY,
122            new JeroMqConfiguration(affinity, backlog, delayAttachOnConnect, identity, ipv4Only, linger, maxMsgSize,
123                rcvHwm, receiveBufferSize, receiveTimeOut, reconnectIVL, reconnectIVLMax, sendBufferSize, sendTimeOut,
124                sndHwm, tcpKeepAlive, tcpKeepAliveCount, tcpKeepAliveIdle, tcpKeepAliveInterval, xpubVerbose,
125                endpoints));
126    }
127
128    public static ZMQ.Context getContext() {
129        return CONTEXT;
130    }
131
132    private static class JeroMqConfiguration {
133        private final long affinity;
134        private final long backlog;
135        private final boolean delayAttachOnConnect;
136        private final byte[] identity;
137        private final boolean ipv4Only;
138        private final long linger;
139        private final long maxMsgSize;
140        private final long rcvHwm;
141        private final long receiveBufferSize;
142        private final int receiveTimeOut;
143        private final long reconnectIVL;
144        private final long reconnectIVLMax;
145        private final long sendBufferSize;
146        private final int sendTimeOut;
147        private final long sndHwm;
148        private final int tcpKeepAlive;
149        private final long tcpKeepAliveCount;
150        private final long tcpKeepAliveIdle;
151        private final long tcpKeepAliveInterval;
152        private final boolean xpubVerbose;
153        private final List<String> endpoints;
154
155        private JeroMqConfiguration(final long affinity, final long backlog, final boolean delayAttachOnConnect,
156                                    final byte[] identity, final boolean ipv4Only, final long linger,
157                                    final long maxMsgSize, final long rcvHwm, final long receiveBufferSize,
158                                    final int receiveTimeOut, final long reconnectIVL, final long reconnectIVLMax,
159                                    final long sendBufferSize, final int sendTimeOut, final long sndHwm,
160                                    final int tcpKeepAlive, final long tcpKeepAliveCount, final long tcpKeepAliveIdle,
161                                    final long tcpKeepAliveInterval, final boolean xpubVerbose,
162                                    final List<String> endpoints) {
163            this.affinity = affinity;
164            this.backlog = backlog;
165            this.delayAttachOnConnect = delayAttachOnConnect;
166            this.identity = identity;
167            this.ipv4Only = ipv4Only;
168            this.linger = linger;
169            this.maxMsgSize = maxMsgSize;
170            this.rcvHwm = rcvHwm;
171            this.receiveBufferSize = receiveBufferSize;
172            this.receiveTimeOut = receiveTimeOut;
173            this.reconnectIVL = reconnectIVL;
174            this.reconnectIVLMax = reconnectIVLMax;
175            this.sendBufferSize = sendBufferSize;
176            this.sendTimeOut = sendTimeOut;
177            this.sndHwm = sndHwm;
178            this.tcpKeepAlive = tcpKeepAlive;
179            this.tcpKeepAliveCount = tcpKeepAliveCount;
180            this.tcpKeepAliveIdle = tcpKeepAliveIdle;
181            this.tcpKeepAliveInterval = tcpKeepAliveInterval;
182            this.xpubVerbose = xpubVerbose;
183            this.endpoints = endpoints;
184        }
185
186        @Override
187        public String toString() {
188            return "JeroMqConfiguration{" +
189                "affinity=" + affinity +
190                ", backlog=" + backlog +
191                ", delayAttachOnConnect=" + delayAttachOnConnect +
192                ", identity=" + Arrays.toString(identity) +
193                ", ipv4Only=" + ipv4Only +
194                ", linger=" + linger +
195                ", maxMsgSize=" + maxMsgSize +
196                ", rcvHwm=" + rcvHwm +
197                ", receiveBufferSize=" + receiveBufferSize +
198                ", receiveTimeOut=" + receiveTimeOut +
199                ", reconnectIVL=" + reconnectIVL +
200                ", reconnectIVLMax=" + reconnectIVLMax +
201                ", sendBufferSize=" + sendBufferSize +
202                ", sendTimeOut=" + sendTimeOut +
203                ", sndHwm=" + sndHwm +
204                ", tcpKeepAlive=" + tcpKeepAlive +
205                ", tcpKeepAliveCount=" + tcpKeepAliveCount +
206                ", tcpKeepAliveIdle=" + tcpKeepAliveIdle +
207                ", tcpKeepAliveInterval=" + tcpKeepAliveInterval +
208                ", xpubVerbose=" + xpubVerbose +
209                ", endpoints=" + endpoints +
210                '}';
211        }
212    }
213
214    private static class JeroMqManagerFactory implements ManagerFactory<JeroMqManager, JeroMqConfiguration> {
215        @Override
216        public JeroMqManager createManager(final String name, final JeroMqConfiguration data) {
217            return new JeroMqManager(name, data);
218        }
219    }
220}