1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.filter.logging;
21
22 import java.io.IOException;
23 import java.net.InetSocketAddress;
24 import java.net.SocketAddress;
25 import java.util.*;
26 import java.util.concurrent.CountDownLatch;
27
28 import org.apache.log4j.AppenderSkeleton;
29 import org.apache.log4j.Level;
30 import org.apache.log4j.spi.LoggingEvent;
31 import org.apache.mina.core.buffer.IoBuffer;
32 import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
33 import org.apache.mina.core.filterchain.IoFilterAdapter;
34 import org.apache.mina.core.future.ConnectFuture;
35 import org.apache.mina.core.service.IoHandlerAdapter;
36 import org.apache.mina.core.session.IdleStatus;
37 import org.apache.mina.core.session.IoSession;
38 import org.apache.mina.filter.codec.ProtocolCodecFactory;
39 import org.apache.mina.filter.codec.ProtocolCodecFilter;
40 import org.apache.mina.filter.codec.ProtocolDecoder;
41 import org.apache.mina.filter.codec.ProtocolDecoderAdapter;
42 import org.apache.mina.filter.codec.ProtocolDecoderOutput;
43 import org.apache.mina.filter.codec.ProtocolEncoder;
44 import org.apache.mina.filter.codec.ProtocolEncoderAdapter;
45 import org.apache.mina.filter.codec.ProtocolEncoderOutput;
46 import org.apache.mina.filter.executor.ExecutorFilter;
47 import org.apache.mina.filter.statistic.ProfilerTimerFilter;
48 import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
49 import org.apache.mina.transport.socket.nio.NioSocketConnector;
50 import org.junit.After;
51 import org.junit.Before;
52 import org.junit.Test;
53 import static org.junit.Assert.assertNotNull;
54 import static org.junit.Assert.assertNull;
55 import static org.junit.Assert.assertEquals;
56 import static org.junit.Assert.fail;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
59
60
61
62
63
64
65 public class MdcInjectionFilterTest {
66
67 static Logger LOGGER = LoggerFactory.getLogger(MdcInjectionFilterTest.class);
68 private static final int TIMEOUT = 5000;
69
70 final MyAppender appender = new MyAppender();
71 private int port;
72 private NioSocketAcceptor acceptor;
73
74 @Before
75 public void setUp() throws Exception {
76
77 org.apache.log4j.Logger.getRootLogger().removeAllAppenders();
78 org.apache.log4j.Logger.getRootLogger().setLevel(Level.DEBUG);
79 org.apache.log4j.Logger.getRootLogger().addAppender(appender);
80 acceptor = new NioSocketAcceptor();
81 }
82
83
84 @After
85 public void tearDown() throws Exception {
86 acceptor.dispose();
87 }
88
89 @Test
90 public void testSimpleChain() throws IOException, InterruptedException {
91 DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
92 chain.addFirst("mdc-injector", new MdcInjectionFilter());
93 chain.addLast("dummy", new DummyIoFilter());
94 chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
95 test(chain);
96 }
97
98 @Test
99 public void testExecutorFilterAtTheEnd() throws IOException, InterruptedException {
100 DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
101 MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
102 chain.addFirst("mdc-injector1", mdcInjectionFilter);
103 chain.addLast("dummy", new DummyIoFilter());
104 chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
105 chain.addLast("executor" , new ExecutorFilter());
106 chain.addLast("mdc-injector2", mdcInjectionFilter);
107 test(chain);
108 }
109
110 @Test
111 public void testExecutorFilterAtBeginning() throws IOException, InterruptedException {
112 DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
113 MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
114 chain.addLast("executor" , new ExecutorFilter());
115 chain.addLast("mdc-injector", mdcInjectionFilter);
116 chain.addLast("dummy", new DummyIoFilter());
117 chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
118 test(chain);
119 }
120
121 @Test
122 public void testExecutorFilterBeforeProtocol() throws IOException, InterruptedException {
123 DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
124 MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
125 chain.addLast("executor" , new ExecutorFilter());
126 chain.addLast("mdc-injector", mdcInjectionFilter);
127 chain.addLast("dummy", new DummyIoFilter());
128 chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
129 test(chain);
130 }
131
132 @Test
133 public void testMultipleFilters() throws IOException, InterruptedException {
134 DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
135 MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
136 chain.addLast("executor" , new ExecutorFilter());
137 chain.addLast("mdc-injector", mdcInjectionFilter);
138 chain.addLast("profiler", new ProfilerTimerFilter());
139 chain.addLast("dummy", new DummyIoFilter());
140 chain.addLast("logger", new LoggingFilter());
141 chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
142 test(chain);
143 }
144
145 @Test
146 public void testTwoExecutorFilters() throws IOException, InterruptedException {
147 DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
148 MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
149 chain.addLast("executor1" , new ExecutorFilter());
150 chain.addLast("mdc-injector1", mdcInjectionFilter);
151 chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
152 chain.addLast("dummy", new DummyIoFilter());
153 chain.addLast("executor2" , new ExecutorFilter());
154
155
156 chain.addLast("mdc-injector2", mdcInjectionFilter);
157 test(chain);
158 }
159
160 @Test
161 public void testOnlyRemoteAddress() throws IOException, InterruptedException {
162 DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
163 chain.addFirst("mdc-injector", new MdcInjectionFilter(
164 MdcInjectionFilter.MdcKey.remoteAddress));
165 chain.addLast("dummy", new DummyIoFilter());
166 chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
167 SimpleIoHandler simpleIoHandler = new SimpleIoHandler();
168 acceptor.setHandler(simpleIoHandler);
169 acceptor.bind(new InetSocketAddress(0));
170 port = acceptor.getLocalAddress().getPort();
171 acceptor.setFilterChainBuilder(chain);
172
173 NioSocketConnector connector = new NioSocketConnector();
174 connector.setHandler(new IoHandlerAdapter());
175 connectAndWrite(connector,0);
176 connectAndWrite(connector,1);
177
178 simpleIoHandler.messageSentLatch.await();
179 simpleIoHandler.sessionIdleLatch.await();
180 simpleIoHandler.sessionClosedLatch.await();
181 connector.dispose();
182
183
184 List<LoggingEvent> events = new ArrayList<LoggingEvent>(appender.events);
185
186 for (LoggingEvent event : events) {
187 for (MdcInjectionFilter.MdcKey mdcKey : MdcInjectionFilter.MdcKey.values()) {
188 String key = mdcKey.name();
189 Object value = event.getMDC(key);
190 if (mdcKey == MdcInjectionFilter.MdcKey.remoteAddress) {
191 assertNotNull(
192 "MDC[remoteAddress] not set for [" + event.getMessage() + "]", value);
193 } else {
194 assertNull("MDC[" + key + "] set for [" + event.getMessage() + "]", value);
195 }
196 }
197 }
198 }
199
200 private void test(DefaultIoFilterChainBuilder chain) throws IOException, InterruptedException {
201
202 SimpleIoHandler simpleIoHandler = new SimpleIoHandler();
203 acceptor.setHandler(simpleIoHandler);
204 acceptor.bind(new InetSocketAddress(0));
205 port = acceptor.getLocalAddress().getPort();
206 acceptor.setFilterChainBuilder(chain);
207
208 NioSocketConnector connector = new NioSocketConnector();
209 connector.setHandler(new IoHandlerAdapter());
210 SocketAddress remoteAddressClients[] = new SocketAddress[2];
211 remoteAddressClients[0] = connectAndWrite(connector,0);
212 remoteAddressClients[1] = connectAndWrite(connector,1);
213
214 simpleIoHandler.messageSentLatch.await();
215 simpleIoHandler.sessionIdleLatch.await();
216 simpleIoHandler.sessionClosedLatch.await();
217 connector.dispose();
218
219
220 List<LoggingEvent> events = new ArrayList<LoggingEvent>(appender.events);
221
222 Set<String> loggersToCheck = new HashSet<String>();
223 loggersToCheck.add(MdcInjectionFilterTest.class.getName());
224 loggersToCheck.add(ProtocolCodecFilter.class.getName());
225 loggersToCheck.add(LoggingFilter.class.getName());
226
227
228 for (LoggingEvent event : events) {
229
230 if (loggersToCheck.contains(event.getLoggerName())) {
231 Object remoteAddress = event.getMDC("remoteAddress");
232 assertNotNull("MDC[remoteAddress] not set for [" + event.getMessage() + "]", remoteAddress);
233 assertNotNull("MDC[remotePort] not set for [" + event.getMessage() + "]", event.getMDC("remotePort"));
234 assertEquals(
235 "every event should have MDC[handlerClass]",
236 SimpleIoHandler.class.getName(),
237 event.getMDC("handlerClass") );
238 }
239 }
240
241 for (int i = 0; i < remoteAddressClients.length; i++) {
242 SocketAddress remoteAddressClient = remoteAddressClients[i];
243 assertEventExists(events, "sessionCreated", remoteAddressClient, null);
244 assertEventExists(events, "sessionOpened", remoteAddressClient, null);
245 assertEventExists(events, "decode", remoteAddressClient, null);
246 assertEventExists(events, "messageReceived-1", remoteAddressClient, null);
247 assertEventExists(events, "messageReceived-2", remoteAddressClient, "user-" + i);
248 assertEventExists(events, "encode", remoteAddressClient, null);
249 assertEventExists(events, "exceptionCaught", remoteAddressClient, "user-" + i);
250 assertEventExists(events, "messageSent-1", remoteAddressClient, "user-" + i);
251 assertEventExists(events, "messageSent-2", remoteAddressClient, null);
252 assertEventExists(events, "sessionIdle", remoteAddressClient, "user-" + i);
253 assertEventExists(events, "sessionClosed", remoteAddressClient, "user-" + i);
254 assertEventExists(events, "sessionClosed", remoteAddressClient, "user-" + i);
255 assertEventExists(events, "DummyIoFilter.sessionOpened", remoteAddressClient, "user-" + i);
256 }
257 }
258
259 private SocketAddress connectAndWrite(NioSocketConnector connector, int clientNr) {
260 ConnectFuture connectFuture = connector.connect(new InetSocketAddress("localhost", port));
261 connectFuture.awaitUninterruptibly(TIMEOUT);
262 IoBuffer message = IoBuffer.allocate(4).putInt(clientNr).flip();
263 IoSession session = connectFuture.getSession();
264 session.write(message).awaitUninterruptibly(TIMEOUT);
265 return session.getLocalAddress();
266 }
267
268 private void assertEventExists(List<LoggingEvent> events,
269 String message,
270 SocketAddress address,
271 String user) {
272 InetSocketAddress remoteAddress = (InetSocketAddress) address;
273 for (LoggingEvent event : events) {
274 if (event.getMessage().equals(message) &&
275 event.getMDC("remoteAddress").equals(remoteAddress.toString()) &&
276 event.getMDC("remoteIp").equals(remoteAddress.getAddress().getHostAddress()) &&
277 event.getMDC("remotePort").equals(remoteAddress.getPort()+"") ) {
278 if (user == null && event.getMDC("user") == null) {
279 return;
280 }
281 if (user != null && user.equals(event.getMDC("user"))) {
282 return;
283 }
284 return;
285 }
286 }
287 fail("No LoggingEvent found from [" + remoteAddress +"] with message [" + message + "]");
288 }
289
290 private static class SimpleIoHandler extends IoHandlerAdapter {
291 CountDownLatch sessionIdleLatch = new CountDownLatch(2);
292 CountDownLatch sessionClosedLatch = new CountDownLatch(2);
293 CountDownLatch messageSentLatch = new CountDownLatch(2);
294
295
296
297
298 public SimpleIoHandler() {
299 super();
300 }
301
302 @Override
303 public void sessionCreated(IoSession session) throws Exception {
304 LOGGER.info("sessionCreated");
305 session.getConfig().setIdleTime(IdleStatus.BOTH_IDLE, 1);
306 }
307
308 @Override
309 public void sessionOpened(IoSession session) throws Exception {
310 LOGGER.info("sessionOpened");
311 }
312
313 @Override
314 public void sessionClosed(IoSession session) throws Exception {
315 LOGGER.info("sessionClosed");
316 sessionClosedLatch.countDown();
317 }
318
319 @Override
320 public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
321 LOGGER.info("sessionIdle");
322 sessionIdleLatch.countDown();
323 session.close(true);
324 }
325
326 @Override
327 public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
328 LOGGER.info("exceptionCaught", cause);
329 }
330
331 @Override
332 public void messageReceived(IoSession session, Object message) throws Exception {
333 LOGGER.info("messageReceived-1");
334
335 String user = "user-" + message;
336 MdcInjectionFilter.setProperty(session, "user", user);
337 LOGGER.info("messageReceived-2");
338 session.getService().broadcast(message);
339 throw new RuntimeException("just a test, forcing exceptionCaught");
340 }
341
342 @Override
343 public void messageSent(IoSession session, Object message) throws Exception {
344 LOGGER.info("messageSent-1");
345 MdcInjectionFilter.removeProperty(session, "user");
346 LOGGER.info("messageSent-2");
347 messageSentLatch.countDown();
348 }
349 }
350
351 private static class DummyProtocolCodecFactory implements ProtocolCodecFactory {
352
353
354
355 public DummyProtocolCodecFactory() {
356 super();
357 }
358
359 public ProtocolEncoder getEncoder(IoSession session) throws Exception {
360 return new ProtocolEncoderAdapter() {
361 public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
362 LOGGER.info("encode");
363 IoBuffer buffer = IoBuffer.allocate(4).putInt(123).flip();
364 out.write(buffer);
365 }
366 };
367 }
368
369 public ProtocolDecoder getDecoder(IoSession session) throws Exception {
370 return new ProtocolDecoderAdapter() {
371 public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
372 if (in.remaining() >= 4) {
373 int value = in.getInt();
374 LOGGER.info("decode");
375 out.write(value);
376 }
377 }
378 };
379 }
380 }
381
382 private static class MyAppender extends AppenderSkeleton {
383 List<LoggingEvent> events = Collections.synchronizedList(new ArrayList<LoggingEvent>());
384
385
386
387
388 public MyAppender() {
389 super();
390 }
391
392 @Override
393 protected void append(final LoggingEvent loggingEvent) {
394 loggingEvent.getMDCCopy();
395 events.add(loggingEvent);
396 }
397
398 @Override
399 public boolean requiresLayout() {
400 return false;
401 }
402
403 @Override
404 public void close() {
405
406 }
407 }
408
409 static class DummyIoFilter extends IoFilterAdapter {
410 @Override
411 public void sessionOpened(NextFilter nextFilter, IoSession session) throws Exception {
412 LOGGER.info("DummyIoFilter.sessionOpened");
413 nextFilter.sessionOpened(session);
414 }
415 }
416
417 }