1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.core.net;
18
19 import java.util.HashMap;
20 import java.util.Map;
21
22 import org.apache.logging.log4j.core.Layout;
23 import org.apache.logging.log4j.core.appender.AppenderRuntimeException;
24 import org.apache.logging.log4j.core.appender.ManagerFactory;
25 import org.apache.logging.log4j.core.appender.OutputStreamManager;
26
27 import java.io.ByteArrayOutputStream;
28 import java.io.IOException;
29 import java.io.OutputStream;
30 import java.net.ConnectException;
31 import java.net.InetAddress;
32 import java.net.Socket;
33 import java.net.UnknownHostException;
34 import java.util.concurrent.CountDownLatch;
35
36
37
38
39 public class TCPSocketManager extends AbstractSocketManager {
40
41
42
43 public static final int DEFAULT_RECONNECTION_DELAY = 30000;
44
45
46
47 private static final int DEFAULT_PORT = 4560;
48
49 private static final TCPSocketManagerFactory FACTORY = new TCPSocketManagerFactory();
50
51 private final int reconnectionDelay;
52
53 private Reconnector connector = null;
54
55 private Socket socket;
56
57 private final boolean retry;
58
59 private final boolean immediateFail;
60
61
62
63
64
65
66
67
68
69
70
71
72
73 public TCPSocketManager(final String name, final OutputStream os, final Socket sock, final InetAddress addr,
74 final String host, final int port, final int delay, final boolean immediateFail,
75 final Layout layout) {
76 super(name, os, addr, host, port, layout);
77 this.reconnectionDelay = delay;
78 this.socket = sock;
79 this.immediateFail = immediateFail;
80 retry = delay > 0;
81 if (sock == null) {
82 connector = new Reconnector(this);
83 connector.setDaemon(true);
84 connector.setPriority(Thread.MIN_PRIORITY);
85 connector.start();
86 }
87 }
88
89
90
91
92
93
94
95
96 public static TCPSocketManager getSocketManager(final String host, int port, int delay,
97 final boolean immediateFail, final Layout layout ) {
98 if (host == null || host.length() == 0) {
99 throw new IllegalArgumentException("A host name is required");
100 }
101 if (port <= 0) {
102 port = DEFAULT_PORT;
103 }
104 if (delay == 0) {
105 delay = DEFAULT_RECONNECTION_DELAY;
106 }
107 return (TCPSocketManager) getManager("TCP:" + host + ":" + port,
108 new FactoryData(host, port, delay, immediateFail, layout), FACTORY);
109 }
110
111 @Override
112 protected void write(final byte[] bytes, final int offset, final int length) {
113 if (socket == null) {
114 if (connector != null && !immediateFail) {
115 connector.latch();
116 }
117 if (socket == null) {
118 final String msg = "Error writing to " + getName() + " socket not available";
119 throw new AppenderRuntimeException(msg);
120 }
121 }
122 synchronized (this) {
123 try {
124 getOutputStream().write(bytes, offset, length);
125 } catch (final IOException ex) {
126 if (retry && connector == null) {
127 connector = new Reconnector(this);
128 connector.setDaemon(true);
129 connector.setPriority(Thread.MIN_PRIORITY);
130 connector.start();
131 }
132 final String msg = "Error writing to " + getName();
133 throw new AppenderRuntimeException(msg, ex);
134 }
135 }
136 }
137
138 @Override
139 protected synchronized void close() {
140 super.close();
141 if (connector != null) {
142 connector.shutdown();
143 connector.interrupt();
144 connector = null;
145 }
146 }
147
148
149
150
151
152
153
154 @Override
155 public Map<String, String> getContentFormat()
156 {
157 Map<String, String> result = new HashMap<String, String>(super.getContentFormat());
158 result.put("protocol", "tcp");
159 result.put("direction", "out");
160 return result;
161 }
162
163
164
165
166 private class Reconnector extends Thread {
167
168 private CountDownLatch latch = new CountDownLatch(1);
169
170 private boolean shutdown = false;
171
172 private final Object owner;
173
174 public Reconnector(final OutputStreamManager owner) {
175 this.owner = owner;
176 }
177
178 public void latch() {
179 try {
180 latch.await();
181 } catch (final InterruptedException ex) {
182
183 }
184 }
185
186 public void shutdown() {
187 shutdown = true;
188 }
189
190 @Override
191 public void run() {
192 while (!shutdown) {
193 try {
194 sleep(reconnectionDelay);
195 final Socket sock = new Socket(address, port);
196 final OutputStream newOS = sock.getOutputStream();
197 synchronized (owner) {
198 try {
199 getOutputStream().close();
200 } catch (final IOException ioe) {
201
202 }
203
204 setOutputStream(newOS);
205 socket = sock;
206 connector = null;
207 shutdown = true;
208 }
209 LOGGER.debug("Connection to " + host + ":" + port + " reestablished.");
210 } catch (final InterruptedException ie) {
211 LOGGER.debug("Reconnection interrupted.");
212 } catch (final ConnectException ex) {
213 LOGGER.debug(host + ":" + port + " refused connection");
214 } catch (final IOException ioe) {
215 LOGGER.debug("Unable to reconnect to " + host + ":" + port);
216 } finally {
217 latch.countDown();
218 }
219 }
220 }
221 }
222
223
224
225
226 private static class FactoryData {
227 private final String host;
228 private final int port;
229 private final int delay;
230 private final boolean immediateFail;
231 private final Layout layout;
232
233 public FactoryData(final String host, final int port, final int delay, final boolean immediateFail,
234 final Layout layout) {
235 this.host = host;
236 this.port = port;
237 this.delay = delay;
238 this.immediateFail = immediateFail;
239 this.layout = layout;
240 }
241 }
242
243
244
245
246 private static class TCPSocketManagerFactory implements ManagerFactory<TCPSocketManager, FactoryData> {
247
248 @Override
249 public TCPSocketManager createManager(final String name, final FactoryData data) {
250
251 InetAddress address;
252 OutputStream os;
253 try {
254 address = InetAddress.getByName(data.host);
255 } catch (final UnknownHostException ex) {
256 LOGGER.error("Could not find address of " + data.host, ex);
257 return null;
258 }
259 try {
260 final Socket socket = new Socket(data.host, data.port);
261 os = socket.getOutputStream();
262 return new TCPSocketManager(name, os, socket, address, data.host, data.port, data.delay,
263 data.immediateFail, data.layout);
264 } catch (final IOException ex) {
265 LOGGER.error("TCPSocketManager (" + name + ") " + ex);
266 os = new ByteArrayOutputStream();
267 }
268 if (data.delay == 0) {
269 return null;
270 }
271 return new TCPSocketManager(name, os, null, address, data.host, data.port, data.delay, data.immediateFail,
272 data.layout);
273 }
274 }
275 }