View Javadoc

1   package org.apache.mina.core.service;
2   
3   import junit.framework.Assert;
4   import org.apache.mina.core.future.CloseFuture;
5   import org.apache.mina.core.future.ConnectFuture;
6   import org.apache.mina.core.future.IoFuture;
7   import org.apache.mina.core.future.IoFutureListener;
8   import org.apache.mina.core.session.IdleStatus;
9   import org.apache.mina.core.session.IoSession;
10  import org.apache.mina.filter.codec.ProtocolCodecFilter;
11  import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
12  import org.apache.mina.filter.logging.LoggingFilter;
13  import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
14  import org.apache.mina.transport.socket.nio.NioSocketConnector;
15  import org.junit.Test;
16  import org.slf4j.Logger;
17  import org.slf4j.LoggerFactory;
18  
19  import java.io.IOException;
20  import java.net.InetSocketAddress;
21  import java.nio.charset.Charset;
22  import java.util.ArrayList;
23  import java.util.List;
24  import java.util.concurrent.CountDownLatch;
25  
26  public class AbstractIoServiceTest {
27  
28    private static final int PORT = 9123;
29  
30    @Test
31    public void testDispose() throws IOException, InterruptedException {
32  
33      List threadsBefore = getThreadNames();
34  
35      final IoAcceptor acceptor = new NioSocketAcceptor();
36  
37      acceptor.getFilterChain().addLast( "logger", new LoggingFilter() );
38      acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter( new TextLineCodecFactory( Charset.forName( "UTF-8" ))));
39  
40      acceptor.setHandler(  new ServerHandler() );
41  
42      acceptor.getSessionConfig().setReadBufferSize( 2048 );
43      acceptor.getSessionConfig().setIdleTime( IdleStatus.BOTH_IDLE, 10 );
44      acceptor.bind( new InetSocketAddress(PORT) );
45      System.out.println("Server running ...");
46  
47      final NioSocketConnector connector = new NioSocketConnector();
48  
49      // Set connect timeout.
50      connector.setConnectTimeoutMillis(30 * 1000L);
51  
52      connector.setHandler(new ClientHandler());
53      connector.getFilterChain().addLast( "logger", new LoggingFilter() );
54      connector.getFilterChain().addLast( "codec", new ProtocolCodecFilter( new TextLineCodecFactory( Charset.forName( "UTF-8" ))));
55  
56      // Start communication.
57      ConnectFuture cf = connector.connect(new InetSocketAddress("localhost", 9123));
58      cf.awaitUninterruptibly();
59  
60      IoSession session = cf.getSession();
61  
62      // send a message
63      session.write("Hello World!\r");
64  
65      // wait until response is received
66      CountDownLatch latch = (CountDownLatch) session.getAttribute("latch");
67      latch.await();
68  
69      // close the session
70      CloseFuture closeFuture = session.close(false);
71  
72      System.out.println("session.close called");
73      //Thread.sleep(5);
74  
75      // wait for session close and then dispose the connector
76      closeFuture.addListener(new IoFutureListener<IoFuture>() {
77  
78        public void operationComplete(IoFuture future) {
79          System.out.println("managed session count=" + connector.getManagedSessionCount());
80          System.out.println("Disposing connector ...");
81          connector.dispose(true);
82          System.out.println("Disposing connector ... *finished*");
83  
84        }
85      });
86  
87      closeFuture.awaitUninterruptibly();    
88      acceptor.dispose(true);
89  
90      List threadsAfter = getThreadNames();
91  
92      System.out.println("threadsBefore = " + threadsBefore);
93      System.out.println("threadsAfter  = " + threadsAfter);
94  
95      // Assert.assertEquals(threadsBefore, threadsAfter);
96  
97    }
98  
99  
100   public static class ClientHandler extends IoHandlerAdapter {
101 
102     private static final Logger LOGGER = LoggerFactory.getLogger("CLIENT");
103 
104     @Override
105     public void sessionCreated(IoSession session) throws Exception {
106       session.setAttribute("latch", new CountDownLatch(1));
107     }
108 
109     @Override
110     public void messageReceived(IoSession session, Object message) throws Exception {
111       LOGGER.info("client: messageReceived("+session+", "+message+")");
112       CountDownLatch latch = (CountDownLatch) session.getAttribute("latch");
113       latch.countDown();
114     }
115 
116     @Override
117     public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
118       LOGGER.warn("exceptionCaught:", cause);
119     }
120   }
121 
122   public static class ServerHandler extends IoHandlerAdapter {
123 
124     private static final Logger LOGGER = LoggerFactory.getLogger("SERVER");
125 
126     @Override
127     public void messageReceived(IoSession session, Object message) throws Exception {
128       LOGGER.info("server: messageReceived("+session+", "+message+")");
129       session.write(message.toString());
130     }
131 
132     @Override
133     public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
134       LOGGER.warn("exceptionCaught:", cause);
135     }
136 
137   }
138 
139   public static void main(String[] args) throws IOException, InterruptedException {
140     new AbstractIoServiceTest().testDispose();
141   }
142 
143   private List<String> getThreadNames() {
144       List<String> list = new ArrayList<String>();
145       int active = Thread.activeCount();
146       Thread[] threads = new Thread[active];
147       Thread.enumerate(threads);
148       for (Thread thread : threads) {
149           try {
150               String name = thread.getName();
151               list.add(name);
152           } catch (NullPointerException ignore) {
153           }
154       }
155       return list;
156   }
157 
158 }