001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements. See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache license, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License. You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the license for the specific language governing permissions and
015     * limitations under the license.
016     */
017    package org.apache.logging.log4j.core.net;
018    
019    import java.io.ByteArrayOutputStream;
020    import java.io.IOException;
021    import java.io.OutputStream;
022    import java.io.Serializable;
023    import java.net.ConnectException;
024    import java.net.InetAddress;
025    import java.net.Socket;
026    import java.net.UnknownHostException;
027    import java.util.HashMap;
028    import java.util.Map;
029    import java.util.concurrent.CountDownLatch;
030    
031    import org.apache.logging.log4j.core.Layout;
032    import org.apache.logging.log4j.core.appender.AppenderLoggingException;
033    import org.apache.logging.log4j.core.appender.ManagerFactory;
034    import org.apache.logging.log4j.core.appender.OutputStreamManager;
035    import org.apache.logging.log4j.util.Strings;
036    
037    /**
038     * Manager of TCP Socket connections.
039     */
040    public class TcpSocketManager extends AbstractSocketManager {
041        /**
042          The default reconnection delay (30000 milliseconds or 30 seconds).
043         */
044        public static final int DEFAULT_RECONNECTION_DELAY   = 30000;
045        /**
046          The default port number of remote logging server (4560).
047         */
048        private static final int DEFAULT_PORT = 4560;
049    
050        private static final TcpSocketManagerFactory FACTORY = new TcpSocketManagerFactory();
051    
052        private final int reconnectionDelay;
053    
054        private Reconnector connector = null;
055    
056        private Socket socket;
057    
058        private final boolean retry;
059    
060        private final boolean immediateFail;
061    
062        /**
063         * The Constructor.
064         * @param name The unique name of this connection.
065         * @param os The OutputStream.
066         * @param sock The Socket.
067         * @param inetAddress The internet address of the host.
068         * @param host The name of the host.
069         * @param port The port number on the host.
070         * @param delay Reconnection interval.
071         * @param immediateFail
072         * @param layout The Layout.
073         */
074        public TcpSocketManager(final String name, final OutputStream os, final Socket sock, final InetAddress inetAddress,
075                                final String host, final int port, final int delay, final boolean immediateFail,
076                                final Layout<? extends Serializable> layout) {
077            super(name, os, inetAddress, host, port, layout);
078            this.reconnectionDelay = delay;
079            this.socket = sock;
080            this.immediateFail = immediateFail;
081            retry = delay > 0;
082            if (sock == null) {
083                connector = new Reconnector(this);
084                connector.setDaemon(true);
085                connector.setPriority(Thread.MIN_PRIORITY);
086                connector.start();
087            }
088        }
089    
090        /**
091         * Obtain a TcpSocketManager.
092         * @param host The host to connect to.
093         * @param port The port on the host.
094         * @param delay The interval to pause between retries.
095         * @return A TcpSocketManager.
096         */
097        public static TcpSocketManager getSocketManager(final String host, int port, int delay,
098                                                        final boolean immediateFail, final Layout<? extends Serializable> layout ) {
099            if (Strings.isEmpty(host)) {
100                throw new IllegalArgumentException("A host name is required");
101            }
102            if (port <= 0) {
103                port = DEFAULT_PORT;
104            }
105            if (delay == 0) {
106                delay = DEFAULT_RECONNECTION_DELAY;
107            }
108            return (TcpSocketManager) getManager("TCP:" + host + ':' + port,
109                new FactoryData(host, port, delay, immediateFail, layout), FACTORY);
110        }
111    
112        @Override
113        protected void write(final byte[] bytes, final int offset, final int length)  {
114            if (socket == null) {
115                if (connector != null && !immediateFail) {
116                    connector.latch();
117                }
118                if (socket == null) {
119                    final String msg = "Error writing to " + getName() + " socket not available";
120                    throw new AppenderLoggingException(msg);
121                }
122            }
123            synchronized (this) {
124                try {
125                    getOutputStream().write(bytes, offset, length);
126                } catch (final IOException ex) {
127                    if (retry && connector == null) {
128                        connector = new Reconnector(this);
129                        connector.setDaemon(true);
130                        connector.setPriority(Thread.MIN_PRIORITY);
131                        connector.start();
132                    }
133                    final String msg = "Error writing to " + getName();
134                    throw new AppenderLoggingException(msg, ex);
135                }
136            }
137        }
138    
139        @Override
140        protected synchronized void close() {
141            super.close();
142            if (connector != null) {
143                connector.shutdown();
144                connector.interrupt();
145                connector = null;
146            }
147        }
148    
149        /**
150         * TcpSocketManager's content format is specified by:<p/>
151         * Key: "protocol" Value: "tcp"<p/>
152         * Key: "direction" Value: "out"
153         * @return Map of content format keys supporting TcpSocketManager
154         */
155        @Override
156        public Map<String, String> getContentFormat()
157        {
158            final Map<String, String> result = new HashMap<String, String>(super.getContentFormat());
159            result.put("protocol", "tcp");
160            result.put("direction", "out");
161            return result;
162        }
163    
164        /**
165         * Handles reconnecting to a Thread.
166         */
167        private class Reconnector extends Thread {
168    
169            private final CountDownLatch latch = new CountDownLatch(1);
170    
171            private boolean shutdown = false;
172    
173            private final Object owner;
174    
175            public Reconnector(final OutputStreamManager owner) {
176                this.owner = owner;
177            }
178    
179            public void latch()  {
180                try {
181                    latch.await();
182                } catch (final InterruptedException ex) {
183                    // Ignore the exception.
184                }
185            }
186    
187            public void shutdown() {
188                shutdown = true;
189            }
190    
191            @Override
192            public void run() {
193                while (!shutdown) {
194                    try {
195                        sleep(reconnectionDelay);
196                        final Socket sock = createSocket(inetAddress, port);
197                        final OutputStream newOS = sock.getOutputStream();
198                        synchronized (owner) {
199                            try {
200                                getOutputStream().close();
201                            } catch (final IOException ioe) {
202                                // Ignore this.
203                            }
204    
205                            setOutputStream(newOS);
206                            socket = sock;
207                            connector = null;
208                            shutdown = true;
209                        }
210                        LOGGER.debug("Connection to " + host + ':' + port + " reestablished.");
211                    } catch (final InterruptedException ie) {
212                        LOGGER.debug("Reconnection interrupted.");
213                    } catch (final ConnectException ex) {
214                        LOGGER.debug(host + ':' + port + " refused connection");
215                    } catch (final IOException ioe) {
216                        LOGGER.debug("Unable to reconnect to " + host + ':' + port);
217                    } finally {
218                        latch.countDown();
219                    }
220                }
221            }
222        }
223    
224        protected Socket createSocket(final InetAddress host, final int port) throws IOException {
225            return createSocket(host.getHostName(), port);
226        }
227    
228        protected Socket createSocket(final String host, final int port) throws IOException {
229            return new Socket(host, port);
230        }
231    
232        /**
233         * Data for the factory.
234         */
235        private static class FactoryData {
236            private final String host;
237            private final int port;
238            private final int delay;
239            private final boolean immediateFail;
240            private final Layout<? extends Serializable> layout;
241    
242            public FactoryData(final String host, final int port, final int delay, final boolean immediateFail,
243                               final Layout<? extends Serializable> layout) {
244                this.host = host;
245                this.port = port;
246                this.delay = delay;
247                this.immediateFail = immediateFail;
248                this.layout = layout;
249            }
250        }
251    
252        /**
253         * Factory to create a TcpSocketManager.
254         */
255        protected static class TcpSocketManagerFactory implements ManagerFactory<TcpSocketManager, FactoryData> {
256            @Override
257            public TcpSocketManager createManager(final String name, final FactoryData data) {
258    
259                InetAddress inetAddress;
260                OutputStream os;
261                try {
262                    inetAddress = InetAddress.getByName(data.host);
263                } catch (final UnknownHostException ex) {
264                    LOGGER.error("Could not find address of " + data.host, ex);
265                    return null;
266                }
267                try {
268                    final Socket socket = new Socket(data.host, data.port);
269                    os = socket.getOutputStream();
270                    return new TcpSocketManager(name, os, socket, inetAddress, data.host, data.port, data.delay,
271                        data.immediateFail, data.layout);
272                } catch (final IOException ex) {
273                    LOGGER.error("TcpSocketManager (" + name + ") " + ex);
274                    os = new ByteArrayOutputStream();
275                }
276                if (data.delay == 0) {
277                    return null;
278                }
279                return new TcpSocketManager(name, os, null, inetAddress, data.host, data.port, data.delay, data.immediateFail,
280                    data.layout);
281            }
282        }
283    }