1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.logging.log4j.core.appender.mom.jeromq;
19
20 import java.io.Serializable;
21 import java.util.ArrayList;
22 import java.util.Arrays;
23 import java.util.List;
24
25 import org.apache.logging.log4j.Logger;
26 import org.apache.logging.log4j.core.Filter;
27 import org.apache.logging.log4j.core.Layout;
28 import org.apache.logging.log4j.core.LogEvent;
29 import org.apache.logging.log4j.core.appender.AbstractAppender;
30 import org.apache.logging.log4j.core.config.Property;
31 import org.apache.logging.log4j.core.config.plugins.Plugin;
32 import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
33 import org.apache.logging.log4j.core.config.plugins.PluginElement;
34 import org.apache.logging.log4j.core.config.plugins.PluginFactory;
35 import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
36 import org.apache.logging.log4j.core.layout.PatternLayout;
37 import org.apache.logging.log4j.status.StatusLogger;
38 import org.apache.logging.log4j.util.PropertiesUtil;
39 import org.apache.logging.log4j.util.Strings;
40 import org.zeromq.ZMQ;
41 import org.zeromq.ZMQ.Socket;
42
43
44
45
46
47
48
49
50
51
52
53 @Plugin(name = "JeroMQ", category = "Core", elementType = "appender", printObject = true)
54 public final class JeroMqAppender extends AbstractAppender {
55
56
57 private static volatile ZMQ.Context context;
58
59 private static final int DEFAULT_BACKLOG = 100;
60
61 private static final int DEFAULT_IVL = 100;
62
63 private static final int DEFAULT_RCV_HWM = 1000;
64
65 private static final int DEFAULT_SND_HWM = 1000;
66
67 private static Logger logger;
68
69
70 private static ZMQ.Socket publisher;
71
72 private static final long serialVersionUID = 1L;
73
74 private static final String SIMPLE_NAME = JeroMqAppender.class.getSimpleName();
75
76 static final String SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK = "log4j.jeromq.enableShutdownHook";
77
78 static final String SYS_PROPERTY_IO_THREADS = "log4j.jeromq.ioThreads";
79
80 static {
81 logger = StatusLogger.getLogger();
82 final PropertiesUtil managerProps = PropertiesUtil.getProperties();
83 final Integer ioThreads = managerProps.getIntegerProperty(SYS_PROPERTY_IO_THREADS, 1);
84 final Boolean enableShutdownHook = managerProps.getBooleanProperty(SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK, true);
85 final String simpleName = SIMPLE_NAME;
86 logger.trace("{} using ZMQ version {}", simpleName, ZMQ.getVersionString());
87 logger.trace("{} creating ZMQ context with ioThreads={}", simpleName, ioThreads);
88 context = ZMQ.context(ioThreads);
89 logger.trace("{} created ZMQ context {}", simpleName, context);
90 if (enableShutdownHook) {
91 final Thread hook = new Thread(simpleName + "-shutdown") {
92 @Override
93 public void run() {
94 shutdown();
95 }
96 };
97 logger.trace("{} adding shutdown hook {}", simpleName, hook);
98 Runtime.getRuntime().addShutdownHook(hook);
99 }
100 }
101
102
103
104 @PluginFactory
105 public static JeroMqAppender createAppender(
106
107 @Required(message = "No name provided for JeroMqAppender") @PluginAttribute("name") final String name,
108 @PluginElement("Layout") Layout<?> layout,
109 @PluginElement("Filters") final Filter filter,
110 @PluginElement("Properties") final Property[] properties,
111
112 @PluginAttribute("ignoreExceptions") final boolean ignoreExceptions,
113
114 @PluginAttribute(value = "affinity", defaultLong = 0) final long affinity,
115 @PluginAttribute(value = "backlog", defaultLong = DEFAULT_BACKLOG) final long backlog,
116 @PluginAttribute(value = "delayAttachOnConnect", defaultBoolean = false) final boolean delayAttachOnConnect,
117 @PluginAttribute(value = "identity") final byte[] identity,
118 @PluginAttribute(value = "ipv4Only", defaultBoolean = true) final boolean ipv4Only,
119 @PluginAttribute(value = "linger", defaultLong = -1) final long linger,
120 @PluginAttribute(value = "maxMsgSize", defaultLong = -1) final long maxMsgSize,
121 @PluginAttribute(value = "rcvHwm", defaultLong = DEFAULT_RCV_HWM) final long rcvHwm,
122 @PluginAttribute(value = "receiveBufferSize", defaultLong = 0) final long receiveBufferSize,
123 @PluginAttribute(value = "receiveTimeOut", defaultLong = -1) final int receiveTimeOut,
124 @PluginAttribute(value = "reconnectIVL", defaultLong = DEFAULT_IVL) final long reconnectIVL,
125 @PluginAttribute(value = "reconnectIVLMax", defaultLong = 0) final long reconnectIVLMax,
126 @PluginAttribute(value = "sendBufferSize", defaultLong = 0) final long sendBufferSize,
127 @PluginAttribute(value = "sendTimeOut", defaultLong = -1) final int sendTimeOut,
128 @PluginAttribute(value = "sndHwm", defaultLong = DEFAULT_SND_HWM) final long sndHwm,
129 @PluginAttribute(value = "tcpKeepAlive", defaultInt = -1) final int tcpKeepAlive,
130 @PluginAttribute(value = "tcpKeepAliveCount", defaultLong = -1) final long tcpKeepAliveCount,
131 @PluginAttribute(value = "tcpKeepAliveIdle", defaultLong = -1) final long tcpKeepAliveIdle,
132 @PluginAttribute(value = "tcpKeepAliveInterval", defaultLong = -1) final long tcpKeepAliveInterval,
133 @PluginAttribute(value = "xpubVerbose", defaultBoolean = false) final boolean xpubVerbose
134
135 ) {
136 if (layout == null) {
137 layout = PatternLayout.createDefaultLayout();
138 }
139 List<String> endpoints;
140 if (properties == null) {
141 endpoints = new ArrayList<>(0);
142 } else {
143 endpoints = new ArrayList<>(properties.length);
144 for (final Property property : properties) {
145 if ("endpoint".equalsIgnoreCase(property.getName())) {
146 final String value = property.getValue();
147 if (Strings.isNotEmpty(value)) {
148 endpoints.add(value);
149 }
150 }
151 }
152 }
153 logger.debug("Creating JeroMqAppender with name={}, filter={}, layout={}, ignoreExceptions={}, endpoints={}",
154 name, filter, layout, ignoreExceptions, endpoints);
155 return new JeroMqAppender(name, filter, layout, ignoreExceptions, endpoints, affinity, backlog,
156 delayAttachOnConnect, identity, ipv4Only, linger, maxMsgSize, rcvHwm, receiveBufferSize, receiveTimeOut,
157 reconnectIVL, reconnectIVLMax, sendBufferSize, sendTimeOut, sndHwm, tcpKeepAlive, tcpKeepAliveCount,
158 tcpKeepAliveIdle, tcpKeepAliveInterval, xpubVerbose);
159 }
160
161 static ZMQ.Context getContext() {
162 return context;
163 }
164
165 private static ZMQ.Socket getPublisher() {
166 return publisher;
167 }
168
169 private static ZMQ.Socket newPublisher() {
170 logger.trace("{} creating a new ZMQ PUB socket with context {}", SIMPLE_NAME, context);
171 final Socket socketPub = context.socket(ZMQ.PUB);
172 logger.trace("{} created new ZMQ PUB socket {}", SIMPLE_NAME, socketPub);
173 return socketPub;
174 }
175
176 static void shutdown() {
177 if (context != null) {
178 logger.trace("{} terminating JeroMQ context {}", SIMPLE_NAME, context);
179 context.term();
180 context = null;
181 }
182 }
183
184 private final long affinity;
185 private final long backlog;
186 private final boolean delayAttachOnConnect;
187 private final List<String> endpoints;
188 private final byte[] identity;
189 private final int ioThreads = 1;
190 private final boolean ipv4Only;
191 private final long linger;
192 private final long maxMsgSize;
193 private final long rcvHwm;
194 private final long receiveBufferSize;
195 private final int receiveTimeOut;
196 private final long reconnectIVL;
197 private final long reconnectIVLMax;
198 private final long sendBufferSize;
199 private int sendRcFalse;
200 private int sendRcTrue;
201 private final int sendTimeOut;
202 private final long sndHwm;
203 private final int tcpKeepAlive;
204 private final long tcpKeepAliveCount;
205 private final long tcpKeepAliveIdle;
206 private final long tcpKeepAliveInterval;
207 private final boolean xpubVerbose;
208
209 private JeroMqAppender(final String name, final Filter filter, final Layout<? extends Serializable> layout,
210 final boolean ignoreExceptions, final List<String> endpoints, final long affinity, final long backlog,
211 final boolean delayAttachOnConnect, final byte[] identity, final boolean ipv4Only, final long linger,
212 final long maxMsgSize, final long rcvHwm, final long receiveBufferSize, final int receiveTimeOut,
213 final long reconnectIVL, final long reconnectIVLMax, final long sendBufferSize, final int sendTimeOut,
214 final long sndHWM, final int tcpKeepAlive, final long tcpKeepAliveCount, final long tcpKeepAliveIdle,
215 final long tcpKeepAliveInterval, final boolean xpubVerbose) {
216 super(name, filter, layout, ignoreExceptions);
217 this.endpoints = endpoints;
218 this.affinity = affinity;
219 this.backlog = backlog;
220 this.delayAttachOnConnect = delayAttachOnConnect;
221 this.identity = identity;
222 this.ipv4Only = ipv4Only;
223 this.linger = linger;
224 this.maxMsgSize = maxMsgSize;
225 this.rcvHwm = rcvHwm;
226 this.receiveBufferSize = receiveBufferSize;
227 this.receiveTimeOut = receiveTimeOut;
228 this.reconnectIVL = reconnectIVL;
229 this.reconnectIVLMax = reconnectIVLMax;
230 this.sendBufferSize = sendBufferSize;
231 this.sendTimeOut = sendTimeOut;
232 this.sndHwm = sndHWM;
233 this.tcpKeepAlive = tcpKeepAlive;
234 this.tcpKeepAliveCount = tcpKeepAliveCount;
235 this.tcpKeepAliveIdle = tcpKeepAliveIdle;
236 this.tcpKeepAliveInterval = tcpKeepAliveInterval;
237 this.xpubVerbose = xpubVerbose;
238 }
239
240 @Override
241 public synchronized void append(final LogEvent event) {
242 final String formattedMessage = event.getMessage().getFormattedMessage();
243 if (getPublisher().send(formattedMessage, 0)) {
244 sendRcTrue++;
245 } else {
246 sendRcFalse++;
247 logger.error("Appender {} could not send message {} to JeroMQ {}", getName(), sendRcFalse,
248 formattedMessage);
249 }
250 }
251
252
253 int getSendRcFalse() {
254 return sendRcFalse;
255 }
256
257
258 int getSendRcTrue() {
259 return sendRcTrue;
260 }
261
262
263 void resetSendRcs() {
264 sendRcTrue = sendRcFalse = 0;
265 }
266
267 @Override
268 public synchronized void start() {
269 super.start();
270 publisher = newPublisher();
271 final String name = getName();
272 final String prefix = "JeroMQ Appender";
273 logger.debug("Starting {} {} using ZMQ version {}", prefix, name, ZMQ.getVersionString());
274 logger.debug("{} {} context {} with ioThreads={}", prefix, name, context, ioThreads);
275
276 final ZMQ.Socket socketPub = getPublisher();
277 logger.trace("{} {} setting {} publisher properties for instance {}", prefix, name,
278 socketPub.getClass().getName(), socketPub);
279 logger.trace("{} {} publisher setAffinity({})", prefix, name, affinity);
280 socketPub.setAffinity(affinity);
281 logger.trace("{} {} publisher setBacklog({})", prefix, name, backlog);
282 socketPub.setBacklog(backlog);
283 logger.trace("{} {} publisher setDelayAttachOnConnect({})", prefix, name, delayAttachOnConnect);
284 socketPub.setDelayAttachOnConnect(delayAttachOnConnect);
285 if (identity != null) {
286 logger.trace("{} {} publisher setIdentity({})", prefix, name, Arrays.toString(identity));
287 socketPub.setIdentity(identity);
288 }
289 logger.trace("{} {} publisher setIPv4Only({})", prefix, name, ipv4Only);
290 socketPub.setIPv4Only(ipv4Only);
291 logger.trace("{} {} publisher setLinger({})", prefix, name, linger);
292 socketPub.setLinger(linger);
293 logger.trace("{} {} publisher setMaxMsgSize({})", prefix, name, maxMsgSize);
294 socketPub.setMaxMsgSize(maxMsgSize);
295 logger.trace("{} {} publisher setRcvHWM({})", prefix, name, rcvHwm);
296 socketPub.setRcvHWM(rcvHwm);
297 logger.trace("{} {} publisher setReceiveBufferSize({})", prefix, name, receiveBufferSize);
298 socketPub.setReceiveBufferSize(receiveBufferSize);
299 logger.trace("{} {} publisher setReceiveTimeOut({})", prefix, name, receiveTimeOut);
300 socketPub.setReceiveTimeOut(receiveTimeOut);
301 logger.trace("{} {} publisher setReconnectIVL({})", prefix, name, reconnectIVL);
302 socketPub.setReconnectIVL(reconnectIVL);
303 logger.trace("{} {} publisher setReconnectIVLMax({})", prefix, name, reconnectIVLMax);
304 socketPub.setReconnectIVLMax(reconnectIVLMax);
305 logger.trace("{} {} publisher setSendBufferSize({})", prefix, name, sendBufferSize);
306 socketPub.setSendBufferSize(sendBufferSize);
307 logger.trace("{} {} publisher setSendTimeOut({})", prefix, name, sendTimeOut);
308 socketPub.setSendTimeOut(sendTimeOut);
309 logger.trace("{} {} publisher setSndHWM({})", prefix, name, sndHwm);
310 socketPub.setSndHWM(sndHwm);
311 logger.trace("{} {} publisher setTCPKeepAlive({})", prefix, name, tcpKeepAlive);
312 socketPub.setTCPKeepAlive(tcpKeepAlive);
313 logger.trace("{} {} publisher setTCPKeepAliveCount({})", prefix, name, tcpKeepAliveCount);
314 socketPub.setTCPKeepAliveCount(tcpKeepAliveCount);
315 logger.trace("{} {} publisher setTCPKeepAliveIdle({})", prefix, name, tcpKeepAliveIdle);
316 socketPub.setTCPKeepAliveIdle(tcpKeepAliveIdle);
317 logger.trace("{} {} publisher setTCPKeepAliveInterval({})", prefix, name, tcpKeepAliveInterval);
318 socketPub.setTCPKeepAliveInterval(tcpKeepAliveInterval);
319 logger.trace("{} {} publisher setXpubVerbose({})", prefix, name, xpubVerbose);
320 socketPub.setXpubVerbose(xpubVerbose);
321
322 if (logger.isDebugEnabled()) {
323 logger.debug(
324 "Created JeroMQ {} publisher {} type {}, affinity={}, backlog={}, delayAttachOnConnect={}, events={}, IPv4Only={}, linger={}, maxMsgSize={}, multicastHops={}, "
325 + "rate={}, rcvHWM={}, receiveBufferSize={}, receiveTimeOut={}, reconnectIVL={}, reconnectIVLMax={}, recoveryInterval={}, sendBufferSize={}, "
326 + "sendTimeOut={}, sndHWM={}, TCPKeepAlive={}, TCPKeepAliveCount={}, TCPKeepAliveIdle={}, TCPKeepAliveInterval={}, TCPKeepAliveSetting={}",
327 name, socketPub, socketPub.getType(), socketPub.getAffinity(), socketPub.getBacklog(),
328 socketPub.getDelayAttachOnConnect(), socketPub.getEvents(), socketPub.getIPv4Only(),
329 socketPub.getLinger(), socketPub.getMaxMsgSize(), socketPub.getMulticastHops(), socketPub.getRate(),
330 socketPub.getRcvHWM(), socketPub.getReceiveBufferSize(), socketPub.getReceiveTimeOut(),
331 socketPub.getReconnectIVL(), socketPub.getReconnectIVLMax(), socketPub.getRecoveryInterval(),
332 socketPub.getSendBufferSize(), socketPub.getSendTimeOut(), socketPub.getSndHWM(),
333 socketPub.getTCPKeepAlive(), socketPub.getTCPKeepAliveCount(), socketPub.getTCPKeepAliveIdle(),
334 socketPub.getTCPKeepAliveInterval(), socketPub.getTCPKeepAliveSetting());
335 }
336 for (final String endpoint : endpoints) {
337 logger.debug("Binding {} appender {} to endpoint {}", SIMPLE_NAME, name, endpoint);
338 socketPub.bind(endpoint);
339 }
340 }
341
342 @Override
343 public synchronized void stop() {
344 super.stop();
345 final Socket socketPub = getPublisher();
346 if (socketPub != null) {
347 logger.debug("Closing {} appender {} publisher {}", SIMPLE_NAME, getName(), socketPub);
348 socketPub.close();
349 publisher = null;
350 }
351 }
352
353 @Override
354 public String toString() {
355 return "JeroMqAppender [context=" + context + ", publisher=" + publisher + ", endpoints=" + endpoints + "]";
356 }
357
358 }