001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements. See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache license, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License. You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the license for the specific language governing permissions and
015     * limitations under the license.
016     */
017    package org.apache.logging.log4j.core.net;
018    
019    import org.apache.logging.log4j.core.appender.AppenderRuntimeException;
020    import org.apache.logging.log4j.core.appender.ManagerFactory;
021    import org.apache.logging.log4j.core.appender.OutputStreamManager;
022    
023    import java.io.ByteArrayOutputStream;
024    import java.io.IOException;
025    import java.io.OutputStream;
026    import java.net.ConnectException;
027    import java.net.InetAddress;
028    import java.net.Socket;
029    import java.net.UnknownHostException;
030    import java.util.concurrent.CountDownLatch;
031    
032    /**
033     * Manager of TCP Socket connections.
034     */
035    public class TCPSocketManager extends AbstractSocketManager {
036        /**
037          The default reconnection delay (30000 milliseconds or 30 seconds).
038         */
039        public static final int DEFAULT_RECONNECTION_DELAY   = 30000;
040        /**
041          The default port number of remote logging server (4560).
042         */
043        private static final int DEFAULT_PORT = 4560;
044    
045        private static final TCPSocketManagerFactory FACTORY = new TCPSocketManagerFactory();
046    
047        private final int reconnectionDelay;
048    
049        private Reconnector connector = null;
050    
051        private Socket socket;
052    
053        private final boolean retry;
054    
055        /**
056         * The Constructor.
057         * @param name The unique name of this connection.
058         * @param os The OutputStream.
059         * @param sock The Socket.
060         * @param addr The internet address of the host.
061         * @param host The name of the host.
062         * @param port The port number on the host.
063         * @param delay Reconnection interval.
064         */
065        public TCPSocketManager(final String name, final OutputStream os, final Socket sock, final InetAddress addr,
066                                final String host, final int port, final int delay) {
067            super(name, os, addr, host, port);
068            this.reconnectionDelay = delay;
069            this.socket = sock;
070            retry = delay > 0;
071            if (sock == null) {
072                connector = new Reconnector(this);
073                connector.setDaemon(true);
074                connector.setPriority(Thread.MIN_PRIORITY);
075                connector.start();
076            }
077        }
078    
079        /**
080         * Obtain a TCPSocketManager.
081         * @param host The host to connect to.
082         * @param port The port on the host.
083         * @param delay The interval to pause between retries.
084         * @return A TCPSocketManager.
085         */
086        public static TCPSocketManager getSocketManager(final String host, int port, int delay) {
087            if (host == null || host.length() == 0) {
088                throw new IllegalArgumentException("A host name is required");
089            }
090            if (port <= 0) {
091                port = DEFAULT_PORT;
092            }
093            if (delay == 0) {
094                delay = DEFAULT_RECONNECTION_DELAY;
095            }
096            return (TCPSocketManager) getManager("TCP:" + host + ":" + port, new FactoryData(host, port, delay), FACTORY);
097        }
098    
099        @Override
100        protected synchronized void write(final byte[] bytes, final int offset, final int length)  {
101            if (socket == null) {
102                if (connector != null) {
103                    connector.latch();
104                }
105                if (socket == null) {
106                    final String msg = "Error writing to " + getName() + " socket not available";
107                    throw new AppenderRuntimeException(msg);
108                }
109            }
110            try {
111                getOutputStream().write(bytes, offset, length);
112            } catch (final IOException ex) {
113                if (retry && connector == null) {
114                    connector = new Reconnector(this);
115                    connector.setDaemon(true);
116                    connector.setPriority(Thread.MIN_PRIORITY);
117                    connector.start();
118                }
119                final String msg = "Error writing to " + getName();
120                throw new AppenderRuntimeException(msg, ex);
121            }
122        }
123    
124        @Override
125        protected synchronized void close() {
126            super.close();
127            if (connector != null) {
128                connector.shutdown();
129                connector.interrupt();
130                connector = null;
131            }
132        }
133    
134        /**
135         * Handles recoonecting to a Thread.
136         */
137        private class Reconnector extends Thread {
138    
139            private CountDownLatch latch = new CountDownLatch(1);
140    
141            private boolean shutdown = false;
142    
143            private final Object owner;
144    
145            public Reconnector(final OutputStreamManager owner) {
146                this.owner = owner;
147            }
148    
149            public void latch()  {
150                try {
151                    latch.await();
152                } catch (final InterruptedException ex) {
153                    // Ignore the exception.
154                }
155            }
156    
157            public void shutdown() {
158                shutdown = true;
159            }
160    
161            @Override
162            public void run() {
163                while (!shutdown) {
164                    try {
165                        sleep(reconnectionDelay);
166                        final Socket sock = new Socket(address, port);
167                        final OutputStream newOS = sock.getOutputStream();
168                        synchronized (owner) {
169                            try {
170                                getOutputStream().close();
171                            } catch (final IOException ioe) {
172                                // Ignore this.
173                            }
174    
175                            setOutputStream(newOS);
176                            socket = sock;
177                            connector = null;
178                            shutdown = true;
179                        }
180                        LOGGER.debug("Connection to " + host + ":" + port + " reestablished.");
181                    } catch (final InterruptedException ie) {
182                        LOGGER.debug("Reconnection interrupted.");
183                    } catch (final ConnectException ex) {
184                        LOGGER.debug(host + ":" + port + " refused connection");
185                    } catch (final IOException ioe) {
186                        LOGGER.debug("Unable to reconnect to " + host + ":" + port);
187                    } finally {
188                        latch.countDown();
189                    }
190                }
191            }
192        }
193    
194        /**
195         * Data for the factory.
196         */
197        private static class FactoryData {
198            private final String host;
199            private final int port;
200            private final int delay;
201    
202            public FactoryData(final String host, final int port, final int delay) {
203                this.host = host;
204                this.port = port;
205                this.delay = delay;
206            }
207        }
208    
209        /**
210         * Factory to create a TCPSocketManager.
211         */
212        private static class TCPSocketManagerFactory implements ManagerFactory<TCPSocketManager, FactoryData> {
213    
214            public TCPSocketManager createManager(final String name, final FactoryData data) {
215    
216                InetAddress address;
217                OutputStream os = null;
218                try {
219                    address = InetAddress.getByName(data.host);
220                } catch (final UnknownHostException ex) {
221                    LOGGER.error("Could not find address of " + data.host, ex);
222                    return null;
223                }
224                try {
225                    final Socket socket = new Socket(data.host, data.port);
226                    os = socket.getOutputStream();
227                    return new TCPSocketManager(name, os, socket, address, data.host, data.port, data.delay);
228                } catch (final IOException ex) {
229                    LOGGER.error("TCPSocketManager (" + name + ") " + ex);
230                    os = new ByteArrayOutputStream();
231                }
232                if (data.delay == 0) {
233                    return null;
234                }
235                return new TCPSocketManager(name, os, null, address, data.host, data.port, data.delay);
236            }
237        }
238    }