1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.logging.log4j.core.appender.mom.jeromq;
19
20 import java.io.Serializable;
21 import java.util.ArrayList;
22 import java.util.Arrays;
23 import java.util.List;
24
25 import org.apache.logging.log4j.Logger;
26 import org.apache.logging.log4j.core.Filter;
27 import org.apache.logging.log4j.core.Layout;
28 import org.apache.logging.log4j.core.LogEvent;
29 import org.apache.logging.log4j.core.appender.AbstractAppender;
30 import org.apache.logging.log4j.core.config.Property;
31 import org.apache.logging.log4j.core.config.plugins.Plugin;
32 import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
33 import org.apache.logging.log4j.core.config.plugins.PluginElement;
34 import org.apache.logging.log4j.core.config.plugins.PluginFactory;
35 import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
36 import org.apache.logging.log4j.core.layout.PatternLayout;
37 import org.apache.logging.log4j.core.util.Log4jThread;
38 import org.apache.logging.log4j.status.StatusLogger;
39 import org.apache.logging.log4j.util.PropertiesUtil;
40 import org.apache.logging.log4j.util.Strings;
41 import org.zeromq.ZMQ;
42 import org.zeromq.ZMQ.Socket;
43
44
45
46
47
48
49
50
51
52
53
54 @Plugin(name = "JeroMQ", category = "Core", elementType = "appender", printObject = true)
55 public final class JeroMqAppender extends AbstractAppender {
56
57
58
59
60 static final String SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK = "log4j.jeromq.enableShutdownHook";
61
62
63
64
65 static final String SYS_PROPERTY_IO_THREADS = "log4j.jeromq.ioThreads";
66
67
68 private static volatile ZMQ.Context context;
69
70 private static final int DEFAULT_BACKLOG = 100;
71
72 private static final int DEFAULT_IVL = 100;
73
74 private static final int DEFAULT_RCV_HWM = 1000;
75
76 private static final int DEFAULT_SND_HWM = 1000;
77
78 private static Logger logger;
79
80
81 private static ZMQ.Socket publisher;
82
83 private static final long serialVersionUID = 1L;
84
85 private static final String SIMPLE_NAME = JeroMqAppender.class.getSimpleName();
86
87 static {
88 logger = StatusLogger.getLogger();
89 final PropertiesUtil managerProps = PropertiesUtil.getProperties();
90 final Integer ioThreads = managerProps.getIntegerProperty(SYS_PROPERTY_IO_THREADS, 1);
91 final Boolean enableShutdownHook = managerProps.getBooleanProperty(SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK, true);
92 final String simpleName = SIMPLE_NAME;
93 logger.trace("{} using ZMQ version {}", simpleName, ZMQ.getVersionString());
94 logger.trace("{} creating ZMQ context with ioThreads={}", simpleName, ioThreads);
95 context = ZMQ.context(ioThreads);
96 logger.trace("{} created ZMQ context {}", simpleName, context);
97 if (enableShutdownHook) {
98 final Thread hook = new Log4jThread(simpleName + "-shutdown") {
99 @Override
100 public void run() {
101 shutdown();
102 }
103 };
104 logger.trace("{} adding shutdown hook {}", simpleName, hook);
105 Runtime.getRuntime().addShutdownHook(hook);
106 }
107 }
108
109 private final long affinity;
110 private final long backlog;
111 private final boolean delayAttachOnConnect;
112 private final List<String> endpoints;
113 private final byte[] identity;
114 private final int ioThreads = 1;
115 private final boolean ipv4Only;
116 private final long linger;
117 private final long maxMsgSize;
118 private final long rcvHwm;
119 private final long receiveBufferSize;
120 private final int receiveTimeOut;
121 private final long reconnectIVL;
122 private final long reconnectIVLMax;
123 private final long sendBufferSize;
124 private int sendRcFalse;
125 private int sendRcTrue;
126 private final int sendTimeOut;
127 private final long sndHwm;
128 private final int tcpKeepAlive;
129 private final long tcpKeepAliveCount;
130 private final long tcpKeepAliveIdle;
131 private final long tcpKeepAliveInterval;
132 private final boolean xpubVerbose;
133
134 private JeroMqAppender(final String name, final Filter filter, final Layout<? extends Serializable> layout,
135 final boolean ignoreExceptions, final List<String> endpoints, final long affinity, final long backlog,
136 final boolean delayAttachOnConnect, final byte[] identity, final boolean ipv4Only, final long linger,
137 final long maxMsgSize, final long rcvHwm, final long receiveBufferSize, final int receiveTimeOut,
138 final long reconnectIVL, final long reconnectIVLMax, final long sendBufferSize, final int sendTimeOut,
139 final long sndHWM, final int tcpKeepAlive, final long tcpKeepAliveCount, final long tcpKeepAliveIdle,
140 final long tcpKeepAliveInterval, final boolean xpubVerbose) {
141 super(name, filter, layout, ignoreExceptions);
142 this.endpoints = endpoints;
143 this.affinity = affinity;
144 this.backlog = backlog;
145 this.delayAttachOnConnect = delayAttachOnConnect;
146 this.identity = identity;
147 this.ipv4Only = ipv4Only;
148 this.linger = linger;
149 this.maxMsgSize = maxMsgSize;
150 this.rcvHwm = rcvHwm;
151 this.receiveBufferSize = receiveBufferSize;
152 this.receiveTimeOut = receiveTimeOut;
153 this.reconnectIVL = reconnectIVL;
154 this.reconnectIVLMax = reconnectIVLMax;
155 this.sendBufferSize = sendBufferSize;
156 this.sendTimeOut = sendTimeOut;
157 this.sndHwm = sndHWM;
158 this.tcpKeepAlive = tcpKeepAlive;
159 this.tcpKeepAliveCount = tcpKeepAliveCount;
160 this.tcpKeepAliveIdle = tcpKeepAliveIdle;
161 this.tcpKeepAliveInterval = tcpKeepAliveInterval;
162 this.xpubVerbose = xpubVerbose;
163 }
164
165
166
167 @PluginFactory
168 public static JeroMqAppender createAppender(
169
170 @Required(message = "No name provided for JeroMqAppender") @PluginAttribute("name") final String name,
171 @PluginElement("Layout") Layout<?> layout,
172 @PluginElement("Filter") final Filter filter,
173 @PluginElement("Properties") final Property[] properties,
174
175 @PluginAttribute("ignoreExceptions") final boolean ignoreExceptions,
176
177 @PluginAttribute(value = "affinity", defaultLong = 0) final long affinity,
178 @PluginAttribute(value = "backlog", defaultLong = DEFAULT_BACKLOG) final long backlog,
179 @PluginAttribute(value = "delayAttachOnConnect", defaultBoolean = false) final boolean delayAttachOnConnect,
180 @PluginAttribute(value = "identity") final byte[] identity,
181 @PluginAttribute(value = "ipv4Only", defaultBoolean = true) final boolean ipv4Only,
182 @PluginAttribute(value = "linger", defaultLong = -1) final long linger,
183 @PluginAttribute(value = "maxMsgSize", defaultLong = -1) final long maxMsgSize,
184 @PluginAttribute(value = "rcvHwm", defaultLong = DEFAULT_RCV_HWM) final long rcvHwm,
185 @PluginAttribute(value = "receiveBufferSize", defaultLong = 0) final long receiveBufferSize,
186 @PluginAttribute(value = "receiveTimeOut", defaultLong = -1) final int receiveTimeOut,
187 @PluginAttribute(value = "reconnectIVL", defaultLong = DEFAULT_IVL) final long reconnectIVL,
188 @PluginAttribute(value = "reconnectIVLMax", defaultLong = 0) final long reconnectIVLMax,
189 @PluginAttribute(value = "sendBufferSize", defaultLong = 0) final long sendBufferSize,
190 @PluginAttribute(value = "sendTimeOut", defaultLong = -1) final int sendTimeOut,
191 @PluginAttribute(value = "sndHwm", defaultLong = DEFAULT_SND_HWM) final long sndHwm,
192 @PluginAttribute(value = "tcpKeepAlive", defaultInt = -1) final int tcpKeepAlive,
193 @PluginAttribute(value = "tcpKeepAliveCount", defaultLong = -1) final long tcpKeepAliveCount,
194 @PluginAttribute(value = "tcpKeepAliveIdle", defaultLong = -1) final long tcpKeepAliveIdle,
195 @PluginAttribute(value = "tcpKeepAliveInterval", defaultLong = -1) final long tcpKeepAliveInterval,
196 @PluginAttribute(value = "xpubVerbose", defaultBoolean = false) final boolean xpubVerbose
197
198 ) {
199 if (layout == null) {
200 layout = PatternLayout.createDefaultLayout();
201 }
202 List<String> endpoints;
203 if (properties == null) {
204 endpoints = new ArrayList<>(0);
205 } else {
206 endpoints = new ArrayList<>(properties.length);
207 for (final Property property : properties) {
208 if ("endpoint".equalsIgnoreCase(property.getName())) {
209 final String value = property.getValue();
210 if (Strings.isNotEmpty(value)) {
211 endpoints.add(value);
212 }
213 }
214 }
215 }
216 logger.debug("Creating JeroMqAppender with name={}, filter={}, layout={}, ignoreExceptions={}, endpoints={}",
217 name, filter, layout, ignoreExceptions, endpoints);
218 return new JeroMqAppender(name, filter, layout, ignoreExceptions, endpoints, affinity, backlog,
219 delayAttachOnConnect, identity, ipv4Only, linger, maxMsgSize, rcvHwm, receiveBufferSize,
220 receiveTimeOut, reconnectIVL, reconnectIVLMax, sendBufferSize, sendTimeOut, sndHwm, tcpKeepAlive,
221 tcpKeepAliveCount, tcpKeepAliveIdle, tcpKeepAliveInterval, xpubVerbose);
222 }
223
224 static ZMQ.Context getContext() {
225 return context;
226 }
227
228 private static ZMQ.Socket getPublisher() {
229 return publisher;
230 }
231
232 private static ZMQ.Socket newPublisher() {
233 logger.trace("{} creating a new ZMQ PUB socket with context {}", SIMPLE_NAME, context);
234 final Socket socketPub = context.socket(ZMQ.PUB);
235 logger.trace("{} created new ZMQ PUB socket {}", SIMPLE_NAME, socketPub);
236 return socketPub;
237 }
238
239 static void shutdown() {
240 if (context != null) {
241 logger.trace("{} terminating JeroMQ context {}", SIMPLE_NAME, context);
242 context.term();
243 context = null;
244 }
245 }
246
247 @Override
248 public synchronized void append(final LogEvent event) {
249 final String formattedMessage = event.getMessage().getFormattedMessage();
250 if (getPublisher().send(formattedMessage, 0)) {
251 sendRcTrue++;
252 } else {
253 sendRcFalse++;
254 logger.error("Appender {} could not send message {} to JeroMQ {}", getName(), sendRcFalse, formattedMessage);
255 }
256 }
257
258
259 int getSendRcFalse() {
260 return sendRcFalse;
261 }
262
263
264 int getSendRcTrue() {
265 return sendRcTrue;
266 }
267
268
269 void resetSendRcs() {
270 sendRcTrue = sendRcFalse = 0;
271 }
272
273 @Override
274 public synchronized void start() {
275 super.start();
276 publisher = newPublisher();
277 final String name = getName();
278 final String prefix = "JeroMQ Appender";
279 logger.debug("Starting {} {} using ZMQ version {}", prefix, name, ZMQ.getVersionString());
280 logger.debug("{} {} context {} with ioThreads={}", prefix, name, context, ioThreads);
281
282 final ZMQ.Socket socketPub = getPublisher();
283 logger.trace("{} {} setting {} publisher properties for instance {}", prefix, name, socketPub.getClass()
284 .getName(), socketPub);
285 logger.trace("{} {} publisher setAffinity({})", prefix, name, affinity);
286 socketPub.setAffinity(affinity);
287 logger.trace("{} {} publisher setBacklog({})", prefix, name, backlog);
288 socketPub.setBacklog(backlog);
289 logger.trace("{} {} publisher setDelayAttachOnConnect({})", prefix, name, delayAttachOnConnect);
290 socketPub.setDelayAttachOnConnect(delayAttachOnConnect);
291 if (identity != null) {
292 logger.trace("{} {} publisher setIdentity({})", prefix, name, Arrays.toString(identity));
293 socketPub.setIdentity(identity);
294 }
295 logger.trace("{} {} publisher setIPv4Only({})", prefix, name, ipv4Only);
296 socketPub.setIPv4Only(ipv4Only);
297 logger.trace("{} {} publisher setLinger({})", prefix, name, linger);
298 socketPub.setLinger(linger);
299 logger.trace("{} {} publisher setMaxMsgSize({})", prefix, name, maxMsgSize);
300 socketPub.setMaxMsgSize(maxMsgSize);
301 logger.trace("{} {} publisher setRcvHWM({})", prefix, name, rcvHwm);
302 socketPub.setRcvHWM(rcvHwm);
303 logger.trace("{} {} publisher setReceiveBufferSize({})", prefix, name, receiveBufferSize);
304 socketPub.setReceiveBufferSize(receiveBufferSize);
305 logger.trace("{} {} publisher setReceiveTimeOut({})", prefix, name, receiveTimeOut);
306 socketPub.setReceiveTimeOut(receiveTimeOut);
307 logger.trace("{} {} publisher setReconnectIVL({})", prefix, name, reconnectIVL);
308 socketPub.setReconnectIVL(reconnectIVL);
309 logger.trace("{} {} publisher setReconnectIVLMax({})", prefix, name, reconnectIVLMax);
310 socketPub.setReconnectIVLMax(reconnectIVLMax);
311 logger.trace("{} {} publisher setSendBufferSize({})", prefix, name, sendBufferSize);
312 socketPub.setSendBufferSize(sendBufferSize);
313 logger.trace("{} {} publisher setSendTimeOut({})", prefix, name, sendTimeOut);
314 socketPub.setSendTimeOut(sendTimeOut);
315 logger.trace("{} {} publisher setSndHWM({})", prefix, name, sndHwm);
316 socketPub.setSndHWM(sndHwm);
317 logger.trace("{} {} publisher setTCPKeepAlive({})", prefix, name, tcpKeepAlive);
318 socketPub.setTCPKeepAlive(tcpKeepAlive);
319 logger.trace("{} {} publisher setTCPKeepAliveCount({})", prefix, name, tcpKeepAliveCount);
320 socketPub.setTCPKeepAliveCount(tcpKeepAliveCount);
321 logger.trace("{} {} publisher setTCPKeepAliveIdle({})", prefix, name, tcpKeepAliveIdle);
322 socketPub.setTCPKeepAliveIdle(tcpKeepAliveIdle);
323 logger.trace("{} {} publisher setTCPKeepAliveInterval({})", prefix, name, tcpKeepAliveInterval);
324 socketPub.setTCPKeepAliveInterval(tcpKeepAliveInterval);
325 logger.trace("{} {} publisher setXpubVerbose({})", prefix, name, xpubVerbose);
326 socketPub.setXpubVerbose(xpubVerbose);
327
328 if (logger.isDebugEnabled()) {
329 logger.debug(
330 "Created JeroMQ {} publisher {} type {}, affinity={}, backlog={}, delayAttachOnConnect={}, events={}, IPv4Only={}, linger={}, maxMsgSize={}, multicastHops={}, "
331 + "rate={}, rcvHWM={}, receiveBufferSize={}, receiveTimeOut={}, reconnectIVL={}, reconnectIVLMax={}, recoveryInterval={}, sendBufferSize={}, "
332 + "sendTimeOut={}, sndHWM={}, TCPKeepAlive={}, TCPKeepAliveCount={}, TCPKeepAliveIdle={}, TCPKeepAliveInterval={}, TCPKeepAliveSetting={}",
333 name, socketPub, socketPub.getType(), socketPub.getAffinity(), socketPub.getBacklog(),
334 socketPub.getDelayAttachOnConnect(), socketPub.getEvents(), socketPub.getIPv4Only(),
335 socketPub.getLinger(), socketPub.getMaxMsgSize(), socketPub.getMulticastHops(),
336 socketPub.getRate(), socketPub.getRcvHWM(), socketPub.getReceiveBufferSize(),
337 socketPub.getReceiveTimeOut(), socketPub.getReconnectIVL(), socketPub.getReconnectIVLMax(),
338 socketPub.getRecoveryInterval(), socketPub.getSendBufferSize(), socketPub.getSendTimeOut(),
339 socketPub.getSndHWM(), socketPub.getTCPKeepAlive(), socketPub.getTCPKeepAliveCount(),
340 socketPub.getTCPKeepAliveIdle(), socketPub.getTCPKeepAliveInterval(),
341 socketPub.getTCPKeepAliveSetting());
342 }
343 for (final String endpoint : endpoints) {
344 logger.debug("Binding {} appender {} to endpoint {}", SIMPLE_NAME, name, endpoint);
345 socketPub.bind(endpoint);
346 }
347 }
348
349 @Override
350 public synchronized void stop() {
351 super.stop();
352 final Socket socketPub = getPublisher();
353 if (socketPub != null) {
354 logger.debug("Closing {} appender {} publisher {}", SIMPLE_NAME, getName(), socketPub);
355 socketPub.close();
356 publisher = null;
357 }
358 }
359
360 @Override
361 public String toString() {
362 return "JeroMqAppender [context=" + context + ", publisher=" + publisher + ", endpoints=" + endpoints + "]";
363 }
364
365 }