001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements. See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache license, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License. You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the license for the specific language governing permissions and
015     * limitations under the license.
016     */
017    package org.apache.logging.log4j.core.net;
018    
019    import org.apache.logging.log4j.core.appender.AppenderRuntimeException;
020    import org.apache.logging.log4j.core.appender.ManagerFactory;
021    import org.apache.logging.log4j.core.appender.OutputStreamManager;
022    
023    import java.io.IOException;
024    import java.io.OutputStream;
025    import java.net.ConnectException;
026    import java.net.InetAddress;
027    import java.net.Socket;
028    import java.net.UnknownHostException;
029    
030    /**
031     * Manager of TCP Socket connections.
032     */
033    public class TCPSocketManager extends AbstractSocketManager {
034        /**
035          The default reconnection delay (30000 milliseconds or 30 seconds).
036         */
037        public static final int DEFAULT_RECONNECTION_DELAY   = 30000;
038        /**
039          The default port number of remote logging server (4560).
040         */
041        private static final int DEFAULT_PORT = 4560;
042    
043        private static final TCPSocketManagerFactory factory = new TCPSocketManagerFactory();
044    
045        private final int reconnectionDelay;
046    
047        private Reconnector connector = null;
048    
049        private Socket socket;
050    
051        private final boolean retry;
052    
053        /**
054         * The Constructor.
055         * @param name The unique name of this connection.
056         * @param os The OutputStream.
057         * @param sock The Socket.
058         * @param addr The internet address of the host.
059         * @param host The name of the host.
060         * @param port The port number on the host.
061         * @param delay Reconnection interval.
062         */
063        public TCPSocketManager(String name, OutputStream os, Socket sock, InetAddress addr, String host, int port,
064                                int delay) {
065            super(name, os, addr, host, port);
066            this.reconnectionDelay = delay;
067            this.socket = sock;
068            retry = delay > 0;
069        }
070    
071        /**
072         * Obtain a TCPSocketManager.
073         * @param host The host to connect to.
074         * @param port The port on the host.
075         * @param delay The interval to pause between retries.
076         * @return A TCPSocketManager.
077         */
078        public static TCPSocketManager getSocketManager(String host, int port, int delay) {
079            if (host == null || host.length() == 0) {
080                throw new IllegalArgumentException("A host name is required");
081            }
082            if (port <= 0) {
083                port = DEFAULT_PORT;
084            }
085            if (delay == 0) {
086                delay = DEFAULT_RECONNECTION_DELAY;
087            }
088            return (TCPSocketManager) getManager("TCP:" + host + ":" + port, new FactoryData(host, port, delay), factory);
089        }
090    
091        @Override
092        protected synchronized void write(byte[] bytes, int offset, int length)  {
093            try {
094                getOutputStream().write(bytes, offset, length);
095                socket.setSendBufferSize(length);
096            } catch (IOException ex) {
097                if (retry && connector == null) {
098                    connector = new Reconnector(this);
099                    connector.setDaemon(true);
100                    connector.setPriority(Thread.MIN_PRIORITY);
101                    connector.start();
102                }
103                String msg = "Error writing to " + getName();
104                throw new AppenderRuntimeException(msg, ex);
105            }
106        }
107    
108        @Override
109        protected synchronized void close() {
110            super.close();
111            if (connector != null) {
112                connector.shutdown();
113                connector.interrupt();
114                connector = null;
115            }
116        }
117    
118        /**
119         * Handles recoonecting to a Thread.
120         */
121        private class Reconnector extends Thread {
122    
123            private boolean shutdown = false;
124    
125            private final Object owner;
126    
127            public Reconnector(OutputStreamManager owner) {
128                this.owner = owner;
129            }
130    
131            public void shutdown() {
132                shutdown = true;
133            }
134    
135            @Override
136            public void run() {
137                while (!shutdown) {
138                    try {
139                        sleep(reconnectionDelay);
140                        Socket sock = new Socket(address, port);
141                        OutputStream newOS = sock.getOutputStream();
142                        synchronized (owner) {
143                            try {
144                                getOutputStream().close();
145                            } catch (IOException ioe) {
146                                // Ignore this.
147                            }
148    
149                            setOutputStream(newOS);
150                            socket = sock;
151                            connector = null;
152                        }
153                        LOGGER.debug("Connection to " + host + ":" + port + " reestablished.");
154                    } catch (InterruptedException ie) {
155                        LOGGER.debug("Reconnection interrupted.");
156                    } catch (ConnectException ex) {
157                        LOGGER.debug(host + ":" + port + " refused connection");
158                    } catch (IOException ioe) {
159                        LOGGER.debug("Unable to reconnect to " + host + ":" + port);
160                    }
161                }
162            }
163        }
164    
165        /**
166         * Data for the factory.
167         */
168        private static class FactoryData {
169            private String host;
170            private int port;
171            private int delay;
172    
173            public FactoryData(String host, int port, int delay) {
174                this.host = host;
175                this.port = port;
176                this.delay = delay;
177            }
178        }
179    
180        /**
181         * Factory to create a TCPSocketManager.
182         */
183        private static class TCPSocketManagerFactory implements ManagerFactory<TCPSocketManager, FactoryData> {
184    
185            public TCPSocketManager createManager(String name, FactoryData data) {
186                try {
187                    InetAddress address = InetAddress.getByName(data.host);
188                    Socket socket = new Socket(data.host, data.port);
189                    OutputStream os = socket.getOutputStream();
190                    return new TCPSocketManager(name, os, socket, address, data.host, data.port, data.delay);
191                } catch (UnknownHostException ex) {
192                    LOGGER.error("Could not find address of " + data.host, ex);
193                } catch (IOException ex) {
194                    LOGGER.error("TCPSocketManager (" + name + ") " + ex);
195                }
196                return null;
197            }
198        }
199    }