001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017 package org.apache.logging.log4j.core.net; 018 019 import java.util.HashMap; 020 import java.util.Map; 021 import org.apache.logging.log4j.core.appender.AppenderRuntimeException; 022 import org.apache.logging.log4j.core.appender.ManagerFactory; 023 import org.apache.logging.log4j.core.appender.OutputStreamManager; 024 025 import java.io.ByteArrayOutputStream; 026 import java.io.IOException; 027 import java.io.OutputStream; 028 import java.net.ConnectException; 029 import java.net.InetAddress; 030 import java.net.Socket; 031 import java.net.UnknownHostException; 032 import java.util.concurrent.CountDownLatch; 033 034 /** 035 * Manager of TCP Socket connections. 036 */ 037 public class TCPSocketManager extends AbstractSocketManager { 038 /** 039 The default reconnection delay (30000 milliseconds or 30 seconds). 040 */ 041 public static final int DEFAULT_RECONNECTION_DELAY = 30000; 042 /** 043 The default port number of remote logging server (4560). 044 */ 045 private static final int DEFAULT_PORT = 4560; 046 047 private static final TCPSocketManagerFactory FACTORY = new TCPSocketManagerFactory(); 048 049 private final int reconnectionDelay; 050 051 private Reconnector connector = null; 052 053 private Socket socket; 054 055 private final boolean retry; 056 057 private final boolean immediateFail; 058 059 /** 060 * The Constructor. 061 * @param name The unique name of this connection. 062 * @param os The OutputStream. 063 * @param sock The Socket. 064 * @param addr The internet address of the host. 065 * @param host The name of the host. 066 * @param port The port number on the host. 067 * @param delay Reconnection interval. 068 */ 069 public TCPSocketManager(final String name, final OutputStream os, final Socket sock, final InetAddress addr, 070 final String host, final int port, final int delay, final boolean immediateFail) { 071 super(name, os, addr, host, port); 072 this.reconnectionDelay = delay; 073 this.socket = sock; 074 this.immediateFail = immediateFail; 075 retry = delay > 0; 076 if (sock == null) { 077 connector = new Reconnector(this); 078 connector.setDaemon(true); 079 connector.setPriority(Thread.MIN_PRIORITY); 080 connector.start(); 081 } 082 } 083 084 /** 085 * Obtain a TCPSocketManager. 086 * @param host The host to connect to. 087 * @param port The port on the host. 088 * @param delay The interval to pause between retries. 089 * @return A TCPSocketManager. 090 */ 091 public static TCPSocketManager getSocketManager(final String host, int port, int delay, boolean immediateFail) { 092 if (host == null || host.length() == 0) { 093 throw new IllegalArgumentException("A host name is required"); 094 } 095 if (port <= 0) { 096 port = DEFAULT_PORT; 097 } 098 if (delay == 0) { 099 delay = DEFAULT_RECONNECTION_DELAY; 100 } 101 return (TCPSocketManager) getManager("TCP:" + host + ":" + port, 102 new FactoryData(host, port, delay, immediateFail), FACTORY); 103 } 104 105 @Override 106 protected void write(final byte[] bytes, final int offset, final int length) { 107 if (socket == null) { 108 if (connector != null && !immediateFail) { 109 connector.latch(); 110 } 111 if (socket == null) { 112 final String msg = "Error writing to " + getName() + " socket not available"; 113 throw new AppenderRuntimeException(msg); 114 } 115 } 116 synchronized (this) { 117 try { 118 getOutputStream().write(bytes, offset, length); 119 } catch (final IOException ex) { 120 if (retry && connector == null) { 121 connector = new Reconnector(this); 122 connector.setDaemon(true); 123 connector.setPriority(Thread.MIN_PRIORITY); 124 connector.start(); 125 } 126 final String msg = "Error writing to " + getName(); 127 throw new AppenderRuntimeException(msg, ex); 128 } 129 } 130 } 131 132 @Override 133 protected synchronized void close() { 134 super.close(); 135 if (connector != null) { 136 connector.shutdown(); 137 connector.interrupt(); 138 connector = null; 139 } 140 } 141 142 /** 143 * TCPSocketManager's content format is specified by:<p/> 144 * Key: "protocol" Value: "tcp"<p/> 145 * Key: "direction" Value: "out" 146 * @return Map of content format keys supporting TCPSocketManager 147 */ 148 public Map<String, String> getContentFormat() 149 { 150 Map<String, String> result = new HashMap<String, String>(super.getContentFormat()); 151 result.put("protocol", "tcp"); 152 result.put("direction", "out"); 153 return result; 154 } 155 156 /** 157 * Handles reconnecting to a Thread. 158 */ 159 private class Reconnector extends Thread { 160 161 private CountDownLatch latch = new CountDownLatch(1); 162 163 private boolean shutdown = false; 164 165 private final Object owner; 166 167 public Reconnector(final OutputStreamManager owner) { 168 this.owner = owner; 169 } 170 171 public void latch() { 172 try { 173 latch.await(); 174 } catch (final InterruptedException ex) { 175 // Ignore the exception. 176 } 177 } 178 179 public void shutdown() { 180 shutdown = true; 181 } 182 183 @Override 184 public void run() { 185 while (!shutdown) { 186 try { 187 sleep(reconnectionDelay); 188 final Socket sock = new Socket(address, port); 189 final OutputStream newOS = sock.getOutputStream(); 190 synchronized (owner) { 191 try { 192 getOutputStream().close(); 193 } catch (final IOException ioe) { 194 // Ignore this. 195 } 196 197 setOutputStream(newOS); 198 socket = sock; 199 connector = null; 200 shutdown = true; 201 } 202 LOGGER.debug("Connection to " + host + ":" + port + " reestablished."); 203 } catch (final InterruptedException ie) { 204 LOGGER.debug("Reconnection interrupted."); 205 } catch (final ConnectException ex) { 206 LOGGER.debug(host + ":" + port + " refused connection"); 207 } catch (final IOException ioe) { 208 LOGGER.debug("Unable to reconnect to " + host + ":" + port); 209 } finally { 210 latch.countDown(); 211 } 212 } 213 } 214 } 215 216 /** 217 * Data for the factory. 218 */ 219 private static class FactoryData { 220 private final String host; 221 private final int port; 222 private final int delay; 223 private final boolean immediateFail; 224 225 public FactoryData(final String host, final int port, final int delay, final boolean immediateFail) { 226 this.host = host; 227 this.port = port; 228 this.delay = delay; 229 this.immediateFail = immediateFail; 230 } 231 } 232 233 /** 234 * Factory to create a TCPSocketManager. 235 */ 236 private static class TCPSocketManagerFactory implements ManagerFactory<TCPSocketManager, FactoryData> { 237 238 public TCPSocketManager createManager(final String name, final FactoryData data) { 239 240 InetAddress address; 241 OutputStream os; 242 try { 243 address = InetAddress.getByName(data.host); 244 } catch (final UnknownHostException ex) { 245 LOGGER.error("Could not find address of " + data.host, ex); 246 return null; 247 } 248 try { 249 final Socket socket = new Socket(data.host, data.port); 250 os = socket.getOutputStream(); 251 return new TCPSocketManager(name, os, socket, address, data.host, data.port, data.delay, 252 data.immediateFail); 253 } catch (final IOException ex) { 254 LOGGER.error("TCPSocketManager (" + name + ") " + ex); 255 os = new ByteArrayOutputStream(); 256 } 257 if (data.delay == 0) { 258 return null; 259 } 260 return new TCPSocketManager(name, os, null, address, data.host, data.port, data.delay, data.immediateFail); 261 } 262 } 263 }