1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.common;
21
22 import java.util.ArrayList;
23 import java.util.List;
24 import java.util.concurrent.TimeUnit;
25
26
27
28
29
30
31
32
33 public class DefaultIoFuture implements IoFuture {
34
35 private static final int DEAD_LOCK_CHECK_INTERVAL = 5000;
36
37 private final IoSession session;
38 private final Object lock;
39 private IoFutureListener<?> firstListener;
40 private List<IoFutureListener<?>> otherListeners;
41 private Object result;
42 private boolean ready;
43 private int waiters;
44
45
46
47
48
49
50 public DefaultIoFuture(IoSession session) {
51 this.session = session;
52 this.lock = this;
53 }
54
55 public IoSession getSession() {
56 return session;
57 }
58
59 public void join() {
60 awaitUninterruptibly();
61 }
62
63 public boolean join(long timeoutMillis) {
64 return awaitUninterruptibly(timeoutMillis);
65 }
66
67 public IoFuture await() throws InterruptedException {
68 synchronized (lock) {
69 while (!ready) {
70 waiters++;
71 try {
72 lock.wait(DEAD_LOCK_CHECK_INTERVAL);
73 checkDeadLock();
74 } finally {
75 waiters--;
76 }
77 }
78 }
79 return this;
80 }
81
82 public boolean await(long timeout, TimeUnit unit)
83 throws InterruptedException {
84 return await(unit.toMillis(timeout));
85 }
86
87 public boolean await(long timeoutMillis) throws InterruptedException {
88 return await0(timeoutMillis, true);
89 }
90
91 public IoFuture awaitUninterruptibly() {
92 synchronized (lock) {
93 while (!ready) {
94 waiters++;
95 try {
96 lock.wait(DEAD_LOCK_CHECK_INTERVAL);
97 } catch (InterruptedException e) {
98 } finally {
99 waiters--;
100 if (!ready) {
101 checkDeadLock();
102 }
103 }
104 }
105 }
106
107 return this;
108 }
109
110 public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
111 return awaitUninterruptibly(unit.toMillis(timeout));
112 }
113
114 public boolean awaitUninterruptibly(long timeoutMillis) {
115 try {
116 return await0(timeoutMillis, false);
117 } catch (InterruptedException e) {
118 throw new InternalError();
119 }
120 }
121
122 private boolean await0(long timeoutMillis, boolean interruptable) throws InterruptedException {
123 long startTime = timeoutMillis <= 0 ? 0 : System.currentTimeMillis();
124 long waitTime = timeoutMillis;
125
126 synchronized (lock) {
127 if (ready) {
128 return ready;
129 } else if (waitTime <= 0) {
130 return ready;
131 }
132
133 waiters++;
134 try {
135 for (;;) {
136 try {
137 lock.wait(Math.min(waitTime, DEAD_LOCK_CHECK_INTERVAL));
138 } catch (InterruptedException e) {
139 if (interruptable) {
140 throw e;
141 }
142 }
143
144 if (ready) {
145 return true;
146 } else {
147 waitTime = timeoutMillis
148 - (System.currentTimeMillis() - startTime);
149 if (waitTime <= 0) {
150 return ready;
151 }
152 }
153 }
154 } finally {
155 waiters--;
156 if (!ready) {
157 checkDeadLock();
158 }
159 }
160 }
161 }
162
163 private void checkDeadLock() {
164
165 if (!(this instanceof CloseFuture || this instanceof WriteFuture ||
166 this instanceof ReadFuture || this instanceof ConnectFuture)) {
167 return;
168 }
169
170 IllegalStateException e = new IllegalStateException(
171 "DEAD LOCK: " + IoFuture.class.getSimpleName() +
172 ".await() was invoked from an I/O processor thread. " +
173 "Please use " + IoFutureListener.class.getSimpleName() +
174 " or configure a proper thread model alternatively.");
175
176 StackTraceElement[] stackTrace = e.getStackTrace();
177
178
179 for (StackTraceElement s: stackTrace) {
180 if (AbstractPollingIoProcessor.class.getName().equals(s.getClassName())) {
181 throw e;
182 }
183 }
184
185
186 for (StackTraceElement s: stackTrace) {
187 try {
188 Class<?> cls = DefaultIoFuture.class.getClassLoader().loadClass(s.getClassName());
189 if (IoProcessor.class.isAssignableFrom(cls)) {
190 throw e;
191 }
192 } catch (Exception cnfe) {
193
194 }
195 }
196 }
197
198 public boolean isReady() {
199 synchronized (lock) {
200 return ready;
201 }
202 }
203
204
205
206
207 protected void setValue(Object newValue) {
208 synchronized (lock) {
209
210 if (ready) {
211 return;
212 }
213
214 result = newValue;
215 ready = true;
216 if (waiters > 0) {
217 lock.notifyAll();
218 }
219 }
220
221 notifyListeners();
222 }
223
224
225
226
227 protected Object getValue() {
228 synchronized (lock) {
229 return result;
230 }
231 }
232
233 public IoFuture addListener(IoFutureListener<?> listener) {
234 if (listener == null) {
235 throw new NullPointerException("listener");
236 }
237
238 boolean notifyNow = false;
239 synchronized (lock) {
240 if (ready) {
241 notifyNow = true;
242 } else {
243 if (firstListener == null) {
244 firstListener = listener;
245 } else {
246 if (otherListeners == null) {
247 otherListeners = new ArrayList<IoFutureListener<?>>(1);
248 }
249 otherListeners.add(listener);
250 }
251 }
252 }
253
254 if (notifyNow) {
255 notifyListener(listener);
256 }
257 return this;
258 }
259
260 public IoFuture removeListener(IoFutureListener<?> listener) {
261 if (listener == null) {
262 throw new NullPointerException("listener");
263 }
264
265 synchronized (lock) {
266 if (!ready) {
267 if (listener == firstListener) {
268 if (otherListeners != null && !otherListeners.isEmpty()) {
269 firstListener = otherListeners.remove(0);
270 } else {
271 firstListener = null;
272 }
273 } else if (otherListeners != null) {
274 otherListeners.remove(listener);
275 }
276 }
277 }
278
279 return this;
280 }
281
282 private void notifyListeners() {
283
284
285
286 if (firstListener != null) {
287 notifyListener(firstListener);
288 firstListener = null;
289
290 if (otherListeners != null) {
291 for (IoFutureListener<?> l : otherListeners) {
292 notifyListener(l);
293 }
294 otherListeners = null;
295 }
296 }
297 }
298
299 @SuppressWarnings("unchecked")
300 private void notifyListener(IoFutureListener l) {
301 try {
302 l.operationComplete(this);
303 } catch (Throwable t) {
304 ExceptionMonitor.getInstance().exceptionCaught(t);
305 }
306 }
307 }