View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  
18  package org.apache.logging.log4j.core.appender.mom.jeromq;
19  
20  import java.io.Serializable;
21  import java.util.ArrayList;
22  import java.util.Arrays;
23  import java.util.List;
24  
25  import org.apache.logging.log4j.Logger;
26  import org.apache.logging.log4j.core.Filter;
27  import org.apache.logging.log4j.core.Layout;
28  import org.apache.logging.log4j.core.LogEvent;
29  import org.apache.logging.log4j.core.appender.AbstractAppender;
30  import org.apache.logging.log4j.core.config.Property;
31  import org.apache.logging.log4j.core.config.plugins.Plugin;
32  import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
33  import org.apache.logging.log4j.core.config.plugins.PluginElement;
34  import org.apache.logging.log4j.core.config.plugins.PluginFactory;
35  import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
36  import org.apache.logging.log4j.core.layout.PatternLayout;
37  import org.apache.logging.log4j.status.StatusLogger;
38  import org.apache.logging.log4j.util.PropertiesUtil;
39  import org.apache.logging.log4j.util.Strings;
40  import org.zeromq.ZMQ;
41  import org.zeromq.ZMQ.Socket;
42  
43  /**
44   * Sends log events to one or more ZeroMQ (JeroMQ) endpoints.
45   * <p>
46   * Requires the JeroMQ jar (LGPL as of 0.3.5)
47   * </p>
48   */
49  // TODO
50  // Some methods are synchronized because a ZMQ.Socket is not thread-safe
51  // Using a ThreadLocal for the publisher hangs tests on shutdown. There must be
52  // some issue on threads owning certain resources as opposed to others.
53  @Plugin(name = "JeroMQ", category = "Core", elementType = "appender", printObject = true)
54  public final class JeroMqAppender extends AbstractAppender {
55  
56      // Per ZMQ docs, there should usually only be one ZMQ context per process.
57      private static volatile ZMQ.Context context;
58  
59      private static final int DEFAULT_BACKLOG = 100;
60  
61      private static final int DEFAULT_IVL = 100;
62  
63      private static final int DEFAULT_RCV_HWM = 1000;
64  
65      private static final int DEFAULT_SND_HWM = 1000;
66  
67      private static Logger logger;
68  
69      // ZMQ sockets are not thread safe.
70      private static ZMQ.Socket publisher;
71  
72      private static final long serialVersionUID = 1L;
73  
74      private static final String SIMPLE_NAME = JeroMqAppender.class.getSimpleName();
75  
76      static final String SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK = "log4j.jeromq.enableShutdownHook";
77  
78      static final String SYS_PROPERTY_IO_THREADS = "log4j.jeromq.ioThreads";
79  
80      static {
81          logger = StatusLogger.getLogger();
82          final PropertiesUtil managerProps = PropertiesUtil.getProperties();
83          final Integer ioThreads = managerProps.getIntegerProperty(SYS_PROPERTY_IO_THREADS, 1);
84          final Boolean enableShutdownHook = managerProps.getBooleanProperty(SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK, true);
85          final String simpleName = SIMPLE_NAME;
86          logger.trace("{} using ZMQ version {}", simpleName, ZMQ.getVersionString());
87          logger.trace("{} creating ZMQ context with ioThreads={}", simpleName, ioThreads);
88          context = ZMQ.context(ioThreads);
89          logger.trace("{} created ZMQ context {}", simpleName, context);
90          if (enableShutdownHook) {
91              final Thread hook = new Thread(simpleName + "-shutdown") {
92                  @Override
93                  public void run() {
94                      shutdown();
95                  }
96              };
97              logger.trace("{} adding shutdown hook {}", simpleName, hook);
98              Runtime.getRuntime().addShutdownHook(hook);
99          }
100     }
101 
102     // The ZMQ.Socket class has other set methods that we do not cover because
103     // they throw unsupported operation exceptions.
104     @PluginFactory
105     public static JeroMqAppender createAppender(
106             // @formatter:off
107             @Required(message = "No name provided for JeroMqAppender") @PluginAttribute("name") final String name,
108             @PluginElement("Layout") Layout<?> layout,
109             @PluginElement("Filters") final Filter filter,
110             @PluginElement("Properties") final Property[] properties,
111             // Super attributes
112             @PluginAttribute("ignoreExceptions") final boolean ignoreExceptions,
113             // ZMQ attributes; defaults picked from zmq.Options.
114             @PluginAttribute(value = "affinity", defaultLong = 0) final long affinity,
115             @PluginAttribute(value = "backlog", defaultLong = DEFAULT_BACKLOG) final long backlog,
116             @PluginAttribute(value = "delayAttachOnConnect", defaultBoolean = false) final boolean delayAttachOnConnect,
117             @PluginAttribute(value = "identity") final byte[] identity,
118             @PluginAttribute(value = "ipv4Only", defaultBoolean = true) final boolean ipv4Only,
119             @PluginAttribute(value = "linger", defaultLong = -1) final long linger,
120             @PluginAttribute(value = "maxMsgSize", defaultLong = -1) final long maxMsgSize,
121             @PluginAttribute(value = "rcvHwm", defaultLong = DEFAULT_RCV_HWM) final long rcvHwm,
122             @PluginAttribute(value = "receiveBufferSize", defaultLong = 0) final long receiveBufferSize,
123             @PluginAttribute(value = "receiveTimeOut", defaultLong = -1) final int receiveTimeOut,
124             @PluginAttribute(value = "reconnectIVL", defaultLong = DEFAULT_IVL) final long reconnectIVL,
125             @PluginAttribute(value = "reconnectIVLMax", defaultLong = 0) final long reconnectIVLMax,
126             @PluginAttribute(value = "sendBufferSize", defaultLong = 0) final long sendBufferSize,
127             @PluginAttribute(value = "sendTimeOut", defaultLong = -1) final int sendTimeOut,
128             @PluginAttribute(value = "sndHwm", defaultLong = DEFAULT_SND_HWM) final long sndHwm,
129             @PluginAttribute(value = "tcpKeepAlive", defaultInt = -1) final int tcpKeepAlive,
130             @PluginAttribute(value = "tcpKeepAliveCount", defaultLong = -1) final long tcpKeepAliveCount,
131             @PluginAttribute(value = "tcpKeepAliveIdle", defaultLong = -1) final long tcpKeepAliveIdle,
132             @PluginAttribute(value = "tcpKeepAliveInterval", defaultLong = -1) final long tcpKeepAliveInterval,
133             @PluginAttribute(value = "xpubVerbose", defaultBoolean = false) final boolean xpubVerbose
134             // @formatter:on
135     ) {
136         if (layout == null) {
137             layout = PatternLayout.createDefaultLayout();
138         }
139         List<String> endpoints;
140         if (properties == null) {
141             endpoints = new ArrayList<>(0);
142         } else {
143             endpoints = new ArrayList<>(properties.length);
144             for (final Property property : properties) {
145                 if ("endpoint".equalsIgnoreCase(property.getName())) {
146                     final String value = property.getValue();
147                     if (Strings.isNotEmpty(value)) {
148                         endpoints.add(value);
149                     }
150                 }
151             }
152         }
153         logger.debug("Creating JeroMqAppender with name={}, filter={}, layout={}, ignoreExceptions={}, endpoints={}",
154                 name, filter, layout, ignoreExceptions, endpoints);
155         return new JeroMqAppender(name, filter, layout, ignoreExceptions, endpoints, affinity, backlog,
156                 delayAttachOnConnect, identity, ipv4Only, linger, maxMsgSize, rcvHwm, receiveBufferSize, receiveTimeOut,
157                 reconnectIVL, reconnectIVLMax, sendBufferSize, sendTimeOut, sndHwm, tcpKeepAlive, tcpKeepAliveCount,
158                 tcpKeepAliveIdle, tcpKeepAliveInterval, xpubVerbose);
159     }
160 
161     static ZMQ.Context getContext() {
162         return context;
163     }
164 
165     private static ZMQ.Socket getPublisher() {
166         return publisher;
167     }
168 
169     private static ZMQ.Socket newPublisher() {
170         logger.trace("{} creating a new ZMQ PUB socket with context {}", SIMPLE_NAME, context);
171         final Socket socketPub = context.socket(ZMQ.PUB);
172         logger.trace("{} created new ZMQ PUB socket {}", SIMPLE_NAME, socketPub);
173         return socketPub;
174     }
175 
176     static void shutdown() {
177         if (context != null) {
178             logger.trace("{} terminating JeroMQ context {}", SIMPLE_NAME, context);
179             context.term();
180             context = null;
181         }
182     }
183 
184     private final long affinity;
185     private final long backlog;
186     private final boolean delayAttachOnConnect;
187     private final List<String> endpoints;
188     private final byte[] identity;
189     private final int ioThreads = 1;
190     private final boolean ipv4Only;
191     private final long linger;
192     private final long maxMsgSize;
193     private final long rcvHwm;
194     private final long receiveBufferSize;
195     private final int receiveTimeOut;
196     private final long reconnectIVL;
197     private final long reconnectIVLMax;
198     private final long sendBufferSize;
199     private int sendRcFalse;
200     private int sendRcTrue;
201     private final int sendTimeOut;
202     private final long sndHwm;
203     private final int tcpKeepAlive;
204     private final long tcpKeepAliveCount;
205     private final long tcpKeepAliveIdle;
206     private final long tcpKeepAliveInterval;
207     private final boolean xpubVerbose;
208 
209     private JeroMqAppender(final String name, final Filter filter, final Layout<? extends Serializable> layout,
210             final boolean ignoreExceptions, final List<String> endpoints, final long affinity, final long backlog,
211             final boolean delayAttachOnConnect, final byte[] identity, final boolean ipv4Only, final long linger,
212             final long maxMsgSize, final long rcvHwm, final long receiveBufferSize, final int receiveTimeOut,
213             final long reconnectIVL, final long reconnectIVLMax, final long sendBufferSize, final int sendTimeOut,
214             final long sndHWM, final int tcpKeepAlive, final long tcpKeepAliveCount, final long tcpKeepAliveIdle,
215             final long tcpKeepAliveInterval, final boolean xpubVerbose) {
216         super(name, filter, layout, ignoreExceptions);
217         this.endpoints = endpoints;
218         this.affinity = affinity;
219         this.backlog = backlog;
220         this.delayAttachOnConnect = delayAttachOnConnect;
221         this.identity = identity;
222         this.ipv4Only = ipv4Only;
223         this.linger = linger;
224         this.maxMsgSize = maxMsgSize;
225         this.rcvHwm = rcvHwm;
226         this.receiveBufferSize = receiveBufferSize;
227         this.receiveTimeOut = receiveTimeOut;
228         this.reconnectIVL = reconnectIVL;
229         this.reconnectIVLMax = reconnectIVLMax;
230         this.sendBufferSize = sendBufferSize;
231         this.sendTimeOut = sendTimeOut;
232         this.sndHwm = sndHWM;
233         this.tcpKeepAlive = tcpKeepAlive;
234         this.tcpKeepAliveCount = tcpKeepAliveCount;
235         this.tcpKeepAliveIdle = tcpKeepAliveIdle;
236         this.tcpKeepAliveInterval = tcpKeepAliveInterval;
237         this.xpubVerbose = xpubVerbose;
238     }
239 
240     @Override
241     public synchronized void append(final LogEvent event) {
242         final String formattedMessage = event.getMessage().getFormattedMessage();
243         if (getPublisher().send(formattedMessage, 0)) {
244             sendRcTrue++;
245         } else {
246             sendRcFalse++;
247             logger.error("Appender {} could not send message {} to JeroMQ {}", getName(), sendRcFalse,
248                     formattedMessage);
249         }
250     }
251 
252     // not public, handy for testing
253     int getSendRcFalse() {
254         return sendRcFalse;
255     }
256 
257     // not public, handy for testing
258     int getSendRcTrue() {
259         return sendRcTrue;
260     }
261 
262     // not public, handy for testing
263     void resetSendRcs() {
264         sendRcTrue = sendRcFalse = 0;
265     }
266 
267     @Override
268     public synchronized void start() {
269         super.start();
270         publisher = newPublisher();
271         final String name = getName();
272         final String prefix = "JeroMQ Appender";
273         logger.debug("Starting {} {} using ZMQ version {}", prefix, name, ZMQ.getVersionString());
274         logger.debug("{} {} context {} with ioThreads={}", prefix, name, context, ioThreads);
275         //
276         final ZMQ.Socket socketPub = getPublisher();
277         logger.trace("{} {} setting {} publisher properties for instance {}", prefix, name,
278                 socketPub.getClass().getName(), socketPub);
279         logger.trace("{} {} publisher setAffinity({})", prefix, name, affinity);
280         socketPub.setAffinity(affinity);
281         logger.trace("{} {} publisher setBacklog({})", prefix, name, backlog);
282         socketPub.setBacklog(backlog);
283         logger.trace("{} {} publisher setDelayAttachOnConnect({})", prefix, name, delayAttachOnConnect);
284         socketPub.setDelayAttachOnConnect(delayAttachOnConnect);
285         if (identity != null) {
286             logger.trace("{} {} publisher setIdentity({})", prefix, name, Arrays.toString(identity));
287             socketPub.setIdentity(identity);
288         }
289         logger.trace("{} {} publisher setIPv4Only({})", prefix, name, ipv4Only);
290         socketPub.setIPv4Only(ipv4Only);
291         logger.trace("{} {} publisher setLinger({})", prefix, name, linger);
292         socketPub.setLinger(linger);
293         logger.trace("{} {} publisher setMaxMsgSize({})", prefix, name, maxMsgSize);
294         socketPub.setMaxMsgSize(maxMsgSize);
295         logger.trace("{} {} publisher setRcvHWM({})", prefix, name, rcvHwm);
296         socketPub.setRcvHWM(rcvHwm);
297         logger.trace("{} {} publisher setReceiveBufferSize({})", prefix, name, receiveBufferSize);
298         socketPub.setReceiveBufferSize(receiveBufferSize);
299         logger.trace("{} {} publisher setReceiveTimeOut({})", prefix, name, receiveTimeOut);
300         socketPub.setReceiveTimeOut(receiveTimeOut);
301         logger.trace("{} {} publisher setReconnectIVL({})", prefix, name, reconnectIVL);
302         socketPub.setReconnectIVL(reconnectIVL);
303         logger.trace("{} {} publisher setReconnectIVLMax({})", prefix, name, reconnectIVLMax);
304         socketPub.setReconnectIVLMax(reconnectIVLMax);
305         logger.trace("{} {} publisher setSendBufferSize({})", prefix, name, sendBufferSize);
306         socketPub.setSendBufferSize(sendBufferSize);
307         logger.trace("{} {} publisher setSendTimeOut({})", prefix, name, sendTimeOut);
308         socketPub.setSendTimeOut(sendTimeOut);
309         logger.trace("{} {} publisher setSndHWM({})", prefix, name, sndHwm);
310         socketPub.setSndHWM(sndHwm);
311         logger.trace("{} {} publisher setTCPKeepAlive({})", prefix, name, tcpKeepAlive);
312         socketPub.setTCPKeepAlive(tcpKeepAlive);
313         logger.trace("{} {} publisher setTCPKeepAliveCount({})", prefix, name, tcpKeepAliveCount);
314         socketPub.setTCPKeepAliveCount(tcpKeepAliveCount);
315         logger.trace("{} {} publisher setTCPKeepAliveIdle({})", prefix, name, tcpKeepAliveIdle);
316         socketPub.setTCPKeepAliveIdle(tcpKeepAliveIdle);
317         logger.trace("{} {} publisher setTCPKeepAliveInterval({})", prefix, name, tcpKeepAliveInterval);
318         socketPub.setTCPKeepAliveInterval(tcpKeepAliveInterval);
319         logger.trace("{} {} publisher setXpubVerbose({})", prefix, name, xpubVerbose);
320         socketPub.setXpubVerbose(xpubVerbose);
321         //
322         if (logger.isDebugEnabled()) {
323             logger.debug(
324                     "Created JeroMQ {} publisher {} type {}, affinity={}, backlog={}, delayAttachOnConnect={}, events={}, IPv4Only={}, linger={}, maxMsgSize={}, multicastHops={}, "
325                             + "rate={}, rcvHWM={}, receiveBufferSize={}, receiveTimeOut={}, reconnectIVL={}, reconnectIVLMax={}, recoveryInterval={}, sendBufferSize={}, "
326                             + "sendTimeOut={}, sndHWM={}, TCPKeepAlive={}, TCPKeepAliveCount={}, TCPKeepAliveIdle={}, TCPKeepAliveInterval={}, TCPKeepAliveSetting={}",
327                     name, socketPub, socketPub.getType(), socketPub.getAffinity(), socketPub.getBacklog(),
328                     socketPub.getDelayAttachOnConnect(), socketPub.getEvents(), socketPub.getIPv4Only(),
329                     socketPub.getLinger(), socketPub.getMaxMsgSize(), socketPub.getMulticastHops(), socketPub.getRate(),
330                     socketPub.getRcvHWM(), socketPub.getReceiveBufferSize(), socketPub.getReceiveTimeOut(),
331                     socketPub.getReconnectIVL(), socketPub.getReconnectIVLMax(), socketPub.getRecoveryInterval(),
332                     socketPub.getSendBufferSize(), socketPub.getSendTimeOut(), socketPub.getSndHWM(),
333                     socketPub.getTCPKeepAlive(), socketPub.getTCPKeepAliveCount(), socketPub.getTCPKeepAliveIdle(),
334                     socketPub.getTCPKeepAliveInterval(), socketPub.getTCPKeepAliveSetting());
335         }
336         for (final String endpoint : endpoints) {
337             logger.debug("Binding {} appender {} to endpoint {}", SIMPLE_NAME, name, endpoint);
338             socketPub.bind(endpoint);
339         }
340     }
341 
342     @Override
343     public synchronized void stop() {
344         super.stop();
345         final Socket socketPub = getPublisher();
346         if (socketPub != null) {
347             logger.debug("Closing {} appender {} publisher {}", SIMPLE_NAME, getName(), socketPub);
348             socketPub.close();
349             publisher = null;
350         }
351     }
352 
353     @Override
354     public String toString() {
355         return "JeroMqAppender [context=" + context + ", publisher=" + publisher + ", endpoints=" + endpoints + "]";
356     }
357 
358 }