001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017 package org.apache.logging.log4j.core.net; 018 019 import java.io.ByteArrayOutputStream; 020 import java.io.IOException; 021 import java.io.OutputStream; 022 import java.io.Serializable; 023 import java.net.ConnectException; 024 import java.net.InetAddress; 025 import java.net.Socket; 026 import java.net.UnknownHostException; 027 import java.util.HashMap; 028 import java.util.Map; 029 import java.util.concurrent.CountDownLatch; 030 031 import org.apache.logging.log4j.core.Layout; 032 import org.apache.logging.log4j.core.appender.AppenderLoggingException; 033 import org.apache.logging.log4j.core.appender.ManagerFactory; 034 import org.apache.logging.log4j.core.appender.OutputStreamManager; 035 import org.apache.logging.log4j.util.Strings; 036 037 /** 038 * Manager of TCP Socket connections. 039 */ 040 public class TcpSocketManager extends AbstractSocketManager { 041 /** 042 The default reconnection delay (30000 milliseconds or 30 seconds). 043 */ 044 public static final int DEFAULT_RECONNECTION_DELAY = 30000; 045 /** 046 The default port number of remote logging server (4560). 047 */ 048 private static final int DEFAULT_PORT = 4560; 049 050 private static final TcpSocketManagerFactory FACTORY = new TcpSocketManagerFactory(); 051 052 private final int reconnectionDelay; 053 054 private Reconnector connector = null; 055 056 private Socket socket; 057 058 private final boolean retry; 059 060 private final boolean immediateFail; 061 062 /** 063 * The Constructor. 064 * @param name The unique name of this connection. 065 * @param os The OutputStream. 066 * @param sock The Socket. 067 * @param inetAddress The internet address of the host. 068 * @param host The name of the host. 069 * @param port The port number on the host. 070 * @param delay Reconnection interval. 071 * @param immediateFail 072 * @param layout The Layout. 073 */ 074 public TcpSocketManager(final String name, final OutputStream os, final Socket sock, final InetAddress inetAddress, 075 final String host, final int port, final int delay, final boolean immediateFail, 076 final Layout<? extends Serializable> layout) { 077 super(name, os, inetAddress, host, port, layout); 078 this.reconnectionDelay = delay; 079 this.socket = sock; 080 this.immediateFail = immediateFail; 081 retry = delay > 0; 082 if (sock == null) { 083 connector = new Reconnector(this); 084 connector.setDaemon(true); 085 connector.setPriority(Thread.MIN_PRIORITY); 086 connector.start(); 087 } 088 } 089 090 /** 091 * Obtain a TcpSocketManager. 092 * @param host The host to connect to. 093 * @param port The port on the host. 094 * @param delay The interval to pause between retries. 095 * @return A TcpSocketManager. 096 */ 097 public static TcpSocketManager getSocketManager(final String host, int port, int delay, 098 final boolean immediateFail, final Layout<? extends Serializable> layout ) { 099 if (Strings.isEmpty(host)) { 100 throw new IllegalArgumentException("A host name is required"); 101 } 102 if (port <= 0) { 103 port = DEFAULT_PORT; 104 } 105 if (delay == 0) { 106 delay = DEFAULT_RECONNECTION_DELAY; 107 } 108 return (TcpSocketManager) getManager("TCP:" + host + ':' + port, 109 new FactoryData(host, port, delay, immediateFail, layout), FACTORY); 110 } 111 112 @Override 113 protected void write(final byte[] bytes, final int offset, final int length) { 114 if (socket == null) { 115 if (connector != null && !immediateFail) { 116 connector.latch(); 117 } 118 if (socket == null) { 119 final String msg = "Error writing to " + getName() + " socket not available"; 120 throw new AppenderLoggingException(msg); 121 } 122 } 123 synchronized (this) { 124 try { 125 getOutputStream().write(bytes, offset, length); 126 } catch (final IOException ex) { 127 if (retry && connector == null) { 128 connector = new Reconnector(this); 129 connector.setDaemon(true); 130 connector.setPriority(Thread.MIN_PRIORITY); 131 connector.start(); 132 } 133 final String msg = "Error writing to " + getName(); 134 throw new AppenderLoggingException(msg, ex); 135 } 136 } 137 } 138 139 @Override 140 protected synchronized void close() { 141 super.close(); 142 if (connector != null) { 143 connector.shutdown(); 144 connector.interrupt(); 145 connector = null; 146 } 147 } 148 149 /** 150 * TcpSocketManager's content format is specified by:<p/> 151 * Key: "protocol" Value: "tcp"<p/> 152 * Key: "direction" Value: "out" 153 * @return Map of content format keys supporting TcpSocketManager 154 */ 155 @Override 156 public Map<String, String> getContentFormat() 157 { 158 final Map<String, String> result = new HashMap<String, String>(super.getContentFormat()); 159 result.put("protocol", "tcp"); 160 result.put("direction", "out"); 161 return result; 162 } 163 164 /** 165 * Handles reconnecting to a Thread. 166 */ 167 private class Reconnector extends Thread { 168 169 private final CountDownLatch latch = new CountDownLatch(1); 170 171 private boolean shutdown = false; 172 173 private final Object owner; 174 175 public Reconnector(final OutputStreamManager owner) { 176 this.owner = owner; 177 } 178 179 public void latch() { 180 try { 181 latch.await(); 182 } catch (final InterruptedException ex) { 183 // Ignore the exception. 184 } 185 } 186 187 public void shutdown() { 188 shutdown = true; 189 } 190 191 @Override 192 public void run() { 193 while (!shutdown) { 194 try { 195 sleep(reconnectionDelay); 196 final Socket sock = createSocket(inetAddress, port); 197 final OutputStream newOS = sock.getOutputStream(); 198 synchronized (owner) { 199 try { 200 getOutputStream().close(); 201 } catch (final IOException ioe) { 202 // Ignore this. 203 } 204 205 setOutputStream(newOS); 206 socket = sock; 207 connector = null; 208 shutdown = true; 209 } 210 LOGGER.debug("Connection to " + host + ':' + port + " reestablished."); 211 } catch (final InterruptedException ie) { 212 LOGGER.debug("Reconnection interrupted."); 213 } catch (final ConnectException ex) { 214 LOGGER.debug(host + ':' + port + " refused connection"); 215 } catch (final IOException ioe) { 216 LOGGER.debug("Unable to reconnect to " + host + ':' + port); 217 } finally { 218 latch.countDown(); 219 } 220 } 221 } 222 } 223 224 protected Socket createSocket(final InetAddress host, final int port) throws IOException { 225 return createSocket(host.getHostName(), port); 226 } 227 228 protected Socket createSocket(final String host, final int port) throws IOException { 229 return new Socket(host, port); 230 } 231 232 /** 233 * Data for the factory. 234 */ 235 private static class FactoryData { 236 private final String host; 237 private final int port; 238 private final int delay; 239 private final boolean immediateFail; 240 private final Layout<? extends Serializable> layout; 241 242 public FactoryData(final String host, final int port, final int delay, final boolean immediateFail, 243 final Layout<? extends Serializable> layout) { 244 this.host = host; 245 this.port = port; 246 this.delay = delay; 247 this.immediateFail = immediateFail; 248 this.layout = layout; 249 } 250 } 251 252 /** 253 * Factory to create a TcpSocketManager. 254 */ 255 protected static class TcpSocketManagerFactory implements ManagerFactory<TcpSocketManager, FactoryData> { 256 @Override 257 public TcpSocketManager createManager(final String name, final FactoryData data) { 258 259 InetAddress inetAddress; 260 OutputStream os; 261 try { 262 inetAddress = InetAddress.getByName(data.host); 263 } catch (final UnknownHostException ex) { 264 LOGGER.error("Could not find address of " + data.host, ex); 265 return null; 266 } 267 try { 268 final Socket socket = new Socket(data.host, data.port); 269 os = socket.getOutputStream(); 270 return new TcpSocketManager(name, os, socket, inetAddress, data.host, data.port, data.delay, 271 data.immediateFail, data.layout); 272 } catch (final IOException ex) { 273 LOGGER.error("TcpSocketManager (" + name + ") " + ex); 274 os = new ByteArrayOutputStream(); 275 } 276 if (data.delay == 0) { 277 return null; 278 } 279 return new TcpSocketManager(name, os, null, inetAddress, data.host, data.port, data.delay, data.immediateFail, 280 data.layout); 281 } 282 } 283 }