001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017 018package org.apache.logging.log4j.core.appender.mom.jeromq; 019 020import java.io.Serializable; 021import java.util.ArrayList; 022import java.util.Arrays; 023import java.util.List; 024 025import org.apache.logging.log4j.Logger; 026import org.apache.logging.log4j.core.Filter; 027import org.apache.logging.log4j.core.Layout; 028import org.apache.logging.log4j.core.LogEvent; 029import org.apache.logging.log4j.core.appender.AbstractAppender; 030import org.apache.logging.log4j.core.config.Property; 031import org.apache.logging.log4j.core.config.plugins.Plugin; 032import org.apache.logging.log4j.core.config.plugins.PluginAttribute; 033import org.apache.logging.log4j.core.config.plugins.PluginElement; 034import org.apache.logging.log4j.core.config.plugins.PluginFactory; 035import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required; 036import org.apache.logging.log4j.core.layout.PatternLayout; 037import org.apache.logging.log4j.core.util.Log4jThread; 038import org.apache.logging.log4j.status.StatusLogger; 039import org.apache.logging.log4j.util.PropertiesUtil; 040import org.apache.logging.log4j.util.Strings; 041import org.zeromq.ZMQ; 042import org.zeromq.ZMQ.Socket; 043 044/** 045 * Sends log events to one or more ZeroMQ (JeroMQ) endpoints. 046 * <p> 047 * Requires the JeroMQ jar (LGPL as of 0.3.5) 048 * </p> 049 */ 050// TODO 051// Some methods are synchronized because a ZMQ.Socket is not thread-safe 052// Using a ThreadLocal for the publisher hangs tests on shutdown. There must be 053// some issue on threads owning certain resources as opposed to others. 054@Plugin(name = "JeroMQ", category = "Core", elementType = "appender", printObject = true) 055public final class JeroMqAppender extends AbstractAppender { 056 057 /** 058 * System property to enable shutdown hook. 059 */ 060 static final String SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK = "log4j.jeromq.enableShutdownHook"; 061 062 /** 063 * System property to control JeroMQ I/O thread count. 064 */ 065 static final String SYS_PROPERTY_IO_THREADS = "log4j.jeromq.ioThreads"; 066 067 // Per ZMQ docs, there should usually only be one ZMQ context per process. 068 private static volatile ZMQ.Context context; 069 070 private static final int DEFAULT_BACKLOG = 100; 071 072 private static final int DEFAULT_IVL = 100; 073 074 private static final int DEFAULT_RCV_HWM = 1000; 075 076 private static final int DEFAULT_SND_HWM = 1000; 077 078 private static Logger logger; 079 080 // ZMQ sockets are not thread safe. 081 private static ZMQ.Socket publisher; 082 083 private static final long serialVersionUID = 1L; 084 085 private static final String SIMPLE_NAME = JeroMqAppender.class.getSimpleName(); 086 087 static { 088 logger = StatusLogger.getLogger(); 089 final PropertiesUtil managerProps = PropertiesUtil.getProperties(); 090 final Integer ioThreads = managerProps.getIntegerProperty(SYS_PROPERTY_IO_THREADS, 1); 091 final Boolean enableShutdownHook = managerProps.getBooleanProperty(SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK, true); 092 final String simpleName = SIMPLE_NAME; 093 logger.trace("{} using ZMQ version {}", simpleName, ZMQ.getVersionString()); 094 logger.trace("{} creating ZMQ context with ioThreads={}", simpleName, ioThreads); 095 context = ZMQ.context(ioThreads); 096 logger.trace("{} created ZMQ context {}", simpleName, context); 097 if (enableShutdownHook) { 098 final Thread hook = new Log4jThread(simpleName + "-shutdown") { 099 @Override 100 public void run() { 101 shutdown(); 102 } 103 }; 104 logger.trace("{} adding shutdown hook {}", simpleName, hook); 105 Runtime.getRuntime().addShutdownHook(hook); 106 } 107 } 108 109 private final long affinity; 110 private final long backlog; 111 private final boolean delayAttachOnConnect; 112 private final List<String> endpoints; 113 private final byte[] identity; 114 private final int ioThreads = 1; 115 private final boolean ipv4Only; 116 private final long linger; 117 private final long maxMsgSize; 118 private final long rcvHwm; 119 private final long receiveBufferSize; 120 private final int receiveTimeOut; 121 private final long reconnectIVL; 122 private final long reconnectIVLMax; 123 private final long sendBufferSize; 124 private int sendRcFalse; 125 private int sendRcTrue; 126 private final int sendTimeOut; 127 private final long sndHwm; 128 private final int tcpKeepAlive; 129 private final long tcpKeepAliveCount; 130 private final long tcpKeepAliveIdle; 131 private final long tcpKeepAliveInterval; 132 private final boolean xpubVerbose; 133 134 private JeroMqAppender(final String name, final Filter filter, final Layout<? extends Serializable> layout, 135 final boolean ignoreExceptions, final List<String> endpoints, final long affinity, final long backlog, 136 final boolean delayAttachOnConnect, final byte[] identity, final boolean ipv4Only, final long linger, 137 final long maxMsgSize, final long rcvHwm, final long receiveBufferSize, final int receiveTimeOut, 138 final long reconnectIVL, final long reconnectIVLMax, final long sendBufferSize, final int sendTimeOut, 139 final long sndHWM, final int tcpKeepAlive, final long tcpKeepAliveCount, final long tcpKeepAliveIdle, 140 final long tcpKeepAliveInterval, final boolean xpubVerbose) { 141 super(name, filter, layout, ignoreExceptions); 142 this.endpoints = endpoints; 143 this.affinity = affinity; 144 this.backlog = backlog; 145 this.delayAttachOnConnect = delayAttachOnConnect; 146 this.identity = identity; 147 this.ipv4Only = ipv4Only; 148 this.linger = linger; 149 this.maxMsgSize = maxMsgSize; 150 this.rcvHwm = rcvHwm; 151 this.receiveBufferSize = receiveBufferSize; 152 this.receiveTimeOut = receiveTimeOut; 153 this.reconnectIVL = reconnectIVL; 154 this.reconnectIVLMax = reconnectIVLMax; 155 this.sendBufferSize = sendBufferSize; 156 this.sendTimeOut = sendTimeOut; 157 this.sndHwm = sndHWM; 158 this.tcpKeepAlive = tcpKeepAlive; 159 this.tcpKeepAliveCount = tcpKeepAliveCount; 160 this.tcpKeepAliveIdle = tcpKeepAliveIdle; 161 this.tcpKeepAliveInterval = tcpKeepAliveInterval; 162 this.xpubVerbose = xpubVerbose; 163 } 164 165 // The ZMQ.Socket class has other set methods that we do not cover because 166 // they throw unsupported operation exceptions. 167 @PluginFactory 168 public static JeroMqAppender createAppender( 169 // @formatter:off 170 @Required(message = "No name provided for JeroMqAppender") @PluginAttribute("name") final String name, 171 @PluginElement("Layout") Layout<?> layout, 172 @PluginElement("Filter") final Filter filter, 173 @PluginElement("Properties") final Property[] properties, 174 // Super attributes 175 @PluginAttribute("ignoreExceptions") final boolean ignoreExceptions, 176 // ZMQ attributes; defaults picked from zmq.Options. 177 @PluginAttribute(value = "affinity", defaultLong = 0) final long affinity, 178 @PluginAttribute(value = "backlog", defaultLong = DEFAULT_BACKLOG) final long backlog, 179 @PluginAttribute(value = "delayAttachOnConnect", defaultBoolean = false) final boolean delayAttachOnConnect, 180 @PluginAttribute(value = "identity") final byte[] identity, 181 @PluginAttribute(value = "ipv4Only", defaultBoolean = true) final boolean ipv4Only, 182 @PluginAttribute(value = "linger", defaultLong = -1) final long linger, 183 @PluginAttribute(value = "maxMsgSize", defaultLong = -1) final long maxMsgSize, 184 @PluginAttribute(value = "rcvHwm", defaultLong = DEFAULT_RCV_HWM) final long rcvHwm, 185 @PluginAttribute(value = "receiveBufferSize", defaultLong = 0) final long receiveBufferSize, 186 @PluginAttribute(value = "receiveTimeOut", defaultLong = -1) final int receiveTimeOut, 187 @PluginAttribute(value = "reconnectIVL", defaultLong = DEFAULT_IVL) final long reconnectIVL, 188 @PluginAttribute(value = "reconnectIVLMax", defaultLong = 0) final long reconnectIVLMax, 189 @PluginAttribute(value = "sendBufferSize", defaultLong = 0) final long sendBufferSize, 190 @PluginAttribute(value = "sendTimeOut", defaultLong = -1) final int sendTimeOut, 191 @PluginAttribute(value = "sndHwm", defaultLong = DEFAULT_SND_HWM) final long sndHwm, 192 @PluginAttribute(value = "tcpKeepAlive", defaultInt = -1) final int tcpKeepAlive, 193 @PluginAttribute(value = "tcpKeepAliveCount", defaultLong = -1) final long tcpKeepAliveCount, 194 @PluginAttribute(value = "tcpKeepAliveIdle", defaultLong = -1) final long tcpKeepAliveIdle, 195 @PluginAttribute(value = "tcpKeepAliveInterval", defaultLong = -1) final long tcpKeepAliveInterval, 196 @PluginAttribute(value = "xpubVerbose", defaultBoolean = false) final boolean xpubVerbose 197 // @formatter:on 198 ) { 199 if (layout == null) { 200 layout = PatternLayout.createDefaultLayout(); 201 } 202 List<String> endpoints; 203 if (properties == null) { 204 endpoints = new ArrayList<>(0); 205 } else { 206 endpoints = new ArrayList<>(properties.length); 207 for (final Property property : properties) { 208 if ("endpoint".equalsIgnoreCase(property.getName())) { 209 final String value = property.getValue(); 210 if (Strings.isNotEmpty(value)) { 211 endpoints.add(value); 212 } 213 } 214 } 215 } 216 logger.debug("Creating JeroMqAppender with name={}, filter={}, layout={}, ignoreExceptions={}, endpoints={}", 217 name, filter, layout, ignoreExceptions, endpoints); 218 return new JeroMqAppender(name, filter, layout, ignoreExceptions, endpoints, affinity, backlog, 219 delayAttachOnConnect, identity, ipv4Only, linger, maxMsgSize, rcvHwm, receiveBufferSize, 220 receiveTimeOut, reconnectIVL, reconnectIVLMax, sendBufferSize, sendTimeOut, sndHwm, tcpKeepAlive, 221 tcpKeepAliveCount, tcpKeepAliveIdle, tcpKeepAliveInterval, xpubVerbose); 222 } 223 224 static ZMQ.Context getContext() { 225 return context; 226 } 227 228 private static ZMQ.Socket getPublisher() { 229 return publisher; 230 } 231 232 private static ZMQ.Socket newPublisher() { 233 logger.trace("{} creating a new ZMQ PUB socket with context {}", SIMPLE_NAME, context); 234 final Socket socketPub = context.socket(ZMQ.PUB); 235 logger.trace("{} created new ZMQ PUB socket {}", SIMPLE_NAME, socketPub); 236 return socketPub; 237 } 238 239 static void shutdown() { 240 if (context != null) { 241 logger.trace("{} terminating JeroMQ context {}", SIMPLE_NAME, context); 242 context.term(); 243 context = null; 244 } 245 } 246 247 @Override 248 public synchronized void append(final LogEvent event) { 249 final String formattedMessage = event.getMessage().getFormattedMessage(); 250 if (getPublisher().send(formattedMessage, 0)) { 251 sendRcTrue++; 252 } else { 253 sendRcFalse++; 254 logger.error("Appender {} could not send message {} to JeroMQ {}", getName(), sendRcFalse, formattedMessage); 255 } 256 } 257 258 // not public, handy for testing 259 int getSendRcFalse() { 260 return sendRcFalse; 261 } 262 263 // not public, handy for testing 264 int getSendRcTrue() { 265 return sendRcTrue; 266 } 267 268 // not public, handy for testing 269 void resetSendRcs() { 270 sendRcTrue = sendRcFalse = 0; 271 } 272 273 @Override 274 public synchronized void start() { 275 super.start(); 276 publisher = newPublisher(); 277 final String name = getName(); 278 final String prefix = "JeroMQ Appender"; 279 logger.debug("Starting {} {} using ZMQ version {}", prefix, name, ZMQ.getVersionString()); 280 logger.debug("{} {} context {} with ioThreads={}", prefix, name, context, ioThreads); 281 // 282 final ZMQ.Socket socketPub = getPublisher(); 283 logger.trace("{} {} setting {} publisher properties for instance {}", prefix, name, socketPub.getClass() 284 .getName(), socketPub); 285 logger.trace("{} {} publisher setAffinity({})", prefix, name, affinity); 286 socketPub.setAffinity(affinity); 287 logger.trace("{} {} publisher setBacklog({})", prefix, name, backlog); 288 socketPub.setBacklog(backlog); 289 logger.trace("{} {} publisher setDelayAttachOnConnect({})", prefix, name, delayAttachOnConnect); 290 socketPub.setDelayAttachOnConnect(delayAttachOnConnect); 291 if (identity != null) { 292 logger.trace("{} {} publisher setIdentity({})", prefix, name, Arrays.toString(identity)); 293 socketPub.setIdentity(identity); 294 } 295 logger.trace("{} {} publisher setIPv4Only({})", prefix, name, ipv4Only); 296 socketPub.setIPv4Only(ipv4Only); 297 logger.trace("{} {} publisher setLinger({})", prefix, name, linger); 298 socketPub.setLinger(linger); 299 logger.trace("{} {} publisher setMaxMsgSize({})", prefix, name, maxMsgSize); 300 socketPub.setMaxMsgSize(maxMsgSize); 301 logger.trace("{} {} publisher setRcvHWM({})", prefix, name, rcvHwm); 302 socketPub.setRcvHWM(rcvHwm); 303 logger.trace("{} {} publisher setReceiveBufferSize({})", prefix, name, receiveBufferSize); 304 socketPub.setReceiveBufferSize(receiveBufferSize); 305 logger.trace("{} {} publisher setReceiveTimeOut({})", prefix, name, receiveTimeOut); 306 socketPub.setReceiveTimeOut(receiveTimeOut); 307 logger.trace("{} {} publisher setReconnectIVL({})", prefix, name, reconnectIVL); 308 socketPub.setReconnectIVL(reconnectIVL); 309 logger.trace("{} {} publisher setReconnectIVLMax({})", prefix, name, reconnectIVLMax); 310 socketPub.setReconnectIVLMax(reconnectIVLMax); 311 logger.trace("{} {} publisher setSendBufferSize({})", prefix, name, sendBufferSize); 312 socketPub.setSendBufferSize(sendBufferSize); 313 logger.trace("{} {} publisher setSendTimeOut({})", prefix, name, sendTimeOut); 314 socketPub.setSendTimeOut(sendTimeOut); 315 logger.trace("{} {} publisher setSndHWM({})", prefix, name, sndHwm); 316 socketPub.setSndHWM(sndHwm); 317 logger.trace("{} {} publisher setTCPKeepAlive({})", prefix, name, tcpKeepAlive); 318 socketPub.setTCPKeepAlive(tcpKeepAlive); 319 logger.trace("{} {} publisher setTCPKeepAliveCount({})", prefix, name, tcpKeepAliveCount); 320 socketPub.setTCPKeepAliveCount(tcpKeepAliveCount); 321 logger.trace("{} {} publisher setTCPKeepAliveIdle({})", prefix, name, tcpKeepAliveIdle); 322 socketPub.setTCPKeepAliveIdle(tcpKeepAliveIdle); 323 logger.trace("{} {} publisher setTCPKeepAliveInterval({})", prefix, name, tcpKeepAliveInterval); 324 socketPub.setTCPKeepAliveInterval(tcpKeepAliveInterval); 325 logger.trace("{} {} publisher setXpubVerbose({})", prefix, name, xpubVerbose); 326 socketPub.setXpubVerbose(xpubVerbose); 327 // 328 if (logger.isDebugEnabled()) { 329 logger.debug( 330 "Created JeroMQ {} publisher {} type {}, affinity={}, backlog={}, delayAttachOnConnect={}, events={}, IPv4Only={}, linger={}, maxMsgSize={}, multicastHops={}, " 331 + "rate={}, rcvHWM={}, receiveBufferSize={}, receiveTimeOut={}, reconnectIVL={}, reconnectIVLMax={}, recoveryInterval={}, sendBufferSize={}, " 332 + "sendTimeOut={}, sndHWM={}, TCPKeepAlive={}, TCPKeepAliveCount={}, TCPKeepAliveIdle={}, TCPKeepAliveInterval={}, TCPKeepAliveSetting={}", 333 name, socketPub, socketPub.getType(), socketPub.getAffinity(), socketPub.getBacklog(), 334 socketPub.getDelayAttachOnConnect(), socketPub.getEvents(), socketPub.getIPv4Only(), 335 socketPub.getLinger(), socketPub.getMaxMsgSize(), socketPub.getMulticastHops(), 336 socketPub.getRate(), socketPub.getRcvHWM(), socketPub.getReceiveBufferSize(), 337 socketPub.getReceiveTimeOut(), socketPub.getReconnectIVL(), socketPub.getReconnectIVLMax(), 338 socketPub.getRecoveryInterval(), socketPub.getSendBufferSize(), socketPub.getSendTimeOut(), 339 socketPub.getSndHWM(), socketPub.getTCPKeepAlive(), socketPub.getTCPKeepAliveCount(), 340 socketPub.getTCPKeepAliveIdle(), socketPub.getTCPKeepAliveInterval(), 341 socketPub.getTCPKeepAliveSetting()); 342 } 343 for (final String endpoint : endpoints) { 344 logger.debug("Binding {} appender {} to endpoint {}", SIMPLE_NAME, name, endpoint); 345 socketPub.bind(endpoint); 346 } 347 } 348 349 @Override 350 public synchronized void stop() { 351 super.stop(); 352 final Socket socketPub = getPublisher(); 353 if (socketPub != null) { 354 logger.debug("Closing {} appender {} publisher {}", SIMPLE_NAME, getName(), socketPub); 355 socketPub.close(); 356 publisher = null; 357 } 358 } 359 360 @Override 361 public String toString() { 362 return "JeroMqAppender [context=" + context + ", publisher=" + publisher + ", endpoints=" + endpoints + "]"; 363 } 364 365}