001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017 018package org.apache.logging.log4j.core.appender.mom.jeromq; 019 020import java.io.Serializable; 021import java.util.ArrayList; 022import java.util.List; 023 024import org.apache.logging.log4j.core.Appender; 025import org.apache.logging.log4j.core.Filter; 026import org.apache.logging.log4j.core.Layout; 027import org.apache.logging.log4j.core.LogEvent; 028import org.apache.logging.log4j.core.appender.AbstractAppender; 029import org.apache.logging.log4j.core.config.Node; 030import org.apache.logging.log4j.core.config.Property; 031import org.apache.logging.log4j.core.config.plugins.Plugin; 032import org.apache.logging.log4j.core.config.plugins.PluginAttribute; 033import org.apache.logging.log4j.core.config.plugins.PluginElement; 034import org.apache.logging.log4j.core.config.plugins.PluginFactory; 035import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required; 036import org.apache.logging.log4j.core.layout.PatternLayout; 037import org.apache.logging.log4j.util.Strings; 038 039/** 040 * Sends log events to one or more ZeroMQ (JeroMQ) endpoints. 041 * <p> 042 * Requires the JeroMQ jar (LGPL as of 0.3.5) 043 * </p> 044 */ 045// TODO 046// Some methods are synchronized because a ZMQ.Socket is not thread-safe 047// Using a ThreadLocal for the publisher hangs tests on shutdown. There must be 048// some issue on threads owning certain resources as opposed to others. 049@Plugin(name = "JeroMQ", category = Node.CATEGORY, elementType = Appender.ELEMENT_TYPE, printObject = true) 050public final class JeroMqAppender extends AbstractAppender { 051 052 private static final int DEFAULT_BACKLOG = 100; 053 054 private static final int DEFAULT_IVL = 100; 055 056 private static final int DEFAULT_RCV_HWM = 1000; 057 058 private static final int DEFAULT_SND_HWM = 1000; 059 060 private final JeroMqManager manager; 061 private final List<String> endpoints; 062 private int sendRcFalse; 063 private int sendRcTrue; 064 065 private JeroMqAppender(final String name, final Filter filter, final Layout<? extends Serializable> layout, 066 final boolean ignoreExceptions, final List<String> endpoints, final long affinity, final long backlog, 067 final boolean delayAttachOnConnect, final byte[] identity, final boolean ipv4Only, final long linger, 068 final long maxMsgSize, final long rcvHwm, final long receiveBufferSize, final int receiveTimeOut, 069 final long reconnectIVL, final long reconnectIVLMax, final long sendBufferSize, final int sendTimeOut, 070 final long sndHWM, final int tcpKeepAlive, final long tcpKeepAliveCount, final long tcpKeepAliveIdle, 071 final long tcpKeepAliveInterval, final boolean xpubVerbose) { 072 super(name, filter, layout, ignoreExceptions); 073 this.manager = JeroMqManager.getJeroMqManager(name, affinity, backlog, delayAttachOnConnect, identity, ipv4Only, 074 linger, maxMsgSize, rcvHwm, receiveBufferSize, receiveTimeOut, reconnectIVL, reconnectIVLMax, 075 sendBufferSize, sendTimeOut, sndHWM, tcpKeepAlive, tcpKeepAliveCount, tcpKeepAliveIdle, 076 tcpKeepAliveInterval, xpubVerbose, endpoints); 077 this.endpoints = endpoints; 078 } 079 080 // The ZMQ.Socket class has other set methods that we do not cover because 081 // they throw unsupported operation exceptions. 082 @PluginFactory 083 public static JeroMqAppender createAppender( 084 // @formatter:off 085 @Required(message = "No name provided for JeroMqAppender") @PluginAttribute("name") final String name, 086 @PluginElement("Layout") Layout<?> layout, 087 @PluginElement("Filter") final Filter filter, 088 @PluginElement("Properties") final Property[] properties, 089 // Super attributes 090 @PluginAttribute("ignoreExceptions") final boolean ignoreExceptions, 091 // ZMQ attributes; defaults picked from zmq.Options. 092 @PluginAttribute(value = "affinity", defaultLong = 0) final long affinity, 093 @PluginAttribute(value = "backlog", defaultLong = DEFAULT_BACKLOG) final long backlog, 094 @PluginAttribute(value = "delayAttachOnConnect", defaultBoolean = false) final boolean delayAttachOnConnect, 095 @PluginAttribute(value = "identity") final byte[] identity, 096 @PluginAttribute(value = "ipv4Only", defaultBoolean = true) final boolean ipv4Only, 097 @PluginAttribute(value = "linger", defaultLong = -1) final long linger, 098 @PluginAttribute(value = "maxMsgSize", defaultLong = -1) final long maxMsgSize, 099 @PluginAttribute(value = "rcvHwm", defaultLong = DEFAULT_RCV_HWM) final long rcvHwm, 100 @PluginAttribute(value = "receiveBufferSize", defaultLong = 0) final long receiveBufferSize, 101 @PluginAttribute(value = "receiveTimeOut", defaultLong = -1) final int receiveTimeOut, 102 @PluginAttribute(value = "reconnectIVL", defaultLong = DEFAULT_IVL) final long reconnectIVL, 103 @PluginAttribute(value = "reconnectIVLMax", defaultLong = 0) final long reconnectIVLMax, 104 @PluginAttribute(value = "sendBufferSize", defaultLong = 0) final long sendBufferSize, 105 @PluginAttribute(value = "sendTimeOut", defaultLong = -1) final int sendTimeOut, 106 @PluginAttribute(value = "sndHwm", defaultLong = DEFAULT_SND_HWM) final long sndHwm, 107 @PluginAttribute(value = "tcpKeepAlive", defaultInt = -1) final int tcpKeepAlive, 108 @PluginAttribute(value = "tcpKeepAliveCount", defaultLong = -1) final long tcpKeepAliveCount, 109 @PluginAttribute(value = "tcpKeepAliveIdle", defaultLong = -1) final long tcpKeepAliveIdle, 110 @PluginAttribute(value = "tcpKeepAliveInterval", defaultLong = -1) final long tcpKeepAliveInterval, 111 @PluginAttribute(value = "xpubVerbose", defaultBoolean = false) final boolean xpubVerbose 112 // @formatter:on 113 ) { 114 if (layout == null) { 115 layout = PatternLayout.createDefaultLayout(); 116 } 117 List<String> endpoints; 118 if (properties == null) { 119 endpoints = new ArrayList<>(0); 120 } else { 121 endpoints = new ArrayList<>(properties.length); 122 for (final Property property : properties) { 123 if ("endpoint".equalsIgnoreCase(property.getName())) { 124 final String value = property.getValue(); 125 if (Strings.isNotEmpty(value)) { 126 endpoints.add(value); 127 } 128 } 129 } 130 } 131 LOGGER.debug("Creating JeroMqAppender with name={}, filter={}, layout={}, ignoreExceptions={}, endpoints={}", 132 name, filter, layout, ignoreExceptions, endpoints); 133 return new JeroMqAppender(name, filter, layout, ignoreExceptions, endpoints, affinity, backlog, 134 delayAttachOnConnect, identity, ipv4Only, linger, maxMsgSize, rcvHwm, receiveBufferSize, 135 receiveTimeOut, reconnectIVL, reconnectIVLMax, sendBufferSize, sendTimeOut, sndHwm, tcpKeepAlive, 136 tcpKeepAliveCount, tcpKeepAliveIdle, tcpKeepAliveInterval, xpubVerbose); 137 } 138 139 @Override 140 public synchronized void append(final LogEvent event) { 141 final Layout<? extends Serializable> layout = getLayout(); 142 final byte[] formattedMessage = layout.toByteArray(event); 143 if (manager.send(getLayout().toByteArray(event))) { 144 sendRcTrue++; 145 } else { 146 sendRcFalse++; 147 LOGGER.error("Appender {} could not send message {} to JeroMQ {}", getName(), sendRcFalse, formattedMessage); 148 } 149 } 150 151 @Override 152 public void stop() { 153 manager.release(); 154 super.stop(); 155 } 156 157 // not public, handy for testing 158 int getSendRcFalse() { 159 return sendRcFalse; 160 } 161 162 // not public, handy for testing 163 int getSendRcTrue() { 164 return sendRcTrue; 165 } 166 167 // not public, handy for testing 168 void resetSendRcs() { 169 sendRcTrue = sendRcFalse = 0; 170 } 171 172 @Override 173 public String toString() { 174 return "JeroMqAppender{" + 175 "name=" + getName() + 176 ", state=" + getState() + 177 ", manager=" + manager + 178 ", endpoints=" + endpoints + 179 '}'; 180 } 181}