001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements. See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache license, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License. You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the license for the specific language governing permissions and
015     * limitations under the license.
016     */
017    package org.apache.logging.log4j.core.net;
018    
019    import java.util.HashMap;
020    import java.util.Map;
021    
022    import org.apache.logging.log4j.core.Layout;
023    import org.apache.logging.log4j.core.appender.AppenderRuntimeException;
024    import org.apache.logging.log4j.core.appender.ManagerFactory;
025    import org.apache.logging.log4j.core.appender.OutputStreamManager;
026    
027    import java.io.ByteArrayOutputStream;
028    import java.io.IOException;
029    import java.io.OutputStream;
030    import java.net.ConnectException;
031    import java.net.InetAddress;
032    import java.net.Socket;
033    import java.net.UnknownHostException;
034    import java.util.concurrent.CountDownLatch;
035    
036    /**
037     * Manager of TCP Socket connections.
038     */
039    public class TCPSocketManager extends AbstractSocketManager {
040        /**
041          The default reconnection delay (30000 milliseconds or 30 seconds).
042         */
043        public static final int DEFAULT_RECONNECTION_DELAY   = 30000;
044        /**
045          The default port number of remote logging server (4560).
046         */
047        private static final int DEFAULT_PORT = 4560;
048    
049        private static final TCPSocketManagerFactory FACTORY = new TCPSocketManagerFactory();
050    
051        private final int reconnectionDelay;
052    
053        private Reconnector connector = null;
054    
055        private Socket socket;
056    
057        private final boolean retry;
058    
059        private final boolean immediateFail;
060    
061        /**
062         * The Constructor.
063         * @param name The unique name of this connection.
064         * @param os The OutputStream.
065         * @param sock The Socket.
066         * @param addr The internet address of the host.
067         * @param host The name of the host.
068         * @param port The port number on the host.
069         * @param delay Reconnection interval.
070         * @param immediateFail
071         * @param layout The Layout.
072         */
073        public TCPSocketManager(final String name, final OutputStream os, final Socket sock, final InetAddress addr,
074                                final String host, final int port, final int delay, final boolean immediateFail,
075                                final Layout layout) {
076            super(name, os, addr, host, port, layout);
077            this.reconnectionDelay = delay;
078            this.socket = sock;
079            this.immediateFail = immediateFail;
080            retry = delay > 0;
081            if (sock == null) {
082                connector = new Reconnector(this);
083                connector.setDaemon(true);
084                connector.setPriority(Thread.MIN_PRIORITY);
085                connector.start();
086            }
087        }
088    
089        /**
090         * Obtain a TCPSocketManager.
091         * @param host The host to connect to.
092         * @param port The port on the host.
093         * @param delay The interval to pause between retries.
094         * @return A TCPSocketManager.
095         */
096        public static TCPSocketManager getSocketManager(final String host, int port, int delay,
097                                                        final boolean immediateFail, final Layout layout ) {
098            if (host == null || host.length() == 0) {
099                throw new IllegalArgumentException("A host name is required");
100            }
101            if (port <= 0) {
102                port = DEFAULT_PORT;
103            }
104            if (delay == 0) {
105                delay = DEFAULT_RECONNECTION_DELAY;
106            }
107            return (TCPSocketManager) getManager("TCP:" + host + ":" + port,
108                new FactoryData(host, port, delay, immediateFail, layout), FACTORY);
109        }
110    
111        @Override
112        protected void write(final byte[] bytes, final int offset, final int length)  {
113            if (socket == null) {
114                if (connector != null && !immediateFail) {
115                    connector.latch();
116                }
117                if (socket == null) {
118                    final String msg = "Error writing to " + getName() + " socket not available";
119                    throw new AppenderRuntimeException(msg);
120                }
121            }
122            synchronized (this) {
123                try {
124                    getOutputStream().write(bytes, offset, length);
125                } catch (final IOException ex) {
126                    if (retry && connector == null) {
127                        connector = new Reconnector(this);
128                        connector.setDaemon(true);
129                        connector.setPriority(Thread.MIN_PRIORITY);
130                        connector.start();
131                    }
132                    final String msg = "Error writing to " + getName();
133                    throw new AppenderRuntimeException(msg, ex);
134                }
135            }
136        }
137    
138        @Override
139        protected synchronized void close() {
140            super.close();
141            if (connector != null) {
142                connector.shutdown();
143                connector.interrupt();
144                connector = null;
145            }
146        }
147    
148        /**
149         * TCPSocketManager's content format is specified by:<p/>
150         * Key: "protocol" Value: "tcp"<p/>
151         * Key: "direction" Value: "out"
152         * @return Map of content format keys supporting TCPSocketManager
153         */
154        @Override
155        public Map<String, String> getContentFormat()
156        {
157            Map<String, String> result = new HashMap<String, String>(super.getContentFormat());
158            result.put("protocol", "tcp");
159            result.put("direction", "out");
160            return result;
161        }
162    
163        /**
164         * Handles reconnecting to a Thread.
165         */
166        private class Reconnector extends Thread {
167    
168            private CountDownLatch latch = new CountDownLatch(1);
169    
170            private boolean shutdown = false;
171    
172            private final Object owner;
173    
174            public Reconnector(final OutputStreamManager owner) {
175                this.owner = owner;
176            }
177    
178            public void latch()  {
179                try {
180                    latch.await();
181                } catch (final InterruptedException ex) {
182                    // Ignore the exception.
183                }
184            }
185    
186            public void shutdown() {
187                shutdown = true;
188            }
189    
190            @Override
191            public void run() {
192                while (!shutdown) {
193                    try {
194                        sleep(reconnectionDelay);
195                        final Socket sock = new Socket(address, port);
196                        final OutputStream newOS = sock.getOutputStream();
197                        synchronized (owner) {
198                            try {
199                                getOutputStream().close();
200                            } catch (final IOException ioe) {
201                                // Ignore this.
202                            }
203    
204                            setOutputStream(newOS);
205                            socket = sock;
206                            connector = null;
207                            shutdown = true;
208                        }
209                        LOGGER.debug("Connection to " + host + ":" + port + " reestablished.");
210                    } catch (final InterruptedException ie) {
211                        LOGGER.debug("Reconnection interrupted.");
212                    } catch (final ConnectException ex) {
213                        LOGGER.debug(host + ":" + port + " refused connection");
214                    } catch (final IOException ioe) {
215                        LOGGER.debug("Unable to reconnect to " + host + ":" + port);
216                    } finally {
217                        latch.countDown();
218                    }
219                }
220            }
221        }
222    
223        /**
224         * Data for the factory.
225         */
226        private static class FactoryData {
227            private final String host;
228            private final int port;
229            private final int delay;
230            private final boolean immediateFail;
231            private final Layout layout;
232    
233            public FactoryData(final String host, final int port, final int delay, final boolean immediateFail,
234                               final Layout layout) {
235                this.host = host;
236                this.port = port;
237                this.delay = delay;
238                this.immediateFail = immediateFail;
239                this.layout = layout;
240            }
241        }
242    
243        /**
244         * Factory to create a TCPSocketManager.
245         */
246        private static class TCPSocketManagerFactory implements ManagerFactory<TCPSocketManager, FactoryData> {
247    
248            @Override
249            public TCPSocketManager createManager(final String name, final FactoryData data) {
250    
251                InetAddress address;
252                OutputStream os;
253                try {
254                    address = InetAddress.getByName(data.host);
255                } catch (final UnknownHostException ex) {
256                    LOGGER.error("Could not find address of " + data.host, ex);
257                    return null;
258                }
259                try {
260                    final Socket socket = new Socket(data.host, data.port);
261                    os = socket.getOutputStream();
262                    return new TCPSocketManager(name, os, socket, address, data.host, data.port, data.delay,
263                        data.immediateFail, data.layout);
264                } catch (final IOException ex) {
265                    LOGGER.error("TCPSocketManager (" + name + ") " + ex);
266                    os = new ByteArrayOutputStream();
267                }
268                if (data.delay == 0) {
269                    return null;
270                }
271                return new TCPSocketManager(name, os, null, address, data.host, data.port, data.delay, data.immediateFail,
272                    data.layout);
273            }
274        }
275    }