001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017 package org.apache.logging.log4j.core.net; 018 019 import org.apache.logging.log4j.core.appender.AppenderRuntimeException; 020 import org.apache.logging.log4j.core.appender.ManagerFactory; 021 import org.apache.logging.log4j.core.appender.OutputStreamManager; 022 023 import java.io.IOException; 024 import java.io.OutputStream; 025 import java.net.ConnectException; 026 import java.net.InetAddress; 027 import java.net.Socket; 028 import java.net.UnknownHostException; 029 030 /** 031 * Manager of TCP Socket connections. 032 */ 033 public class TCPSocketManager extends AbstractSocketManager { 034 /** 035 The default reconnection delay (30000 milliseconds or 30 seconds). 036 */ 037 public static final int DEFAULT_RECONNECTION_DELAY = 30000; 038 /** 039 The default port number of remote logging server (4560). 040 */ 041 private static final int DEFAULT_PORT = 4560; 042 043 private static final TCPSocketManagerFactory factory = new TCPSocketManagerFactory(); 044 045 private final int reconnectionDelay; 046 047 private Reconnector connector = null; 048 049 private Socket socket; 050 051 private final boolean retry; 052 053 /** 054 * The Constructor. 055 * @param name The unique name of this connection. 056 * @param os The OutputStream. 057 * @param sock The Socket. 058 * @param addr The internet address of the host. 059 * @param host The name of the host. 060 * @param port The port number on the host. 061 * @param delay Reconnection interval. 062 */ 063 public TCPSocketManager(String name, OutputStream os, Socket sock, InetAddress addr, String host, int port, 064 int delay) { 065 super(name, os, addr, host, port); 066 this.reconnectionDelay = delay; 067 this.socket = sock; 068 retry = delay > 0; 069 } 070 071 /** 072 * Obtain a TCPSocketManager. 073 * @param host The host to connect to. 074 * @param port The port on the host. 075 * @param delay The interval to pause between retries. 076 * @return A TCPSocketManager. 077 */ 078 public static TCPSocketManager getSocketManager(String host, int port, int delay) { 079 if (host == null || host.length() == 0) { 080 throw new IllegalArgumentException("A host name is required"); 081 } 082 if (port <= 0) { 083 port = DEFAULT_PORT; 084 } 085 if (delay == 0) { 086 delay = DEFAULT_RECONNECTION_DELAY; 087 } 088 return (TCPSocketManager) getManager("TCP:" + host + ":" + port, new FactoryData(host, port, delay), factory); 089 } 090 091 @Override 092 protected synchronized void write(byte[] bytes, int offset, int length) { 093 try { 094 getOutputStream().write(bytes, offset, length); 095 socket.setSendBufferSize(length); 096 } catch (IOException ex) { 097 if (retry && connector == null) { 098 connector = new Reconnector(this); 099 connector.setDaemon(true); 100 connector.setPriority(Thread.MIN_PRIORITY); 101 connector.start(); 102 } 103 String msg = "Error writing to " + getName(); 104 throw new AppenderRuntimeException(msg, ex); 105 } 106 } 107 108 @Override 109 protected synchronized void close() { 110 super.close(); 111 if (connector != null) { 112 connector.shutdown(); 113 connector.interrupt(); 114 connector = null; 115 } 116 } 117 118 /** 119 * Handles recoonecting to a Thread. 120 */ 121 private class Reconnector extends Thread { 122 123 private boolean shutdown = false; 124 125 private final Object owner; 126 127 public Reconnector(OutputStreamManager owner) { 128 this.owner = owner; 129 } 130 131 public void shutdown() { 132 shutdown = true; 133 } 134 135 @Override 136 public void run() { 137 while (!shutdown) { 138 try { 139 sleep(reconnectionDelay); 140 Socket sock = new Socket(address, port); 141 OutputStream newOS = sock.getOutputStream(); 142 synchronized (owner) { 143 try { 144 getOutputStream().close(); 145 } catch (IOException ioe) { 146 // Ignore this. 147 } 148 149 setOutputStream(newOS); 150 socket = sock; 151 connector = null; 152 } 153 LOGGER.debug("Connection to " + host + ":" + port + " reestablished."); 154 } catch (InterruptedException ie) { 155 LOGGER.debug("Reconnection interrupted."); 156 } catch (ConnectException ex) { 157 LOGGER.debug(host + ":" + port + " refused connection"); 158 } catch (IOException ioe) { 159 LOGGER.debug("Unable to reconnect to " + host + ":" + port); 160 } 161 } 162 } 163 } 164 165 /** 166 * Data for the factory. 167 */ 168 private static class FactoryData { 169 private String host; 170 private int port; 171 private int delay; 172 173 public FactoryData(String host, int port, int delay) { 174 this.host = host; 175 this.port = port; 176 this.delay = delay; 177 } 178 } 179 180 /** 181 * Factory to create a TCPSocketManager. 182 */ 183 private static class TCPSocketManagerFactory implements ManagerFactory<TCPSocketManager, FactoryData> { 184 185 public TCPSocketManager createManager(String name, FactoryData data) { 186 try { 187 InetAddress address = InetAddress.getByName(data.host); 188 Socket socket = new Socket(data.host, data.port); 189 OutputStream os = socket.getOutputStream(); 190 return new TCPSocketManager(name, os, socket, address, data.host, data.port, data.delay); 191 } catch (UnknownHostException ex) { 192 LOGGER.error("Could not find address of " + data.host, ex); 193 } catch (IOException ex) { 194 LOGGER.error("TCPSocketManager (" + name + ") " + ex); 195 } 196 return null; 197 } 198 } 199 }