001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017package org.apache.logging.log4j.core.net.server; 018 019import java.io.BufferedReader; 020import java.io.EOFException; 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.InputStreamReader; 024import java.io.ObjectInputStream; 025import java.io.OptionalDataException; 026import java.net.ServerSocket; 027import java.net.Socket; 028import java.nio.charset.Charset; 029import java.util.Map; 030import java.util.concurrent.ConcurrentHashMap; 031import java.util.concurrent.ConcurrentMap; 032 033import org.apache.logging.log4j.core.config.ConfigurationFactory; 034import org.apache.logging.log4j.core.util.Log4jThread; 035 036/** 037 * Listens for events over a socket connection. 038 * 039 * @param <T> 040 * The kind of input stream read 041 */ 042public class TcpSocketServer<T extends InputStream> extends AbstractSocketServer<T> { 043 044 /** 045 * Thread that processes the events. 046 */ 047 private class SocketHandler extends Thread { 048 049 private final T inputStream; 050 051 private volatile boolean shutdown = false; 052 053 public SocketHandler(final Socket socket) throws IOException { 054 this.inputStream = logEventInput.wrapStream(socket.getInputStream()); 055 } 056 057 @Override 058 public void run() { 059 logger.entry(); 060 boolean closed = false; 061 try { 062 try { 063 while (!shutdown) { 064 logEventInput.logEvents(inputStream, TcpSocketServer.this); 065 } 066 } catch (final EOFException e) { 067 closed = true; 068 } catch (final OptionalDataException e) { 069 logger.error("OptionalDataException eof=" + e.eof + " length=" + e.length, e); 070 } catch (final IOException e) { 071 logger.error("IOException encountered while reading from socket", e); 072 } 073 if (!closed) { 074 try { 075 inputStream.close(); 076 } catch (final Exception ex) { 077 // Ignore the exception; 078 } 079 } 080 } finally { 081 handlers.remove(Long.valueOf(getId())); 082 } 083 logger.exit(); 084 } 085 086 public void shutdown() { 087 this.shutdown = true; 088 interrupt(); 089 } 090 } 091 092 private final ConcurrentMap<Long, SocketHandler> handlers = new ConcurrentHashMap<>(); 093 094 private final ServerSocket serverSocket; 095 096 /** 097 * Constructor. 098 * 099 * @param port 100 * to listen. 101 * @param logEventInput 102 * the log even input 103 * @throws IOException 104 * if an I/O error occurs when opening the socket. 105 */ 106 public TcpSocketServer(final int port, final LogEventBridge<T> logEventInput) throws IOException { 107 this(port, logEventInput, new ServerSocket(port)); 108 } 109 110 /** 111 * Constructor. 112 * 113 * @param port 114 * to listen. 115 * @param logEventInput 116 * the log even input 117 * @param serverSocket 118 * the socket server 119 * @throws IOException 120 * if an I/O error occurs when opening the socket. 121 */ 122 public TcpSocketServer(final int port, final LogEventBridge<T> logEventInput, final ServerSocket serverSocket) 123 throws IOException { 124 super(port, logEventInput); 125 this.serverSocket = serverSocket; 126 } 127 128 /** 129 * Creates a socket server that reads JSON log events. 130 * 131 * @param port 132 * the port to listen 133 * @return a new a socket server 134 * @throws IOException 135 * if an I/O error occurs when opening the socket. 136 */ 137 public static TcpSocketServer<InputStream> createJsonSocketServer(final int port) throws IOException { 138 LOGGER.entry("createJsonSocketServer", port); 139 final TcpSocketServer<InputStream> socketServer = new TcpSocketServer<>(port, new JsonInputStreamLogEventBridge()); 140 return LOGGER.exit(socketServer); 141 } 142 143 /** 144 * Creates a socket server that reads serialized log events. 145 * 146 * @param port 147 * the port to listen 148 * @return a new a socket server 149 * @throws IOException 150 * if an I/O error occurs when opening the socket. 151 */ 152 public static TcpSocketServer<ObjectInputStream> createSerializedSocketServer(final int port) throws IOException { 153 LOGGER.entry(port); 154 final TcpSocketServer<ObjectInputStream> socketServer = new TcpSocketServer<>(port, new ObjectInputStreamLogEventBridge()); 155 return LOGGER.exit(socketServer); 156 } 157 158 /** 159 * Creates a socket server that reads XML log events. 160 * 161 * @param port 162 * the port to listen 163 * @return a new a socket server 164 * @throws IOException 165 * if an I/O error occurs when opening the socket. 166 */ 167 public static TcpSocketServer<InputStream> createXmlSocketServer(final int port) throws IOException { 168 LOGGER.entry(port); 169 final TcpSocketServer<InputStream> socketServer = new TcpSocketServer<>(port, new XmlInputStreamLogEventBridge()); 170 return LOGGER.exit(socketServer); 171 } 172 173 /** 174 * Main startup for the server. 175 * 176 * @param args 177 * The command line arguments. 178 * @throws Exception 179 * if an error occurs. 180 */ 181 public static void main(final String[] args) throws Exception { 182 if (args.length < 1 || args.length > 2) { 183 System.err.println("Incorrect number of arguments"); 184 printUsage(); 185 return; 186 } 187 final int port = Integer.parseInt(args[0]); 188 if (port <= 0 || port >= MAX_PORT) { 189 System.err.println("Invalid port number"); 190 printUsage(); 191 return; 192 } 193 if (args.length == 2 && args[1].length() > 0) { 194 ConfigurationFactory.setConfigurationFactory(new ServerConfigurationFactory(args[1])); 195 } 196 final TcpSocketServer<ObjectInputStream> socketServer = TcpSocketServer.createSerializedSocketServer(port); 197 final Thread serverThread = new Log4jThread(socketServer); 198 serverThread.start(); 199 final Charset enc = Charset.defaultCharset(); 200 final BufferedReader reader = new BufferedReader(new InputStreamReader(System.in, enc)); 201 while (true) { 202 final String line = reader.readLine(); 203 if (line == null || line.equalsIgnoreCase("Quit") || line.equalsIgnoreCase("Stop") 204 || line.equalsIgnoreCase("Exit")) { 205 socketServer.shutdown(); 206 serverThread.join(); 207 break; 208 } 209 } 210 } 211 212 private static void printUsage() { 213 System.out.println("Usage: ServerSocket port configFilePath"); 214 } 215 216 /** 217 * Accept incoming events and processes them. 218 */ 219 @Override 220 public void run() { 221 logger.entry(); 222 while (isActive()) { 223 if (serverSocket.isClosed()) { 224 return; 225 } 226 try { 227 // Accept incoming connections. 228 logger.debug("Socket accept()..."); 229 final Socket clientSocket = serverSocket.accept(); 230 logger.debug("Socket accepted: {}", clientSocket); 231 clientSocket.setSoLinger(true, 0); 232 233 // accept() will block until a client connects to the server. 234 // If execution reaches this point, then it means that a client 235 // socket has been accepted. 236 237 final SocketHandler handler = new SocketHandler(clientSocket); 238 handlers.put(Long.valueOf(handler.getId()), handler); 239 handler.start(); 240 } catch (final IOException e) { 241 if (serverSocket.isClosed()) { 242 // OK we're done. 243 logger.exit(); 244 return; 245 } 246 logger.error("Exception encountered on accept. Ignoring. Stack Trace :", e); 247 } 248 } 249 for (final Map.Entry<Long, SocketHandler> entry : handlers.entrySet()) { 250 final SocketHandler handler = entry.getValue(); 251 handler.shutdown(); 252 try { 253 handler.join(); 254 } catch (final InterruptedException ie) { 255 // Ignore the exception 256 } 257 } 258 logger.exit(); 259 } 260 261 /** 262 * Shutdown the server. 263 * 264 * @throws IOException if the server socket could not be closed 265 */ 266 public void shutdown() throws IOException { 267 logger.entry(); 268 setActive(false); 269 Thread.currentThread().interrupt(); 270 serverSocket.close(); 271 logger.exit(); 272 } 273}