001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017 018package org.apache.logging.log4j.core.appender.mom.jeromq; 019 020import java.io.Serializable; 021import java.util.ArrayList; 022import java.util.Arrays; 023import java.util.List; 024 025import org.apache.logging.log4j.Logger; 026import org.apache.logging.log4j.core.Filter; 027import org.apache.logging.log4j.core.Layout; 028import org.apache.logging.log4j.core.LogEvent; 029import org.apache.logging.log4j.core.appender.AbstractAppender; 030import org.apache.logging.log4j.core.config.Property; 031import org.apache.logging.log4j.core.config.plugins.Plugin; 032import org.apache.logging.log4j.core.config.plugins.PluginAttribute; 033import org.apache.logging.log4j.core.config.plugins.PluginElement; 034import org.apache.logging.log4j.core.config.plugins.PluginFactory; 035import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required; 036import org.apache.logging.log4j.core.layout.PatternLayout; 037import org.apache.logging.log4j.status.StatusLogger; 038import org.apache.logging.log4j.util.PropertiesUtil; 039import org.apache.logging.log4j.util.Strings; 040import org.zeromq.ZMQ; 041import org.zeromq.ZMQ.Socket; 042 043/** 044 * Sends log events to one or more ZeroMQ (JeroMQ) endpoints. 045 * <p> 046 * Requires the JeroMQ jar (LGPL as of 0.3.5) 047 * </p> 048 */ 049// TODO 050// Some methods are synchronized because a ZMQ.Socket is not thread-safe 051// Using a ThreadLocal for the publisher hangs tests on shutdown. There must be 052// some issue on threads owning certain resources as opposed to others. 053@Plugin(name = "JeroMQ", category = "Core", elementType = "appender", printObject = true) 054public final class JeroMqAppender extends AbstractAppender { 055 056 /** 057 * System property to enable shutdown hook. 058 */ 059 static final String SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK = "log4j.jeromq.enableShutdownHook"; 060 061 /** 062 * System property to control JeroMQ I/O thread count. 063 */ 064 static final String SYS_PROPERTY_IO_THREADS = "log4j.jeromq.ioThreads"; 065 066 // Per ZMQ docs, there should usually only be one ZMQ context per process. 067 private static volatile ZMQ.Context context; 068 069 private static final int DEFAULT_BACKLOG = 100; 070 071 private static final int DEFAULT_IVL = 100; 072 073 private static final int DEFAULT_RCV_HWM = 1000; 074 075 private static final int DEFAULT_SND_HWM = 1000; 076 077 private static Logger logger; 078 079 // ZMQ sockets are not thread safe. 080 private static ZMQ.Socket publisher; 081 082 private static final long serialVersionUID = 1L; 083 084 private static final String SIMPLE_NAME = JeroMqAppender.class.getSimpleName(); 085 086 static { 087 logger = StatusLogger.getLogger(); 088 final PropertiesUtil managerProps = PropertiesUtil.getProperties(); 089 final Integer ioThreads = managerProps.getIntegerProperty(SYS_PROPERTY_IO_THREADS, 1); 090 final Boolean enableShutdownHook = managerProps.getBooleanProperty(SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK, true); 091 final String simpleName = SIMPLE_NAME; 092 logger.trace("{} using ZMQ version {}", simpleName, ZMQ.getVersionString()); 093 logger.trace("{} creating ZMQ context with ioThreads={}", simpleName, ioThreads); 094 context = ZMQ.context(ioThreads); 095 logger.trace("{} created ZMQ context {}", simpleName, context); 096 if (enableShutdownHook) { 097 final Thread hook = new Thread(simpleName + "-shutdown") { 098 @Override 099 public void run() { 100 shutdown(); 101 } 102 }; 103 logger.trace("{} adding shutdown hook {}", simpleName, hook); 104 Runtime.getRuntime().addShutdownHook(hook); 105 } 106 } 107 108 private final long affinity; 109 private final long backlog; 110 private final boolean delayAttachOnConnect; 111 private final List<String> endpoints; 112 private final byte[] identity; 113 private final int ioThreads = 1; 114 private final boolean ipv4Only; 115 private final long linger; 116 private final long maxMsgSize; 117 private final long rcvHwm; 118 private final long receiveBufferSize; 119 private final int receiveTimeOut; 120 private final long reconnectIVL; 121 private final long reconnectIVLMax; 122 private final long sendBufferSize; 123 private int sendRcFalse; 124 private int sendRcTrue; 125 private final int sendTimeOut; 126 private final long sndHwm; 127 private final int tcpKeepAlive; 128 private final long tcpKeepAliveCount; 129 private final long tcpKeepAliveIdle; 130 private final long tcpKeepAliveInterval; 131 private final boolean xpubVerbose; 132 133 private JeroMqAppender(final String name, final Filter filter, final Layout<? extends Serializable> layout, 134 final boolean ignoreExceptions, final List<String> endpoints, final long affinity, final long backlog, 135 final boolean delayAttachOnConnect, final byte[] identity, final boolean ipv4Only, final long linger, 136 final long maxMsgSize, final long rcvHwm, final long receiveBufferSize, final int receiveTimeOut, 137 final long reconnectIVL, final long reconnectIVLMax, final long sendBufferSize, final int sendTimeOut, 138 final long sndHWM, final int tcpKeepAlive, final long tcpKeepAliveCount, final long tcpKeepAliveIdle, 139 final long tcpKeepAliveInterval, final boolean xpubVerbose) { 140 super(name, filter, layout, ignoreExceptions); 141 this.endpoints = endpoints; 142 this.affinity = affinity; 143 this.backlog = backlog; 144 this.delayAttachOnConnect = delayAttachOnConnect; 145 this.identity = identity; 146 this.ipv4Only = ipv4Only; 147 this.linger = linger; 148 this.maxMsgSize = maxMsgSize; 149 this.rcvHwm = rcvHwm; 150 this.receiveBufferSize = receiveBufferSize; 151 this.receiveTimeOut = receiveTimeOut; 152 this.reconnectIVL = reconnectIVL; 153 this.reconnectIVLMax = reconnectIVLMax; 154 this.sendBufferSize = sendBufferSize; 155 this.sendTimeOut = sendTimeOut; 156 this.sndHwm = sndHWM; 157 this.tcpKeepAlive = tcpKeepAlive; 158 this.tcpKeepAliveCount = tcpKeepAliveCount; 159 this.tcpKeepAliveIdle = tcpKeepAliveIdle; 160 this.tcpKeepAliveInterval = tcpKeepAliveInterval; 161 this.xpubVerbose = xpubVerbose; 162 } 163 164 // The ZMQ.Socket class has other set methods that we do not cover because 165 // they throw unsupported operation exceptions. 166 @PluginFactory 167 public static JeroMqAppender createAppender( 168 // @formatter:off 169 @Required(message = "No name provided for JeroMqAppender") @PluginAttribute("name") final String name, 170 @PluginElement("Layout") Layout<?> layout, 171 @PluginElement("Filter") final Filter filter, 172 @PluginElement("Properties") final Property[] properties, 173 // Super attributes 174 @PluginAttribute("ignoreExceptions") final boolean ignoreExceptions, 175 // ZMQ attributes; defaults picked from zmq.Options. 176 @PluginAttribute(value = "affinity", defaultLong = 0) final long affinity, 177 @PluginAttribute(value = "backlog", defaultLong = DEFAULT_BACKLOG) final long backlog, 178 @PluginAttribute(value = "delayAttachOnConnect", defaultBoolean = false) final boolean delayAttachOnConnect, 179 @PluginAttribute(value = "identity") final byte[] identity, 180 @PluginAttribute(value = "ipv4Only", defaultBoolean = true) final boolean ipv4Only, 181 @PluginAttribute(value = "linger", defaultLong = -1) final long linger, 182 @PluginAttribute(value = "maxMsgSize", defaultLong = -1) final long maxMsgSize, 183 @PluginAttribute(value = "rcvHwm", defaultLong = DEFAULT_RCV_HWM) final long rcvHwm, 184 @PluginAttribute(value = "receiveBufferSize", defaultLong = 0) final long receiveBufferSize, 185 @PluginAttribute(value = "receiveTimeOut", defaultLong = -1) final int receiveTimeOut, 186 @PluginAttribute(value = "reconnectIVL", defaultLong = DEFAULT_IVL) final long reconnectIVL, 187 @PluginAttribute(value = "reconnectIVLMax", defaultLong = 0) final long reconnectIVLMax, 188 @PluginAttribute(value = "sendBufferSize", defaultLong = 0) final long sendBufferSize, 189 @PluginAttribute(value = "sendTimeOut", defaultLong = -1) final int sendTimeOut, 190 @PluginAttribute(value = "sndHwm", defaultLong = DEFAULT_SND_HWM) final long sndHwm, 191 @PluginAttribute(value = "tcpKeepAlive", defaultInt = -1) final int tcpKeepAlive, 192 @PluginAttribute(value = "tcpKeepAliveCount", defaultLong = -1) final long tcpKeepAliveCount, 193 @PluginAttribute(value = "tcpKeepAliveIdle", defaultLong = -1) final long tcpKeepAliveIdle, 194 @PluginAttribute(value = "tcpKeepAliveInterval", defaultLong = -1) final long tcpKeepAliveInterval, 195 @PluginAttribute(value = "xpubVerbose", defaultBoolean = false) final boolean xpubVerbose 196 // @formatter:on 197 ) { 198 if (layout == null) { 199 layout = PatternLayout.createDefaultLayout(); 200 } 201 List<String> endpoints; 202 if (properties == null) { 203 endpoints = new ArrayList<>(0); 204 } else { 205 endpoints = new ArrayList<>(properties.length); 206 for (final Property property : properties) { 207 if ("endpoint".equalsIgnoreCase(property.getName())) { 208 final String value = property.getValue(); 209 if (Strings.isNotEmpty(value)) { 210 endpoints.add(value); 211 } 212 } 213 } 214 } 215 logger.debug("Creating JeroMqAppender with name={}, filter={}, layout={}, ignoreExceptions={}, endpoints={}", 216 name, filter, layout, ignoreExceptions, endpoints); 217 return new JeroMqAppender(name, filter, layout, ignoreExceptions, endpoints, affinity, backlog, 218 delayAttachOnConnect, identity, ipv4Only, linger, maxMsgSize, rcvHwm, receiveBufferSize, 219 receiveTimeOut, reconnectIVL, reconnectIVLMax, sendBufferSize, sendTimeOut, sndHwm, tcpKeepAlive, 220 tcpKeepAliveCount, tcpKeepAliveIdle, tcpKeepAliveInterval, xpubVerbose); 221 } 222 223 static ZMQ.Context getContext() { 224 return context; 225 } 226 227 private static ZMQ.Socket getPublisher() { 228 return publisher; 229 } 230 231 private static ZMQ.Socket newPublisher() { 232 logger.trace("{} creating a new ZMQ PUB socket with context {}", SIMPLE_NAME, context); 233 final Socket socketPub = context.socket(ZMQ.PUB); 234 logger.trace("{} created new ZMQ PUB socket {}", SIMPLE_NAME, socketPub); 235 return socketPub; 236 } 237 238 static void shutdown() { 239 if (context != null) { 240 logger.trace("{} terminating JeroMQ context {}", SIMPLE_NAME, context); 241 context.term(); 242 context = null; 243 } 244 } 245 246 @Override 247 public synchronized void append(final LogEvent event) { 248 final String formattedMessage = event.getMessage().getFormattedMessage(); 249 if (getPublisher().send(formattedMessage, 0)) { 250 sendRcTrue++; 251 } else { 252 sendRcFalse++; 253 logger.error("Appender {} could not send message {} to JeroMQ {}", getName(), sendRcFalse, formattedMessage); 254 } 255 } 256 257 // not public, handy for testing 258 int getSendRcFalse() { 259 return sendRcFalse; 260 } 261 262 // not public, handy for testing 263 int getSendRcTrue() { 264 return sendRcTrue; 265 } 266 267 // not public, handy for testing 268 void resetSendRcs() { 269 sendRcTrue = sendRcFalse = 0; 270 } 271 272 @Override 273 public synchronized void start() { 274 super.start(); 275 publisher = newPublisher(); 276 final String name = getName(); 277 final String prefix = "JeroMQ Appender"; 278 logger.debug("Starting {} {} using ZMQ version {}", prefix, name, ZMQ.getVersionString()); 279 logger.debug("{} {} context {} with ioThreads={}", prefix, name, context, ioThreads); 280 // 281 final ZMQ.Socket socketPub = getPublisher(); 282 logger.trace("{} {} setting {} publisher properties for instance {}", prefix, name, socketPub.getClass() 283 .getName(), socketPub); 284 logger.trace("{} {} publisher setAffinity({})", prefix, name, affinity); 285 socketPub.setAffinity(affinity); 286 logger.trace("{} {} publisher setBacklog({})", prefix, name, backlog); 287 socketPub.setBacklog(backlog); 288 logger.trace("{} {} publisher setDelayAttachOnConnect({})", prefix, name, delayAttachOnConnect); 289 socketPub.setDelayAttachOnConnect(delayAttachOnConnect); 290 if (identity != null) { 291 logger.trace("{} {} publisher setIdentity({})", prefix, name, Arrays.toString(identity)); 292 socketPub.setIdentity(identity); 293 } 294 logger.trace("{} {} publisher setIPv4Only({})", prefix, name, ipv4Only); 295 socketPub.setIPv4Only(ipv4Only); 296 logger.trace("{} {} publisher setLinger({})", prefix, name, linger); 297 socketPub.setLinger(linger); 298 logger.trace("{} {} publisher setMaxMsgSize({})", prefix, name, maxMsgSize); 299 socketPub.setMaxMsgSize(maxMsgSize); 300 logger.trace("{} {} publisher setRcvHWM({})", prefix, name, rcvHwm); 301 socketPub.setRcvHWM(rcvHwm); 302 logger.trace("{} {} publisher setReceiveBufferSize({})", prefix, name, receiveBufferSize); 303 socketPub.setReceiveBufferSize(receiveBufferSize); 304 logger.trace("{} {} publisher setReceiveTimeOut({})", prefix, name, receiveTimeOut); 305 socketPub.setReceiveTimeOut(receiveTimeOut); 306 logger.trace("{} {} publisher setReconnectIVL({})", prefix, name, reconnectIVL); 307 socketPub.setReconnectIVL(reconnectIVL); 308 logger.trace("{} {} publisher setReconnectIVLMax({})", prefix, name, reconnectIVLMax); 309 socketPub.setReconnectIVLMax(reconnectIVLMax); 310 logger.trace("{} {} publisher setSendBufferSize({})", prefix, name, sendBufferSize); 311 socketPub.setSendBufferSize(sendBufferSize); 312 logger.trace("{} {} publisher setSendTimeOut({})", prefix, name, sendTimeOut); 313 socketPub.setSendTimeOut(sendTimeOut); 314 logger.trace("{} {} publisher setSndHWM({})", prefix, name, sndHwm); 315 socketPub.setSndHWM(sndHwm); 316 logger.trace("{} {} publisher setTCPKeepAlive({})", prefix, name, tcpKeepAlive); 317 socketPub.setTCPKeepAlive(tcpKeepAlive); 318 logger.trace("{} {} publisher setTCPKeepAliveCount({})", prefix, name, tcpKeepAliveCount); 319 socketPub.setTCPKeepAliveCount(tcpKeepAliveCount); 320 logger.trace("{} {} publisher setTCPKeepAliveIdle({})", prefix, name, tcpKeepAliveIdle); 321 socketPub.setTCPKeepAliveIdle(tcpKeepAliveIdle); 322 logger.trace("{} {} publisher setTCPKeepAliveInterval({})", prefix, name, tcpKeepAliveInterval); 323 socketPub.setTCPKeepAliveInterval(tcpKeepAliveInterval); 324 logger.trace("{} {} publisher setXpubVerbose({})", prefix, name, xpubVerbose); 325 socketPub.setXpubVerbose(xpubVerbose); 326 // 327 if (logger.isDebugEnabled()) { 328 logger.debug( 329 "Created JeroMQ {} publisher {} type {}, affinity={}, backlog={}, delayAttachOnConnect={}, events={}, IPv4Only={}, linger={}, maxMsgSize={}, multicastHops={}, " 330 + "rate={}, rcvHWM={}, receiveBufferSize={}, receiveTimeOut={}, reconnectIVL={}, reconnectIVLMax={}, recoveryInterval={}, sendBufferSize={}, " 331 + "sendTimeOut={}, sndHWM={}, TCPKeepAlive={}, TCPKeepAliveCount={}, TCPKeepAliveIdle={}, TCPKeepAliveInterval={}, TCPKeepAliveSetting={}", 332 name, socketPub, socketPub.getType(), socketPub.getAffinity(), socketPub.getBacklog(), 333 socketPub.getDelayAttachOnConnect(), socketPub.getEvents(), socketPub.getIPv4Only(), 334 socketPub.getLinger(), socketPub.getMaxMsgSize(), socketPub.getMulticastHops(), 335 socketPub.getRate(), socketPub.getRcvHWM(), socketPub.getReceiveBufferSize(), 336 socketPub.getReceiveTimeOut(), socketPub.getReconnectIVL(), socketPub.getReconnectIVLMax(), 337 socketPub.getRecoveryInterval(), socketPub.getSendBufferSize(), socketPub.getSendTimeOut(), 338 socketPub.getSndHWM(), socketPub.getTCPKeepAlive(), socketPub.getTCPKeepAliveCount(), 339 socketPub.getTCPKeepAliveIdle(), socketPub.getTCPKeepAliveInterval(), 340 socketPub.getTCPKeepAliveSetting()); 341 } 342 for (final String endpoint : endpoints) { 343 logger.debug("Binding {} appender {} to endpoint {}", SIMPLE_NAME, name, endpoint); 344 socketPub.bind(endpoint); 345 } 346 } 347 348 @Override 349 public synchronized void stop() { 350 super.stop(); 351 final Socket socketPub = getPublisher(); 352 if (socketPub != null) { 353 logger.debug("Closing {} appender {} publisher {}", SIMPLE_NAME, getName(), socketPub); 354 socketPub.close(); 355 publisher = null; 356 } 357 } 358 359 @Override 360 public String toString() { 361 return "JeroMqAppender [context=" + context + ", publisher=" + publisher + ", endpoints=" + endpoints + "]"; 362 } 363 364}