001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017package org.apache.logging.log4j.core.net;
018
019import org.apache.logging.log4j.core.Layout;
020import org.apache.logging.log4j.core.appender.AppenderLoggingException;
021import org.apache.logging.log4j.core.appender.ManagerFactory;
022import org.apache.logging.log4j.core.appender.OutputStreamManager;
023import org.apache.logging.log4j.core.helpers.Strings;
024
025import java.io.ByteArrayOutputStream;
026import java.io.IOException;
027import java.io.OutputStream;
028import java.io.Serializable;
029import java.net.ConnectException;
030import java.net.InetAddress;
031import java.net.Socket;
032import java.net.UnknownHostException;
033import java.util.HashMap;
034import java.util.Map;
035import java.util.concurrent.CountDownLatch;
036
037/**
038 * Manager of TCP Socket connections.
039 */
040public class TCPSocketManager extends AbstractSocketManager {
041    /**
042      The default reconnection delay (30000 milliseconds or 30 seconds).
043     */
044    public static final int DEFAULT_RECONNECTION_DELAY   = 30000;
045    /**
046      The default port number of remote logging server (4560).
047     */
048    private static final int DEFAULT_PORT = 4560;
049
050    private static final TCPSocketManagerFactory FACTORY = new TCPSocketManagerFactory();
051
052    private final int reconnectionDelay;
053
054    private Reconnector connector = null;
055
056    private Socket socket;
057
058    private final boolean retry;
059
060    private final boolean immediateFail;
061
062    /**
063     * The Constructor.
064     * @param name The unique name of this connection.
065     * @param os The OutputStream.
066     * @param sock The Socket.
067     * @param addr The internet address of the host.
068     * @param host The name of the host.
069     * @param port The port number on the host.
070     * @param delay Reconnection interval.
071     * @param immediateFail
072     * @param layout The Layout.
073     */
074    public TCPSocketManager(final String name, final OutputStream os, final Socket sock, final InetAddress addr,
075                            final String host, final int port, final int delay, final boolean immediateFail,
076                            final Layout<? extends Serializable> layout) {
077        super(name, os, addr, host, port, layout);
078        this.reconnectionDelay = delay;
079        this.socket = sock;
080        this.immediateFail = immediateFail;
081        retry = delay > 0;
082        if (sock == null) {
083            connector = new Reconnector(this);
084            connector.setDaemon(true);
085            connector.setPriority(Thread.MIN_PRIORITY);
086            connector.start();
087        }
088    }
089
090    /**
091     * Obtain a TCPSocketManager.
092     * @param host The host to connect to.
093     * @param port The port on the host.
094     * @param delay The interval to pause between retries.
095     * @return A TCPSocketManager.
096     */
097    public static TCPSocketManager getSocketManager(final String host, int port, int delay,
098                                                    final boolean immediateFail, final Layout<? extends Serializable> layout ) {
099        if (Strings.isEmpty(host)) {
100            throw new IllegalArgumentException("A host name is required");
101        }
102        if (port <= 0) {
103            port = DEFAULT_PORT;
104        }
105        if (delay == 0) {
106            delay = DEFAULT_RECONNECTION_DELAY;
107        }
108        return (TCPSocketManager) getManager("TCP:" + host + ":" + port,
109            new FactoryData(host, port, delay, immediateFail, layout), FACTORY);
110    }
111
112    @Override
113    protected void write(final byte[] bytes, final int offset, final int length)  {
114        if (socket == null) {
115            if (connector != null && !immediateFail) {
116                connector.latch();
117            }
118            if (socket == null) {
119                final String msg = "Error writing to " + getName() + " socket not available";
120                throw new AppenderLoggingException(msg);
121            }
122        }
123        synchronized (this) {
124            try {
125                getOutputStream().write(bytes, offset, length);
126            } catch (final IOException ex) {
127                if (retry && connector == null) {
128                    connector = new Reconnector(this);
129                    connector.setDaemon(true);
130                    connector.setPriority(Thread.MIN_PRIORITY);
131                    connector.start();
132                }
133                final String msg = "Error writing to " + getName();
134                throw new AppenderLoggingException(msg, ex);
135            }
136        }
137    }
138
139    @Override
140    protected synchronized void close() {
141        super.close();
142        if (connector != null) {
143            connector.shutdown();
144            connector.interrupt();
145            connector = null;
146        }
147    }
148
149    /**
150     * TCPSocketManager's content format is specified by:<p/>
151     * Key: "protocol" Value: "tcp"<p/>
152     * Key: "direction" Value: "out"
153     * @return Map of content format keys supporting TCPSocketManager
154     */
155    @Override
156    public Map<String, String> getContentFormat()
157    {
158        final Map<String, String> result = new HashMap<String, String>(super.getContentFormat());
159        result.put("protocol", "tcp");
160        result.put("direction", "out");
161        return result;
162    }
163
164    /**
165     * Handles reconnecting to a Thread.
166     */
167    private class Reconnector extends Thread {
168
169        private final CountDownLatch latch = new CountDownLatch(1);
170
171        private boolean shutdown = false;
172
173        private final Object owner;
174
175        public Reconnector(final OutputStreamManager owner) {
176            this.owner = owner;
177        }
178
179        public void latch()  {
180            try {
181                latch.await();
182            } catch (final InterruptedException ex) {
183                // Ignore the exception.
184            }
185        }
186
187        public void shutdown() {
188            shutdown = true;
189        }
190
191        @Override
192        public void run() {
193            while (!shutdown) {
194                try {
195                    sleep(reconnectionDelay);
196                    final Socket sock = createSocket(address, port);
197                    final OutputStream newOS = sock.getOutputStream();
198                    synchronized (owner) {
199                        try {
200                            getOutputStream().close();
201                        } catch (final IOException ioe) {
202                            // Ignore this.
203                        }
204
205                        setOutputStream(newOS);
206                        socket = sock;
207                        connector = null;
208                        shutdown = true;
209                    }
210                    LOGGER.debug("Connection to " + host + ":" + port + " reestablished.");
211                } catch (final InterruptedException ie) {
212                    LOGGER.debug("Reconnection interrupted.");
213                } catch (final ConnectException ex) {
214                    LOGGER.debug(host + ":" + port + " refused connection");
215                } catch (final IOException ioe) {
216                    LOGGER.debug("Unable to reconnect to " + host + ":" + port);
217                } finally {
218                    latch.countDown();
219                }
220            }
221        }
222    }
223
224    protected Socket createSocket(InetAddress host, int port) throws IOException {
225        return createSocket(host.getHostName(), port);
226    }
227
228    protected Socket createSocket(String host, int port) throws IOException {
229        return new Socket(host, port);
230    }
231
232    /**
233     * Data for the factory.
234     */
235    private static class FactoryData {
236        private final String host;
237        private final int port;
238        private final int delay;
239        private final boolean immediateFail;
240        private final Layout<? extends Serializable> layout;
241
242        public FactoryData(final String host, final int port, final int delay, final boolean immediateFail,
243                           final Layout<? extends Serializable> layout) {
244            this.host = host;
245            this.port = port;
246            this.delay = delay;
247            this.immediateFail = immediateFail;
248            this.layout = layout;
249        }
250    }
251
252    /**
253     * Factory to create a TCPSocketManager.
254     */
255    protected static class TCPSocketManagerFactory implements ManagerFactory<TCPSocketManager, FactoryData> {
256        @Override
257        public TCPSocketManager createManager(final String name, final FactoryData data) {
258
259            InetAddress address;
260            OutputStream os;
261            try {
262                address = InetAddress.getByName(data.host);
263            } catch (final UnknownHostException ex) {
264                LOGGER.error("Could not find address of " + data.host, ex);
265                return null;
266            }
267            try {
268                final Socket socket = new Socket(data.host, data.port);
269                os = socket.getOutputStream();
270                return new TCPSocketManager(name, os, socket, address, data.host, data.port, data.delay,
271                    data.immediateFail, data.layout);
272            } catch (final IOException ex) {
273                LOGGER.error("TCPSocketManager (" + name + ") " + ex);
274                os = new ByteArrayOutputStream();
275            }
276            if (data.delay == 0) {
277                return null;
278            }
279            return new TCPSocketManager(name, os, null, address, data.host, data.port, data.delay, data.immediateFail,
280                data.layout);
281        }
282    }
283}