1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.core.net;
18
19 import java.io.ByteArrayOutputStream;
20 import java.io.IOException;
21 import java.io.OutputStream;
22 import java.io.Serializable;
23 import java.net.ConnectException;
24 import java.net.InetAddress;
25 import java.net.Socket;
26 import java.net.UnknownHostException;
27 import java.util.HashMap;
28 import java.util.Map;
29 import java.util.concurrent.CountDownLatch;
30
31 import org.apache.logging.log4j.core.Layout;
32 import org.apache.logging.log4j.core.appender.AppenderLoggingException;
33 import org.apache.logging.log4j.core.appender.ManagerFactory;
34 import org.apache.logging.log4j.core.appender.OutputStreamManager;
35 import org.apache.logging.log4j.util.Strings;
36
37
38
39
40 public class TcpSocketManager extends AbstractSocketManager {
41
42
43
44 public static final int DEFAULT_RECONNECTION_DELAY_MILLIS = 30000;
45
46
47
48 private static final int DEFAULT_PORT = 4560;
49
50 private static final TcpSocketManagerFactory FACTORY = new TcpSocketManagerFactory();
51
52 private final int reconnectionDelay;
53
54 private Reconnector connector = null;
55
56 private Socket socket;
57
58 private final boolean retry;
59
60 private final boolean immediateFail;
61
62
63
64
65
66
67
68
69
70
71
72
73
74 public TcpSocketManager(final String name, final OutputStream os, final Socket sock, final InetAddress inetAddress,
75 final String host, final int port, final int delay, final boolean immediateFail,
76 final Layout<? extends Serializable> layout) {
77 super(name, os, inetAddress, host, port, layout);
78 this.reconnectionDelay = delay;
79 this.socket = sock;
80 this.immediateFail = immediateFail;
81 retry = delay > 0;
82 if (sock == null) {
83 connector = new Reconnector(this);
84 connector.setDaemon(true);
85 connector.setPriority(Thread.MIN_PRIORITY);
86 connector.start();
87 }
88 }
89
90
91
92
93
94
95
96
97 public static TcpSocketManager getSocketManager(final String host, int port, int delayMillis,
98 final boolean immediateFail, final Layout<? extends Serializable> layout ) {
99 if (Strings.isEmpty(host)) {
100 throw new IllegalArgumentException("A host name is required");
101 }
102 if (port <= 0) {
103 port = DEFAULT_PORT;
104 }
105 if (delayMillis == 0) {
106 delayMillis = DEFAULT_RECONNECTION_DELAY_MILLIS;
107 }
108 return (TcpSocketManager) getManager("TCP:" + host + ':' + port,
109 new FactoryData(host, port, delayMillis, immediateFail, layout), FACTORY);
110 }
111
112 @Override
113 protected void write(final byte[] bytes, final int offset, final int length) {
114 if (socket == null) {
115 if (connector != null && !immediateFail) {
116 connector.latch();
117 }
118 if (socket == null) {
119 final String msg = "Error writing to " + getName() + " socket not available";
120 throw new AppenderLoggingException(msg);
121 }
122 }
123 synchronized (this) {
124 try {
125 getOutputStream().write(bytes, offset, length);
126 } catch (final IOException ex) {
127 if (retry && connector == null) {
128 connector = new Reconnector(this);
129 connector.setDaemon(true);
130 connector.setPriority(Thread.MIN_PRIORITY);
131 connector.start();
132 }
133 final String msg = "Error writing to " + getName();
134 throw new AppenderLoggingException(msg, ex);
135 }
136 }
137 }
138
139 @Override
140 protected synchronized void close() {
141 super.close();
142 if (connector != null) {
143 connector.shutdown();
144 connector.interrupt();
145 connector = null;
146 }
147 }
148
149
150
151
152
153
154
155
156
157
158 @Override
159 public Map<String, String> getContentFormat() {
160 final Map<String, String> result = new HashMap<String, String>(super.getContentFormat());
161 result.put("protocol", "tcp");
162 result.put("direction", "out");
163 return result;
164 }
165
166
167
168
169 private class Reconnector extends Thread {
170
171 private final CountDownLatch latch = new CountDownLatch(1);
172
173 private boolean shutdown = false;
174
175 private final Object owner;
176
177 public Reconnector(final OutputStreamManager owner) {
178 this.owner = owner;
179 }
180
181 public void latch() {
182 try {
183 latch.await();
184 } catch (final InterruptedException ex) {
185
186 }
187 }
188
189 public void shutdown() {
190 shutdown = true;
191 }
192
193 @Override
194 public void run() {
195 while (!shutdown) {
196 try {
197 sleep(reconnectionDelay);
198 final Socket sock = createSocket(inetAddress, port);
199 final OutputStream newOS = sock.getOutputStream();
200 synchronized (owner) {
201 try {
202 getOutputStream().close();
203 } catch (final IOException ioe) {
204
205 }
206
207 setOutputStream(newOS);
208 socket = sock;
209 connector = null;
210 shutdown = true;
211 }
212 LOGGER.debug("Connection to " + host + ':' + port + " reestablished.");
213 } catch (final InterruptedException ie) {
214 LOGGER.debug("Reconnection interrupted.");
215 } catch (final ConnectException ex) {
216 LOGGER.debug(host + ':' + port + " refused connection");
217 } catch (final IOException ioe) {
218 LOGGER.debug("Unable to reconnect to " + host + ':' + port);
219 } finally {
220 latch.countDown();
221 }
222 }
223 }
224 }
225
226 protected Socket createSocket(final InetAddress host, final int port) throws IOException {
227 return createSocket(host.getHostName(), port);
228 }
229
230 protected Socket createSocket(final String host, final int port) throws IOException {
231 return new Socket(host, port);
232 }
233
234
235
236
237 private static class FactoryData {
238 private final String host;
239 private final int port;
240 private final int delayMillis;
241 private final boolean immediateFail;
242 private final Layout<? extends Serializable> layout;
243
244 public FactoryData(final String host, final int port, final int delayMillis, final boolean immediateFail,
245 final Layout<? extends Serializable> layout) {
246 this.host = host;
247 this.port = port;
248 this.delayMillis = delayMillis;
249 this.immediateFail = immediateFail;
250 this.layout = layout;
251 }
252 }
253
254
255
256
257 protected static class TcpSocketManagerFactory implements ManagerFactory<TcpSocketManager, FactoryData> {
258 @Override
259 public TcpSocketManager createManager(final String name, final FactoryData data) {
260
261 InetAddress inetAddress;
262 OutputStream os;
263 try {
264 inetAddress = InetAddress.getByName(data.host);
265 } catch (final UnknownHostException ex) {
266 LOGGER.error("Could not find address of " + data.host, ex);
267 return null;
268 }
269 try {
270 final Socket socket = new Socket(data.host, data.port);
271 os = socket.getOutputStream();
272 return new TcpSocketManager(name, os, socket, inetAddress, data.host, data.port, data.delayMillis,
273 data.immediateFail, data.layout);
274 } catch (final IOException ex) {
275 LOGGER.error("TcpSocketManager (" + name + ") " + ex);
276 os = new ByteArrayOutputStream();
277 }
278 if (data.delayMillis == 0) {
279 return null;
280 }
281 return new TcpSocketManager(name, os, null, inetAddress, data.host, data.port, data.delayMillis, data.immediateFail,
282 data.layout);
283 }
284 }
285 }