001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017package org.apache.logging.log4j.core.net.server; 018 019import java.io.BufferedReader; 020import java.io.ByteArrayInputStream; 021import java.io.EOFException; 022import java.io.IOException; 023import java.io.InputStream; 024import java.io.InputStreamReader; 025import java.io.ObjectInputStream; 026import java.io.OptionalDataException; 027import java.net.DatagramPacket; 028import java.net.DatagramSocket; 029 030import org.apache.logging.log4j.core.config.ConfigurationFactory; 031import org.apache.logging.log4j.core.util.Log4jThread; 032 033/** 034 * Listens for events over a socket connection. 035 * 036 * @param <T> 037 * The kind of input stream read 038 */ 039public class UdpSocketServer<T extends InputStream> extends AbstractSocketServer<T> { 040 041 private final DatagramSocket datagramSocket; 042 043 // max size so we only have to deal with one packet 044 private final int maxBufferSize = 1024 * 65 + 1024; 045 046 /** 047 * Constructor. 048 * 049 * @param port 050 * to listen on. 051 * @param logEventInput 052 * @throws IOException 053 * If an error occurs. 054 */ 055 public UdpSocketServer(final int port, final LogEventBridge<T> logEventInput) throws IOException { 056 super(port, logEventInput); 057 this.datagramSocket = new DatagramSocket(port); 058 } 059 060 /** 061 * Creates a socket server that reads JSON log events. 062 * 063 * @param port 064 * the port to listen 065 * @return a new a socket server 066 * @throws IOException 067 * if an I/O error occurs when opening the socket. 068 */ 069 public static UdpSocketServer<InputStream> createJsonSocketServer(final int port) throws IOException { 070 return new UdpSocketServer<>(port, new JsonInputStreamLogEventBridge()); 071 } 072 073 /** 074 * Creates a socket server that reads serialized log events. 075 * 076 * @param port 077 * the port to listen 078 * @return a new a socket server 079 * @throws IOException 080 * if an I/O error occurs when opening the socket. 081 */ 082 public static UdpSocketServer<ObjectInputStream> createSerializedSocketServer(final int port) throws IOException { 083 return new UdpSocketServer<>(port, new ObjectInputStreamLogEventBridge()); 084 } 085 086 /** 087 * Creates a socket server that reads XML log events. 088 * 089 * @param port 090 * the port to listen 091 * @return a new a socket server 092 * @throws IOException 093 * if an I/O error occurs when opening the socket. 094 */ 095 public static UdpSocketServer<InputStream> createXmlSocketServer(final int port) throws IOException { 096 return new UdpSocketServer<>(port, new XmlInputStreamLogEventBridge()); 097 } 098 099 /** 100 * Main startup for the server. 101 * 102 * @param args 103 * The command line arguments. 104 * @throws Exception 105 * if an error occurs. 106 */ 107 public static void main(final String[] args) throws Exception { 108 if (args.length < 1 || args.length > 2) { 109 System.err.println("Incorrect number of arguments"); 110 printUsage(); 111 return; 112 } 113 final int port = Integer.parseInt(args[0]); 114 if (port <= 0 || port >= MAX_PORT) { 115 System.err.println("Invalid port number"); 116 printUsage(); 117 return; 118 } 119 if (args.length == 2 && args[1].length() > 0) { 120 ConfigurationFactory.setConfigurationFactory(new ServerConfigurationFactory(args[1])); 121 } 122 final UdpSocketServer<ObjectInputStream> socketServer = UdpSocketServer.createSerializedSocketServer(port); 123 final Thread server = new Log4jThread(socketServer); 124 server.start(); 125 final BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); 126 while (true) { 127 final String line = reader.readLine(); 128 if (line == null || line.equalsIgnoreCase("Quit") || line.equalsIgnoreCase("Stop") 129 || line.equalsIgnoreCase("Exit")) { 130 socketServer.shutdown(); 131 server.join(); 132 break; 133 } 134 } 135 } 136 137 private static void printUsage() { 138 System.out.println("Usage: ServerSocket port configFilePath"); 139 } 140 141 /** 142 * Accept incoming events and processes them. 143 */ 144 @Override 145 public void run() { 146 while (isActive()) { 147 if (datagramSocket.isClosed()) { 148 // OK we're done. 149 return; 150 } 151 try { 152 final byte[] buf = new byte[maxBufferSize]; 153 final DatagramPacket packet = new DatagramPacket(buf, buf.length); 154 datagramSocket.receive(packet); 155 final ByteArrayInputStream bais = new ByteArrayInputStream(packet.getData(), packet.getOffset(), packet.getLength()); 156 logEventInput.logEvents(logEventInput.wrapStream(bais), this); 157 } catch (final OptionalDataException e) { 158 if (datagramSocket.isClosed()) { 159 // OK we're done. 160 return; 161 } 162 logger.error("OptionalDataException eof=" + e.eof + " length=" + e.length, e); 163 } catch (final EOFException e) { 164 if (datagramSocket.isClosed()) { 165 // OK we're done. 166 return; 167 } 168 logger.info("EOF encountered"); 169 } catch (final IOException e) { 170 if (datagramSocket.isClosed()) { 171 // OK we're done. 172 return; 173 } 174 logger.error("Exception encountered on accept. Ignoring. Stack Trace :", e); 175 } 176 } 177 } 178 179 /** 180 * Shutdown the server. 181 */ 182 public void shutdown() { 183 this.setActive(false); 184 Thread.currentThread().interrupt(); 185 datagramSocket.close(); 186 } 187}