001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017 018package org.apache.logging.log4j.core.appender.mom.jeromq; 019 020import java.util.Arrays; 021import java.util.List; 022 023import org.apache.logging.log4j.LogManager; 024import org.apache.logging.log4j.core.appender.AbstractManager; 025import org.apache.logging.log4j.core.appender.ManagerFactory; 026import org.apache.logging.log4j.core.util.ShutdownCallbackRegistry; 027import org.apache.logging.log4j.util.PropertiesUtil; 028import org.zeromq.ZMQ; 029 030/** 031 * Manager for publishing messages via JeroMq. 032 * 033 * @since 2.6 034 */ 035public class JeroMqManager extends AbstractManager { 036 037 /** 038 * System property to enable shutdown hook. 039 */ 040 public static final String SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK = "log4j.jeromq.enableShutdownHook"; 041 042 /** 043 * System property to control JeroMQ I/O thread count. 044 */ 045 public static final String SYS_PROPERTY_IO_THREADS = "log4j.jeromq.ioThreads"; 046 047 private static final JeroMqManagerFactory FACTORY = new JeroMqManagerFactory(); 048 private static final ZMQ.Context CONTEXT; 049 050 static { 051 LOGGER.trace("JeroMqManager using ZMQ version {}", ZMQ.getVersionString()); 052 053 final int ioThreads = PropertiesUtil.getProperties().getIntegerProperty(SYS_PROPERTY_IO_THREADS, 1); 054 LOGGER.trace("JeroMqManager creating ZMQ context with ioThreads = {}", ioThreads); 055 CONTEXT = ZMQ.context(ioThreads); 056 057 final boolean enableShutdownHook = PropertiesUtil.getProperties().getBooleanProperty( 058 SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK, true); 059 if (enableShutdownHook) { 060 ((ShutdownCallbackRegistry) LogManager.getFactory()).addShutdownCallback(new Runnable() { 061 @Override 062 public void run() { 063 CONTEXT.close(); 064 } 065 }); 066 } 067 } 068 069 private final ZMQ.Socket publisher; 070 071 private JeroMqManager(final String name, final JeroMqConfiguration config) { 072 super(name); 073 publisher = CONTEXT.socket(ZMQ.PUB); 074 publisher.setAffinity(config.affinity); 075 publisher.setBacklog(config.backlog); 076 publisher.setDelayAttachOnConnect(config.delayAttachOnConnect); 077 if (config.identity != null) { 078 publisher.setIdentity(config.identity); 079 } 080 publisher.setIPv4Only(config.ipv4Only); 081 publisher.setLinger(config.linger); 082 publisher.setMaxMsgSize(config.maxMsgSize); 083 publisher.setRcvHWM(config.rcvHwm); 084 publisher.setReceiveBufferSize(config.receiveBufferSize); 085 publisher.setReceiveTimeOut(config.receiveTimeOut); 086 publisher.setReconnectIVL(config.reconnectIVL); 087 publisher.setReconnectIVLMax(config.reconnectIVLMax); 088 publisher.setSendBufferSize(config.sendBufferSize); 089 publisher.setSendTimeOut(config.sendTimeOut); 090 publisher.setSndHWM(config.sndHwm); 091 publisher.setTCPKeepAlive(config.tcpKeepAlive); 092 publisher.setTCPKeepAliveCount(config.tcpKeepAliveCount); 093 publisher.setTCPKeepAliveIdle(config.tcpKeepAliveIdle); 094 publisher.setTCPKeepAliveInterval(config.tcpKeepAliveInterval); 095 publisher.setXpubVerbose(config.xpubVerbose); 096 for (final String endpoint : config.endpoints) { 097 publisher.bind(endpoint); 098 } 099 LOGGER.debug("Created JeroMqManager with {}", config); 100 } 101 102 public boolean send(final byte[] data) { 103 return publisher.send(data); 104 } 105 106 @Override 107 protected void releaseSub() { 108 publisher.close(); 109 } 110 111 public static JeroMqManager getJeroMqManager(final String name, final long affinity, final long backlog, 112 final boolean delayAttachOnConnect, final byte[] identity, 113 final boolean ipv4Only, final long linger, final long maxMsgSize, 114 final long rcvHwm, final long receiveBufferSize, 115 final int receiveTimeOut, final long reconnectIVL, 116 final long reconnectIVLMax, final long sendBufferSize, 117 final int sendTimeOut, final long sndHwm, final int tcpKeepAlive, 118 final long tcpKeepAliveCount, final long tcpKeepAliveIdle, 119 final long tcpKeepAliveInterval, final boolean xpubVerbose, 120 final List<String> endpoints) { 121 return getManager(name, FACTORY, 122 new JeroMqConfiguration(affinity, backlog, delayAttachOnConnect, identity, ipv4Only, linger, maxMsgSize, 123 rcvHwm, receiveBufferSize, receiveTimeOut, reconnectIVL, reconnectIVLMax, sendBufferSize, sendTimeOut, 124 sndHwm, tcpKeepAlive, tcpKeepAliveCount, tcpKeepAliveIdle, tcpKeepAliveInterval, xpubVerbose, 125 endpoints)); 126 } 127 128 public static ZMQ.Context getContext() { 129 return CONTEXT; 130 } 131 132 private static class JeroMqConfiguration { 133 private final long affinity; 134 private final long backlog; 135 private final boolean delayAttachOnConnect; 136 private final byte[] identity; 137 private final boolean ipv4Only; 138 private final long linger; 139 private final long maxMsgSize; 140 private final long rcvHwm; 141 private final long receiveBufferSize; 142 private final int receiveTimeOut; 143 private final long reconnectIVL; 144 private final long reconnectIVLMax; 145 private final long sendBufferSize; 146 private final int sendTimeOut; 147 private final long sndHwm; 148 private final int tcpKeepAlive; 149 private final long tcpKeepAliveCount; 150 private final long tcpKeepAliveIdle; 151 private final long tcpKeepAliveInterval; 152 private final boolean xpubVerbose; 153 private final List<String> endpoints; 154 155 private JeroMqConfiguration(final long affinity, final long backlog, final boolean delayAttachOnConnect, 156 final byte[] identity, final boolean ipv4Only, final long linger, 157 final long maxMsgSize, final long rcvHwm, final long receiveBufferSize, 158 final int receiveTimeOut, final long reconnectIVL, final long reconnectIVLMax, 159 final long sendBufferSize, final int sendTimeOut, final long sndHwm, 160 final int tcpKeepAlive, final long tcpKeepAliveCount, final long tcpKeepAliveIdle, 161 final long tcpKeepAliveInterval, final boolean xpubVerbose, 162 final List<String> endpoints) { 163 this.affinity = affinity; 164 this.backlog = backlog; 165 this.delayAttachOnConnect = delayAttachOnConnect; 166 this.identity = identity; 167 this.ipv4Only = ipv4Only; 168 this.linger = linger; 169 this.maxMsgSize = maxMsgSize; 170 this.rcvHwm = rcvHwm; 171 this.receiveBufferSize = receiveBufferSize; 172 this.receiveTimeOut = receiveTimeOut; 173 this.reconnectIVL = reconnectIVL; 174 this.reconnectIVLMax = reconnectIVLMax; 175 this.sendBufferSize = sendBufferSize; 176 this.sendTimeOut = sendTimeOut; 177 this.sndHwm = sndHwm; 178 this.tcpKeepAlive = tcpKeepAlive; 179 this.tcpKeepAliveCount = tcpKeepAliveCount; 180 this.tcpKeepAliveIdle = tcpKeepAliveIdle; 181 this.tcpKeepAliveInterval = tcpKeepAliveInterval; 182 this.xpubVerbose = xpubVerbose; 183 this.endpoints = endpoints; 184 } 185 186 @Override 187 public String toString() { 188 return "JeroMqConfiguration{" + 189 "affinity=" + affinity + 190 ", backlog=" + backlog + 191 ", delayAttachOnConnect=" + delayAttachOnConnect + 192 ", identity=" + Arrays.toString(identity) + 193 ", ipv4Only=" + ipv4Only + 194 ", linger=" + linger + 195 ", maxMsgSize=" + maxMsgSize + 196 ", rcvHwm=" + rcvHwm + 197 ", receiveBufferSize=" + receiveBufferSize + 198 ", receiveTimeOut=" + receiveTimeOut + 199 ", reconnectIVL=" + reconnectIVL + 200 ", reconnectIVLMax=" + reconnectIVLMax + 201 ", sendBufferSize=" + sendBufferSize + 202 ", sendTimeOut=" + sendTimeOut + 203 ", sndHwm=" + sndHwm + 204 ", tcpKeepAlive=" + tcpKeepAlive + 205 ", tcpKeepAliveCount=" + tcpKeepAliveCount + 206 ", tcpKeepAliveIdle=" + tcpKeepAliveIdle + 207 ", tcpKeepAliveInterval=" + tcpKeepAliveInterval + 208 ", xpubVerbose=" + xpubVerbose + 209 ", endpoints=" + endpoints + 210 '}'; 211 } 212 } 213 214 private static class JeroMqManagerFactory implements ManagerFactory<JeroMqManager, JeroMqConfiguration> { 215 @Override 216 public JeroMqManager createManager(final String name, final JeroMqConfiguration data) { 217 return new JeroMqManager(name, data); 218 } 219 } 220}