View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.common;
21  
22  import java.util.ArrayList;
23  import java.util.List;
24  import java.util.concurrent.TimeUnit;
25  
26  
27  /**
28   * A default implementation of {@link IoFuture}.
29   *
30   * @author The Apache MINA Project (dev@mina.apache.org)
31   * @version $Rev: 600818 $, $Date: 2007-12-04 00:37:24 -0700 (Tue, 04 Dec 2007) $
32   */
33  public class DefaultIoFuture implements IoFuture {
34  
35      private static final int DEAD_LOCK_CHECK_INTERVAL = 5000;
36  
37      private final IoSession session;
38      private final Object lock;
39      private IoFutureListener<?> firstListener;
40      private List<IoFutureListener<?>> otherListeners;
41      private Object result;
42      private boolean ready;
43      private int waiters;
44  
45      /**
46       * Creates a new instance.
47       *
48       * @param session an {@link IoSession} which is associated with this future
49       */
50      public DefaultIoFuture(IoSession session) {
51          this.session = session;
52          this.lock = this;
53      }
54  
55      public IoSession getSession() {
56          return session;
57      }
58  
59      public void join() {
60          awaitUninterruptibly();
61      }
62  
63      public boolean join(long timeoutMillis) {
64          return awaitUninterruptibly(timeoutMillis);
65      }
66  
67      public IoFuture await() throws InterruptedException {
68          synchronized (lock) {
69              while (!ready) {
70                  waiters++;
71                  try {
72                      lock.wait(DEAD_LOCK_CHECK_INTERVAL);
73                      checkDeadLock();
74                  } finally {
75                      waiters--;
76                  }
77              }
78          }
79          return this;
80      }
81  
82      public boolean await(long timeout, TimeUnit unit)
83              throws InterruptedException {
84          return await(unit.toMillis(timeout));
85      }
86  
87      public boolean await(long timeoutMillis) throws InterruptedException {
88          return await0(timeoutMillis, true);
89      }
90  
91      public IoFuture awaitUninterruptibly() {
92          synchronized (lock) {
93              while (!ready) {
94                  waiters++;
95                  try {
96                      lock.wait(DEAD_LOCK_CHECK_INTERVAL);
97                  } catch (InterruptedException e) {
98                  } finally {
99                      waiters--;
100                     if (!ready) {
101                         checkDeadLock();
102                     }
103                 }
104             }
105         }
106 
107         return this;
108     }
109 
110     public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
111         return awaitUninterruptibly(unit.toMillis(timeout));
112     }
113 
114     public boolean awaitUninterruptibly(long timeoutMillis) {
115         try {
116             return await0(timeoutMillis, false);
117         } catch (InterruptedException e) {
118             throw new InternalError();
119         }
120     }
121 
122     private boolean await0(long timeoutMillis, boolean interruptable) throws InterruptedException {
123         long startTime = timeoutMillis <= 0 ? 0 : System.currentTimeMillis();
124         long waitTime = timeoutMillis;
125 
126         synchronized (lock) {
127             if (ready) {
128                 return ready;
129             } else if (waitTime <= 0) {
130                 return ready;
131             }
132 
133             waiters++;
134             try {
135                 for (;;) {
136                     try {
137                         lock.wait(Math.min(waitTime, DEAD_LOCK_CHECK_INTERVAL));
138                     } catch (InterruptedException e) {
139                         if (interruptable) {
140                             throw e;
141                         }
142                     }
143 
144                     if (ready) {
145                         return true;
146                     } else {
147                         waitTime = timeoutMillis
148                                 - (System.currentTimeMillis() - startTime);
149                         if (waitTime <= 0) {
150                             return ready;
151                         }
152                     }
153                 }
154             } finally {
155                 waiters--;
156                 if (!ready) {
157                     checkDeadLock();
158                 }
159             }
160         }
161     }
162 
163     private void checkDeadLock() {
164         // Only read / write / connect / write future can cause dead lock. 
165         if (!(this instanceof CloseFuture || this instanceof WriteFuture ||
166               this instanceof ReadFuture || this instanceof ConnectFuture)) {
167             return;
168         }
169         
170         IllegalStateException e = new IllegalStateException(
171                 "DEAD LOCK: " + IoFuture.class.getSimpleName() +
172                 ".await() was invoked from an I/O processor thread.  " +
173                 "Please use " + IoFutureListener.class.getSimpleName() +
174                 " or configure a proper thread model alternatively.");
175 
176         StackTraceElement[] stackTrace = e.getStackTrace();
177 
178         // Simple and quick check.
179         for (StackTraceElement s: stackTrace) {
180             if (AbstractPollingIoProcessor.class.getName().equals(s.getClassName())) {
181                 throw e;
182             }
183         }
184 
185         // And then more precisely.
186         for (StackTraceElement s: stackTrace) {
187             try {
188                 Class<?> cls = DefaultIoFuture.class.getClassLoader().loadClass(s.getClassName());
189                 if (IoProcessor.class.isAssignableFrom(cls)) {
190                     throw e;
191                 }
192             } catch (Exception cnfe) {
193                 // Ignore
194             }
195         }
196     }
197 
198     public boolean isReady() {
199         synchronized (lock) {
200             return ready;
201         }
202     }
203 
204     /**
205      * Sets the result of the asynchronous operation, and mark it as finished.
206      */
207     protected void setValue(Object newValue) {
208         synchronized (lock) {
209             // Allow only once.
210             if (ready) {
211                 return;
212             }
213 
214             result = newValue;
215             ready = true;
216             if (waiters > 0) {
217                 lock.notifyAll();
218             }
219         }
220 
221         notifyListeners();
222     }
223 
224     /**
225      * Returns the result of the asynchronous operation.
226      */
227     protected Object getValue() {
228         synchronized (lock) {
229             return result;
230         }
231     }
232 
233     public IoFuture addListener(IoFutureListener<?> listener) {
234         if (listener == null) {
235             throw new NullPointerException("listener");
236         }
237 
238         boolean notifyNow = false;
239         synchronized (lock) {
240             if (ready) {
241                 notifyNow = true;
242             } else {
243                 if (firstListener == null) {
244                     firstListener = listener;
245                 } else {
246                     if (otherListeners == null) {
247                         otherListeners = new ArrayList<IoFutureListener<?>>(1);
248                     }
249                     otherListeners.add(listener);
250                 }
251             }
252         }
253 
254         if (notifyNow) {
255             notifyListener(listener);
256         }
257         return this;
258     }
259 
260     public IoFuture removeListener(IoFutureListener<?> listener) {
261         if (listener == null) {
262             throw new NullPointerException("listener");
263         }
264 
265         synchronized (lock) {
266             if (!ready) {
267                 if (listener == firstListener) {
268                     if (otherListeners != null && !otherListeners.isEmpty()) {
269                         firstListener = otherListeners.remove(0);
270                     } else {
271                         firstListener = null;
272                     }
273                 } else if (otherListeners != null) {
274                     otherListeners.remove(listener);
275                 }
276             }
277         }
278 
279         return this;
280     }
281 
282     private void notifyListeners() {
283         // There won't be any visibility problem or concurrent modification
284         // because 'ready' flag will be checked against both addListener and
285         // removeListener calls.
286         if (firstListener != null) {
287             notifyListener(firstListener);
288             firstListener = null;
289 
290             if (otherListeners != null) {
291                 for (IoFutureListener<?> l : otherListeners) {
292                     notifyListener(l);
293                 }
294                 otherListeners = null;
295             }
296         }
297     }
298 
299     @SuppressWarnings("unchecked")
300     private void notifyListener(IoFutureListener l) {
301         try {
302             l.operationComplete(this);
303         } catch (Throwable t) {
304             ExceptionMonitor.getInstance().exceptionCaught(t);
305         }
306     }
307 }