View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.core.net;
18  
19  import java.util.HashMap;
20  import java.util.Map;
21  
22  import org.apache.logging.log4j.core.Layout;
23  import org.apache.logging.log4j.core.appender.AppenderRuntimeException;
24  import org.apache.logging.log4j.core.appender.ManagerFactory;
25  import org.apache.logging.log4j.core.appender.OutputStreamManager;
26  
27  import java.io.ByteArrayOutputStream;
28  import java.io.IOException;
29  import java.io.OutputStream;
30  import java.net.ConnectException;
31  import java.net.InetAddress;
32  import java.net.Socket;
33  import java.net.UnknownHostException;
34  import java.util.concurrent.CountDownLatch;
35  
36  /**
37   * Manager of TCP Socket connections.
38   */
39  public class TCPSocketManager extends AbstractSocketManager {
40      /**
41        The default reconnection delay (30000 milliseconds or 30 seconds).
42       */
43      public static final int DEFAULT_RECONNECTION_DELAY   = 30000;
44      /**
45        The default port number of remote logging server (4560).
46       */
47      private static final int DEFAULT_PORT = 4560;
48  
49      private static final TCPSocketManagerFactory FACTORY = new TCPSocketManagerFactory();
50  
51      private final int reconnectionDelay;
52  
53      private Reconnector connector = null;
54  
55      private Socket socket;
56  
57      private final boolean retry;
58  
59      private final boolean immediateFail;
60  
61      /**
62       * The Constructor.
63       * @param name The unique name of this connection.
64       * @param os The OutputStream.
65       * @param sock The Socket.
66       * @param addr The internet address of the host.
67       * @param host The name of the host.
68       * @param port The port number on the host.
69       * @param delay Reconnection interval.
70       * @param immediateFail
71       * @param layout The Layout.
72       */
73      public TCPSocketManager(final String name, final OutputStream os, final Socket sock, final InetAddress addr,
74                              final String host, final int port, final int delay, final boolean immediateFail,
75                              final Layout layout) {
76          super(name, os, addr, host, port, layout);
77          this.reconnectionDelay = delay;
78          this.socket = sock;
79          this.immediateFail = immediateFail;
80          retry = delay > 0;
81          if (sock == null) {
82              connector = new Reconnector(this);
83              connector.setDaemon(true);
84              connector.setPriority(Thread.MIN_PRIORITY);
85              connector.start();
86          }
87      }
88  
89      /**
90       * Obtain a TCPSocketManager.
91       * @param host The host to connect to.
92       * @param port The port on the host.
93       * @param delay The interval to pause between retries.
94       * @return A TCPSocketManager.
95       */
96      public static TCPSocketManager getSocketManager(final String host, int port, int delay,
97                                                      final boolean immediateFail, final Layout layout ) {
98          if (host == null || host.length() == 0) {
99              throw new IllegalArgumentException("A host name is required");
100         }
101         if (port <= 0) {
102             port = DEFAULT_PORT;
103         }
104         if (delay == 0) {
105             delay = DEFAULT_RECONNECTION_DELAY;
106         }
107         return (TCPSocketManager) getManager("TCP:" + host + ":" + port,
108             new FactoryData(host, port, delay, immediateFail, layout), FACTORY);
109     }
110 
111     @Override
112     protected void write(final byte[] bytes, final int offset, final int length)  {
113         if (socket == null) {
114             if (connector != null && !immediateFail) {
115                 connector.latch();
116             }
117             if (socket == null) {
118                 final String msg = "Error writing to " + getName() + " socket not available";
119                 throw new AppenderRuntimeException(msg);
120             }
121         }
122         synchronized (this) {
123             try {
124                 getOutputStream().write(bytes, offset, length);
125             } catch (final IOException ex) {
126                 if (retry && connector == null) {
127                     connector = new Reconnector(this);
128                     connector.setDaemon(true);
129                     connector.setPriority(Thread.MIN_PRIORITY);
130                     connector.start();
131                 }
132                 final String msg = "Error writing to " + getName();
133                 throw new AppenderRuntimeException(msg, ex);
134             }
135         }
136     }
137 
138     @Override
139     protected synchronized void close() {
140         super.close();
141         if (connector != null) {
142             connector.shutdown();
143             connector.interrupt();
144             connector = null;
145         }
146     }
147 
148     /**
149      * TCPSocketManager's content format is specified by:<p/>
150      * Key: "protocol" Value: "tcp"<p/>
151      * Key: "direction" Value: "out"
152      * @return Map of content format keys supporting TCPSocketManager
153      */
154     @Override
155     public Map<String, String> getContentFormat()
156     {
157         Map<String, String> result = new HashMap<String, String>(super.getContentFormat());
158         result.put("protocol", "tcp");
159         result.put("direction", "out");
160         return result;
161     }
162 
163     /**
164      * Handles reconnecting to a Thread.
165      */
166     private class Reconnector extends Thread {
167 
168         private CountDownLatch latch = new CountDownLatch(1);
169 
170         private boolean shutdown = false;
171 
172         private final Object owner;
173 
174         public Reconnector(final OutputStreamManager owner) {
175             this.owner = owner;
176         }
177 
178         public void latch()  {
179             try {
180                 latch.await();
181             } catch (final InterruptedException ex) {
182                 // Ignore the exception.
183             }
184         }
185 
186         public void shutdown() {
187             shutdown = true;
188         }
189 
190         @Override
191         public void run() {
192             while (!shutdown) {
193                 try {
194                     sleep(reconnectionDelay);
195                     final Socket sock = new Socket(address, port);
196                     final OutputStream newOS = sock.getOutputStream();
197                     synchronized (owner) {
198                         try {
199                             getOutputStream().close();
200                         } catch (final IOException ioe) {
201                             // Ignore this.
202                         }
203 
204                         setOutputStream(newOS);
205                         socket = sock;
206                         connector = null;
207                         shutdown = true;
208                     }
209                     LOGGER.debug("Connection to " + host + ":" + port + " reestablished.");
210                 } catch (final InterruptedException ie) {
211                     LOGGER.debug("Reconnection interrupted.");
212                 } catch (final ConnectException ex) {
213                     LOGGER.debug(host + ":" + port + " refused connection");
214                 } catch (final IOException ioe) {
215                     LOGGER.debug("Unable to reconnect to " + host + ":" + port);
216                 } finally {
217                     latch.countDown();
218                 }
219             }
220         }
221     }
222 
223     /**
224      * Data for the factory.
225      */
226     private static class FactoryData {
227         private final String host;
228         private final int port;
229         private final int delay;
230         private final boolean immediateFail;
231         private final Layout layout;
232 
233         public FactoryData(final String host, final int port, final int delay, final boolean immediateFail,
234                            final Layout layout) {
235             this.host = host;
236             this.port = port;
237             this.delay = delay;
238             this.immediateFail = immediateFail;
239             this.layout = layout;
240         }
241     }
242 
243     /**
244      * Factory to create a TCPSocketManager.
245      */
246     private static class TCPSocketManagerFactory implements ManagerFactory<TCPSocketManager, FactoryData> {
247 
248         @Override
249         public TCPSocketManager createManager(final String name, final FactoryData data) {
250 
251             InetAddress address;
252             OutputStream os;
253             try {
254                 address = InetAddress.getByName(data.host);
255             } catch (final UnknownHostException ex) {
256                 LOGGER.error("Could not find address of " + data.host, ex);
257                 return null;
258             }
259             try {
260                 final Socket socket = new Socket(data.host, data.port);
261                 os = socket.getOutputStream();
262                 return new TCPSocketManager(name, os, socket, address, data.host, data.port, data.delay,
263                     data.immediateFail, data.layout);
264             } catch (final IOException ex) {
265                 LOGGER.error("TCPSocketManager (" + name + ") " + ex);
266                 os = new ByteArrayOutputStream();
267             }
268             if (data.delay == 0) {
269                 return null;
270             }
271             return new TCPSocketManager(name, os, null, address, data.host, data.port, data.delay, data.immediateFail,
272                 data.layout);
273         }
274     }
275 }