001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements. See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache license, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License. You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the license for the specific language governing permissions and
015     * limitations under the license.
016     */
017    package org.apache.logging.log4j.core.net;
018    
019    import java.util.HashMap;
020    import java.util.Map;
021    import org.apache.logging.log4j.core.appender.AppenderRuntimeException;
022    import org.apache.logging.log4j.core.appender.ManagerFactory;
023    import org.apache.logging.log4j.core.appender.OutputStreamManager;
024    
025    import java.io.ByteArrayOutputStream;
026    import java.io.IOException;
027    import java.io.OutputStream;
028    import java.net.ConnectException;
029    import java.net.InetAddress;
030    import java.net.Socket;
031    import java.net.UnknownHostException;
032    import java.util.concurrent.CountDownLatch;
033    
034    /**
035     * Manager of TCP Socket connections.
036     */
037    public class TCPSocketManager extends AbstractSocketManager {
038        /**
039          The default reconnection delay (30000 milliseconds or 30 seconds).
040         */
041        public static final int DEFAULT_RECONNECTION_DELAY   = 30000;
042        /**
043          The default port number of remote logging server (4560).
044         */
045        private static final int DEFAULT_PORT = 4560;
046    
047        private static final TCPSocketManagerFactory FACTORY = new TCPSocketManagerFactory();
048    
049        private final int reconnectionDelay;
050    
051        private Reconnector connector = null;
052    
053        private Socket socket;
054    
055        private final boolean retry;
056    
057        private final boolean immediateFail;
058    
059        /**
060         * The Constructor.
061         * @param name The unique name of this connection.
062         * @param os The OutputStream.
063         * @param sock The Socket.
064         * @param addr The internet address of the host.
065         * @param host The name of the host.
066         * @param port The port number on the host.
067         * @param delay Reconnection interval.
068         */
069        public TCPSocketManager(final String name, final OutputStream os, final Socket sock, final InetAddress addr,
070                                final String host, final int port, final int delay, final boolean immediateFail) {
071            super(name, os, addr, host, port);
072            this.reconnectionDelay = delay;
073            this.socket = sock;
074            this.immediateFail = immediateFail;
075            retry = delay > 0;
076            if (sock == null) {
077                connector = new Reconnector(this);
078                connector.setDaemon(true);
079                connector.setPriority(Thread.MIN_PRIORITY);
080                connector.start();
081            }
082        }
083    
084        /**
085         * Obtain a TCPSocketManager.
086         * @param host The host to connect to.
087         * @param port The port on the host.
088         * @param delay The interval to pause between retries.
089         * @return A TCPSocketManager.
090         */
091        public static TCPSocketManager getSocketManager(final String host, int port, int delay, boolean immediateFail) {
092            if (host == null || host.length() == 0) {
093                throw new IllegalArgumentException("A host name is required");
094            }
095            if (port <= 0) {
096                port = DEFAULT_PORT;
097            }
098            if (delay == 0) {
099                delay = DEFAULT_RECONNECTION_DELAY;
100            }
101            return (TCPSocketManager) getManager("TCP:" + host + ":" + port,
102                new FactoryData(host, port, delay, immediateFail), FACTORY);
103        }
104    
105        @Override
106        protected void write(final byte[] bytes, final int offset, final int length)  {
107            if (socket == null) {
108                if (connector != null && !immediateFail) {
109                    connector.latch();
110                }
111                if (socket == null) {
112                    final String msg = "Error writing to " + getName() + " socket not available";
113                    throw new AppenderRuntimeException(msg);
114                }
115            }
116            synchronized (this) {
117                try {
118                    getOutputStream().write(bytes, offset, length);
119                } catch (final IOException ex) {
120                    if (retry && connector == null) {
121                        connector = new Reconnector(this);
122                        connector.setDaemon(true);
123                        connector.setPriority(Thread.MIN_PRIORITY);
124                        connector.start();
125                    }
126                    final String msg = "Error writing to " + getName();
127                    throw new AppenderRuntimeException(msg, ex);
128                }
129            }
130        }
131    
132        @Override
133        protected synchronized void close() {
134            super.close();
135            if (connector != null) {
136                connector.shutdown();
137                connector.interrupt();
138                connector = null;
139            }
140        }
141    
142        /**
143         * TCPSocketManager's content format is specified by:<p/>
144         * Key: "protocol" Value: "tcp"<p/>
145         * Key: "direction" Value: "out"
146         * @return Map of content format keys supporting TCPSocketManager
147         */
148        public Map<String, String> getContentFormat()
149        {
150            Map<String, String> result = new HashMap<String, String>(super.getContentFormat());
151            result.put("protocol", "tcp");
152            result.put("direction", "out");
153            return result;
154        }
155    
156        /**
157         * Handles reconnecting to a Thread.
158         */
159        private class Reconnector extends Thread {
160    
161            private CountDownLatch latch = new CountDownLatch(1);
162    
163            private boolean shutdown = false;
164    
165            private final Object owner;
166    
167            public Reconnector(final OutputStreamManager owner) {
168                this.owner = owner;
169            }
170    
171            public void latch()  {
172                try {
173                    latch.await();
174                } catch (final InterruptedException ex) {
175                    // Ignore the exception.
176                }
177            }
178    
179            public void shutdown() {
180                shutdown = true;
181            }
182    
183            @Override
184            public void run() {
185                while (!shutdown) {
186                    try {
187                        sleep(reconnectionDelay);
188                        final Socket sock = new Socket(address, port);
189                        final OutputStream newOS = sock.getOutputStream();
190                        synchronized (owner) {
191                            try {
192                                getOutputStream().close();
193                            } catch (final IOException ioe) {
194                                // Ignore this.
195                            }
196    
197                            setOutputStream(newOS);
198                            socket = sock;
199                            connector = null;
200                            shutdown = true;
201                        }
202                        LOGGER.debug("Connection to " + host + ":" + port + " reestablished.");
203                    } catch (final InterruptedException ie) {
204                        LOGGER.debug("Reconnection interrupted.");
205                    } catch (final ConnectException ex) {
206                        LOGGER.debug(host + ":" + port + " refused connection");
207                    } catch (final IOException ioe) {
208                        LOGGER.debug("Unable to reconnect to " + host + ":" + port);
209                    } finally {
210                        latch.countDown();
211                    }
212                }
213            }
214        }
215    
216        /**
217         * Data for the factory.
218         */
219        private static class FactoryData {
220            private final String host;
221            private final int port;
222            private final int delay;
223            private final boolean immediateFail;
224    
225            public FactoryData(final String host, final int port, final int delay, final boolean immediateFail) {
226                this.host = host;
227                this.port = port;
228                this.delay = delay;
229                this.immediateFail = immediateFail;
230            }
231        }
232    
233        /**
234         * Factory to create a TCPSocketManager.
235         */
236        private static class TCPSocketManagerFactory implements ManagerFactory<TCPSocketManager, FactoryData> {
237    
238            public TCPSocketManager createManager(final String name, final FactoryData data) {
239    
240                InetAddress address;
241                OutputStream os;
242                try {
243                    address = InetAddress.getByName(data.host);
244                } catch (final UnknownHostException ex) {
245                    LOGGER.error("Could not find address of " + data.host, ex);
246                    return null;
247                }
248                try {
249                    final Socket socket = new Socket(data.host, data.port);
250                    os = socket.getOutputStream();
251                    return new TCPSocketManager(name, os, socket, address, data.host, data.port, data.delay,
252                        data.immediateFail);
253                } catch (final IOException ex) {
254                    LOGGER.error("TCPSocketManager (" + name + ") " + ex);
255                    os = new ByteArrayOutputStream();
256                }
257                if (data.delay == 0) {
258                    return null;
259                }
260                return new TCPSocketManager(name, os, null, address, data.host, data.port, data.delay, data.immediateFail);
261            }
262        }
263    }