1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.mina.transport.vmpipe;
20  
21  import java.lang.management.ManagementFactory;
22  import java.lang.management.ThreadInfo;
23  import java.lang.management.ThreadMXBean;
24  
25  import junit.framework.TestCase;
26  
27  import org.apache.mina.common.ConnectFuture;
28  import org.apache.mina.common.IoAcceptor;
29  import org.apache.mina.common.IoAcceptorConfig;
30  import org.apache.mina.common.IoConnector;
31  import org.apache.mina.common.IoHandlerAdapter;
32  import org.apache.mina.common.IoSession;
33  import org.apache.mina.common.ThreadModel;
34  
35  import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
36  import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
37  import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicReference;
38  
39  /**
40   * @author Apache Mina Project (dev@mina.apache.org)
41   * @version $Rev: 575603 $, $Date: 2007-09-14 19:04:45 +0900 (금, 14  9월 2007) $
42   */
43  public class VmPipeSessionCrossCommunicationTest extends TestCase {
44      public void testOneSessionTalkingBackAndForthDoesNotDeadlock() throws Exception {
45          final VmPipeAddress address = new VmPipeAddress( 1 );
46          final IoConnector connector = new VmPipeConnector();
47          final AtomicReference c1 = new AtomicReference();
48          final CountDownLatch latch = new CountDownLatch( 1 );
49          final CountDownLatch messageCount = new CountDownLatch( 2 );
50          IoAcceptor acceptor = new VmPipeAcceptor();
51  
52          acceptor.bind( address, new IoHandlerAdapter() {
53              public void messageReceived( IoSession session, Object message ) throws Exception {
54                  System.out.println( Thread.currentThread().getName() + ": " + message );
55  
56                  if ( "start".equals( message ) ) {
57                      session.write( "open new" );
58                  } else if ( "re-use c1".equals( message ) ) {
59                      session.write( "tell me something on c1 now" );
60                  } else if ( ( (String) message ).startsWith( "please don't deadlock" ) ) {
61                      messageCount.countDown();
62                  } else {
63                      fail( "unexpected message received " + message );
64                  }
65              }
66          } );
67  
68          connector.getDefaultConfig().setThreadModel( ThreadModel.MANUAL );
69  
70          ConnectFuture future = connector.connect( address, new IoHandlerAdapter() {
71              public void messageReceived( IoSession session, Object message ) throws Exception {
72                  System.out.println( Thread.currentThread().getName() + ": " + message );
73                  
74                  if ( "open new".equals( message ) ) {
75                      System.out.println( "opening c2 from " + Thread.currentThread().getName() );
76  
77                      ConnectFuture c2Future = connector.connect( address, new IoHandlerAdapter() {
78                          public void sessionOpened( IoSession session ) throws Exception {
79                              session.write( "re-use c1" );
80                          }
81  
82                          public void messageReceived( IoSession session, Object message ) throws Exception {
83                              System.out.println( Thread.currentThread().getName() + ": " + message );
84  
85                              if ( "tell me something on c1 now".equals( message ) ) {
86                                  latch.countDown();
87                                  ((IoSession) c1.get()).write( "please don't deadlock via c1" );
88                              } else {
89                                  fail( "unexpected message received " + message );
90                              }
91                          }
92                      } );
93  
94                      c2Future.join();
95  
96                      latch.await();
97  
98                      c2Future.getSession().write( "please don't deadlock via c2" );
99                  } else {
100                     fail( "unexpeced message received " + message );
101                 }
102             }
103         } );
104 
105         future.join();
106 
107         c1.set( future.getSession() );
108         ((IoSession) c1.get()).write( "start" );
109 
110         ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
111 
112         while ( !messageCount.await( 100, TimeUnit.MILLISECONDS ) ) {
113             long[] threads = threadMXBean.findMonitorDeadlockedThreads();
114 
115             if ( null != threads ) {
116                 StringBuffer sb = new StringBuffer( 256 );
117                 ThreadInfo[] infos = threadMXBean.getThreadInfo( threads, Integer.MAX_VALUE );
118 
119                 for (int i = 0; i < infos.length; i ++) {
120                     ThreadInfo info = infos[i];
121                     sb.append( info.getThreadName() )
122                         .append( " blocked on " )
123                         .append( info.getLockName() )
124                         .append( " owned by " )
125                         .append( info.getLockOwnerName() )
126                         .append( "\n" );
127                 }
128 
129                 for (int i = 0; i < infos.length; i ++) {
130                     ThreadInfo info = infos[i];
131                     sb.append( "\nStack for " ).append( info.getThreadName() ).append( "\n" );
132                     StackTraceElement[] stackTrace = info.getStackTrace();
133                     for (int j = 0; j < stackTrace.length; j ++) {
134                         sb.append( "\t" ).append( stackTrace[j] ).append( "\n" );
135                     }
136                 }
137 
138                 fail( "deadlocked! \n" + sb );
139             }
140         }
141 
142         ( (IoAcceptorConfig) acceptor.getDefaultConfig() ).setDisconnectOnUnbind( false );
143         acceptor.unbindAll();
144     }
145 }