001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017package org.apache.logging.log4j.core.net;
018
019import java.io.ByteArrayOutputStream;
020import java.io.IOException;
021import java.io.OutputStream;
022import java.io.Serializable;
023import java.net.ConnectException;
024import java.net.InetAddress;
025import java.net.InetSocketAddress;
026import java.net.Socket;
027import java.net.UnknownHostException;
028import java.util.HashMap;
029import java.util.Map;
030import java.util.concurrent.CountDownLatch;
031
032import org.apache.logging.log4j.core.Layout;
033import org.apache.logging.log4j.core.appender.AppenderLoggingException;
034import org.apache.logging.log4j.core.appender.ManagerFactory;
035import org.apache.logging.log4j.core.appender.OutputStreamManager;
036import org.apache.logging.log4j.util.Strings;
037
038/**
039 * Manager of TCP Socket connections.
040 */
041public class TcpSocketManager extends AbstractSocketManager {
042    /**
043      The default reconnection delay (30000 milliseconds or 30 seconds).
044     */
045    public static final int DEFAULT_RECONNECTION_DELAY_MILLIS   = 30000;
046    /**
047      The default port number of remote logging server (4560).
048     */
049    private static final int DEFAULT_PORT = 4560;
050
051    private static final TcpSocketManagerFactory FACTORY = new TcpSocketManagerFactory();
052
053    private final int reconnectionDelay;
054
055    private Reconnector connector;
056
057    private Socket socket;
058
059    private final boolean retry;
060
061    private final boolean immediateFail;
062    
063    private final int connectTimeoutMillis;
064
065    /**
066     * The Constructor.
067     * @param name The unique name of this connection.
068     * @param os The OutputStream.
069     * @param sock The Socket.
070     * @param inetAddress The Internet address of the host.
071     * @param host The name of the host.
072     * @param port The port number on the host.
073     * @param connectTimeoutMillis the connect timeout in milliseconds.
074     * @param delay Reconnection interval.
075     * @param immediateFail
076     * @param layout The Layout.
077     */
078    public TcpSocketManager(final String name, final OutputStream os, final Socket sock, final InetAddress inetAddress,
079                            final String host, final int port, final int connectTimeoutMillis, final int delay,
080                            final boolean immediateFail, final Layout<? extends Serializable> layout) {
081        super(name, os, inetAddress, host, port, layout, true);
082        this.connectTimeoutMillis = connectTimeoutMillis;
083        this.reconnectionDelay = delay;
084        this.socket = sock;
085        this.immediateFail = immediateFail;
086        retry = delay > 0;
087        if (sock == null) {
088            connector = new Reconnector(this);
089            connector.setDaemon(true);
090            connector.setPriority(Thread.MIN_PRIORITY);
091            connector.start();
092        }
093    }
094
095    /**
096     * Obtain a TcpSocketManager.
097     * @param host The host to connect to.
098     * @param port The port on the host.
099     * @param connectTimeoutMillis the connect timeout in milliseconds
100     * @param delayMillis The interval to pause between retries.
101     * @return A TcpSocketManager.
102     */
103    public static TcpSocketManager getSocketManager(final String host, int port, final int connectTimeoutMillis,
104            int delayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout) {
105        if (Strings.isEmpty(host)) {
106            throw new IllegalArgumentException("A host name is required");
107        }
108        if (port <= 0) {
109            port = DEFAULT_PORT;
110        }
111        if (delayMillis == 0) {
112            delayMillis = DEFAULT_RECONNECTION_DELAY_MILLIS;
113        }
114        return (TcpSocketManager) getManager("TCP:" + host + ':' + port, new FactoryData(host, port,
115                connectTimeoutMillis, delayMillis, immediateFail, layout), FACTORY);
116    }
117
118    @Override
119    protected void write(final byte[] bytes, final int offset, final int length, boolean immediateFlush)  {
120        if (socket == null) {
121            if (connector != null && !immediateFail) {
122                connector.latch();
123            }
124            if (socket == null) {
125                final String msg = "Error writing to " + getName() + " socket not available";
126                throw new AppenderLoggingException(msg);
127            }
128        }
129        synchronized (this) {
130            try {
131                final OutputStream outputStream = getOutputStream();
132                outputStream.write(bytes, offset, length);
133                if (immediateFlush) {
134                    outputStream.flush();
135                }
136            } catch (final IOException ex) {
137                if (retry && connector == null) {
138                    connector = new Reconnector(this);
139                    connector.setDaemon(true);
140                    connector.setPriority(Thread.MIN_PRIORITY);
141                    connector.start();
142                }
143                final String msg = "Error writing to " + getName();
144                throw new AppenderLoggingException(msg, ex);
145            }
146        }
147    }
148
149    @Override
150    protected synchronized void close() {
151        super.close();
152        if (connector != null) {
153            connector.shutdown();
154            connector.interrupt();
155            connector = null;
156        }
157    }
158
159    public int getConnectTimeoutMillis() {
160        return connectTimeoutMillis;
161    }
162
163    /**
164     * Gets this TcpSocketManager's content format. Specified by:
165     * <ul>
166     * <li>Key: "protocol" Value: "tcp"</li>
167     * <li>Key: "direction" Value: "out"</li>
168     * </ul>
169     * 
170     * @return Map of content format keys supporting TcpSocketManager
171     */
172    @Override
173    public Map<String, String> getContentFormat() {
174        final Map<String, String> result = new HashMap<>(super.getContentFormat());
175        result.put("protocol", "tcp");
176        result.put("direction", "out");
177        return result;
178    }
179
180    /**
181     * Handles reconnecting to a Thread.
182     */
183    private class Reconnector extends Thread {
184
185        private final CountDownLatch latch = new CountDownLatch(1);
186
187        private boolean shutdown = false;
188
189        private final Object owner;
190
191        public Reconnector(final OutputStreamManager owner) {
192            this.owner = owner;
193        }
194
195        public void latch()  {
196            try {
197                latch.await();
198            } catch (final InterruptedException ex) {
199                // Ignore the exception.
200            }
201        }
202
203        public void shutdown() {
204            shutdown = true;
205        }
206
207        @Override
208        public void run() {
209            while (!shutdown) {
210                try {
211                    sleep(reconnectionDelay);
212                    final Socket sock = createSocket(inetAddress, port);
213                    final OutputStream newOS = sock.getOutputStream();
214                    synchronized (owner) {
215                        try {
216                            getOutputStream().close();
217                        } catch (final IOException ioe) {
218                            // Ignore this.
219                        }
220
221                        setOutputStream(newOS);
222                        socket = sock;
223                        connector = null;
224                        shutdown = true;
225                    }
226                    LOGGER.debug("Connection to " + host + ':' + port + " reestablished.");
227                } catch (final InterruptedException ie) {
228                    LOGGER.debug("Reconnection interrupted.");
229                } catch (final ConnectException ex) {
230                    LOGGER.debug(host + ':' + port + " refused connection");
231                } catch (final IOException ioe) {
232                    LOGGER.debug("Unable to reconnect to " + host + ':' + port);
233                } finally {
234                    latch.countDown();
235                }
236            }
237        }
238    }
239
240    protected Socket createSocket(final InetAddress host, final int port) throws IOException {
241        return createSocket(host.getHostName(), port);
242    }
243
244    protected Socket createSocket(final String host, final int port) throws IOException {
245        final InetSocketAddress address = new InetSocketAddress(host, port);
246        final Socket newSocket = new Socket();
247        newSocket.connect(address, connectTimeoutMillis);
248        return newSocket;
249    }
250
251    /**
252     * Data for the factory.
253     */
254    private static class FactoryData {
255        private final String host;
256        private final int port;
257        private final int connectTimeoutMillis;
258        private final int delayMillis;
259        private final boolean immediateFail;
260        private final Layout<? extends Serializable> layout;
261
262        public FactoryData(final String host, final int port, final int connectTimeoutMillis, final int delayMillis,
263                           final boolean immediateFail, final Layout<? extends Serializable> layout) {
264            this.host = host;
265            this.port = port;
266            this.connectTimeoutMillis = connectTimeoutMillis;
267            this.delayMillis = delayMillis;
268            this.immediateFail = immediateFail;
269            this.layout = layout;
270        }
271    }
272
273    /**
274     * Factory to create a TcpSocketManager.
275     */
276    protected static class TcpSocketManagerFactory implements ManagerFactory<TcpSocketManager, FactoryData> {
277        @Override
278        public TcpSocketManager createManager(final String name, final FactoryData data) {
279
280            InetAddress inetAddress;
281            OutputStream os;
282            try {
283                inetAddress = InetAddress.getByName(data.host);
284            } catch (final UnknownHostException ex) {
285                LOGGER.error("Could not find address of " + data.host, ex, ex);
286                return null;
287            }
288            try {
289                // LOG4J2-1042
290                final Socket socket = new Socket();
291                socket.connect(new InetSocketAddress(data.host, data.port), data.connectTimeoutMillis);
292                os = socket.getOutputStream();
293                return new TcpSocketManager(name, os, socket, inetAddress, data.host, data.port,
294                        data.connectTimeoutMillis, data.delayMillis, data.immediateFail, data.layout);
295            } catch (final IOException ex) {
296                LOGGER.error("TcpSocketManager (" + name + ") " + ex, ex);
297                os = new ByteArrayOutputStream();
298            }
299            if (data.delayMillis == 0) {
300                return null;
301            }
302            return new TcpSocketManager(name, os, null, inetAddress, data.host, data.port, data.connectTimeoutMillis,
303                    data.delayMillis, data.immediateFail, data.layout);
304        }
305    }
306
307}