View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.polling;
21  
22  import java.io.IOException;
23  import java.nio.channels.SelectionKey;
24  import java.util.ArrayList;
25  import java.util.HashMap;
26  import java.util.Iterator;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.Queue;
30  import java.util.concurrent.ConcurrentLinkedQueue;
31  import java.util.concurrent.Executor;
32  import java.util.concurrent.atomic.AtomicInteger;
33  
34  import org.apache.mina.core.ExceptionMonitor;
35  import org.apache.mina.core.buffer.IoBuffer;
36  import org.apache.mina.core.file.FileRegion;
37  import org.apache.mina.core.future.DefaultIoFuture;
38  import org.apache.mina.core.service.AbstractIoService;
39  import org.apache.mina.core.service.IoProcessor;
40  import org.apache.mina.core.session.AbstractIoSession;
41  import org.apache.mina.core.session.IdleStatusChecker;
42  import org.apache.mina.core.session.IoSessionConfig;
43  import org.apache.mina.core.write.WriteRequest;
44  import org.apache.mina.core.write.WriteRequestQueue;
45  import org.apache.mina.core.write.WriteToClosedSessionException;
46  import org.apache.mina.util.NamePreservingRunnable;
47  
48  /**
49   * An abstract implementation of {@link IoProcessor} which helps
50   * transport developers to write an {@link IoProcessor} easily.
51   *
52   * @author The Apache MINA Project (dev@mina.apache.org)
53   * @version $Rev: 672609 $, $Date: 2008-06-29 09:40:06 +0200 (dim, 29 jun 2008) $
54   */
55  public abstract class AbstractPollingIoProcessor<T extends AbstractIoSession> implements IoProcessor<T> {
56      /**
57       * The maximum loop count for a write operation until
58       * {@link #write(AbstractIoSession, IoBuffer, int)} returns non-zero value.
59       * It is similar to what a spin lock is for in concurrency programming.
60       * It improves memory utilization and write throughput significantly.
61       */
62      private static final int WRITE_SPIN_COUNT = 256;
63  
64      /** A map containing the last Thread ID for each class */
65      private static final Map<Class<?>, AtomicInteger> threadIds = 
66          new HashMap<Class<?>, AtomicInteger>();
67  
68      private final Object lock = new Object();
69      private final String threadName;
70      private final Executor executor;
71  
72      private final Queue<T> newSessions = new ConcurrentLinkedQueue<T>();
73      private final Queue<T> removingSessions = new ConcurrentLinkedQueue<T>();
74      private final Queue<T> flushingSessions = new ConcurrentLinkedQueue<T>();
75      private final Queue<T> trafficControllingSessions = new ConcurrentLinkedQueue<T>();
76  
77      private Worker worker;
78      private long lastIdleCheckTime;
79  
80      private final Object disposalLock = new Object();
81      private volatile boolean disposing;
82      private volatile boolean disposed;
83      private final DefaultIoFuture disposalFuture = new DefaultIoFuture(null);
84  
85      protected AbstractPollingIoProcessor(Executor executor) {
86          if (executor == null) {
87              throw new NullPointerException("executor");
88          }
89  
90          this.threadName = nextThreadName();
91          this.executor = executor;
92      }
93  
94      /**
95       * Compute the thread ID for this class instance. As we may have different
96       * classes, we store the last ID number into a Map associating the class
97       * name to the last assigned ID.
98       *   
99       * @return a name for the current thread, based on the class name and
100      * an incremental value, starting at 1. 
101      */
102     private String nextThreadName() {
103         Class<?> cls = getClass();
104         int newThreadId;
105         
106         // We synchronize this block to avoid a concurrent access to 
107         // the actomicInteger (it can be modified by another thread, while
108         // being seen as null by another thread)
109         synchronized( threadIds ) {
110             // Get the current ID associated to this class' name
111             AtomicInteger threadId = threadIds.get(cls);
112             
113             if (threadId == null) {
114                 // We never have seen this class before, just create a
115                 // new ID starting at 1 for it, and associate this ID
116                 // with the class name in the map.
117                 newThreadId = 1;
118                 threadIds.put(cls, new AtomicInteger(newThreadId));
119             } else {
120                 // Just increment the lat ID, and get it.
121                 newThreadId = threadId.incrementAndGet();
122             }
123         }
124         
125         // Now we can compute the name for this thread
126         return cls.getSimpleName() + '-' + newThreadId;
127     }
128 
129     /**
130      * {@inheritDoc}
131      */
132     public final boolean isDisposing() {
133         return disposing;
134     }
135 
136     /**
137      * {@inheritDoc}
138      */
139     public final boolean isDisposed() {
140         return disposed;
141     }
142     
143     /**
144      * {@inheritDoc}
145      */
146     public final void dispose() {
147         if (disposed) {
148             return;
149         }
150 
151         synchronized (disposalLock) {
152             if (!disposing) {
153                 disposing = true;
154                 startupWorker();
155             }
156         }
157 
158         disposalFuture.awaitUninterruptibly();
159         disposed = true;
160     }
161 
162     protected abstract void dispose0() throws Exception;
163 
164     /**
165      * poll those sessions for the given timeout
166      * @param timeout milliseconds before the call timeout if no event appear
167      * @return true if at least a session is ready for read or for write
168      * @throws Exception if some low level IO error occurs
169      */
170     protected abstract boolean select(int timeout) throws Exception;
171     protected abstract boolean isSelectorEmpty();
172     
173     /**
174      * Interrupt the {@link AbstractPollingIoProcessor#select(int) call.
175      */
176     protected abstract void wakeup();
177     protected abstract Iterator<T> allSessions();
178     protected abstract Iterator<T> selectedSessions();
179     protected abstract SessionState state(T session);
180 
181     /**
182      * Is the session ready for writing
183      * @param session the session queried
184      * @return true is ready, false if not ready
185      */
186     protected abstract boolean isWritable(T session);
187 
188     /**
189      * Is the session ready for reading
190      * @param session the session queried
191      * @return true is ready, false if not ready
192      */
193     protected abstract boolean isReadable(T session);
194 
195     /**
196      * register a session for writing
197      * @param session the session registered
198      * @param interested true for registering, false for removing
199      */
200     protected abstract void setInterestedInWrite(T session, boolean interested)
201             throws Exception;
202 
203     /**
204      * register a session for reading
205      * @param session the session registered
206      * @param interested true for registering, false for removing
207      */
208     protected abstract void setInterestedInRead(T session, boolean interested)
209             throws Exception;
210 
211     /**
212      * is this session registered for reading
213      * @param session the session queried
214      * @return true is registered for reading
215      */
216     protected abstract boolean isInterestedInRead(T session);
217 
218     /**
219      * is this session registered for writing
220      * @param session the session queried
221      * @return true is registered for writing
222      */
223     protected abstract boolean isInterestedInWrite(T session);
224 
225     protected abstract void init(T session) throws Exception;
226     protected abstract void destroy(T session) throws Exception;
227     protected abstract int read(T session, IoBuffer buf) throws Exception;
228     protected abstract int write(T session, IoBuffer buf, int length) throws Exception;
229     protected abstract int transferFile(T session, FileRegion region, int length) throws Exception;
230 
231     /**
232      * {@inheritDoc}
233      */
234     public final void add(T session) {
235         if (isDisposing()) {
236             throw new IllegalStateException("Already disposed.");
237         }
238 
239         newSessions.add(session);
240         startupWorker();
241     }
242 
243     /**
244      * {@inheritDoc}
245      */
246     public final void remove(T session) {
247         scheduleRemove(session);
248         startupWorker();
249     }
250 
251     private void scheduleRemove(T session) {
252         removingSessions.add(session);
253     }
254 
255     /**
256      * {@inheritDoc}
257      */
258     public final void flush(T session) {
259         // The following optimization has been disabled because it can cause StackOverflowError.
260         //if (Thread.currentThread() == workerThread) {
261         //    // Bypass the queue if called from the worker thread itself
262         //    // (i.e. single thread model).
263         //    flushNow(session, System.currentTimeMillis());
264         //    return;
265         //}
266 
267         boolean needsWakeup = flushingSessions.isEmpty();
268         if (scheduleFlush(session) && needsWakeup) {
269             wakeup();
270         }
271     }
272 
273     private boolean scheduleFlush(T session) {
274         if (session.setScheduledForFlush(true)) {
275             flushingSessions.add(session);
276             return true;
277         }
278         return false;
279     }
280 
281     /**
282      * {@inheritDoc}
283      */
284     public final void updateTrafficMask(T session) {
285         scheduleTrafficControl(session);
286         wakeup();
287     }
288 
289     private void scheduleTrafficControl(T session) {
290         trafficControllingSessions.add(session);
291     }
292 
293     private void startupWorker() {
294         synchronized (lock) {
295             if (worker == null) {
296                 worker = new Worker();
297                 executor.execute(new NamePreservingRunnable(worker, threadName));
298             }
299         }
300         wakeup();
301     }
302 
303     private int add() {
304         int addedSessions = 0;
305         
306         // Loop on the new sessions blocking queue, to count
307         // the number of sessions who has been created
308         for (;;) {
309             T session = newSessions.poll();
310 
311             if (session == null) {
312                 // We don't have anymore new sessions
313                 break;
314             }
315 
316 
317             if (addNow(session)) {
318                 // The new session has been added to the 
319                 addedSessions ++;
320             }
321         }
322 
323         return addedSessions;
324     }
325 
326     private boolean addNow(T session) {
327 
328         boolean registered = false;
329         boolean notified = false;
330         try {
331             init(session);
332             registered = true;
333 
334             // Build the filter chain of this session.
335             session.getService().getFilterChainBuilder().buildFilterChain(
336                     session.getFilterChain());
337 
338             // DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here
339             // in AbstractIoFilterChain.fireSessionOpened().
340             ((AbstractIoService) session.getService()).getListeners().fireSessionCreated(session);
341             notified = true;
342         } catch (Throwable e) {
343             if (notified) {
344                 // Clear the DefaultIoFilterChain.CONNECT_FUTURE attribute
345                 // and call ConnectFuture.setException().
346                 scheduleRemove(session);
347                 session.getFilterChain().fireExceptionCaught(e);
348                 wakeup();
349             } else {
350                 ExceptionMonitor.getInstance().exceptionCaught(e);
351                 try {
352                     destroy(session);
353                 } catch (Exception e1) {
354                     ExceptionMonitor.getInstance().exceptionCaught(e1);
355                 } finally {
356                     registered = false;
357                 }
358             }
359         }
360         return registered;
361     }
362 
363     private int remove() {
364         int removedSessions = 0;
365         for (; ;) {
366             T session = removingSessions.poll();
367 
368             if (session == null) {
369                 break;
370             }
371 
372             SessionState state = state(session);
373             switch (state) {
374             case OPEN:
375                 if (removeNow(session)) {
376                     removedSessions ++;
377                 }
378                 break;
379             case CLOSED:
380                 // Skip if channel is already closed
381                 break;
382             case PREPARING:
383                 // Retry later if session is not yet fully initialized.
384                 // (In case that Session.close() is called before addSession() is processed)
385                 scheduleRemove(session);
386                 return removedSessions;
387             default:
388                 throw new IllegalStateException(String.valueOf(state));
389             }
390         }
391 
392         return removedSessions;
393     }
394 
395     private boolean removeNow(T session) {
396         clearWriteRequestQueue(session);
397 
398         try {
399             destroy(session);
400             return true;
401         } catch (Exception e) {
402             session.getFilterChain().fireExceptionCaught(e);
403         } finally {
404             clearWriteRequestQueue(session);
405             ((AbstractIoService) session.getService()).getListeners().fireSessionDestroyed(session);
406         }
407         return false;
408     }
409 
410     private void clearWriteRequestQueue(T session) {
411         WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
412         WriteRequest req;
413 
414         List<WriteRequest> failedRequests = new ArrayList<WriteRequest>();
415 
416         if ((req = writeRequestQueue.poll(session)) != null) {
417             Object m = req.getMessage();
418             if (m instanceof IoBuffer) {
419                 IoBuffer buf = (IoBuffer) req.getMessage();
420 
421                 // The first unwritten empty buffer must be
422                 // forwarded to the filter chain.
423                 if (buf.hasRemaining()) {
424                     buf.reset();
425                     failedRequests.add(req);
426                 } else {
427                     session.getFilterChain().fireMessageSent(req);
428                 }
429             } else {
430                 failedRequests.add(req);
431             }
432 
433             // Discard others.
434             while ((req = writeRequestQueue.poll(session)) != null) {
435                 failedRequests.add(req);
436             }
437         }
438 
439         // Create an exception and notify.
440         if (!failedRequests.isEmpty()) {
441             WriteToClosedSessionException cause = new WriteToClosedSessionException(failedRequests);
442             for (WriteRequest r: failedRequests) {
443                 session.decreaseScheduledBytesAndMessages(r);
444                 r.getFuture().setException(cause);
445             }
446             session.getFilterChain().fireExceptionCaught(cause);
447         }
448     }
449 
450     private void process() throws Exception {
451         for (Iterator<T> i = selectedSessions(); i.hasNext();) {
452             process(i.next());
453             i.remove();
454         }
455     }
456 
457     private void process(T session) {
458 
459         if (isReadable(session) && session.getTrafficMask().isReadable()) {
460             read(session);
461         }
462 
463         if (isWritable(session) && session.getTrafficMask().isWritable()) {
464             scheduleFlush(session);
465         }
466     }
467 
468     private void read(T session) {
469         IoSessionConfig config = session.getConfig();
470         IoBuffer buf = IoBuffer.allocate(config.getReadBufferSize());
471 
472         final boolean hasFragmentation =
473             session.getTransportMetadata().hasFragmentation();
474 
475         try {
476             int readBytes = 0;
477             int ret;
478 
479             try {
480                 if (hasFragmentation) {
481                     while ((ret = read(session, buf)) > 0) {
482                         readBytes += ret;
483                         if (!buf.hasRemaining()) {
484                             break;
485                         }
486                     }
487                 } else {
488                     ret = read(session, buf);
489                     if (ret > 0) {
490                         readBytes = ret;
491                     }
492                 }
493             } finally {
494                 buf.flip();
495             }
496 
497             if (readBytes > 0) {
498                 session.getFilterChain().fireMessageReceived(buf);
499                 buf = null;
500 
501                 if (hasFragmentation) {
502                     if (readBytes << 1 < config.getReadBufferSize()) {
503                         session.decreaseReadBufferSize();
504                     } else if (readBytes == config.getReadBufferSize()) {
505                         session.increaseReadBufferSize();
506                     }
507                 }
508             }
509             if (ret < 0) {
510                 scheduleRemove(session);
511             }
512         } catch (Throwable e) {
513             if (e instanceof IOException) {
514                 scheduleRemove(session);
515             }
516             session.getFilterChain().fireExceptionCaught(e);
517         }
518     }
519 
520     private void notifyIdleSessions(long currentTime) throws Exception {
521         // process idle sessions
522         if (currentTime - lastIdleCheckTime >= 1000) {
523             lastIdleCheckTime = currentTime;
524             IdleStatusChecker.notifyIdleness(allSessions(), currentTime);
525         }
526     }
527 
528     private void flush(long currentTime) {
529         final T firstSession = flushingSessions.peek();
530         if (firstSession == null) {
531             return;
532         }
533 
534         T session = flushingSessions.poll(); // the same one with firstSession
535         for (; ;) {
536             session.setScheduledForFlush(false);
537             SessionState state = state(session);
538             switch (state) {
539             case OPEN:
540                 try {
541                     boolean flushedAll = flushNow(session, currentTime);
542                     if (flushedAll && !session.getWriteRequestQueue().isEmpty(session) &&
543                         !session.isScheduledForFlush()) {
544                         scheduleFlush(session);
545                     }
546                 } catch (Exception e) {
547                     scheduleRemove(session);
548                     session.getFilterChain().fireExceptionCaught(e);
549                 }
550                 break;
551             case CLOSED:
552                 // Skip if the channel is already closed.
553                 break;
554             case PREPARING:
555                 // Retry later if session is not yet fully initialized.
556                 // (In case that Session.write() is called before addSession() is processed)
557                 scheduleFlush(session);
558                 return;
559             default:
560                 throw new IllegalStateException(String.valueOf(state));
561             }
562 
563             session = flushingSessions.peek();
564             if (session == null || session == firstSession) {
565                 break;
566             }
567             session = flushingSessions.poll();
568         }
569     }
570 
571     private boolean flushNow(T session, long currentTime) {
572         if (!session.isConnected()) {
573             scheduleRemove(session);
574             return false;
575         }
576 
577         final boolean hasFragmentation =
578             session.getTransportMetadata().hasFragmentation();
579 
580         final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
581 
582         // Set limitation for the number of written bytes for read-write
583         // fairness.  I used maxReadBufferSize * 3 / 2, which yields best
584         // performance in my experience while not breaking fairness much.
585         final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize() +
586                               (session.getConfig().getMaxReadBufferSize() >>> 1);
587         int writtenBytes = 0;
588         try {
589             // Clear OP_WRITE
590             setInterestedInWrite(session, false);
591             do {
592                 // Check for pending writes.
593                 WriteRequest req = session.getCurrentWriteRequest();
594                 if (req == null) {
595                     req = writeRequestQueue.poll(session);
596                     if (req == null) {
597                         break;
598                     }
599                     session.setCurrentWriteRequest(req);
600                 }
601 
602                 int localWrittenBytes = 0;
603                 Object message = req.getMessage();
604                 if (message instanceof IoBuffer) {
605                     localWrittenBytes = writeBuffer(
606                             session, req, hasFragmentation,
607                             maxWrittenBytes - writtenBytes,
608                             currentTime);
609                 } else if (message instanceof FileRegion) {
610                     localWrittenBytes = writeFile(
611                             session, req, hasFragmentation,
612                             maxWrittenBytes - writtenBytes,
613                             currentTime);
614 
615                     // Fix for Java bug on Linux http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5103988
616                     // If there's still data to be written in the FileRegion, return 0 indicating that we need
617                     // to pause until writing may resume.
618                     if (localWrittenBytes > 0 && ((FileRegion) message).getRemainingBytes() > 0) {
619                         writtenBytes += localWrittenBytes;
620                         setInterestedInWrite(session, true);
621                         return false;
622                     }
623                 } else {
624                     throw new IllegalStateException("Don't know how to handle message of type '" + message.getClass().getName() + "'.  Are you missing a protocol encoder?");
625                 }
626 
627                 if (localWrittenBytes == 0) {
628                     // Kernel buffer is full.
629                     setInterestedInWrite(session, true);
630                     return false;
631                 }
632 
633                 writtenBytes += localWrittenBytes;
634 
635                 if (writtenBytes >= maxWrittenBytes) {
636                     // Wrote too much
637                     scheduleFlush(session);
638                     return false;
639                 }
640             } while (writtenBytes < maxWrittenBytes);
641         } catch (Exception e) {
642             session.getFilterChain().fireExceptionCaught(e);
643             return false;
644         }
645 
646         return true;
647     }
648 
649     private int writeBuffer(T session, WriteRequest req,
650             boolean hasFragmentation, int maxLength, long currentTime) throws Exception {
651         IoBuffer buf = (IoBuffer) req.getMessage();
652         int localWrittenBytes = 0;
653         if (buf.hasRemaining()) {
654             int length;
655             if (hasFragmentation) {
656                 length = Math.min(buf.remaining(), maxLength);
657             } else {
658                 length = buf.remaining();
659             }
660             for (int i = WRITE_SPIN_COUNT; i > 0; i --) {
661                 localWrittenBytes = write(session, buf, length);
662                 if (localWrittenBytes != 0) {
663                     break;
664                 }
665             }
666         }
667 
668         session.increaseWrittenBytes(localWrittenBytes, currentTime);
669 
670         if (!buf.hasRemaining() ||
671                 !hasFragmentation && localWrittenBytes != 0) {
672             // Buffer has been sent, clear the current request.
673             buf.reset();
674             fireMessageSent(session, req);
675         }
676         return localWrittenBytes;
677     }
678 
679     private int writeFile(T session, WriteRequest req,
680             boolean hasFragmentation, int maxLength, long currentTime) throws Exception {
681         int localWrittenBytes;
682         FileRegion region = (FileRegion) req.getMessage();
683         if (region.getRemainingBytes() > 0) {
684             int length;
685             if (hasFragmentation) {
686                 length = (int) Math.min(region.getRemainingBytes(), maxLength);
687             } else {
688                 length = (int) Math.min(Integer.MAX_VALUE, region.getRemainingBytes());
689             }
690             localWrittenBytes = transferFile(session, region, length);
691             region.update(localWrittenBytes);
692         } else {
693             localWrittenBytes = 0;
694         }
695 
696         session.increaseWrittenBytes(localWrittenBytes, currentTime);
697 
698         if (region.getRemainingBytes() <= 0 ||
699                     !hasFragmentation && localWrittenBytes != 0) {
700             fireMessageSent(session, req);
701         }
702 
703         return localWrittenBytes;
704     }
705 
706     private void fireMessageSent(T session, WriteRequest req) {
707         session.setCurrentWriteRequest(null);
708         session.getFilterChain().fireMessageSent(req);
709     }
710 
711     private void updateTrafficMask() {
712         for (; ;) {
713             T session = trafficControllingSessions.poll();
714 
715             if (session == null) {
716                 break;
717             }
718 
719             SessionState state = state(session);
720             switch (state) {
721             case OPEN:
722                 updateTrafficMaskNow(session);
723                 break;
724             case CLOSED:
725                 break;
726             case PREPARING:
727                 // Retry later if session is not yet fully initialized.
728                 // (In case that Session.suspend??() or session.resume??() is
729                 // called before addSession() is processed)
730                 scheduleTrafficControl(session);
731                 return;
732             default:
733                 throw new IllegalStateException(String.valueOf(state));
734             }
735         }
736     }
737 
738     private void updateTrafficMaskNow(T session) {
739         // The normal is OP_READ and, if there are write requests in the
740         // session's write queue, set OP_WRITE to trigger flushing.
741         int mask = session.getTrafficMask().getInterestOps();
742         try {
743             setInterestedInRead(session, (mask & SelectionKey.OP_READ) != 0);
744         } catch (Exception e) {
745             session.getFilterChain().fireExceptionCaught(e);
746         }
747         try {
748             setInterestedInWrite(
749                     session,
750                     !session.getWriteRequestQueue().isEmpty(session) &&
751                             (mask & SelectionKey.OP_WRITE) != 0);
752         } catch (Exception e) {
753             session.getFilterChain().fireExceptionCaught(e);
754         }
755     }
756 
757     
758     /**
759      *  
760      *
761      */
762     private class Worker implements Runnable {
763         public void run() {
764             int nSessions = 0;
765             lastIdleCheckTime = System.currentTimeMillis();
766 
767             for (;;) {
768                 try {
769                     boolean selected = select(1000);
770 
771                     nSessions += add();
772                     updateTrafficMask();
773 
774                     if (selected) {
775                         process();
776                     }
777 
778                     long currentTime = System.currentTimeMillis();
779                     flush(currentTime);
780                     nSessions -= remove();
781                     notifyIdleSessions(currentTime);
782 
783                     if (nSessions == 0) {
784                         synchronized (lock) {
785                             if (newSessions.isEmpty() && isSelectorEmpty()) {
786                                 worker = null;
787                                 break;
788                             }
789                         }
790                     }
791 
792                     // Disconnect all sessions immediately if disposal has been
793                     // requested so that we exit this loop eventually.
794                     if (isDisposing()) {
795                         for (Iterator<T> i = allSessions(); i.hasNext(); ) {
796                             scheduleRemove(i.next());
797                         }
798                         wakeup();
799                     }
800                 } catch (Throwable t) {
801                     ExceptionMonitor.getInstance().exceptionCaught(t);
802 
803                     try {
804                         Thread.sleep(1000);
805                     } catch (InterruptedException e1) {
806                         ExceptionMonitor.getInstance().exceptionCaught(e1);
807                     }
808                 }
809             }
810 
811             try {
812                 synchronized (disposalLock) {
813                     if (isDisposing()) {
814                         dispose0();
815                     }
816                 }
817             } catch (Throwable t) {
818                 ExceptionMonitor.getInstance().exceptionCaught(t);
819             } finally {
820                 disposalFuture.setValue(true);
821             }
822         }
823     }
824 
825     protected static enum SessionState {
826         OPEN,
827         CLOSED,
828         PREPARING,
829     }
830 }