001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017package org.apache.logging.log4j.core.net.server;
018
019import java.io.BufferedReader;
020import java.io.EOFException;
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.InputStreamReader;
024import java.io.ObjectInputStream;
025import java.io.OptionalDataException;
026import java.net.ServerSocket;
027import java.net.Socket;
028import java.nio.charset.Charset;
029import java.util.Map;
030import java.util.concurrent.ConcurrentHashMap;
031import java.util.concurrent.ConcurrentMap;
032
033import org.apache.logging.log4j.core.config.ConfigurationFactory;
034import org.apache.logging.log4j.core.util.Log4jThread;
035
036/**
037 * Listens for events over a socket connection.
038 * 
039 * @param <T>
040 *        The kind of input stream read
041 */
042public class TcpSocketServer<T extends InputStream> extends AbstractSocketServer<T> {
043
044    /**
045     * Thread that processes the events.
046     */
047    private class SocketHandler extends Thread {
048
049        private final T inputStream;
050
051        private volatile boolean shutdown = false;
052
053        public SocketHandler(final Socket socket) throws IOException {
054            this.inputStream = logEventInput.wrapStream(socket.getInputStream());
055        }
056
057        @Override
058        public void run() {
059            logger.entry();
060            boolean closed = false;
061            try {
062                try {
063                    while (!shutdown) {
064                        logEventInput.logEvents(inputStream, TcpSocketServer.this);
065                    }
066                } catch (final EOFException e) {
067                    closed = true;
068                } catch (final OptionalDataException e) {
069                    logger.error("OptionalDataException eof=" + e.eof + " length=" + e.length, e);
070                } catch (final IOException e) {
071                    logger.error("IOException encountered while reading from socket", e);
072                }
073                if (!closed) {
074                    try {
075                        inputStream.close();
076                    } catch (final Exception ex) {
077                        // Ignore the exception;
078                    }
079                }
080            } finally {
081                handlers.remove(Long.valueOf(getId()));
082            }
083            logger.exit();
084        }
085
086        public void shutdown() {
087            this.shutdown = true;
088            interrupt();
089        }
090    }
091
092    private final ConcurrentMap<Long, SocketHandler> handlers = new ConcurrentHashMap<>();
093
094    private final ServerSocket serverSocket;
095
096    /**
097     * Constructor.
098     * 
099     * @param port
100     *        to listen.
101     * @param logEventInput
102     *        the log even input
103     * @throws IOException
104     *         if an I/O error occurs when opening the socket.
105     */
106    public TcpSocketServer(final int port, final LogEventBridge<T> logEventInput) throws IOException {
107        this(port, logEventInput, new ServerSocket(port));
108    }
109
110    /**
111     * Constructor.
112     * 
113     * @param port
114     *        to listen.
115     * @param logEventInput
116     *        the log even input
117     * @param serverSocket
118     *        the socket server
119     * @throws IOException
120     *         if an I/O error occurs when opening the socket.
121     */
122    public TcpSocketServer(final int port, final LogEventBridge<T> logEventInput, final ServerSocket serverSocket)
123            throws IOException {
124        super(port, logEventInput);
125        this.serverSocket = serverSocket;
126    }
127
128    /**
129     * Creates a socket server that reads JSON log events.
130     * 
131     * @param port
132     *        the port to listen
133     * @return a new a socket server
134     * @throws IOException
135     *         if an I/O error occurs when opening the socket.
136     */
137    public static TcpSocketServer<InputStream> createJsonSocketServer(final int port) throws IOException {
138        LOGGER.entry("createJsonSocketServer", port);
139        final TcpSocketServer<InputStream> socketServer = new TcpSocketServer<>(port, new JsonInputStreamLogEventBridge());
140        return LOGGER.exit(socketServer);
141    }
142
143    /**
144     * Creates a socket server that reads serialized log events.
145     * 
146     * @param port
147     *        the port to listen
148     * @return a new a socket server
149     * @throws IOException
150     *         if an I/O error occurs when opening the socket.
151     */
152    public static TcpSocketServer<ObjectInputStream> createSerializedSocketServer(final int port) throws IOException {
153        LOGGER.entry(port);
154        final TcpSocketServer<ObjectInputStream> socketServer = new TcpSocketServer<>(port, new ObjectInputStreamLogEventBridge());
155        return LOGGER.exit(socketServer);
156    }
157
158    /**
159     * Creates a socket server that reads XML log events.
160     * 
161     * @param port
162     *        the port to listen
163     * @return a new a socket server
164     * @throws IOException
165     *         if an I/O error occurs when opening the socket.
166     */
167    public static TcpSocketServer<InputStream> createXmlSocketServer(final int port) throws IOException {
168        LOGGER.entry(port);
169        final TcpSocketServer<InputStream> socketServer = new TcpSocketServer<>(port, new XmlInputStreamLogEventBridge());
170        return LOGGER.exit(socketServer);
171    }
172
173    /**
174     * Main startup for the server.
175     * 
176     * @param args
177     *        The command line arguments.
178     * @throws Exception
179     *         if an error occurs.
180     */
181    public static void main(final String[] args) throws Exception {
182        if (args.length < 1 || args.length > 2) {
183            System.err.println("Incorrect number of arguments");
184            printUsage();
185            return;
186        }
187        final int port = Integer.parseInt(args[0]);
188        if (port <= 0 || port >= MAX_PORT) {
189            System.err.println("Invalid port number");
190            printUsage();
191            return;
192        }
193        if (args.length == 2 && args[1].length() > 0) {
194            ConfigurationFactory.setConfigurationFactory(new ServerConfigurationFactory(args[1]));
195        }
196        final TcpSocketServer<ObjectInputStream> socketServer = TcpSocketServer.createSerializedSocketServer(port);
197        final Thread serverThread = new Log4jThread(socketServer);
198        serverThread.start();
199        final Charset enc = Charset.defaultCharset();
200        final BufferedReader reader = new BufferedReader(new InputStreamReader(System.in, enc));
201        while (true) {
202            final String line = reader.readLine();
203            if (line == null || line.equalsIgnoreCase("Quit") || line.equalsIgnoreCase("Stop")
204                    || line.equalsIgnoreCase("Exit")) {
205                socketServer.shutdown();
206                serverThread.join();
207                break;
208            }
209        }
210    }
211
212    private static void printUsage() {
213        System.out.println("Usage: ServerSocket port configFilePath");
214    }
215
216    /**
217     * Accept incoming events and processes them.
218     */
219    @Override
220    public void run() {
221        logger.entry();
222        while (isActive()) {
223            if (serverSocket.isClosed()) {
224                return;
225            }
226            try {
227                // Accept incoming connections.
228                logger.debug("Socket accept()...");
229                final Socket clientSocket = serverSocket.accept();
230                logger.debug("Socket accepted: {}", clientSocket);
231                clientSocket.setSoLinger(true, 0);
232
233                // accept() will block until a client connects to the server.
234                // If execution reaches this point, then it means that a client
235                // socket has been accepted.
236
237                final SocketHandler handler = new SocketHandler(clientSocket);
238                handlers.put(Long.valueOf(handler.getId()), handler);
239                handler.start();
240            } catch (final IOException e) {
241                if (serverSocket.isClosed()) {
242                    // OK we're done.
243                    logger.exit();
244                    return;
245                }
246                logger.error("Exception encountered on accept. Ignoring. Stack Trace :", e);
247            }
248        }
249        for (final Map.Entry<Long, SocketHandler> entry : handlers.entrySet()) {
250            final SocketHandler handler = entry.getValue();
251            handler.shutdown();
252            try {
253                handler.join();
254            } catch (final InterruptedException ie) {
255                // Ignore the exception
256            }
257        }
258        logger.exit();
259    }
260
261    /**
262     * Shutdown the server.
263     * 
264     * @throws IOException if the server socket could not be closed
265     */
266    public void shutdown() throws IOException {
267        logger.entry();
268        setActive(false);
269        Thread.currentThread().interrupt();
270        serverSocket.close();
271        logger.exit();
272    }
273}