View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.core.net;
18  
19  import org.apache.logging.log4j.core.appender.AppenderRuntimeException;
20  import org.apache.logging.log4j.core.appender.ManagerFactory;
21  import org.apache.logging.log4j.core.appender.OutputStreamManager;
22  
23  import java.io.ByteArrayOutputStream;
24  import java.io.IOException;
25  import java.io.OutputStream;
26  import java.net.ConnectException;
27  import java.net.InetAddress;
28  import java.net.Socket;
29  import java.net.UnknownHostException;
30  import java.util.concurrent.CountDownLatch;
31  
32  /**
33   * Manager of TCP Socket connections.
34   */
35  public class TCPSocketManager extends AbstractSocketManager {
36      /**
37        The default reconnection delay (30000 milliseconds or 30 seconds).
38       */
39      public static final int DEFAULT_RECONNECTION_DELAY   = 30000;
40      /**
41        The default port number of remote logging server (4560).
42       */
43      private static final int DEFAULT_PORT = 4560;
44  
45      private static final TCPSocketManagerFactory FACTORY = new TCPSocketManagerFactory();
46  
47      private final int reconnectionDelay;
48  
49      private Reconnector connector = null;
50  
51      private Socket socket;
52  
53      private final boolean retry;
54  
55      /**
56       * The Constructor.
57       * @param name The unique name of this connection.
58       * @param os The OutputStream.
59       * @param sock The Socket.
60       * @param addr The internet address of the host.
61       * @param host The name of the host.
62       * @param port The port number on the host.
63       * @param delay Reconnection interval.
64       */
65      public TCPSocketManager(final String name, final OutputStream os, final Socket sock, final InetAddress addr,
66                              final String host, final int port, final int delay) {
67          super(name, os, addr, host, port);
68          this.reconnectionDelay = delay;
69          this.socket = sock;
70          retry = delay > 0;
71          if (sock == null) {
72              connector = new Reconnector(this);
73              connector.setDaemon(true);
74              connector.setPriority(Thread.MIN_PRIORITY);
75              connector.start();
76          }
77      }
78  
79      /**
80       * Obtain a TCPSocketManager.
81       * @param host The host to connect to.
82       * @param port The port on the host.
83       * @param delay The interval to pause between retries.
84       * @return A TCPSocketManager.
85       */
86      public static TCPSocketManager getSocketManager(final String host, int port, int delay) {
87          if (host == null || host.length() == 0) {
88              throw new IllegalArgumentException("A host name is required");
89          }
90          if (port <= 0) {
91              port = DEFAULT_PORT;
92          }
93          if (delay == 0) {
94              delay = DEFAULT_RECONNECTION_DELAY;
95          }
96          return (TCPSocketManager) getManager("TCP:" + host + ":" + port, new FactoryData(host, port, delay), FACTORY);
97      }
98  
99      @Override
100     protected synchronized void write(final byte[] bytes, final int offset, final int length)  {
101         if (socket == null) {
102             if (connector != null) {
103                 connector.latch();
104             }
105             if (socket == null) {
106                 final String msg = "Error writing to " + getName() + " socket not available";
107                 throw new AppenderRuntimeException(msg);
108             }
109         }
110         try {
111             getOutputStream().write(bytes, offset, length);
112         } catch (final IOException ex) {
113             if (retry && connector == null) {
114                 connector = new Reconnector(this);
115                 connector.setDaemon(true);
116                 connector.setPriority(Thread.MIN_PRIORITY);
117                 connector.start();
118             }
119             final String msg = "Error writing to " + getName();
120             throw new AppenderRuntimeException(msg, ex);
121         }
122     }
123 
124     @Override
125     protected synchronized void close() {
126         super.close();
127         if (connector != null) {
128             connector.shutdown();
129             connector.interrupt();
130             connector = null;
131         }
132     }
133 
134     /**
135      * Handles recoonecting to a Thread.
136      */
137     private class Reconnector extends Thread {
138 
139         private CountDownLatch latch = new CountDownLatch(1);
140 
141         private boolean shutdown = false;
142 
143         private final Object owner;
144 
145         public Reconnector(final OutputStreamManager owner) {
146             this.owner = owner;
147         }
148 
149         public void latch()  {
150             try {
151                 latch.await();
152             } catch (final InterruptedException ex) {
153                 // Ignore the exception.
154             }
155         }
156 
157         public void shutdown() {
158             shutdown = true;
159         }
160 
161         @Override
162         public void run() {
163             while (!shutdown) {
164                 try {
165                     sleep(reconnectionDelay);
166                     final Socket sock = new Socket(address, port);
167                     final OutputStream newOS = sock.getOutputStream();
168                     synchronized (owner) {
169                         try {
170                             getOutputStream().close();
171                         } catch (final IOException ioe) {
172                             // Ignore this.
173                         }
174 
175                         setOutputStream(newOS);
176                         socket = sock;
177                         connector = null;
178                         shutdown = true;
179                     }
180                     LOGGER.debug("Connection to " + host + ":" + port + " reestablished.");
181                 } catch (final InterruptedException ie) {
182                     LOGGER.debug("Reconnection interrupted.");
183                 } catch (final ConnectException ex) {
184                     LOGGER.debug(host + ":" + port + " refused connection");
185                 } catch (final IOException ioe) {
186                     LOGGER.debug("Unable to reconnect to " + host + ":" + port);
187                 } finally {
188                     latch.countDown();
189                 }
190             }
191         }
192     }
193 
194     /**
195      * Data for the factory.
196      */
197     private static class FactoryData {
198         private final String host;
199         private final int port;
200         private final int delay;
201 
202         public FactoryData(final String host, final int port, final int delay) {
203             this.host = host;
204             this.port = port;
205             this.delay = delay;
206         }
207     }
208 
209     /**
210      * Factory to create a TCPSocketManager.
211      */
212     private static class TCPSocketManagerFactory implements ManagerFactory<TCPSocketManager, FactoryData> {
213 
214         public TCPSocketManager createManager(final String name, final FactoryData data) {
215 
216             InetAddress address;
217             OutputStream os = null;
218             try {
219                 address = InetAddress.getByName(data.host);
220             } catch (final UnknownHostException ex) {
221                 LOGGER.error("Could not find address of " + data.host, ex);
222                 return null;
223             }
224             try {
225                 final Socket socket = new Socket(data.host, data.port);
226                 os = socket.getOutputStream();
227                 return new TCPSocketManager(name, os, socket, address, data.host, data.port, data.delay);
228             } catch (final IOException ex) {
229                 LOGGER.error("TCPSocketManager (" + name + ") " + ex);
230                 os = new ByteArrayOutputStream();
231             }
232             if (data.delay == 0) {
233                 return null;
234             }
235             return new TCPSocketManager(name, os, null, address, data.host, data.port, data.delay);
236         }
237     }
238 }