1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.mina.transport.vmpipe;
20
21 import java.lang.management.ManagementFactory;
22 import java.lang.management.ThreadInfo;
23 import java.lang.management.ThreadMXBean;
24 import java.util.concurrent.CountDownLatch;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.atomic.AtomicReference;
27
28 import junit.framework.TestCase;
29
30 import org.apache.mina.core.future.ConnectFuture;
31 import org.apache.mina.core.service.IoAcceptor;
32 import org.apache.mina.core.service.IoConnector;
33 import org.apache.mina.core.service.IoHandlerAdapter;
34 import org.apache.mina.core.session.IoSession;
35
36
37
38
39
40
41 public class VmPipeSessionCrossCommunicationTest extends TestCase {
42 public void testOneSessionTalkingBackAndForthDoesNotDeadlock() throws Exception {
43 final VmPipeAddress address = new VmPipeAddress(1);
44 final IoConnector connector = new VmPipeConnector();
45 final AtomicReference<IoSession> c1 = new AtomicReference<IoSession>();
46 final CountDownLatch latch = new CountDownLatch(1);
47 final CountDownLatch messageCount = new CountDownLatch(2);
48 IoAcceptor acceptor = new VmPipeAcceptor();
49
50 acceptor.setHandler(new IoHandlerAdapter() {
51 @Override
52 public void messageReceived(IoSession session, Object message) throws Exception {
53
54
55 if ("start".equals(message)) {
56 session.write("open new");
57 } else if ("re-use c1".equals(message)) {
58 session.write("tell me something on c1 now");
59 } else if (((String) message).startsWith("please don't deadlock")) {
60 messageCount.countDown();
61 } else {
62 fail("unexpected message received " + message);
63 }
64 }
65 });
66 acceptor.bind(address);
67
68 connector.setHandler(new IoHandlerAdapter() {
69 @Override
70 public void messageReceived(IoSession session, Object message) throws Exception {
71
72
73 if ("open new".equals(message)) {
74
75
76 IoConnector c2 = new VmPipeConnector();
77 c2.setHandler(new IoHandlerAdapter() {
78 @Override
79 public void sessionOpened(IoSession session) throws Exception {
80 session.write("re-use c1");
81 }
82
83 @Override
84 public void messageReceived(IoSession session, Object message) throws Exception {
85
86
87 if ("tell me something on c1 now".equals(message)) {
88 latch.countDown();
89 c1.get().write("please don't deadlock via c1");
90 } else {
91 fail("unexpected message received " + message);
92 }
93 }
94 });
95
96 ConnectFuture c2Future = c2.connect(address);
97
98 c2Future.await();
99
100 latch.await();
101
102 c2Future.getSession().write("please don't deadlock via c2");
103 } else {
104 fail("unexpeced message received " + message);
105 }
106 }
107 });
108
109 ConnectFuture future = connector.connect(address);
110
111 future.await();
112
113 c1.set(future.getSession());
114 c1.get().write("start");
115
116 ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
117
118 while (!messageCount.await(100, TimeUnit.MILLISECONDS)) {
119 long[] threads = threadMXBean.findMonitorDeadlockedThreads();
120
121 if (null != threads) {
122 StringBuffer sb = new StringBuffer(256);
123 ThreadInfo[] infos = threadMXBean.getThreadInfo(threads, Integer.MAX_VALUE);
124
125 for (ThreadInfo info : infos) {
126 sb.append(info.getThreadName())
127 .append(" blocked on ")
128 .append(info.getLockName())
129 .append(" owned by ")
130 .append(info.getLockOwnerName())
131 .append("\n");
132 }
133
134 for (ThreadInfo info : infos) {
135 sb.append("\nStack for ").append(info.getThreadName()).append("\n");
136 for (StackTraceElement element : info.getStackTrace()) {
137 sb.append("\t").append(element).append("\n");
138 }
139 }
140
141 fail("deadlocked! \n" + sb);
142 }
143 }
144
145 acceptor.setCloseOnDeactivation(false);
146 acceptor.dispose();
147 }
148 }