View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  
18  package org.apache.logging.log4j.core.appender.mom.jeromq;
19  
20  import java.io.Serializable;
21  import java.util.ArrayList;
22  import java.util.Arrays;
23  import java.util.List;
24  
25  import org.apache.logging.log4j.Logger;
26  import org.apache.logging.log4j.core.Filter;
27  import org.apache.logging.log4j.core.Layout;
28  import org.apache.logging.log4j.core.LogEvent;
29  import org.apache.logging.log4j.core.appender.AbstractAppender;
30  import org.apache.logging.log4j.core.config.Property;
31  import org.apache.logging.log4j.core.config.plugins.Plugin;
32  import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
33  import org.apache.logging.log4j.core.config.plugins.PluginElement;
34  import org.apache.logging.log4j.core.config.plugins.PluginFactory;
35  import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
36  import org.apache.logging.log4j.core.layout.PatternLayout;
37  import org.apache.logging.log4j.status.StatusLogger;
38  import org.apache.logging.log4j.util.PropertiesUtil;
39  import org.apache.logging.log4j.util.Strings;
40  import org.zeromq.ZMQ;
41  import org.zeromq.ZMQ.Socket;
42  
43  /**
44   * Sends log events to one or more ZeroMQ (JeroMQ) endpoints.
45   * <p>
46   * Requires the JeroMQ jar (LGPL as of 0.3.5)
47   * </p>
48   */
49  // TODO
50  // Some methods are synchronized because a ZMQ.Socket is not thread-safe
51  // Using a ThreadLocal for the publisher hangs tests on shutdown. There must be
52  // some issue on threads owning certain resources as opposed to others.
53  @Plugin(name = "JeroMQ", category = "Core", elementType = "appender", printObject = true)
54  public final class JeroMqAppender extends AbstractAppender {
55  
56      /**
57       * System property to enable shutdown hook.
58       */
59      static final String SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK = "log4j.jeromq.enableShutdownHook";
60  
61      /**
62       * System property to control JeroMQ I/O thread count.
63       */
64      static final String SYS_PROPERTY_IO_THREADS = "log4j.jeromq.ioThreads";
65  
66      // Per ZMQ docs, there should usually only be one ZMQ context per process.
67      private static volatile ZMQ.Context context;
68  
69      private static final int DEFAULT_BACKLOG = 100;
70  
71      private static final int DEFAULT_IVL = 100;
72  
73      private static final int DEFAULT_RCV_HWM = 1000;
74  
75      private static final int DEFAULT_SND_HWM = 1000;
76  
77      private static Logger logger;
78  
79      // ZMQ sockets are not thread safe.
80      private static ZMQ.Socket publisher;
81  
82      private static final long serialVersionUID = 1L;
83  
84      private static final String SIMPLE_NAME = JeroMqAppender.class.getSimpleName();
85  
86      static {
87          logger = StatusLogger.getLogger();
88          final PropertiesUtil managerProps = PropertiesUtil.getProperties();
89          final Integer ioThreads = managerProps.getIntegerProperty(SYS_PROPERTY_IO_THREADS, 1);
90          final Boolean enableShutdownHook = managerProps.getBooleanProperty(SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK, true);
91          final String simpleName = SIMPLE_NAME;
92          logger.trace("{} using ZMQ version {}", simpleName, ZMQ.getVersionString());
93          logger.trace("{} creating ZMQ context with ioThreads={}", simpleName, ioThreads);
94          context = ZMQ.context(ioThreads);
95          logger.trace("{} created ZMQ context {}", simpleName, context);
96          if (enableShutdownHook) {
97              final Thread hook = new Thread(simpleName + "-shutdown") {
98                  @Override
99                  public void run() {
100                     shutdown();
101                 }
102             };
103             logger.trace("{} adding shutdown hook {}", simpleName, hook);
104             Runtime.getRuntime().addShutdownHook(hook);
105         }
106     }
107 
108     private final long affinity;
109     private final long backlog;
110     private final boolean delayAttachOnConnect;
111     private final List<String> endpoints;
112     private final byte[] identity;
113     private final int ioThreads = 1;
114     private final boolean ipv4Only;
115     private final long linger;
116     private final long maxMsgSize;
117     private final long rcvHwm;
118     private final long receiveBufferSize;
119     private final int receiveTimeOut;
120     private final long reconnectIVL;
121     private final long reconnectIVLMax;
122     private final long sendBufferSize;
123     private int sendRcFalse;
124     private int sendRcTrue;
125     private final int sendTimeOut;
126     private final long sndHwm;
127     private final int tcpKeepAlive;
128     private final long tcpKeepAliveCount;
129     private final long tcpKeepAliveIdle;
130     private final long tcpKeepAliveInterval;
131     private final boolean xpubVerbose;
132 
133     private JeroMqAppender(final String name, final Filter filter, final Layout<? extends Serializable> layout,
134             final boolean ignoreExceptions, final List<String> endpoints, final long affinity, final long backlog,
135             final boolean delayAttachOnConnect, final byte[] identity, final boolean ipv4Only, final long linger,
136             final long maxMsgSize, final long rcvHwm, final long receiveBufferSize, final int receiveTimeOut,
137             final long reconnectIVL, final long reconnectIVLMax, final long sendBufferSize, final int sendTimeOut,
138             final long sndHWM, final int tcpKeepAlive, final long tcpKeepAliveCount, final long tcpKeepAliveIdle,
139             final long tcpKeepAliveInterval, final boolean xpubVerbose) {
140         super(name, filter, layout, ignoreExceptions);
141         this.endpoints = endpoints;
142         this.affinity = affinity;
143         this.backlog = backlog;
144         this.delayAttachOnConnect = delayAttachOnConnect;
145         this.identity = identity;
146         this.ipv4Only = ipv4Only;
147         this.linger = linger;
148         this.maxMsgSize = maxMsgSize;
149         this.rcvHwm = rcvHwm;
150         this.receiveBufferSize = receiveBufferSize;
151         this.receiveTimeOut = receiveTimeOut;
152         this.reconnectIVL = reconnectIVL;
153         this.reconnectIVLMax = reconnectIVLMax;
154         this.sendBufferSize = sendBufferSize;
155         this.sendTimeOut = sendTimeOut;
156         this.sndHwm = sndHWM;
157         this.tcpKeepAlive = tcpKeepAlive;
158         this.tcpKeepAliveCount = tcpKeepAliveCount;
159         this.tcpKeepAliveIdle = tcpKeepAliveIdle;
160         this.tcpKeepAliveInterval = tcpKeepAliveInterval;
161         this.xpubVerbose = xpubVerbose;
162     }
163 
164     // The ZMQ.Socket class has other set methods that we do not cover because
165     // they throw unsupported operation exceptions.
166     @PluginFactory
167     public static JeroMqAppender createAppender(
168             // @formatter:off
169             @Required(message = "No name provided for JeroMqAppender") @PluginAttribute("name") final String name,
170             @PluginElement("Layout") Layout<?> layout,
171             @PluginElement("Filter") final Filter filter,
172             @PluginElement("Properties") final Property[] properties,
173             // Super attributes
174             @PluginAttribute("ignoreExceptions") final boolean ignoreExceptions,
175             // ZMQ attributes; defaults picked from zmq.Options.
176             @PluginAttribute(value = "affinity", defaultLong = 0) final long affinity,
177             @PluginAttribute(value = "backlog", defaultLong = DEFAULT_BACKLOG) final long backlog,
178             @PluginAttribute(value = "delayAttachOnConnect", defaultBoolean = false) final boolean delayAttachOnConnect,
179             @PluginAttribute(value = "identity") final byte[] identity,
180             @PluginAttribute(value = "ipv4Only", defaultBoolean = true) final boolean ipv4Only,
181             @PluginAttribute(value = "linger", defaultLong = -1) final long linger,
182             @PluginAttribute(value = "maxMsgSize", defaultLong = -1) final long maxMsgSize,
183             @PluginAttribute(value = "rcvHwm", defaultLong = DEFAULT_RCV_HWM) final long rcvHwm,
184             @PluginAttribute(value = "receiveBufferSize", defaultLong = 0) final long receiveBufferSize,
185             @PluginAttribute(value = "receiveTimeOut", defaultLong = -1) final int receiveTimeOut,
186             @PluginAttribute(value = "reconnectIVL", defaultLong = DEFAULT_IVL) final long reconnectIVL,
187             @PluginAttribute(value = "reconnectIVLMax", defaultLong = 0) final long reconnectIVLMax,
188             @PluginAttribute(value = "sendBufferSize", defaultLong = 0) final long sendBufferSize,
189             @PluginAttribute(value = "sendTimeOut", defaultLong = -1) final int sendTimeOut,
190             @PluginAttribute(value = "sndHwm", defaultLong = DEFAULT_SND_HWM) final long sndHwm,
191             @PluginAttribute(value = "tcpKeepAlive", defaultInt = -1) final int tcpKeepAlive,
192             @PluginAttribute(value = "tcpKeepAliveCount", defaultLong = -1) final long tcpKeepAliveCount,
193             @PluginAttribute(value = "tcpKeepAliveIdle", defaultLong = -1) final long tcpKeepAliveIdle,
194             @PluginAttribute(value = "tcpKeepAliveInterval", defaultLong = -1) final long tcpKeepAliveInterval,
195             @PluginAttribute(value = "xpubVerbose", defaultBoolean = false) final boolean xpubVerbose
196             // @formatter:on
197     ) {
198         if (layout == null) {
199             layout = PatternLayout.createDefaultLayout();
200         }
201         List<String> endpoints;
202         if (properties == null) {
203             endpoints = new ArrayList<>(0);
204         } else {
205             endpoints = new ArrayList<>(properties.length);
206             for (final Property property : properties) {
207                 if ("endpoint".equalsIgnoreCase(property.getName())) {
208                     final String value = property.getValue();
209                     if (Strings.isNotEmpty(value)) {
210                         endpoints.add(value);
211                     }
212                 }
213             }
214         }
215         logger.debug("Creating JeroMqAppender with name={}, filter={}, layout={}, ignoreExceptions={}, endpoints={}",
216                 name, filter, layout, ignoreExceptions, endpoints);
217         return new JeroMqAppender(name, filter, layout, ignoreExceptions, endpoints, affinity, backlog,
218                 delayAttachOnConnect, identity, ipv4Only, linger, maxMsgSize, rcvHwm, receiveBufferSize,
219                 receiveTimeOut, reconnectIVL, reconnectIVLMax, sendBufferSize, sendTimeOut, sndHwm, tcpKeepAlive,
220                 tcpKeepAliveCount, tcpKeepAliveIdle, tcpKeepAliveInterval, xpubVerbose);
221     }
222 
223     static ZMQ.Context getContext() {
224         return context;
225     }
226 
227     private static ZMQ.Socket getPublisher() {
228         return publisher;
229     }
230 
231     private static ZMQ.Socket newPublisher() {
232         logger.trace("{} creating a new ZMQ PUB socket with context {}", SIMPLE_NAME, context);
233         final Socket socketPub = context.socket(ZMQ.PUB);
234         logger.trace("{} created new ZMQ PUB socket {}", SIMPLE_NAME, socketPub);
235         return socketPub;
236     }
237 
238     static void shutdown() {
239         if (context != null) {
240             logger.trace("{} terminating JeroMQ context {}", SIMPLE_NAME, context);
241             context.term();
242             context = null;
243         }
244     }
245 
246     @Override
247     public synchronized void append(final LogEvent event) {
248         final String formattedMessage = event.getMessage().getFormattedMessage();
249         if (getPublisher().send(formattedMessage, 0)) {
250             sendRcTrue++;
251         } else {
252             sendRcFalse++;
253             logger.error("Appender {} could not send message {} to JeroMQ {}", getName(), sendRcFalse, formattedMessage);
254         }
255     }
256 
257     // not public, handy for testing
258     int getSendRcFalse() {
259         return sendRcFalse;
260     }
261 
262     // not public, handy for testing
263     int getSendRcTrue() {
264         return sendRcTrue;
265     }
266 
267     // not public, handy for testing
268     void resetSendRcs() {
269         sendRcTrue = sendRcFalse = 0;
270     }
271 
272     @Override
273     public synchronized void start() {
274         super.start();
275         publisher = newPublisher();
276         final String name = getName();
277         final String prefix = "JeroMQ Appender";
278         logger.debug("Starting {} {} using ZMQ version {}", prefix, name, ZMQ.getVersionString());
279         logger.debug("{} {} context {} with ioThreads={}", prefix, name, context, ioThreads);
280         //
281         final ZMQ.Socket socketPub = getPublisher();
282         logger.trace("{} {} setting {} publisher properties for instance {}", prefix, name, socketPub.getClass()
283                 .getName(), socketPub);
284         logger.trace("{} {} publisher setAffinity({})", prefix, name, affinity);
285         socketPub.setAffinity(affinity);
286         logger.trace("{} {} publisher setBacklog({})", prefix, name, backlog);
287         socketPub.setBacklog(backlog);
288         logger.trace("{} {} publisher setDelayAttachOnConnect({})", prefix, name, delayAttachOnConnect);
289         socketPub.setDelayAttachOnConnect(delayAttachOnConnect);
290         if (identity != null) {
291             logger.trace("{} {} publisher setIdentity({})", prefix, name, Arrays.toString(identity));
292             socketPub.setIdentity(identity);
293         }
294         logger.trace("{} {} publisher setIPv4Only({})", prefix, name, ipv4Only);
295         socketPub.setIPv4Only(ipv4Only);
296         logger.trace("{} {} publisher setLinger({})", prefix, name, linger);
297         socketPub.setLinger(linger);
298         logger.trace("{} {} publisher setMaxMsgSize({})", prefix, name, maxMsgSize);
299         socketPub.setMaxMsgSize(maxMsgSize);
300         logger.trace("{} {} publisher setRcvHWM({})", prefix, name, rcvHwm);
301         socketPub.setRcvHWM(rcvHwm);
302         logger.trace("{} {} publisher setReceiveBufferSize({})", prefix, name, receiveBufferSize);
303         socketPub.setReceiveBufferSize(receiveBufferSize);
304         logger.trace("{} {} publisher setReceiveTimeOut({})", prefix, name, receiveTimeOut);
305         socketPub.setReceiveTimeOut(receiveTimeOut);
306         logger.trace("{} {} publisher setReconnectIVL({})", prefix, name, reconnectIVL);
307         socketPub.setReconnectIVL(reconnectIVL);
308         logger.trace("{} {} publisher setReconnectIVLMax({})", prefix, name, reconnectIVLMax);
309         socketPub.setReconnectIVLMax(reconnectIVLMax);
310         logger.trace("{} {} publisher setSendBufferSize({})", prefix, name, sendBufferSize);
311         socketPub.setSendBufferSize(sendBufferSize);
312         logger.trace("{} {} publisher setSendTimeOut({})", prefix, name, sendTimeOut);
313         socketPub.setSendTimeOut(sendTimeOut);
314         logger.trace("{} {} publisher setSndHWM({})", prefix, name, sndHwm);
315         socketPub.setSndHWM(sndHwm);
316         logger.trace("{} {} publisher setTCPKeepAlive({})", prefix, name, tcpKeepAlive);
317         socketPub.setTCPKeepAlive(tcpKeepAlive);
318         logger.trace("{} {} publisher setTCPKeepAliveCount({})", prefix, name, tcpKeepAliveCount);
319         socketPub.setTCPKeepAliveCount(tcpKeepAliveCount);
320         logger.trace("{} {} publisher setTCPKeepAliveIdle({})", prefix, name, tcpKeepAliveIdle);
321         socketPub.setTCPKeepAliveIdle(tcpKeepAliveIdle);
322         logger.trace("{} {} publisher setTCPKeepAliveInterval({})", prefix, name, tcpKeepAliveInterval);
323         socketPub.setTCPKeepAliveInterval(tcpKeepAliveInterval);
324         logger.trace("{} {} publisher setXpubVerbose({})", prefix, name, xpubVerbose);
325         socketPub.setXpubVerbose(xpubVerbose);
326         //
327         if (logger.isDebugEnabled()) {
328             logger.debug(
329                     "Created JeroMQ {} publisher {} type {}, affinity={}, backlog={}, delayAttachOnConnect={}, events={}, IPv4Only={}, linger={}, maxMsgSize={}, multicastHops={}, "
330                             + "rate={}, rcvHWM={}, receiveBufferSize={}, receiveTimeOut={}, reconnectIVL={}, reconnectIVLMax={}, recoveryInterval={}, sendBufferSize={}, "
331                             + "sendTimeOut={}, sndHWM={}, TCPKeepAlive={}, TCPKeepAliveCount={}, TCPKeepAliveIdle={}, TCPKeepAliveInterval={}, TCPKeepAliveSetting={}",
332                     name, socketPub, socketPub.getType(), socketPub.getAffinity(), socketPub.getBacklog(),
333                     socketPub.getDelayAttachOnConnect(), socketPub.getEvents(), socketPub.getIPv4Only(),
334                     socketPub.getLinger(), socketPub.getMaxMsgSize(), socketPub.getMulticastHops(),
335                     socketPub.getRate(), socketPub.getRcvHWM(), socketPub.getReceiveBufferSize(),
336                     socketPub.getReceiveTimeOut(), socketPub.getReconnectIVL(), socketPub.getReconnectIVLMax(),
337                     socketPub.getRecoveryInterval(), socketPub.getSendBufferSize(), socketPub.getSendTimeOut(),
338                     socketPub.getSndHWM(), socketPub.getTCPKeepAlive(), socketPub.getTCPKeepAliveCount(),
339                     socketPub.getTCPKeepAliveIdle(), socketPub.getTCPKeepAliveInterval(),
340                     socketPub.getTCPKeepAliveSetting());
341         }
342         for (final String endpoint : endpoints) {
343             logger.debug("Binding {} appender {} to endpoint {}", SIMPLE_NAME, name, endpoint);
344             socketPub.bind(endpoint);
345         }
346     }
347 
348     @Override
349     public synchronized void stop() {
350         super.stop();
351         final Socket socketPub = getPublisher();
352         if (socketPub != null) {
353             logger.debug("Closing {} appender {} publisher {}", SIMPLE_NAME, getName(), socketPub);
354             socketPub.close();
355             publisher = null;
356         }
357     }
358 
359     @Override
360     public String toString() {
361         return "JeroMqAppender [context=" + context + ", publisher=" + publisher + ", endpoints=" + endpoints + "]";
362     }
363 
364 }