001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017package org.apache.logging.log4j.core.net.server;
018
019import java.io.BufferedReader;
020import java.io.EOFException;
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.InputStreamReader;
024import java.io.ObjectInputStream;
025import java.io.OptionalDataException;
026import java.net.ServerSocket;
027import java.net.Socket;
028import java.nio.charset.Charset;
029import java.util.Map;
030import java.util.concurrent.ConcurrentHashMap;
031import java.util.concurrent.ConcurrentMap;
032
033import org.apache.logging.log4j.core.config.ConfigurationFactory;
034import org.apache.logging.log4j.core.util.Log4jThread;
035import org.apache.logging.log4j.message.EntryMessage;
036
037/**
038 * Listens for events over a socket connection.
039 * 
040 * @param <T>
041 *        The kind of input stream read
042 */
043public class TcpSocketServer<T extends InputStream> extends AbstractSocketServer<T> {
044
045    /**
046     * Thread that processes the events.
047     */
048    private class SocketHandler extends Thread {
049
050        private final T inputStream;
051
052        private volatile boolean shutdown = false;
053
054        public SocketHandler(final Socket socket) throws IOException {
055            this.inputStream = logEventInput.wrapStream(socket.getInputStream());
056        }
057
058        @Override
059        public void run() {
060            final EntryMessage entry = logger.traceEntry();
061            boolean closed = false;
062            try {
063                try {
064                    while (!shutdown) {
065                        logEventInput.logEvents(inputStream, TcpSocketServer.this);
066                    }
067                } catch (final EOFException e) {
068                    closed = true;
069                } catch (final OptionalDataException e) {
070                    logger.error("OptionalDataException eof=" + e.eof + " length=" + e.length, e);
071                } catch (final IOException e) {
072                    logger.error("IOException encountered while reading from socket", e);
073                }
074                if (!closed) {
075                    try {
076                        inputStream.close();
077                    } catch (final Exception ex) {
078                        // Ignore the exception;
079                    }
080                }
081            } finally {
082                handlers.remove(Long.valueOf(getId()));
083            }
084            logger.traceExit(entry);
085        }
086
087        public void shutdown() {
088            this.shutdown = true;
089            interrupt();
090        }
091    }
092
093    private final ConcurrentMap<Long, SocketHandler> handlers = new ConcurrentHashMap<>();
094
095    private final ServerSocket serverSocket;
096
097    /**
098     * Constructor.
099     * 
100     * @param port
101     *        to listen.
102     * @param logEventInput
103     *        the log even input
104     * @throws IOException
105     *         if an I/O error occurs when opening the socket.
106     */
107    public TcpSocketServer(final int port, final LogEventBridge<T> logEventInput) throws IOException {
108        this(port, logEventInput, new ServerSocket(port));
109    }
110
111    /**
112     * Constructor.
113     * 
114     * @param port
115     *        to listen.
116     * @param logEventInput
117     *        the log even input
118     * @param serverSocket
119     *        the socket server
120     * @throws IOException
121     *         if an I/O error occurs when opening the socket.
122     */
123    public TcpSocketServer(final int port, final LogEventBridge<T> logEventInput, final ServerSocket serverSocket)
124            throws IOException {
125        super(port, logEventInput);
126        this.serverSocket = serverSocket;
127    }
128
129    /**
130     * Creates a socket server that reads JSON log events.
131     * 
132     * @param port
133     *        the port to listen
134     * @return a new a socket server
135     * @throws IOException
136     *         if an I/O error occurs when opening the socket.
137     */
138    public static TcpSocketServer<InputStream> createJsonSocketServer(final int port) throws IOException {
139        LOGGER.entry("createJsonSocketServer", port);
140        final TcpSocketServer<InputStream> socketServer = new TcpSocketServer<>(port, new JsonInputStreamLogEventBridge());
141        return LOGGER.exit(socketServer);
142    }
143
144    /**
145     * Creates a socket server that reads serialized log events.
146     * 
147     * @param port
148     *        the port to listen
149     * @return a new a socket server
150     * @throws IOException
151     *         if an I/O error occurs when opening the socket.
152     */
153    public static TcpSocketServer<ObjectInputStream> createSerializedSocketServer(final int port) throws IOException {
154        LOGGER.entry(port);
155        final TcpSocketServer<ObjectInputStream> socketServer = new TcpSocketServer<>(port, new ObjectInputStreamLogEventBridge());
156        return LOGGER.exit(socketServer);
157    }
158
159    /**
160     * Creates a socket server that reads XML log events.
161     * 
162     * @param port
163     *        the port to listen
164     * @return a new a socket server
165     * @throws IOException
166     *         if an I/O error occurs when opening the socket.
167     */
168    public static TcpSocketServer<InputStream> createXmlSocketServer(final int port) throws IOException {
169        LOGGER.entry(port);
170        final TcpSocketServer<InputStream> socketServer = new TcpSocketServer<>(port, new XmlInputStreamLogEventBridge());
171        return LOGGER.exit(socketServer);
172    }
173
174    /**
175     * Main startup for the server.
176     * 
177     * @param args
178     *        The command line arguments.
179     * @throws Exception
180     *         if an error occurs.
181     */
182    public static void main(final String[] args) throws Exception {
183        if (args.length < 1 || args.length > 2) {
184            System.err.println("Incorrect number of arguments");
185            printUsage();
186            return;
187        }
188        final int port = Integer.parseInt(args[0]);
189        if (port <= 0 || port >= MAX_PORT) {
190            System.err.println("Invalid port number");
191            printUsage();
192            return;
193        }
194        if (args.length == 2 && args[1].length() > 0) {
195            ConfigurationFactory.setConfigurationFactory(new ServerConfigurationFactory(args[1]));
196        }
197        final TcpSocketServer<ObjectInputStream> socketServer = TcpSocketServer.createSerializedSocketServer(port);
198        final Thread serverThread = new Log4jThread(socketServer);
199        serverThread.start();
200        final Charset enc = Charset.defaultCharset();
201        final BufferedReader reader = new BufferedReader(new InputStreamReader(System.in, enc));
202        while (true) {
203            final String line = reader.readLine();
204            if (line == null || line.equalsIgnoreCase("Quit") || line.equalsIgnoreCase("Stop")
205                    || line.equalsIgnoreCase("Exit")) {
206                socketServer.shutdown();
207                serverThread.join();
208                break;
209            }
210        }
211    }
212
213    private static void printUsage() {
214        System.out.println("Usage: ServerSocket port configFilePath");
215    }
216
217    /**
218     * Accept incoming events and processes them.
219     */
220    @Override
221    public void run() {
222        final EntryMessage entry = logger.traceEntry();
223        while (isActive()) {
224            if (serverSocket.isClosed()) {
225                return;
226            }
227            try {
228                // Accept incoming connections.
229                logger.debug("Socket accept()...");
230                final Socket clientSocket = serverSocket.accept();
231                logger.debug("Socket accepted: {}", clientSocket);
232                clientSocket.setSoLinger(true, 0);
233
234                // accept() will block until a client connects to the server.
235                // If execution reaches this point, then it means that a client
236                // socket has been accepted.
237
238                final SocketHandler handler = new SocketHandler(clientSocket);
239                handlers.put(Long.valueOf(handler.getId()), handler);
240                handler.start();
241            } catch (final IOException e) {
242                if (serverSocket.isClosed()) {
243                    // OK we're done.
244                    logger.traceExit(entry);
245                    return;
246                }
247                logger.error("Exception encountered on accept. Ignoring. Stack Trace :", e);
248            }
249        }
250        for (final Map.Entry<Long, SocketHandler> handlerEntry : handlers.entrySet()) {
251            final SocketHandler handler = handlerEntry.getValue();
252            handler.shutdown();
253            try {
254                handler.join();
255            } catch (final InterruptedException ie) {
256                // Ignore the exception
257            }
258        }
259        logger.traceExit(entry);
260    }
261
262    /**
263     * Shutdown the server.
264     * 
265     * @throws IOException if the server socket could not be closed
266     */
267    public void shutdown() throws IOException {
268        final EntryMessage entry = logger.traceEntry();
269        setActive(false);
270        Thread.currentThread().interrupt();
271        serverSocket.close();
272        logger.traceExit(entry);
273    }
274}