1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.mina.transport.vmpipe;
20  
21  import java.lang.management.ManagementFactory;
22  import java.lang.management.ThreadInfo;
23  import java.lang.management.ThreadMXBean;
24  import java.util.concurrent.CountDownLatch;
25  import java.util.concurrent.TimeUnit;
26  import java.util.concurrent.atomic.AtomicReference;
27  
28  import junit.framework.TestCase;
29  
30  import org.apache.mina.core.future.ConnectFuture;
31  import org.apache.mina.core.service.IoAcceptor;
32  import org.apache.mina.core.service.IoConnector;
33  import org.apache.mina.core.service.IoHandlerAdapter;
34  import org.apache.mina.core.session.IoSession;
35  
36  /**
37   * TODO Add documentation
38   * 
39   * @author The Apache MINA Project (dev@mina.apache.org)
40   */
41  public class VmPipeSessionCrossCommunicationTest extends TestCase {
42      public void testOneSessionTalkingBackAndForthDoesNotDeadlock() throws Exception {
43          final VmPipeAddress address = new VmPipeAddress(1);
44          final IoConnector connector = new VmPipeConnector();
45          final AtomicReference<IoSession> c1 = new AtomicReference<IoSession>();
46          final CountDownLatch latch = new CountDownLatch(1);
47          final CountDownLatch messageCount = new CountDownLatch(2);
48          IoAcceptor acceptor = new VmPipeAcceptor();
49  
50          acceptor.setHandler(new IoHandlerAdapter() {
51              @Override
52              public void messageReceived(IoSession session, Object message) throws Exception {
53                  //System.out.println(Thread.currentThread().getName() + ": " + message);
54  
55                  if ("start".equals(message)) {
56                      session.write("open new");
57                  } else if ("re-use c1".equals(message)) {
58                      session.write("tell me something on c1 now");
59                  } else if (((String) message).startsWith("please don't deadlock")) {
60                      messageCount.countDown();
61                  } else {
62                      fail("unexpected message received " + message);
63                  }
64              }
65          });
66          acceptor.bind(address);
67  
68          connector.setHandler(new IoHandlerAdapter() {
69              @Override
70              public void messageReceived(IoSession session, Object message) throws Exception {
71                  //System.out.println(Thread.currentThread().getName() + ": " + message);
72  
73                  if ("open new".equals(message)) {
74                      //System.out.println("opening c2 from " + Thread.currentThread().getName());
75  
76                      IoConnector c2 = new VmPipeConnector();
77                      c2.setHandler(new IoHandlerAdapter() {
78                          @Override
79                          public void sessionOpened(IoSession session) throws Exception {
80                              session.write("re-use c1");
81                          }
82  
83                          @Override
84                          public void messageReceived(IoSession session, Object message) throws Exception {
85                              //System.out.println(Thread.currentThread().getName() + ": " + message);
86  
87                              if ("tell me something on c1 now".equals(message)) {
88                                  latch.countDown();
89                                  c1.get().write("please don't deadlock via c1");
90                              } else {
91                                  fail("unexpected message received " + message);
92                              }
93                          }
94                      });
95  
96                      ConnectFuture c2Future = c2.connect(address);
97  
98                      c2Future.await();
99  
100                     latch.await();
101 
102                     c2Future.getSession().write("please don't deadlock via c2");
103                 } else {
104                     fail("unexpeced message received " + message);
105                 }
106             }
107         });
108 
109         ConnectFuture future = connector.connect(address);
110 
111         future.await();
112 
113         c1.set(future.getSession());
114         c1.get().write("start");
115 
116         ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
117 
118         while (!messageCount.await(100, TimeUnit.MILLISECONDS)) {
119             long[] threads = threadMXBean.findMonitorDeadlockedThreads();
120 
121             if (null != threads) {
122                 StringBuffer sb = new StringBuffer(256);
123                 ThreadInfo[] infos = threadMXBean.getThreadInfo(threads, Integer.MAX_VALUE);
124 
125                 for (ThreadInfo info : infos) {
126                     sb.append(info.getThreadName())
127                             .append(" blocked on ")
128                             .append(info.getLockName())
129                             .append(" owned by ")
130                             .append(info.getLockOwnerName())
131                             .append("\n");
132                 }
133 
134                 for (ThreadInfo info : infos) {
135                     sb.append("\nStack for ").append(info.getThreadName()).append("\n");
136                     for (StackTraceElement element : info.getStackTrace()) {
137                         sb.append("\t").append(element).append("\n");
138                     }
139                 }
140 
141                 fail("deadlocked! \n" + sb);
142             }
143         }
144 
145         acceptor.setCloseOnDeactivation(false);
146         acceptor.dispose();
147     }
148 }