1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.logging.log4j.core.appender.mom.jeromq;
19
20 import java.io.Serializable;
21 import java.util.ArrayList;
22 import java.util.Arrays;
23 import java.util.List;
24
25 import org.apache.logging.log4j.Logger;
26 import org.apache.logging.log4j.core.Filter;
27 import org.apache.logging.log4j.core.Layout;
28 import org.apache.logging.log4j.core.LogEvent;
29 import org.apache.logging.log4j.core.appender.AbstractAppender;
30 import org.apache.logging.log4j.core.config.Property;
31 import org.apache.logging.log4j.core.config.plugins.Plugin;
32 import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
33 import org.apache.logging.log4j.core.config.plugins.PluginElement;
34 import org.apache.logging.log4j.core.config.plugins.PluginFactory;
35 import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
36 import org.apache.logging.log4j.core.layout.PatternLayout;
37 import org.apache.logging.log4j.status.StatusLogger;
38 import org.apache.logging.log4j.util.PropertiesUtil;
39 import org.apache.logging.log4j.util.Strings;
40 import org.zeromq.ZMQ;
41 import org.zeromq.ZMQ.Socket;
42
43
44
45
46
47
48
49
50
51
52
53 @Plugin(name = "JeroMQ", category = "Core", elementType = "appender", printObject = true)
54 public final class JeroMqAppender extends AbstractAppender {
55
56
57
58
59 static final String SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK = "log4j.jeromq.enableShutdownHook";
60
61
62
63
64 static final String SYS_PROPERTY_IO_THREADS = "log4j.jeromq.ioThreads";
65
66
67 private static volatile ZMQ.Context context;
68
69 private static final int DEFAULT_BACKLOG = 100;
70
71 private static final int DEFAULT_IVL = 100;
72
73 private static final int DEFAULT_RCV_HWM = 1000;
74
75 private static final int DEFAULT_SND_HWM = 1000;
76
77 private static Logger logger;
78
79
80 private static ZMQ.Socket publisher;
81
82 private static final long serialVersionUID = 1L;
83
84 private static final String SIMPLE_NAME = JeroMqAppender.class.getSimpleName();
85
86 static {
87 logger = StatusLogger.getLogger();
88 final PropertiesUtil managerProps = PropertiesUtil.getProperties();
89 final Integer ioThreads = managerProps.getIntegerProperty(SYS_PROPERTY_IO_THREADS, 1);
90 final Boolean enableShutdownHook = managerProps.getBooleanProperty(SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK, true);
91 final String simpleName = SIMPLE_NAME;
92 logger.trace("{} using ZMQ version {}", simpleName, ZMQ.getVersionString());
93 logger.trace("{} creating ZMQ context with ioThreads={}", simpleName, ioThreads);
94 context = ZMQ.context(ioThreads);
95 logger.trace("{} created ZMQ context {}", simpleName, context);
96 if (enableShutdownHook) {
97 final Thread hook = new Thread(simpleName + "-shutdown") {
98 @Override
99 public void run() {
100 shutdown();
101 }
102 };
103 logger.trace("{} adding shutdown hook {}", simpleName, hook);
104 Runtime.getRuntime().addShutdownHook(hook);
105 }
106 }
107
108 private final long affinity;
109 private final long backlog;
110 private final boolean delayAttachOnConnect;
111 private final List<String> endpoints;
112 private final byte[] identity;
113 private final int ioThreads = 1;
114 private final boolean ipv4Only;
115 private final long linger;
116 private final long maxMsgSize;
117 private final long rcvHwm;
118 private final long receiveBufferSize;
119 private final int receiveTimeOut;
120 private final long reconnectIVL;
121 private final long reconnectIVLMax;
122 private final long sendBufferSize;
123 private int sendRcFalse;
124 private int sendRcTrue;
125 private final int sendTimeOut;
126 private final long sndHwm;
127 private final int tcpKeepAlive;
128 private final long tcpKeepAliveCount;
129 private final long tcpKeepAliveIdle;
130 private final long tcpKeepAliveInterval;
131 private final boolean xpubVerbose;
132
133 private JeroMqAppender(final String name, final Filter filter, final Layout<? extends Serializable> layout,
134 final boolean ignoreExceptions, final List<String> endpoints, final long affinity, final long backlog,
135 final boolean delayAttachOnConnect, final byte[] identity, final boolean ipv4Only, final long linger,
136 final long maxMsgSize, final long rcvHwm, final long receiveBufferSize, final int receiveTimeOut,
137 final long reconnectIVL, final long reconnectIVLMax, final long sendBufferSize, final int sendTimeOut,
138 final long sndHWM, final int tcpKeepAlive, final long tcpKeepAliveCount, final long tcpKeepAliveIdle,
139 final long tcpKeepAliveInterval, final boolean xpubVerbose) {
140 super(name, filter, layout, ignoreExceptions);
141 this.endpoints = endpoints;
142 this.affinity = affinity;
143 this.backlog = backlog;
144 this.delayAttachOnConnect = delayAttachOnConnect;
145 this.identity = identity;
146 this.ipv4Only = ipv4Only;
147 this.linger = linger;
148 this.maxMsgSize = maxMsgSize;
149 this.rcvHwm = rcvHwm;
150 this.receiveBufferSize = receiveBufferSize;
151 this.receiveTimeOut = receiveTimeOut;
152 this.reconnectIVL = reconnectIVL;
153 this.reconnectIVLMax = reconnectIVLMax;
154 this.sendBufferSize = sendBufferSize;
155 this.sendTimeOut = sendTimeOut;
156 this.sndHwm = sndHWM;
157 this.tcpKeepAlive = tcpKeepAlive;
158 this.tcpKeepAliveCount = tcpKeepAliveCount;
159 this.tcpKeepAliveIdle = tcpKeepAliveIdle;
160 this.tcpKeepAliveInterval = tcpKeepAliveInterval;
161 this.xpubVerbose = xpubVerbose;
162 }
163
164
165
166 @PluginFactory
167 public static JeroMqAppender createAppender(
168
169 @Required(message = "No name provided for JeroMqAppender") @PluginAttribute("name") final String name,
170 @PluginElement("Layout") Layout<?> layout,
171 @PluginElement("Filter") final Filter filter,
172 @PluginElement("Properties") final Property[] properties,
173
174 @PluginAttribute("ignoreExceptions") final boolean ignoreExceptions,
175
176 @PluginAttribute(value = "affinity", defaultLong = 0) final long affinity,
177 @PluginAttribute(value = "backlog", defaultLong = DEFAULT_BACKLOG) final long backlog,
178 @PluginAttribute(value = "delayAttachOnConnect", defaultBoolean = false) final boolean delayAttachOnConnect,
179 @PluginAttribute(value = "identity") final byte[] identity,
180 @PluginAttribute(value = "ipv4Only", defaultBoolean = true) final boolean ipv4Only,
181 @PluginAttribute(value = "linger", defaultLong = -1) final long linger,
182 @PluginAttribute(value = "maxMsgSize", defaultLong = -1) final long maxMsgSize,
183 @PluginAttribute(value = "rcvHwm", defaultLong = DEFAULT_RCV_HWM) final long rcvHwm,
184 @PluginAttribute(value = "receiveBufferSize", defaultLong = 0) final long receiveBufferSize,
185 @PluginAttribute(value = "receiveTimeOut", defaultLong = -1) final int receiveTimeOut,
186 @PluginAttribute(value = "reconnectIVL", defaultLong = DEFAULT_IVL) final long reconnectIVL,
187 @PluginAttribute(value = "reconnectIVLMax", defaultLong = 0) final long reconnectIVLMax,
188 @PluginAttribute(value = "sendBufferSize", defaultLong = 0) final long sendBufferSize,
189 @PluginAttribute(value = "sendTimeOut", defaultLong = -1) final int sendTimeOut,
190 @PluginAttribute(value = "sndHwm", defaultLong = DEFAULT_SND_HWM) final long sndHwm,
191 @PluginAttribute(value = "tcpKeepAlive", defaultInt = -1) final int tcpKeepAlive,
192 @PluginAttribute(value = "tcpKeepAliveCount", defaultLong = -1) final long tcpKeepAliveCount,
193 @PluginAttribute(value = "tcpKeepAliveIdle", defaultLong = -1) final long tcpKeepAliveIdle,
194 @PluginAttribute(value = "tcpKeepAliveInterval", defaultLong = -1) final long tcpKeepAliveInterval,
195 @PluginAttribute(value = "xpubVerbose", defaultBoolean = false) final boolean xpubVerbose
196
197 ) {
198 if (layout == null) {
199 layout = PatternLayout.createDefaultLayout();
200 }
201 List<String> endpoints;
202 if (properties == null) {
203 endpoints = new ArrayList<>(0);
204 } else {
205 endpoints = new ArrayList<>(properties.length);
206 for (final Property property : properties) {
207 if ("endpoint".equalsIgnoreCase(property.getName())) {
208 final String value = property.getValue();
209 if (Strings.isNotEmpty(value)) {
210 endpoints.add(value);
211 }
212 }
213 }
214 }
215 logger.debug("Creating JeroMqAppender with name={}, filter={}, layout={}, ignoreExceptions={}, endpoints={}",
216 name, filter, layout, ignoreExceptions, endpoints);
217 return new JeroMqAppender(name, filter, layout, ignoreExceptions, endpoints, affinity, backlog,
218 delayAttachOnConnect, identity, ipv4Only, linger, maxMsgSize, rcvHwm, receiveBufferSize,
219 receiveTimeOut, reconnectIVL, reconnectIVLMax, sendBufferSize, sendTimeOut, sndHwm, tcpKeepAlive,
220 tcpKeepAliveCount, tcpKeepAliveIdle, tcpKeepAliveInterval, xpubVerbose);
221 }
222
223 static ZMQ.Context getContext() {
224 return context;
225 }
226
227 private static ZMQ.Socket getPublisher() {
228 return publisher;
229 }
230
231 private static ZMQ.Socket newPublisher() {
232 logger.trace("{} creating a new ZMQ PUB socket with context {}", SIMPLE_NAME, context);
233 final Socket socketPub = context.socket(ZMQ.PUB);
234 logger.trace("{} created new ZMQ PUB socket {}", SIMPLE_NAME, socketPub);
235 return socketPub;
236 }
237
238 static void shutdown() {
239 if (context != null) {
240 logger.trace("{} terminating JeroMQ context {}", SIMPLE_NAME, context);
241 context.term();
242 context = null;
243 }
244 }
245
246 @Override
247 public synchronized void append(final LogEvent event) {
248 final String formattedMessage = event.getMessage().getFormattedMessage();
249 if (getPublisher().send(formattedMessage, 0)) {
250 sendRcTrue++;
251 } else {
252 sendRcFalse++;
253 logger.error("Appender {} could not send message {} to JeroMQ {}", getName(), sendRcFalse, formattedMessage);
254 }
255 }
256
257
258 int getSendRcFalse() {
259 return sendRcFalse;
260 }
261
262
263 int getSendRcTrue() {
264 return sendRcTrue;
265 }
266
267
268 void resetSendRcs() {
269 sendRcTrue = sendRcFalse = 0;
270 }
271
272 @Override
273 public synchronized void start() {
274 super.start();
275 publisher = newPublisher();
276 final String name = getName();
277 final String prefix = "JeroMQ Appender";
278 logger.debug("Starting {} {} using ZMQ version {}", prefix, name, ZMQ.getVersionString());
279 logger.debug("{} {} context {} with ioThreads={}", prefix, name, context, ioThreads);
280
281 final ZMQ.Socket socketPub = getPublisher();
282 logger.trace("{} {} setting {} publisher properties for instance {}", prefix, name, socketPub.getClass()
283 .getName(), socketPub);
284 logger.trace("{} {} publisher setAffinity({})", prefix, name, affinity);
285 socketPub.setAffinity(affinity);
286 logger.trace("{} {} publisher setBacklog({})", prefix, name, backlog);
287 socketPub.setBacklog(backlog);
288 logger.trace("{} {} publisher setDelayAttachOnConnect({})", prefix, name, delayAttachOnConnect);
289 socketPub.setDelayAttachOnConnect(delayAttachOnConnect);
290 if (identity != null) {
291 logger.trace("{} {} publisher setIdentity({})", prefix, name, Arrays.toString(identity));
292 socketPub.setIdentity(identity);
293 }
294 logger.trace("{} {} publisher setIPv4Only({})", prefix, name, ipv4Only);
295 socketPub.setIPv4Only(ipv4Only);
296 logger.trace("{} {} publisher setLinger({})", prefix, name, linger);
297 socketPub.setLinger(linger);
298 logger.trace("{} {} publisher setMaxMsgSize({})", prefix, name, maxMsgSize);
299 socketPub.setMaxMsgSize(maxMsgSize);
300 logger.trace("{} {} publisher setRcvHWM({})", prefix, name, rcvHwm);
301 socketPub.setRcvHWM(rcvHwm);
302 logger.trace("{} {} publisher setReceiveBufferSize({})", prefix, name, receiveBufferSize);
303 socketPub.setReceiveBufferSize(receiveBufferSize);
304 logger.trace("{} {} publisher setReceiveTimeOut({})", prefix, name, receiveTimeOut);
305 socketPub.setReceiveTimeOut(receiveTimeOut);
306 logger.trace("{} {} publisher setReconnectIVL({})", prefix, name, reconnectIVL);
307 socketPub.setReconnectIVL(reconnectIVL);
308 logger.trace("{} {} publisher setReconnectIVLMax({})", prefix, name, reconnectIVLMax);
309 socketPub.setReconnectIVLMax(reconnectIVLMax);
310 logger.trace("{} {} publisher setSendBufferSize({})", prefix, name, sendBufferSize);
311 socketPub.setSendBufferSize(sendBufferSize);
312 logger.trace("{} {} publisher setSendTimeOut({})", prefix, name, sendTimeOut);
313 socketPub.setSendTimeOut(sendTimeOut);
314 logger.trace("{} {} publisher setSndHWM({})", prefix, name, sndHwm);
315 socketPub.setSndHWM(sndHwm);
316 logger.trace("{} {} publisher setTCPKeepAlive({})", prefix, name, tcpKeepAlive);
317 socketPub.setTCPKeepAlive(tcpKeepAlive);
318 logger.trace("{} {} publisher setTCPKeepAliveCount({})", prefix, name, tcpKeepAliveCount);
319 socketPub.setTCPKeepAliveCount(tcpKeepAliveCount);
320 logger.trace("{} {} publisher setTCPKeepAliveIdle({})", prefix, name, tcpKeepAliveIdle);
321 socketPub.setTCPKeepAliveIdle(tcpKeepAliveIdle);
322 logger.trace("{} {} publisher setTCPKeepAliveInterval({})", prefix, name, tcpKeepAliveInterval);
323 socketPub.setTCPKeepAliveInterval(tcpKeepAliveInterval);
324 logger.trace("{} {} publisher setXpubVerbose({})", prefix, name, xpubVerbose);
325 socketPub.setXpubVerbose(xpubVerbose);
326
327 if (logger.isDebugEnabled()) {
328 logger.debug(
329 "Created JeroMQ {} publisher {} type {}, affinity={}, backlog={}, delayAttachOnConnect={}, events={}, IPv4Only={}, linger={}, maxMsgSize={}, multicastHops={}, "
330 + "rate={}, rcvHWM={}, receiveBufferSize={}, receiveTimeOut={}, reconnectIVL={}, reconnectIVLMax={}, recoveryInterval={}, sendBufferSize={}, "
331 + "sendTimeOut={}, sndHWM={}, TCPKeepAlive={}, TCPKeepAliveCount={}, TCPKeepAliveIdle={}, TCPKeepAliveInterval={}, TCPKeepAliveSetting={}",
332 name, socketPub, socketPub.getType(), socketPub.getAffinity(), socketPub.getBacklog(),
333 socketPub.getDelayAttachOnConnect(), socketPub.getEvents(), socketPub.getIPv4Only(),
334 socketPub.getLinger(), socketPub.getMaxMsgSize(), socketPub.getMulticastHops(),
335 socketPub.getRate(), socketPub.getRcvHWM(), socketPub.getReceiveBufferSize(),
336 socketPub.getReceiveTimeOut(), socketPub.getReconnectIVL(), socketPub.getReconnectIVLMax(),
337 socketPub.getRecoveryInterval(), socketPub.getSendBufferSize(), socketPub.getSendTimeOut(),
338 socketPub.getSndHWM(), socketPub.getTCPKeepAlive(), socketPub.getTCPKeepAliveCount(),
339 socketPub.getTCPKeepAliveIdle(), socketPub.getTCPKeepAliveInterval(),
340 socketPub.getTCPKeepAliveSetting());
341 }
342 for (final String endpoint : endpoints) {
343 logger.debug("Binding {} appender {} to endpoint {}", SIMPLE_NAME, name, endpoint);
344 socketPub.bind(endpoint);
345 }
346 }
347
348 @Override
349 public synchronized void stop() {
350 super.stop();
351 final Socket socketPub = getPublisher();
352 if (socketPub != null) {
353 logger.debug("Closing {} appender {} publisher {}", SIMPLE_NAME, getName(), socketPub);
354 socketPub.close();
355 publisher = null;
356 }
357 }
358
359 @Override
360 public String toString() {
361 return "JeroMqAppender [context=" + context + ", publisher=" + publisher + ", endpoints=" + endpoints + "]";
362 }
363
364 }