001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017package org.apache.logging.log4j.core.net.server; 018 019import java.io.BufferedReader; 020import java.io.EOFException; 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.InputStreamReader; 024import java.io.ObjectInputStream; 025import java.io.OptionalDataException; 026import java.net.ServerSocket; 027import java.net.Socket; 028import java.nio.charset.Charset; 029import java.util.Map; 030import java.util.concurrent.ConcurrentHashMap; 031import java.util.concurrent.ConcurrentMap; 032 033import org.apache.logging.log4j.core.config.ConfigurationFactory; 034import org.apache.logging.log4j.core.util.Log4jThread; 035import org.apache.logging.log4j.message.EntryMessage; 036 037/** 038 * Listens for events over a socket connection. 039 * 040 * @param <T> 041 * The kind of input stream read 042 */ 043public class TcpSocketServer<T extends InputStream> extends AbstractSocketServer<T> { 044 045 /** 046 * Thread that processes the events. 047 */ 048 private class SocketHandler extends Thread { 049 050 private final T inputStream; 051 052 private volatile boolean shutdown = false; 053 054 public SocketHandler(final Socket socket) throws IOException { 055 this.inputStream = logEventInput.wrapStream(socket.getInputStream()); 056 } 057 058 @Override 059 public void run() { 060 final EntryMessage entry = logger.traceEntry(); 061 boolean closed = false; 062 try { 063 try { 064 while (!shutdown) { 065 logEventInput.logEvents(inputStream, TcpSocketServer.this); 066 } 067 } catch (final EOFException e) { 068 closed = true; 069 } catch (final OptionalDataException e) { 070 logger.error("OptionalDataException eof=" + e.eof + " length=" + e.length, e); 071 } catch (final IOException e) { 072 logger.error("IOException encountered while reading from socket", e); 073 } 074 if (!closed) { 075 try { 076 inputStream.close(); 077 } catch (final Exception ex) { 078 // Ignore the exception; 079 } 080 } 081 } finally { 082 handlers.remove(Long.valueOf(getId())); 083 } 084 logger.traceExit(entry); 085 } 086 087 public void shutdown() { 088 this.shutdown = true; 089 interrupt(); 090 } 091 } 092 093 private final ConcurrentMap<Long, SocketHandler> handlers = new ConcurrentHashMap<>(); 094 095 private final ServerSocket serverSocket; 096 097 /** 098 * Constructor. 099 * 100 * @param port 101 * to listen. 102 * @param logEventInput 103 * the log even input 104 * @throws IOException 105 * if an I/O error occurs when opening the socket. 106 */ 107 public TcpSocketServer(final int port, final LogEventBridge<T> logEventInput) throws IOException { 108 this(port, logEventInput, new ServerSocket(port)); 109 } 110 111 /** 112 * Constructor. 113 * 114 * @param port 115 * to listen. 116 * @param logEventInput 117 * the log even input 118 * @param serverSocket 119 * the socket server 120 * @throws IOException 121 * if an I/O error occurs when opening the socket. 122 */ 123 public TcpSocketServer(final int port, final LogEventBridge<T> logEventInput, final ServerSocket serverSocket) 124 throws IOException { 125 super(port, logEventInput); 126 this.serverSocket = serverSocket; 127 } 128 129 /** 130 * Creates a socket server that reads JSON log events. 131 * 132 * @param port 133 * the port to listen 134 * @return a new a socket server 135 * @throws IOException 136 * if an I/O error occurs when opening the socket. 137 */ 138 public static TcpSocketServer<InputStream> createJsonSocketServer(final int port) throws IOException { 139 LOGGER.entry("createJsonSocketServer", port); 140 final TcpSocketServer<InputStream> socketServer = new TcpSocketServer<>(port, new JsonInputStreamLogEventBridge()); 141 return LOGGER.exit(socketServer); 142 } 143 144 /** 145 * Creates a socket server that reads serialized log events. 146 * 147 * @param port 148 * the port to listen 149 * @return a new a socket server 150 * @throws IOException 151 * if an I/O error occurs when opening the socket. 152 */ 153 public static TcpSocketServer<ObjectInputStream> createSerializedSocketServer(final int port) throws IOException { 154 LOGGER.entry(port); 155 final TcpSocketServer<ObjectInputStream> socketServer = new TcpSocketServer<>(port, new ObjectInputStreamLogEventBridge()); 156 return LOGGER.exit(socketServer); 157 } 158 159 /** 160 * Creates a socket server that reads XML log events. 161 * 162 * @param port 163 * the port to listen 164 * @return a new a socket server 165 * @throws IOException 166 * if an I/O error occurs when opening the socket. 167 */ 168 public static TcpSocketServer<InputStream> createXmlSocketServer(final int port) throws IOException { 169 LOGGER.entry(port); 170 final TcpSocketServer<InputStream> socketServer = new TcpSocketServer<>(port, new XmlInputStreamLogEventBridge()); 171 return LOGGER.exit(socketServer); 172 } 173 174 /** 175 * Main startup for the server. 176 * 177 * @param args 178 * The command line arguments. 179 * @throws Exception 180 * if an error occurs. 181 */ 182 public static void main(final String[] args) throws Exception { 183 if (args.length < 1 || args.length > 2) { 184 System.err.println("Incorrect number of arguments"); 185 printUsage(); 186 return; 187 } 188 final int port = Integer.parseInt(args[0]); 189 if (port <= 0 || port >= MAX_PORT) { 190 System.err.println("Invalid port number"); 191 printUsage(); 192 return; 193 } 194 if (args.length == 2 && args[1].length() > 0) { 195 ConfigurationFactory.setConfigurationFactory(new ServerConfigurationFactory(args[1])); 196 } 197 final TcpSocketServer<ObjectInputStream> socketServer = TcpSocketServer.createSerializedSocketServer(port); 198 final Thread serverThread = new Log4jThread(socketServer); 199 serverThread.start(); 200 final Charset enc = Charset.defaultCharset(); 201 final BufferedReader reader = new BufferedReader(new InputStreamReader(System.in, enc)); 202 while (true) { 203 final String line = reader.readLine(); 204 if (line == null || line.equalsIgnoreCase("Quit") || line.equalsIgnoreCase("Stop") 205 || line.equalsIgnoreCase("Exit")) { 206 socketServer.shutdown(); 207 serverThread.join(); 208 break; 209 } 210 } 211 } 212 213 private static void printUsage() { 214 System.out.println("Usage: ServerSocket port configFilePath"); 215 } 216 217 /** 218 * Accept incoming events and processes them. 219 */ 220 @Override 221 public void run() { 222 final EntryMessage entry = logger.traceEntry(); 223 while (isActive()) { 224 if (serverSocket.isClosed()) { 225 return; 226 } 227 try { 228 // Accept incoming connections. 229 logger.debug("Socket accept()..."); 230 final Socket clientSocket = serverSocket.accept(); 231 logger.debug("Socket accepted: {}", clientSocket); 232 clientSocket.setSoLinger(true, 0); 233 234 // accept() will block until a client connects to the server. 235 // If execution reaches this point, then it means that a client 236 // socket has been accepted. 237 238 final SocketHandler handler = new SocketHandler(clientSocket); 239 handlers.put(Long.valueOf(handler.getId()), handler); 240 handler.start(); 241 } catch (final IOException e) { 242 if (serverSocket.isClosed()) { 243 // OK we're done. 244 logger.traceExit(entry); 245 return; 246 } 247 logger.error("Exception encountered on accept. Ignoring. Stack Trace :", e); 248 } 249 } 250 for (final Map.Entry<Long, SocketHandler> handlerEntry : handlers.entrySet()) { 251 final SocketHandler handler = handlerEntry.getValue(); 252 handler.shutdown(); 253 try { 254 handler.join(); 255 } catch (final InterruptedException ie) { 256 // Ignore the exception 257 } 258 } 259 logger.traceExit(entry); 260 } 261 262 /** 263 * Shutdown the server. 264 * 265 * @throws IOException if the server socket could not be closed 266 */ 267 public void shutdown() throws IOException { 268 final EntryMessage entry = logger.traceEntry(); 269 setActive(false); 270 Thread.currentThread().interrupt(); 271 serverSocket.close(); 272 logger.traceExit(entry); 273 } 274}