View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  
18  package org.apache.logging.log4j.core.appender.mom.jeromq;
19  
20  import java.io.Serializable;
21  import java.util.ArrayList;
22  import java.util.Arrays;
23  import java.util.List;
24  
25  import org.apache.logging.log4j.Logger;
26  import org.apache.logging.log4j.core.Filter;
27  import org.apache.logging.log4j.core.Layout;
28  import org.apache.logging.log4j.core.LogEvent;
29  import org.apache.logging.log4j.core.appender.AbstractAppender;
30  import org.apache.logging.log4j.core.config.Property;
31  import org.apache.logging.log4j.core.config.plugins.Plugin;
32  import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
33  import org.apache.logging.log4j.core.config.plugins.PluginElement;
34  import org.apache.logging.log4j.core.config.plugins.PluginFactory;
35  import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
36  import org.apache.logging.log4j.core.layout.PatternLayout;
37  import org.apache.logging.log4j.core.util.Log4jThread;
38  import org.apache.logging.log4j.status.StatusLogger;
39  import org.apache.logging.log4j.util.PropertiesUtil;
40  import org.apache.logging.log4j.util.Strings;
41  import org.zeromq.ZMQ;
42  import org.zeromq.ZMQ.Socket;
43  
44  /**
45   * Sends log events to one or more ZeroMQ (JeroMQ) endpoints.
46   * <p>
47   * Requires the JeroMQ jar (LGPL as of 0.3.5)
48   * </p>
49   */
50  // TODO
51  // Some methods are synchronized because a ZMQ.Socket is not thread-safe
52  // Using a ThreadLocal for the publisher hangs tests on shutdown. There must be
53  // some issue on threads owning certain resources as opposed to others.
54  @Plugin(name = "JeroMQ", category = "Core", elementType = "appender", printObject = true)
55  public final class JeroMqAppender extends AbstractAppender {
56  
57      /**
58       * System property to enable shutdown hook.
59       */
60      static final String SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK = "log4j.jeromq.enableShutdownHook";
61  
62      /**
63       * System property to control JeroMQ I/O thread count.
64       */
65      static final String SYS_PROPERTY_IO_THREADS = "log4j.jeromq.ioThreads";
66  
67      // Per ZMQ docs, there should usually only be one ZMQ context per process.
68      private static volatile ZMQ.Context context;
69  
70      private static final int DEFAULT_BACKLOG = 100;
71  
72      private static final int DEFAULT_IVL = 100;
73  
74      private static final int DEFAULT_RCV_HWM = 1000;
75  
76      private static final int DEFAULT_SND_HWM = 1000;
77  
78      private static Logger logger;
79  
80      // ZMQ sockets are not thread safe.
81      private static ZMQ.Socket publisher;
82  
83      private static final long serialVersionUID = 1L;
84  
85      private static final String SIMPLE_NAME = JeroMqAppender.class.getSimpleName();
86  
87      static {
88          logger = StatusLogger.getLogger();
89          final PropertiesUtil managerProps = PropertiesUtil.getProperties();
90          final Integer ioThreads = managerProps.getIntegerProperty(SYS_PROPERTY_IO_THREADS, 1);
91          final Boolean enableShutdownHook = managerProps.getBooleanProperty(SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK, true);
92          final String simpleName = SIMPLE_NAME;
93          logger.trace("{} using ZMQ version {}", simpleName, ZMQ.getVersionString());
94          logger.trace("{} creating ZMQ context with ioThreads={}", simpleName, ioThreads);
95          context = ZMQ.context(ioThreads);
96          logger.trace("{} created ZMQ context {}", simpleName, context);
97          if (enableShutdownHook) {
98              final Thread hook = new Log4jThread(simpleName + "-shutdown") {
99                  @Override
100                 public void run() {
101                     shutdown();
102                 }
103             };
104             logger.trace("{} adding shutdown hook {}", simpleName, hook);
105             Runtime.getRuntime().addShutdownHook(hook);
106         }
107     }
108 
109     private final long affinity;
110     private final long backlog;
111     private final boolean delayAttachOnConnect;
112     private final List<String> endpoints;
113     private final byte[] identity;
114     private final int ioThreads = 1;
115     private final boolean ipv4Only;
116     private final long linger;
117     private final long maxMsgSize;
118     private final long rcvHwm;
119     private final long receiveBufferSize;
120     private final int receiveTimeOut;
121     private final long reconnectIVL;
122     private final long reconnectIVLMax;
123     private final long sendBufferSize;
124     private int sendRcFalse;
125     private int sendRcTrue;
126     private final int sendTimeOut;
127     private final long sndHwm;
128     private final int tcpKeepAlive;
129     private final long tcpKeepAliveCount;
130     private final long tcpKeepAliveIdle;
131     private final long tcpKeepAliveInterval;
132     private final boolean xpubVerbose;
133 
134     private JeroMqAppender(final String name, final Filter filter, final Layout<? extends Serializable> layout,
135             final boolean ignoreExceptions, final List<String> endpoints, final long affinity, final long backlog,
136             final boolean delayAttachOnConnect, final byte[] identity, final boolean ipv4Only, final long linger,
137             final long maxMsgSize, final long rcvHwm, final long receiveBufferSize, final int receiveTimeOut,
138             final long reconnectIVL, final long reconnectIVLMax, final long sendBufferSize, final int sendTimeOut,
139             final long sndHWM, final int tcpKeepAlive, final long tcpKeepAliveCount, final long tcpKeepAliveIdle,
140             final long tcpKeepAliveInterval, final boolean xpubVerbose) {
141         super(name, filter, layout, ignoreExceptions);
142         this.endpoints = endpoints;
143         this.affinity = affinity;
144         this.backlog = backlog;
145         this.delayAttachOnConnect = delayAttachOnConnect;
146         this.identity = identity;
147         this.ipv4Only = ipv4Only;
148         this.linger = linger;
149         this.maxMsgSize = maxMsgSize;
150         this.rcvHwm = rcvHwm;
151         this.receiveBufferSize = receiveBufferSize;
152         this.receiveTimeOut = receiveTimeOut;
153         this.reconnectIVL = reconnectIVL;
154         this.reconnectIVLMax = reconnectIVLMax;
155         this.sendBufferSize = sendBufferSize;
156         this.sendTimeOut = sendTimeOut;
157         this.sndHwm = sndHWM;
158         this.tcpKeepAlive = tcpKeepAlive;
159         this.tcpKeepAliveCount = tcpKeepAliveCount;
160         this.tcpKeepAliveIdle = tcpKeepAliveIdle;
161         this.tcpKeepAliveInterval = tcpKeepAliveInterval;
162         this.xpubVerbose = xpubVerbose;
163     }
164 
165     // The ZMQ.Socket class has other set methods that we do not cover because
166     // they throw unsupported operation exceptions.
167     @PluginFactory
168     public static JeroMqAppender createAppender(
169             // @formatter:off
170             @Required(message = "No name provided for JeroMqAppender") @PluginAttribute("name") final String name,
171             @PluginElement("Layout") Layout<?> layout,
172             @PluginElement("Filter") final Filter filter,
173             @PluginElement("Properties") final Property[] properties,
174             // Super attributes
175             @PluginAttribute("ignoreExceptions") final boolean ignoreExceptions,
176             // ZMQ attributes; defaults picked from zmq.Options.
177             @PluginAttribute(value = "affinity", defaultLong = 0) final long affinity,
178             @PluginAttribute(value = "backlog", defaultLong = DEFAULT_BACKLOG) final long backlog,
179             @PluginAttribute(value = "delayAttachOnConnect", defaultBoolean = false) final boolean delayAttachOnConnect,
180             @PluginAttribute(value = "identity") final byte[] identity,
181             @PluginAttribute(value = "ipv4Only", defaultBoolean = true) final boolean ipv4Only,
182             @PluginAttribute(value = "linger", defaultLong = -1) final long linger,
183             @PluginAttribute(value = "maxMsgSize", defaultLong = -1) final long maxMsgSize,
184             @PluginAttribute(value = "rcvHwm", defaultLong = DEFAULT_RCV_HWM) final long rcvHwm,
185             @PluginAttribute(value = "receiveBufferSize", defaultLong = 0) final long receiveBufferSize,
186             @PluginAttribute(value = "receiveTimeOut", defaultLong = -1) final int receiveTimeOut,
187             @PluginAttribute(value = "reconnectIVL", defaultLong = DEFAULT_IVL) final long reconnectIVL,
188             @PluginAttribute(value = "reconnectIVLMax", defaultLong = 0) final long reconnectIVLMax,
189             @PluginAttribute(value = "sendBufferSize", defaultLong = 0) final long sendBufferSize,
190             @PluginAttribute(value = "sendTimeOut", defaultLong = -1) final int sendTimeOut,
191             @PluginAttribute(value = "sndHwm", defaultLong = DEFAULT_SND_HWM) final long sndHwm,
192             @PluginAttribute(value = "tcpKeepAlive", defaultInt = -1) final int tcpKeepAlive,
193             @PluginAttribute(value = "tcpKeepAliveCount", defaultLong = -1) final long tcpKeepAliveCount,
194             @PluginAttribute(value = "tcpKeepAliveIdle", defaultLong = -1) final long tcpKeepAliveIdle,
195             @PluginAttribute(value = "tcpKeepAliveInterval", defaultLong = -1) final long tcpKeepAliveInterval,
196             @PluginAttribute(value = "xpubVerbose", defaultBoolean = false) final boolean xpubVerbose
197             // @formatter:on
198     ) {
199         if (layout == null) {
200             layout = PatternLayout.createDefaultLayout();
201         }
202         List<String> endpoints;
203         if (properties == null) {
204             endpoints = new ArrayList<>(0);
205         } else {
206             endpoints = new ArrayList<>(properties.length);
207             for (final Property property : properties) {
208                 if ("endpoint".equalsIgnoreCase(property.getName())) {
209                     final String value = property.getValue();
210                     if (Strings.isNotEmpty(value)) {
211                         endpoints.add(value);
212                     }
213                 }
214             }
215         }
216         logger.debug("Creating JeroMqAppender with name={}, filter={}, layout={}, ignoreExceptions={}, endpoints={}",
217                 name, filter, layout, ignoreExceptions, endpoints);
218         return new JeroMqAppender(name, filter, layout, ignoreExceptions, endpoints, affinity, backlog,
219                 delayAttachOnConnect, identity, ipv4Only, linger, maxMsgSize, rcvHwm, receiveBufferSize,
220                 receiveTimeOut, reconnectIVL, reconnectIVLMax, sendBufferSize, sendTimeOut, sndHwm, tcpKeepAlive,
221                 tcpKeepAliveCount, tcpKeepAliveIdle, tcpKeepAliveInterval, xpubVerbose);
222     }
223 
224     static ZMQ.Context getContext() {
225         return context;
226     }
227 
228     private static ZMQ.Socket getPublisher() {
229         return publisher;
230     }
231 
232     private static ZMQ.Socket newPublisher() {
233         logger.trace("{} creating a new ZMQ PUB socket with context {}", SIMPLE_NAME, context);
234         final Socket socketPub = context.socket(ZMQ.PUB);
235         logger.trace("{} created new ZMQ PUB socket {}", SIMPLE_NAME, socketPub);
236         return socketPub;
237     }
238 
239     static void shutdown() {
240         if (context != null) {
241             logger.trace("{} terminating JeroMQ context {}", SIMPLE_NAME, context);
242             context.term();
243             context = null;
244         }
245     }
246 
247     @Override
248     public synchronized void append(final LogEvent event) {
249         final String formattedMessage = event.getMessage().getFormattedMessage();
250         if (getPublisher().send(formattedMessage, 0)) {
251             sendRcTrue++;
252         } else {
253             sendRcFalse++;
254             logger.error("Appender {} could not send message {} to JeroMQ {}", getName(), sendRcFalse, formattedMessage);
255         }
256     }
257 
258     // not public, handy for testing
259     int getSendRcFalse() {
260         return sendRcFalse;
261     }
262 
263     // not public, handy for testing
264     int getSendRcTrue() {
265         return sendRcTrue;
266     }
267 
268     // not public, handy for testing
269     void resetSendRcs() {
270         sendRcTrue = sendRcFalse = 0;
271     }
272 
273     @Override
274     public synchronized void start() {
275         super.start();
276         publisher = newPublisher();
277         final String name = getName();
278         final String prefix = "JeroMQ Appender";
279         logger.debug("Starting {} {} using ZMQ version {}", prefix, name, ZMQ.getVersionString());
280         logger.debug("{} {} context {} with ioThreads={}", prefix, name, context, ioThreads);
281         //
282         final ZMQ.Socket socketPub = getPublisher();
283         logger.trace("{} {} setting {} publisher properties for instance {}", prefix, name, socketPub.getClass()
284                 .getName(), socketPub);
285         logger.trace("{} {} publisher setAffinity({})", prefix, name, affinity);
286         socketPub.setAffinity(affinity);
287         logger.trace("{} {} publisher setBacklog({})", prefix, name, backlog);
288         socketPub.setBacklog(backlog);
289         logger.trace("{} {} publisher setDelayAttachOnConnect({})", prefix, name, delayAttachOnConnect);
290         socketPub.setDelayAttachOnConnect(delayAttachOnConnect);
291         if (identity != null) {
292             logger.trace("{} {} publisher setIdentity({})", prefix, name, Arrays.toString(identity));
293             socketPub.setIdentity(identity);
294         }
295         logger.trace("{} {} publisher setIPv4Only({})", prefix, name, ipv4Only);
296         socketPub.setIPv4Only(ipv4Only);
297         logger.trace("{} {} publisher setLinger({})", prefix, name, linger);
298         socketPub.setLinger(linger);
299         logger.trace("{} {} publisher setMaxMsgSize({})", prefix, name, maxMsgSize);
300         socketPub.setMaxMsgSize(maxMsgSize);
301         logger.trace("{} {} publisher setRcvHWM({})", prefix, name, rcvHwm);
302         socketPub.setRcvHWM(rcvHwm);
303         logger.trace("{} {} publisher setReceiveBufferSize({})", prefix, name, receiveBufferSize);
304         socketPub.setReceiveBufferSize(receiveBufferSize);
305         logger.trace("{} {} publisher setReceiveTimeOut({})", prefix, name, receiveTimeOut);
306         socketPub.setReceiveTimeOut(receiveTimeOut);
307         logger.trace("{} {} publisher setReconnectIVL({})", prefix, name, reconnectIVL);
308         socketPub.setReconnectIVL(reconnectIVL);
309         logger.trace("{} {} publisher setReconnectIVLMax({})", prefix, name, reconnectIVLMax);
310         socketPub.setReconnectIVLMax(reconnectIVLMax);
311         logger.trace("{} {} publisher setSendBufferSize({})", prefix, name, sendBufferSize);
312         socketPub.setSendBufferSize(sendBufferSize);
313         logger.trace("{} {} publisher setSendTimeOut({})", prefix, name, sendTimeOut);
314         socketPub.setSendTimeOut(sendTimeOut);
315         logger.trace("{} {} publisher setSndHWM({})", prefix, name, sndHwm);
316         socketPub.setSndHWM(sndHwm);
317         logger.trace("{} {} publisher setTCPKeepAlive({})", prefix, name, tcpKeepAlive);
318         socketPub.setTCPKeepAlive(tcpKeepAlive);
319         logger.trace("{} {} publisher setTCPKeepAliveCount({})", prefix, name, tcpKeepAliveCount);
320         socketPub.setTCPKeepAliveCount(tcpKeepAliveCount);
321         logger.trace("{} {} publisher setTCPKeepAliveIdle({})", prefix, name, tcpKeepAliveIdle);
322         socketPub.setTCPKeepAliveIdle(tcpKeepAliveIdle);
323         logger.trace("{} {} publisher setTCPKeepAliveInterval({})", prefix, name, tcpKeepAliveInterval);
324         socketPub.setTCPKeepAliveInterval(tcpKeepAliveInterval);
325         logger.trace("{} {} publisher setXpubVerbose({})", prefix, name, xpubVerbose);
326         socketPub.setXpubVerbose(xpubVerbose);
327         //
328         if (logger.isDebugEnabled()) {
329             logger.debug(
330                     "Created JeroMQ {} publisher {} type {}, affinity={}, backlog={}, delayAttachOnConnect={}, events={}, IPv4Only={}, linger={}, maxMsgSize={}, multicastHops={}, "
331                             + "rate={}, rcvHWM={}, receiveBufferSize={}, receiveTimeOut={}, reconnectIVL={}, reconnectIVLMax={}, recoveryInterval={}, sendBufferSize={}, "
332                             + "sendTimeOut={}, sndHWM={}, TCPKeepAlive={}, TCPKeepAliveCount={}, TCPKeepAliveIdle={}, TCPKeepAliveInterval={}, TCPKeepAliveSetting={}",
333                     name, socketPub, socketPub.getType(), socketPub.getAffinity(), socketPub.getBacklog(),
334                     socketPub.getDelayAttachOnConnect(), socketPub.getEvents(), socketPub.getIPv4Only(),
335                     socketPub.getLinger(), socketPub.getMaxMsgSize(), socketPub.getMulticastHops(),
336                     socketPub.getRate(), socketPub.getRcvHWM(), socketPub.getReceiveBufferSize(),
337                     socketPub.getReceiveTimeOut(), socketPub.getReconnectIVL(), socketPub.getReconnectIVLMax(),
338                     socketPub.getRecoveryInterval(), socketPub.getSendBufferSize(), socketPub.getSendTimeOut(),
339                     socketPub.getSndHWM(), socketPub.getTCPKeepAlive(), socketPub.getTCPKeepAliveCount(),
340                     socketPub.getTCPKeepAliveIdle(), socketPub.getTCPKeepAliveInterval(),
341                     socketPub.getTCPKeepAliveSetting());
342         }
343         for (final String endpoint : endpoints) {
344             logger.debug("Binding {} appender {} to endpoint {}", SIMPLE_NAME, name, endpoint);
345             socketPub.bind(endpoint);
346         }
347     }
348 
349     @Override
350     public synchronized void stop() {
351         super.stop();
352         final Socket socketPub = getPublisher();
353         if (socketPub != null) {
354             logger.debug("Closing {} appender {} publisher {}", SIMPLE_NAME, getName(), socketPub);
355             socketPub.close();
356             publisher = null;
357         }
358     }
359 
360     @Override
361     public String toString() {
362         return "JeroMqAppender [context=" + context + ", publisher=" + publisher + ", endpoints=" + endpoints + "]";
363     }
364 
365 }