1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.logging;
21  
22  import java.io.IOException;
23  import java.net.InetSocketAddress;
24  import java.net.SocketAddress;
25  import java.util.*;
26  import java.util.concurrent.CountDownLatch;
27  
28  import org.apache.log4j.AppenderSkeleton;
29  import org.apache.log4j.Level;
30  import org.apache.log4j.spi.LoggingEvent;
31  import org.apache.mina.core.buffer.IoBuffer;
32  import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
33  import org.apache.mina.core.filterchain.IoFilterAdapter;
34  import org.apache.mina.core.future.ConnectFuture;
35  import org.apache.mina.core.service.IoHandlerAdapter;
36  import org.apache.mina.core.session.IdleStatus;
37  import org.apache.mina.core.session.IoSession;
38  import org.apache.mina.filter.codec.ProtocolCodecFactory;
39  import org.apache.mina.filter.codec.ProtocolCodecFilter;
40  import org.apache.mina.filter.codec.ProtocolDecoder;
41  import org.apache.mina.filter.codec.ProtocolDecoderAdapter;
42  import org.apache.mina.filter.codec.ProtocolDecoderOutput;
43  import org.apache.mina.filter.codec.ProtocolEncoder;
44  import org.apache.mina.filter.codec.ProtocolEncoderAdapter;
45  import org.apache.mina.filter.codec.ProtocolEncoderOutput;
46  import org.apache.mina.filter.executor.ExecutorFilter;
47  import org.apache.mina.filter.statistic.ProfilerTimerFilter;
48  import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
49  import org.apache.mina.transport.socket.nio.NioSocketConnector;
50  import org.junit.After;
51  import org.junit.Before;
52  import org.junit.Test;
53  import static org.junit.Assert.assertNotNull;
54  import static org.junit.Assert.assertNull;
55  import static org.junit.Assert.assertEquals;
56  import static org.junit.Assert.fail;
57  import org.slf4j.Logger;
58  import org.slf4j.LoggerFactory;
59  
60  /**
61   * Tests {@link MdcInjectionFilter} in variuos scenarios.
62   *
63   * @author The Apache MINA Project (dev@mina.apache.org)
64   */
65  public class MdcInjectionFilterTest {
66  
67      static Logger LOGGER = LoggerFactory.getLogger(MdcInjectionFilterTest.class);
68      private static final int TIMEOUT = 5000;
69  
70      final MyAppender appender = new MyAppender();
71      private int port;
72      private NioSocketAcceptor acceptor;
73  
74      @Before
75      public void setUp() throws Exception {
76          // comment out next line if you want to see normal logging
77          org.apache.log4j.Logger.getRootLogger().removeAllAppenders();
78          org.apache.log4j.Logger.getRootLogger().setLevel(Level.DEBUG);
79          org.apache.log4j.Logger.getRootLogger().addAppender(appender);
80          acceptor = new NioSocketAcceptor();
81      }
82  
83  
84      @After
85      public void tearDown() throws Exception {
86          acceptor.dispose();
87      }
88  
89      @Test
90      public void testSimpleChain() throws IOException, InterruptedException {
91          DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
92          chain.addFirst("mdc-injector", new MdcInjectionFilter());
93          chain.addLast("dummy", new DummyIoFilter());
94          chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
95          test(chain);
96      }
97  
98      @Test
99      public void testExecutorFilterAtTheEnd() throws IOException, InterruptedException {
100         DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
101         MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
102         chain.addFirst("mdc-injector1", mdcInjectionFilter);
103         chain.addLast("dummy", new DummyIoFilter());
104         chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
105         chain.addLast("executor" , new ExecutorFilter());
106         chain.addLast("mdc-injector2", mdcInjectionFilter);
107         test(chain);
108     }
109 
110     @Test
111     public void testExecutorFilterAtBeginning() throws IOException, InterruptedException {
112         DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
113         MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
114         chain.addLast("executor" , new ExecutorFilter());
115         chain.addLast("mdc-injector", mdcInjectionFilter);
116         chain.addLast("dummy", new DummyIoFilter());
117         chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
118         test(chain);
119     }
120 
121     @Test
122     public void testExecutorFilterBeforeProtocol() throws IOException, InterruptedException {
123         DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
124         MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
125         chain.addLast("executor" , new ExecutorFilter());
126         chain.addLast("mdc-injector", mdcInjectionFilter);
127         chain.addLast("dummy", new DummyIoFilter());
128         chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
129         test(chain);
130     }
131 
132     @Test
133     public void testMultipleFilters() throws IOException, InterruptedException {
134         DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
135         MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
136         chain.addLast("executor" , new ExecutorFilter());
137         chain.addLast("mdc-injector", mdcInjectionFilter);
138         chain.addLast("profiler", new ProfilerTimerFilter());
139         chain.addLast("dummy", new DummyIoFilter());
140         chain.addLast("logger", new LoggingFilter());
141         chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
142         test(chain);
143     }
144 
145     @Test
146     public void testTwoExecutorFilters() throws IOException, InterruptedException {
147         DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
148         MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
149         chain.addLast("executor1" , new ExecutorFilter());
150         chain.addLast("mdc-injector1", mdcInjectionFilter);
151         chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
152         chain.addLast("dummy", new DummyIoFilter());
153         chain.addLast("executor2" , new ExecutorFilter());
154         // add the MdcInjectionFilter instance after every ExecutorFilter
155         // it's important to use the same MdcInjectionFilter instance
156         chain.addLast("mdc-injector2",  mdcInjectionFilter);
157         test(chain);
158     }
159 
160     @Test
161     public void testOnlyRemoteAddress() throws IOException, InterruptedException {
162         DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
163         chain.addFirst("mdc-injector", new MdcInjectionFilter(
164             MdcInjectionFilter.MdcKey.remoteAddress));
165         chain.addLast("dummy", new DummyIoFilter());
166         chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
167         SimpleIoHandler simpleIoHandler = new SimpleIoHandler();
168         acceptor.setHandler(simpleIoHandler);
169         acceptor.bind(new InetSocketAddress(0));
170         port = acceptor.getLocalAddress().getPort();
171         acceptor.setFilterChainBuilder(chain);
172         // create some clients
173         NioSocketConnector connector = new NioSocketConnector();
174         connector.setHandler(new IoHandlerAdapter());
175         connectAndWrite(connector,0);
176         connectAndWrite(connector,1);
177         // wait until Iohandler has received all events
178         simpleIoHandler.messageSentLatch.await();
179         simpleIoHandler.sessionIdleLatch.await();
180         simpleIoHandler.sessionClosedLatch.await();
181         connector.dispose();
182 
183         // make a copy to prevent ConcurrentModificationException
184         List<LoggingEvent> events = new ArrayList<LoggingEvent>(appender.events);
185         // verify that all logging events have correct MDC
186         for (LoggingEvent event : events) {
187             for (MdcInjectionFilter.MdcKey mdcKey : MdcInjectionFilter.MdcKey.values()) {
188               String key = mdcKey.name();
189               Object value = event.getMDC(key);
190               if (mdcKey == MdcInjectionFilter.MdcKey.remoteAddress) {
191                   assertNotNull(
192                       "MDC[remoteAddress] not set for [" + event.getMessage() + "]", value);
193               } else {
194                   assertNull("MDC[" + key + "] set for [" + event.getMessage() + "]", value);
195               }
196             }
197         }
198     }
199 
200     private void test(DefaultIoFilterChainBuilder chain) throws IOException, InterruptedException {
201         // configure the server
202         SimpleIoHandler simpleIoHandler = new SimpleIoHandler();
203         acceptor.setHandler(simpleIoHandler);
204         acceptor.bind(new InetSocketAddress(0));
205         port = acceptor.getLocalAddress().getPort();
206         acceptor.setFilterChainBuilder(chain);
207         // create some clients
208         NioSocketConnector connector = new NioSocketConnector();
209         connector.setHandler(new IoHandlerAdapter());
210         SocketAddress remoteAddressClients[] = new SocketAddress[2];
211         remoteAddressClients[0] = connectAndWrite(connector,0);
212         remoteAddressClients[1] = connectAndWrite(connector,1);
213         // wait until Iohandler has received all events
214         simpleIoHandler.messageSentLatch.await();
215         simpleIoHandler.sessionIdleLatch.await();
216         simpleIoHandler.sessionClosedLatch.await();
217         connector.dispose();
218 
219         // make a copy to prevent ConcurrentModificationException
220         List<LoggingEvent> events = new ArrayList<LoggingEvent>(appender.events);
221 
222         Set<String> loggersToCheck = new HashSet<String>();
223         loggersToCheck.add(MdcInjectionFilterTest.class.getName());
224         loggersToCheck.add(ProtocolCodecFilter.class.getName());
225         loggersToCheck.add(LoggingFilter.class.getName());
226 
227         // verify that all logging events have correct MDC
228         for (LoggingEvent event : events) {
229              
230             if (loggersToCheck.contains(event.getLoggerName())) {
231                 Object remoteAddress = event.getMDC("remoteAddress");
232                 assertNotNull("MDC[remoteAddress] not set for [" + event.getMessage() + "]", remoteAddress);
233                 assertNotNull("MDC[remotePort] not set for [" + event.getMessage() + "]", event.getMDC("remotePort"));
234                 assertEquals(
235                     "every event should have MDC[handlerClass]",
236                     SimpleIoHandler.class.getName(),
237                     event.getMDC("handlerClass") );
238             }
239         }
240         // assert we have received all expected logging events for each client
241         for (int i = 0; i < remoteAddressClients.length; i++) {
242             SocketAddress remoteAddressClient = remoteAddressClients[i];
243             assertEventExists(events, "sessionCreated", remoteAddressClient, null);
244             assertEventExists(events, "sessionOpened", remoteAddressClient, null);
245             assertEventExists(events, "decode", remoteAddressClient, null);
246             assertEventExists(events, "messageReceived-1", remoteAddressClient, null);
247             assertEventExists(events, "messageReceived-2", remoteAddressClient, "user-" + i);
248             assertEventExists(events, "encode", remoteAddressClient, null);
249             assertEventExists(events, "exceptionCaught", remoteAddressClient, "user-" + i);
250             assertEventExists(events, "messageSent-1", remoteAddressClient, "user-" + i);
251             assertEventExists(events, "messageSent-2", remoteAddressClient, null);
252             assertEventExists(events, "sessionIdle", remoteAddressClient, "user-" + i);
253             assertEventExists(events, "sessionClosed", remoteAddressClient, "user-" + i);
254             assertEventExists(events, "sessionClosed", remoteAddressClient, "user-" + i);
255             assertEventExists(events, "DummyIoFilter.sessionOpened", remoteAddressClient, "user-" + i);
256         }
257     }
258 
259     private SocketAddress connectAndWrite(NioSocketConnector connector, int clientNr) {
260         ConnectFuture connectFuture = connector.connect(new InetSocketAddress("localhost", port));
261         connectFuture.awaitUninterruptibly(TIMEOUT);
262         IoBuffer message = IoBuffer.allocate(4).putInt(clientNr).flip();
263         IoSession session = connectFuture.getSession();
264         session.write(message).awaitUninterruptibly(TIMEOUT);
265         return session.getLocalAddress();
266     }
267 
268     private void assertEventExists(List<LoggingEvent> events,
269                                    String message,
270                                    SocketAddress address,
271                                    String user) {
272         InetSocketAddress remoteAddress = (InetSocketAddress) address;
273         for (LoggingEvent event : events) {
274             if (event.getMessage().equals(message) &&
275                 event.getMDC("remoteAddress").equals(remoteAddress.toString()) &&
276                 event.getMDC("remoteIp").equals(remoteAddress.getAddress().getHostAddress()) &&
277                 event.getMDC("remotePort").equals(remoteAddress.getPort()+"") ) {
278                 if (user == null && event.getMDC("user") == null) {
279                     return;
280                 }
281                 if (user != null && user.equals(event.getMDC("user"))) {
282                     return;
283                 }
284                 return;
285             }
286         }
287         fail("No LoggingEvent found from [" + remoteAddress +"] with message [" + message + "]");
288     }
289 
290     private static class SimpleIoHandler extends IoHandlerAdapter {
291         CountDownLatch sessionIdleLatch = new CountDownLatch(2);
292         CountDownLatch sessionClosedLatch = new CountDownLatch(2);
293         CountDownLatch messageSentLatch = new CountDownLatch(2);
294 
295         /**
296          * Default constructor
297          */
298         public SimpleIoHandler() {
299             super();
300         }
301         
302         @Override
303         public void sessionCreated(IoSession session) throws Exception {
304             LOGGER.info("sessionCreated");
305             session.getConfig().setIdleTime(IdleStatus.BOTH_IDLE, 1);
306         }
307 
308         @Override
309         public void sessionOpened(IoSession session) throws Exception {
310             LOGGER.info("sessionOpened");
311         }
312 
313         @Override
314         public void sessionClosed(IoSession session) throws Exception {
315             LOGGER.info("sessionClosed");
316             sessionClosedLatch.countDown();
317         }
318 
319         @Override
320         public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
321             LOGGER.info("sessionIdle");
322             sessionIdleLatch.countDown();
323             session.close(true);
324         }
325 
326         @Override
327         public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
328             LOGGER.info("exceptionCaught", cause);
329         }
330 
331         @Override
332         public void messageReceived(IoSession session, Object message) throws Exception {
333             LOGGER.info("messageReceived-1");
334             // adding a custom property to the context
335             String user = "user-" + message;
336             MdcInjectionFilter.setProperty(session, "user", user);
337             LOGGER.info("messageReceived-2");
338             session.getService().broadcast(message);
339             throw new RuntimeException("just a test, forcing exceptionCaught");
340         }
341 
342         @Override
343         public void messageSent(IoSession session, Object message) throws Exception {
344             LOGGER.info("messageSent-1");
345             MdcInjectionFilter.removeProperty(session, "user");
346             LOGGER.info("messageSent-2");
347             messageSentLatch.countDown();
348         }
349     }
350 
351     private static class DummyProtocolCodecFactory implements ProtocolCodecFactory {
352         /**
353          * Default constructor
354          */
355         public DummyProtocolCodecFactory() {
356             super();
357         }
358         
359         public ProtocolEncoder getEncoder(IoSession session) throws Exception {
360             return new ProtocolEncoderAdapter() {
361                 public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
362                     LOGGER.info("encode");
363                     IoBuffer buffer = IoBuffer.allocate(4).putInt(123).flip();
364                     out.write(buffer);
365                 }
366             };
367         }
368 
369         public ProtocolDecoder getDecoder(IoSession session) throws Exception {
370             return new ProtocolDecoderAdapter() {
371                 public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
372                     if (in.remaining() >= 4) {
373                         int value = in.getInt();
374                         LOGGER.info("decode");
375                         out.write(value);
376                     }
377                 }
378             };
379         }
380     }
381 
382     private static class MyAppender extends AppenderSkeleton {
383         List<LoggingEvent> events = Collections.synchronizedList(new ArrayList<LoggingEvent>());
384 
385         /**
386          * Default constructor
387          */
388         public MyAppender() {
389             super();
390         }
391         
392         @Override
393         protected void append(final LoggingEvent loggingEvent) {
394             loggingEvent.getMDCCopy();
395             events.add(loggingEvent);
396         }
397 
398         @Override
399         public boolean requiresLayout() {
400             return false;
401         }
402 
403         @Override
404         public void close() {
405             // Do nothing
406         }
407     }
408 
409     static class DummyIoFilter extends IoFilterAdapter {
410         @Override
411         public void sessionOpened(NextFilter nextFilter, IoSession session) throws Exception {
412             LOGGER.info("DummyIoFilter.sessionOpened");
413             nextFilter.sessionOpened(session);
414         }
415     }
416 
417 }