001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017 package org.apache.logging.log4j.core.net; 018 019 import org.apache.logging.log4j.core.appender.AppenderRuntimeException; 020 import org.apache.logging.log4j.core.appender.ManagerFactory; 021 import org.apache.logging.log4j.core.appender.OutputStreamManager; 022 023 import java.io.ByteArrayOutputStream; 024 import java.io.IOException; 025 import java.io.OutputStream; 026 import java.net.ConnectException; 027 import java.net.InetAddress; 028 import java.net.Socket; 029 import java.net.UnknownHostException; 030 import java.util.concurrent.CountDownLatch; 031 032 /** 033 * Manager of TCP Socket connections. 034 */ 035 public class TCPSocketManager extends AbstractSocketManager { 036 /** 037 The default reconnection delay (30000 milliseconds or 30 seconds). 038 */ 039 public static final int DEFAULT_RECONNECTION_DELAY = 30000; 040 /** 041 The default port number of remote logging server (4560). 042 */ 043 private static final int DEFAULT_PORT = 4560; 044 045 private static final TCPSocketManagerFactory FACTORY = new TCPSocketManagerFactory(); 046 047 private final int reconnectionDelay; 048 049 private Reconnector connector = null; 050 051 private Socket socket; 052 053 private final boolean retry; 054 055 /** 056 * The Constructor. 057 * @param name The unique name of this connection. 058 * @param os The OutputStream. 059 * @param sock The Socket. 060 * @param addr The internet address of the host. 061 * @param host The name of the host. 062 * @param port The port number on the host. 063 * @param delay Reconnection interval. 064 */ 065 public TCPSocketManager(final String name, final OutputStream os, final Socket sock, final InetAddress addr, 066 final String host, final int port, final int delay) { 067 super(name, os, addr, host, port); 068 this.reconnectionDelay = delay; 069 this.socket = sock; 070 retry = delay > 0; 071 if (sock == null) { 072 connector = new Reconnector(this); 073 connector.setDaemon(true); 074 connector.setPriority(Thread.MIN_PRIORITY); 075 connector.start(); 076 } 077 } 078 079 /** 080 * Obtain a TCPSocketManager. 081 * @param host The host to connect to. 082 * @param port The port on the host. 083 * @param delay The interval to pause between retries. 084 * @return A TCPSocketManager. 085 */ 086 public static TCPSocketManager getSocketManager(final String host, int port, int delay) { 087 if (host == null || host.length() == 0) { 088 throw new IllegalArgumentException("A host name is required"); 089 } 090 if (port <= 0) { 091 port = DEFAULT_PORT; 092 } 093 if (delay == 0) { 094 delay = DEFAULT_RECONNECTION_DELAY; 095 } 096 return (TCPSocketManager) getManager("TCP:" + host + ":" + port, new FactoryData(host, port, delay), FACTORY); 097 } 098 099 @Override 100 protected synchronized void write(final byte[] bytes, final int offset, final int length) { 101 if (socket == null) { 102 if (connector != null) { 103 connector.latch(); 104 } 105 if (socket == null) { 106 final String msg = "Error writing to " + getName() + " socket not available"; 107 throw new AppenderRuntimeException(msg); 108 } 109 } 110 try { 111 getOutputStream().write(bytes, offset, length); 112 } catch (final IOException ex) { 113 if (retry && connector == null) { 114 connector = new Reconnector(this); 115 connector.setDaemon(true); 116 connector.setPriority(Thread.MIN_PRIORITY); 117 connector.start(); 118 } 119 final String msg = "Error writing to " + getName(); 120 throw new AppenderRuntimeException(msg, ex); 121 } 122 } 123 124 @Override 125 protected synchronized void close() { 126 super.close(); 127 if (connector != null) { 128 connector.shutdown(); 129 connector.interrupt(); 130 connector = null; 131 } 132 } 133 134 /** 135 * Handles recoonecting to a Thread. 136 */ 137 private class Reconnector extends Thread { 138 139 private CountDownLatch latch = new CountDownLatch(1); 140 141 private boolean shutdown = false; 142 143 private final Object owner; 144 145 public Reconnector(final OutputStreamManager owner) { 146 this.owner = owner; 147 } 148 149 public void latch() { 150 try { 151 latch.await(); 152 } catch (final InterruptedException ex) { 153 // Ignore the exception. 154 } 155 } 156 157 public void shutdown() { 158 shutdown = true; 159 } 160 161 @Override 162 public void run() { 163 while (!shutdown) { 164 try { 165 sleep(reconnectionDelay); 166 final Socket sock = new Socket(address, port); 167 final OutputStream newOS = sock.getOutputStream(); 168 synchronized (owner) { 169 try { 170 getOutputStream().close(); 171 } catch (final IOException ioe) { 172 // Ignore this. 173 } 174 175 setOutputStream(newOS); 176 socket = sock; 177 connector = null; 178 shutdown = true; 179 } 180 LOGGER.debug("Connection to " + host + ":" + port + " reestablished."); 181 } catch (final InterruptedException ie) { 182 LOGGER.debug("Reconnection interrupted."); 183 } catch (final ConnectException ex) { 184 LOGGER.debug(host + ":" + port + " refused connection"); 185 } catch (final IOException ioe) { 186 LOGGER.debug("Unable to reconnect to " + host + ":" + port); 187 } finally { 188 latch.countDown(); 189 } 190 } 191 } 192 } 193 194 /** 195 * Data for the factory. 196 */ 197 private static class FactoryData { 198 private final String host; 199 private final int port; 200 private final int delay; 201 202 public FactoryData(final String host, final int port, final int delay) { 203 this.host = host; 204 this.port = port; 205 this.delay = delay; 206 } 207 } 208 209 /** 210 * Factory to create a TCPSocketManager. 211 */ 212 private static class TCPSocketManagerFactory implements ManagerFactory<TCPSocketManager, FactoryData> { 213 214 public TCPSocketManager createManager(final String name, final FactoryData data) { 215 216 InetAddress address; 217 OutputStream os = null; 218 try { 219 address = InetAddress.getByName(data.host); 220 } catch (final UnknownHostException ex) { 221 LOGGER.error("Could not find address of " + data.host, ex); 222 return null; 223 } 224 try { 225 final Socket socket = new Socket(data.host, data.port); 226 os = socket.getOutputStream(); 227 return new TCPSocketManager(name, os, socket, address, data.host, data.port, data.delay); 228 } catch (final IOException ex) { 229 LOGGER.error("TCPSocketManager (" + name + ") " + ex); 230 os = new ByteArrayOutputStream(); 231 } 232 if (data.delay == 0) { 233 return null; 234 } 235 return new TCPSocketManager(name, os, null, address, data.host, data.port, data.delay); 236 } 237 } 238 }