1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.core.net;
18
19 import org.apache.logging.log4j.core.appender.AppenderRuntimeException;
20 import org.apache.logging.log4j.core.appender.ManagerFactory;
21 import org.apache.logging.log4j.core.appender.OutputStreamManager;
22
23 import java.io.ByteArrayOutputStream;
24 import java.io.IOException;
25 import java.io.OutputStream;
26 import java.net.ConnectException;
27 import java.net.InetAddress;
28 import java.net.Socket;
29 import java.net.UnknownHostException;
30 import java.util.concurrent.CountDownLatch;
31
32
33
34
35 public class TCPSocketManager extends AbstractSocketManager {
36
37
38
39 public static final int DEFAULT_RECONNECTION_DELAY = 30000;
40
41
42
43 private static final int DEFAULT_PORT = 4560;
44
45 private static final TCPSocketManagerFactory FACTORY = new TCPSocketManagerFactory();
46
47 private final int reconnectionDelay;
48
49 private Reconnector connector = null;
50
51 private Socket socket;
52
53 private final boolean retry;
54
55
56
57
58
59
60
61
62
63
64
65 public TCPSocketManager(final String name, final OutputStream os, final Socket sock, final InetAddress addr,
66 final String host, final int port, final int delay) {
67 super(name, os, addr, host, port);
68 this.reconnectionDelay = delay;
69 this.socket = sock;
70 retry = delay > 0;
71 if (sock == null) {
72 connector = new Reconnector(this);
73 connector.setDaemon(true);
74 connector.setPriority(Thread.MIN_PRIORITY);
75 connector.start();
76 }
77 }
78
79
80
81
82
83
84
85
86 public static TCPSocketManager getSocketManager(final String host, int port, int delay) {
87 if (host == null || host.length() == 0) {
88 throw new IllegalArgumentException("A host name is required");
89 }
90 if (port <= 0) {
91 port = DEFAULT_PORT;
92 }
93 if (delay == 0) {
94 delay = DEFAULT_RECONNECTION_DELAY;
95 }
96 return (TCPSocketManager) getManager("TCP:" + host + ":" + port, new FactoryData(host, port, delay), FACTORY);
97 }
98
99 @Override
100 protected synchronized void write(final byte[] bytes, final int offset, final int length) {
101 if (socket == null) {
102 if (connector != null) {
103 connector.latch();
104 }
105 if (socket == null) {
106 final String msg = "Error writing to " + getName() + " socket not available";
107 throw new AppenderRuntimeException(msg);
108 }
109 }
110 try {
111 getOutputStream().write(bytes, offset, length);
112 } catch (final IOException ex) {
113 if (retry && connector == null) {
114 connector = new Reconnector(this);
115 connector.setDaemon(true);
116 connector.setPriority(Thread.MIN_PRIORITY);
117 connector.start();
118 }
119 final String msg = "Error writing to " + getName();
120 throw new AppenderRuntimeException(msg, ex);
121 }
122 }
123
124 @Override
125 protected synchronized void close() {
126 super.close();
127 if (connector != null) {
128 connector.shutdown();
129 connector.interrupt();
130 connector = null;
131 }
132 }
133
134
135
136
137 private class Reconnector extends Thread {
138
139 private CountDownLatch latch = new CountDownLatch(1);
140
141 private boolean shutdown = false;
142
143 private final Object owner;
144
145 public Reconnector(final OutputStreamManager owner) {
146 this.owner = owner;
147 }
148
149 public void latch() {
150 try {
151 latch.await();
152 } catch (final InterruptedException ex) {
153
154 }
155 }
156
157 public void shutdown() {
158 shutdown = true;
159 }
160
161 @Override
162 public void run() {
163 while (!shutdown) {
164 try {
165 sleep(reconnectionDelay);
166 final Socket sock = new Socket(address, port);
167 final OutputStream newOS = sock.getOutputStream();
168 synchronized (owner) {
169 try {
170 getOutputStream().close();
171 } catch (final IOException ioe) {
172
173 }
174
175 setOutputStream(newOS);
176 socket = sock;
177 connector = null;
178 shutdown = true;
179 }
180 LOGGER.debug("Connection to " + host + ":" + port + " reestablished.");
181 } catch (final InterruptedException ie) {
182 LOGGER.debug("Reconnection interrupted.");
183 } catch (final ConnectException ex) {
184 LOGGER.debug(host + ":" + port + " refused connection");
185 } catch (final IOException ioe) {
186 LOGGER.debug("Unable to reconnect to " + host + ":" + port);
187 } finally {
188 latch.countDown();
189 }
190 }
191 }
192 }
193
194
195
196
197 private static class FactoryData {
198 private final String host;
199 private final int port;
200 private final int delay;
201
202 public FactoryData(final String host, final int port, final int delay) {
203 this.host = host;
204 this.port = port;
205 this.delay = delay;
206 }
207 }
208
209
210
211
212 private static class TCPSocketManagerFactory implements ManagerFactory<TCPSocketManager, FactoryData> {
213
214 public TCPSocketManager createManager(final String name, final FactoryData data) {
215
216 InetAddress address;
217 OutputStream os = null;
218 try {
219 address = InetAddress.getByName(data.host);
220 } catch (final UnknownHostException ex) {
221 LOGGER.error("Could not find address of " + data.host, ex);
222 return null;
223 }
224 try {
225 final Socket socket = new Socket(data.host, data.port);
226 os = socket.getOutputStream();
227 return new TCPSocketManager(name, os, socket, address, data.host, data.port, data.delay);
228 } catch (final IOException ex) {
229 LOGGER.error("TCPSocketManager (" + name + ") " + ex);
230 os = new ByteArrayOutputStream();
231 }
232 if (data.delay == 0) {
233 return null;
234 }
235 return new TCPSocketManager(name, os, null, address, data.host, data.port, data.delay);
236 }
237 }
238 }