001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017 package org.apache.logging.log4j.core.net; 018 019 import java.util.HashMap; 020 import java.util.Map; 021 022 import org.apache.logging.log4j.core.Layout; 023 import org.apache.logging.log4j.core.appender.AppenderRuntimeException; 024 import org.apache.logging.log4j.core.appender.ManagerFactory; 025 import org.apache.logging.log4j.core.appender.OutputStreamManager; 026 027 import java.io.ByteArrayOutputStream; 028 import java.io.IOException; 029 import java.io.OutputStream; 030 import java.net.ConnectException; 031 import java.net.InetAddress; 032 import java.net.Socket; 033 import java.net.UnknownHostException; 034 import java.util.concurrent.CountDownLatch; 035 036 /** 037 * Manager of TCP Socket connections. 038 */ 039 public class TCPSocketManager extends AbstractSocketManager { 040 /** 041 The default reconnection delay (30000 milliseconds or 30 seconds). 042 */ 043 public static final int DEFAULT_RECONNECTION_DELAY = 30000; 044 /** 045 The default port number of remote logging server (4560). 046 */ 047 private static final int DEFAULT_PORT = 4560; 048 049 private static final TCPSocketManagerFactory FACTORY = new TCPSocketManagerFactory(); 050 051 private final int reconnectionDelay; 052 053 private Reconnector connector = null; 054 055 private Socket socket; 056 057 private final boolean retry; 058 059 private final boolean immediateFail; 060 061 /** 062 * The Constructor. 063 * @param name The unique name of this connection. 064 * @param os The OutputStream. 065 * @param sock The Socket. 066 * @param addr The internet address of the host. 067 * @param host The name of the host. 068 * @param port The port number on the host. 069 * @param delay Reconnection interval. 070 * @param immediateFail 071 * @param layout The Layout. 072 */ 073 public TCPSocketManager(final String name, final OutputStream os, final Socket sock, final InetAddress addr, 074 final String host, final int port, final int delay, final boolean immediateFail, 075 final Layout layout) { 076 super(name, os, addr, host, port, layout); 077 this.reconnectionDelay = delay; 078 this.socket = sock; 079 this.immediateFail = immediateFail; 080 retry = delay > 0; 081 if (sock == null) { 082 connector = new Reconnector(this); 083 connector.setDaemon(true); 084 connector.setPriority(Thread.MIN_PRIORITY); 085 connector.start(); 086 } 087 } 088 089 /** 090 * Obtain a TCPSocketManager. 091 * @param host The host to connect to. 092 * @param port The port on the host. 093 * @param delay The interval to pause between retries. 094 * @return A TCPSocketManager. 095 */ 096 public static TCPSocketManager getSocketManager(final String host, int port, int delay, 097 final boolean immediateFail, final Layout layout ) { 098 if (host == null || host.length() == 0) { 099 throw new IllegalArgumentException("A host name is required"); 100 } 101 if (port <= 0) { 102 port = DEFAULT_PORT; 103 } 104 if (delay == 0) { 105 delay = DEFAULT_RECONNECTION_DELAY; 106 } 107 return (TCPSocketManager) getManager("TCP:" + host + ":" + port, 108 new FactoryData(host, port, delay, immediateFail, layout), FACTORY); 109 } 110 111 @Override 112 protected void write(final byte[] bytes, final int offset, final int length) { 113 if (socket == null) { 114 if (connector != null && !immediateFail) { 115 connector.latch(); 116 } 117 if (socket == null) { 118 final String msg = "Error writing to " + getName() + " socket not available"; 119 throw new AppenderRuntimeException(msg); 120 } 121 } 122 synchronized (this) { 123 try { 124 getOutputStream().write(bytes, offset, length); 125 } catch (final IOException ex) { 126 if (retry && connector == null) { 127 connector = new Reconnector(this); 128 connector.setDaemon(true); 129 connector.setPriority(Thread.MIN_PRIORITY); 130 connector.start(); 131 } 132 final String msg = "Error writing to " + getName(); 133 throw new AppenderRuntimeException(msg, ex); 134 } 135 } 136 } 137 138 @Override 139 protected synchronized void close() { 140 super.close(); 141 if (connector != null) { 142 connector.shutdown(); 143 connector.interrupt(); 144 connector = null; 145 } 146 } 147 148 /** 149 * TCPSocketManager's content format is specified by:<p/> 150 * Key: "protocol" Value: "tcp"<p/> 151 * Key: "direction" Value: "out" 152 * @return Map of content format keys supporting TCPSocketManager 153 */ 154 @Override 155 public Map<String, String> getContentFormat() 156 { 157 Map<String, String> result = new HashMap<String, String>(super.getContentFormat()); 158 result.put("protocol", "tcp"); 159 result.put("direction", "out"); 160 return result; 161 } 162 163 /** 164 * Handles reconnecting to a Thread. 165 */ 166 private class Reconnector extends Thread { 167 168 private CountDownLatch latch = new CountDownLatch(1); 169 170 private boolean shutdown = false; 171 172 private final Object owner; 173 174 public Reconnector(final OutputStreamManager owner) { 175 this.owner = owner; 176 } 177 178 public void latch() { 179 try { 180 latch.await(); 181 } catch (final InterruptedException ex) { 182 // Ignore the exception. 183 } 184 } 185 186 public void shutdown() { 187 shutdown = true; 188 } 189 190 @Override 191 public void run() { 192 while (!shutdown) { 193 try { 194 sleep(reconnectionDelay); 195 final Socket sock = new Socket(address, port); 196 final OutputStream newOS = sock.getOutputStream(); 197 synchronized (owner) { 198 try { 199 getOutputStream().close(); 200 } catch (final IOException ioe) { 201 // Ignore this. 202 } 203 204 setOutputStream(newOS); 205 socket = sock; 206 connector = null; 207 shutdown = true; 208 } 209 LOGGER.debug("Connection to " + host + ":" + port + " reestablished."); 210 } catch (final InterruptedException ie) { 211 LOGGER.debug("Reconnection interrupted."); 212 } catch (final ConnectException ex) { 213 LOGGER.debug(host + ":" + port + " refused connection"); 214 } catch (final IOException ioe) { 215 LOGGER.debug("Unable to reconnect to " + host + ":" + port); 216 } finally { 217 latch.countDown(); 218 } 219 } 220 } 221 } 222 223 /** 224 * Data for the factory. 225 */ 226 private static class FactoryData { 227 private final String host; 228 private final int port; 229 private final int delay; 230 private final boolean immediateFail; 231 private final Layout layout; 232 233 public FactoryData(final String host, final int port, final int delay, final boolean immediateFail, 234 final Layout layout) { 235 this.host = host; 236 this.port = port; 237 this.delay = delay; 238 this.immediateFail = immediateFail; 239 this.layout = layout; 240 } 241 } 242 243 /** 244 * Factory to create a TCPSocketManager. 245 */ 246 private static class TCPSocketManagerFactory implements ManagerFactory<TCPSocketManager, FactoryData> { 247 248 @Override 249 public TCPSocketManager createManager(final String name, final FactoryData data) { 250 251 InetAddress address; 252 OutputStream os; 253 try { 254 address = InetAddress.getByName(data.host); 255 } catch (final UnknownHostException ex) { 256 LOGGER.error("Could not find address of " + data.host, ex); 257 return null; 258 } 259 try { 260 final Socket socket = new Socket(data.host, data.port); 261 os = socket.getOutputStream(); 262 return new TCPSocketManager(name, os, socket, address, data.host, data.port, data.delay, 263 data.immediateFail, data.layout); 264 } catch (final IOException ex) { 265 LOGGER.error("TCPSocketManager (" + name + ") " + ex); 266 os = new ByteArrayOutputStream(); 267 } 268 if (data.delay == 0) { 269 return null; 270 } 271 return new TCPSocketManager(name, os, null, address, data.host, data.port, data.delay, data.immediateFail, 272 data.layout); 273 } 274 } 275 }