001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017package org.apache.logging.log4j.core.net; 018 019import java.io.ByteArrayOutputStream; 020import java.io.IOException; 021import java.io.OutputStream; 022import java.io.Serializable; 023import java.net.ConnectException; 024import java.net.InetAddress; 025import java.net.InetSocketAddress; 026import java.net.Socket; 027import java.net.UnknownHostException; 028import java.util.HashMap; 029import java.util.Map; 030import java.util.concurrent.CountDownLatch; 031 032import org.apache.logging.log4j.core.Layout; 033import org.apache.logging.log4j.core.appender.AppenderLoggingException; 034import org.apache.logging.log4j.core.appender.ManagerFactory; 035import org.apache.logging.log4j.core.appender.OutputStreamManager; 036import org.apache.logging.log4j.util.Strings; 037 038/** 039 * Manager of TCP Socket connections. 040 */ 041public class TcpSocketManager extends AbstractSocketManager { 042 /** 043 The default reconnection delay (30000 milliseconds or 30 seconds). 044 */ 045 public static final int DEFAULT_RECONNECTION_DELAY_MILLIS = 30000; 046 /** 047 The default port number of remote logging server (4560). 048 */ 049 private static final int DEFAULT_PORT = 4560; 050 051 private static final TcpSocketManagerFactory FACTORY = new TcpSocketManagerFactory(); 052 053 private final int reconnectionDelay; 054 055 private Reconnector connector; 056 057 private Socket socket; 058 059 private final boolean retry; 060 061 private final boolean immediateFail; 062 063 private final int connectTimeoutMillis; 064 065 /** 066 * The Constructor. 067 * @param name The unique name of this connection. 068 * @param os The OutputStream. 069 * @param sock The Socket. 070 * @param inetAddress The Internet address of the host. 071 * @param host The name of the host. 072 * @param port The port number on the host. 073 * @param connectTimeoutMillis the connect timeout in milliseconds. 074 * @param delay Reconnection interval. 075 * @param immediateFail 076 * @param layout The Layout. 077 */ 078 public TcpSocketManager(final String name, final OutputStream os, final Socket sock, final InetAddress inetAddress, 079 final String host, final int port, final int connectTimeoutMillis, final int delay, 080 final boolean immediateFail, final Layout<? extends Serializable> layout) { 081 super(name, os, inetAddress, host, port, layout, true); 082 this.connectTimeoutMillis = connectTimeoutMillis; 083 this.reconnectionDelay = delay; 084 this.socket = sock; 085 this.immediateFail = immediateFail; 086 retry = delay > 0; 087 if (sock == null) { 088 connector = new Reconnector(this); 089 connector.setDaemon(true); 090 connector.setPriority(Thread.MIN_PRIORITY); 091 connector.start(); 092 } 093 } 094 095 /** 096 * Obtain a TcpSocketManager. 097 * @param host The host to connect to. 098 * @param port The port on the host. 099 * @param connectTimeoutMillis the connect timeout in milliseconds 100 * @param delayMillis The interval to pause between retries. 101 * @return A TcpSocketManager. 102 */ 103 public static TcpSocketManager getSocketManager(final String host, int port, final int connectTimeoutMillis, 104 int delayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout) { 105 if (Strings.isEmpty(host)) { 106 throw new IllegalArgumentException("A host name is required"); 107 } 108 if (port <= 0) { 109 port = DEFAULT_PORT; 110 } 111 if (delayMillis == 0) { 112 delayMillis = DEFAULT_RECONNECTION_DELAY_MILLIS; 113 } 114 return (TcpSocketManager) getManager("TCP:" + host + ':' + port, new FactoryData(host, port, 115 connectTimeoutMillis, delayMillis, immediateFail, layout), FACTORY); 116 } 117 118 @Override 119 protected void write(final byte[] bytes, final int offset, final int length, boolean immediateFlush) { 120 if (socket == null) { 121 if (connector != null && !immediateFail) { 122 connector.latch(); 123 } 124 if (socket == null) { 125 final String msg = "Error writing to " + getName() + " socket not available"; 126 throw new AppenderLoggingException(msg); 127 } 128 } 129 synchronized (this) { 130 try { 131 final OutputStream outputStream = getOutputStream(); 132 outputStream.write(bytes, offset, length); 133 if (immediateFlush) { 134 outputStream.flush(); 135 } 136 } catch (final IOException ex) { 137 if (retry && connector == null) { 138 connector = new Reconnector(this); 139 connector.setDaemon(true); 140 connector.setPriority(Thread.MIN_PRIORITY); 141 connector.start(); 142 } 143 final String msg = "Error writing to " + getName(); 144 throw new AppenderLoggingException(msg, ex); 145 } 146 } 147 } 148 149 @Override 150 protected synchronized void close() { 151 super.close(); 152 if (connector != null) { 153 connector.shutdown(); 154 connector.interrupt(); 155 connector = null; 156 } 157 } 158 159 public int getConnectTimeoutMillis() { 160 return connectTimeoutMillis; 161 } 162 163 /** 164 * Gets this TcpSocketManager's content format. Specified by: 165 * <ul> 166 * <li>Key: "protocol" Value: "tcp"</li> 167 * <li>Key: "direction" Value: "out"</li> 168 * </ul> 169 * 170 * @return Map of content format keys supporting TcpSocketManager 171 */ 172 @Override 173 public Map<String, String> getContentFormat() { 174 final Map<String, String> result = new HashMap<>(super.getContentFormat()); 175 result.put("protocol", "tcp"); 176 result.put("direction", "out"); 177 return result; 178 } 179 180 /** 181 * Handles reconnecting to a Thread. 182 */ 183 private class Reconnector extends Thread { 184 185 private final CountDownLatch latch = new CountDownLatch(1); 186 187 private boolean shutdown = false; 188 189 private final Object owner; 190 191 public Reconnector(final OutputStreamManager owner) { 192 this.owner = owner; 193 } 194 195 public void latch() { 196 try { 197 latch.await(); 198 } catch (final InterruptedException ex) { 199 // Ignore the exception. 200 } 201 } 202 203 public void shutdown() { 204 shutdown = true; 205 } 206 207 @Override 208 public void run() { 209 while (!shutdown) { 210 try { 211 sleep(reconnectionDelay); 212 final Socket sock = createSocket(inetAddress, port); 213 final OutputStream newOS = sock.getOutputStream(); 214 synchronized (owner) { 215 try { 216 getOutputStream().close(); 217 } catch (final IOException ioe) { 218 // Ignore this. 219 } 220 221 setOutputStream(newOS); 222 socket = sock; 223 connector = null; 224 shutdown = true; 225 } 226 LOGGER.debug("Connection to " + host + ':' + port + " reestablished."); 227 } catch (final InterruptedException ie) { 228 LOGGER.debug("Reconnection interrupted."); 229 } catch (final ConnectException ex) { 230 LOGGER.debug(host + ':' + port + " refused connection"); 231 } catch (final IOException ioe) { 232 LOGGER.debug("Unable to reconnect to " + host + ':' + port); 233 } finally { 234 latch.countDown(); 235 } 236 } 237 } 238 } 239 240 protected Socket createSocket(final InetAddress host, final int port) throws IOException { 241 return createSocket(host.getHostName(), port); 242 } 243 244 protected Socket createSocket(final String host, final int port) throws IOException { 245 final InetSocketAddress address = new InetSocketAddress(host, port); 246 final Socket newSocket = new Socket(); 247 newSocket.connect(address, connectTimeoutMillis); 248 return newSocket; 249 } 250 251 /** 252 * Data for the factory. 253 */ 254 private static class FactoryData { 255 private final String host; 256 private final int port; 257 private final int connectTimeoutMillis; 258 private final int delayMillis; 259 private final boolean immediateFail; 260 private final Layout<? extends Serializable> layout; 261 262 public FactoryData(final String host, final int port, final int connectTimeoutMillis, final int delayMillis, 263 final boolean immediateFail, final Layout<? extends Serializable> layout) { 264 this.host = host; 265 this.port = port; 266 this.connectTimeoutMillis = connectTimeoutMillis; 267 this.delayMillis = delayMillis; 268 this.immediateFail = immediateFail; 269 this.layout = layout; 270 } 271 } 272 273 /** 274 * Factory to create a TcpSocketManager. 275 */ 276 protected static class TcpSocketManagerFactory implements ManagerFactory<TcpSocketManager, FactoryData> { 277 @Override 278 public TcpSocketManager createManager(final String name, final FactoryData data) { 279 280 InetAddress inetAddress; 281 OutputStream os; 282 try { 283 inetAddress = InetAddress.getByName(data.host); 284 } catch (final UnknownHostException ex) { 285 LOGGER.error("Could not find address of " + data.host, ex, ex); 286 return null; 287 } 288 try { 289 // LOG4J2-1042 290 final Socket socket = new Socket(); 291 socket.connect(new InetSocketAddress(data.host, data.port), data.connectTimeoutMillis); 292 os = socket.getOutputStream(); 293 return new TcpSocketManager(name, os, socket, inetAddress, data.host, data.port, 294 data.connectTimeoutMillis, data.delayMillis, data.immediateFail, data.layout); 295 } catch (final IOException ex) { 296 LOGGER.error("TcpSocketManager (" + name + ") " + ex, ex); 297 os = new ByteArrayOutputStream(); 298 } 299 if (data.delayMillis == 0) { 300 return null; 301 } 302 return new TcpSocketManager(name, os, null, inetAddress, data.host, data.port, data.connectTimeoutMillis, 303 data.delayMillis, data.immediateFail, data.layout); 304 } 305 } 306 307}