1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.core.polling;
21
22 import java.io.IOException;
23 import java.net.PortUnreachableException;
24 import java.nio.channels.ClosedSelectorException;
25 import java.util.ArrayList;
26 import java.util.Iterator;
27 import java.util.List;
28 import java.util.Queue;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.ConcurrentLinkedQueue;
31 import java.util.concurrent.Executor;
32 import java.util.concurrent.atomic.AtomicBoolean;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.concurrent.atomic.AtomicReference;
35
36 import org.apache.mina.core.buffer.IoBuffer;
37 import org.apache.mina.core.file.FileRegion;
38 import org.apache.mina.core.filterchain.IoFilterChain;
39 import org.apache.mina.core.filterchain.IoFilterChainBuilder;
40 import org.apache.mina.core.future.DefaultIoFuture;
41 import org.apache.mina.core.service.AbstractIoService;
42 import org.apache.mina.core.service.IoProcessor;
43 import org.apache.mina.core.service.IoServiceListenerSupport;
44 import org.apache.mina.core.session.AbstractIoSession;
45 import org.apache.mina.core.session.IoSession;
46 import org.apache.mina.core.session.IoSessionConfig;
47 import org.apache.mina.core.session.SessionState;
48 import org.apache.mina.core.write.WriteRequest;
49 import org.apache.mina.core.write.WriteRequestQueue;
50 import org.apache.mina.core.write.WriteToClosedSessionException;
51 import org.apache.mina.transport.socket.AbstractDatagramSessionConfig;
52 import org.apache.mina.util.ExceptionMonitor;
53 import org.apache.mina.util.NamePreservingRunnable;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56
57
58
59
60
61
62
63
64
65
66
67 public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> implements IoProcessor<S> {
68
69 private final static Logger LOG = LoggerFactory.getLogger(IoProcessor.class);
70
71
72
73
74
75
76
77 private static final int WRITE_SPIN_COUNT = 256;
78
79
80
81
82
83 private static final long SELECT_TIMEOUT = 1000L;
84
85
86 private static final ConcurrentHashMap<Class<?>, AtomicInteger> threadIds = new ConcurrentHashMap<Class<?>, AtomicInteger>();
87
88
89 private final String threadName;
90
91
92 private final Executor executor;
93
94
95 private final Queue<S> newSessions = new ConcurrentLinkedQueue<S>();
96
97
98 private final Queue<S> removingSessions = new ConcurrentLinkedQueue<S>();
99
100
101 private final Queue<S> flushingSessions = new ConcurrentLinkedQueue<S>();
102
103
104
105
106
107 private final Queue<S> trafficControllingSessions = new ConcurrentLinkedQueue<S>();
108
109
110 private final AtomicReference<Processor> processorRef = new AtomicReference<Processor>();
111
112 private long lastIdleCheckTime;
113
114 private final Object disposalLock = new Object();
115
116 private volatile boolean disposing;
117
118 private volatile boolean disposed;
119
120 private final DefaultIoFuture disposalFuture = new DefaultIoFuture(null);
121
122 protected AtomicBoolean wakeupCalled = new AtomicBoolean(false);
123
124
125
126
127
128
129
130
131 protected AbstractPollingIoProcessor(Executor executor) {
132 if (executor == null) {
133 throw new IllegalArgumentException("executor");
134 }
135
136 this.threadName = nextThreadName();
137 this.executor = executor;
138 }
139
140
141
142
143
144
145
146
147
148 private String nextThreadName() {
149 Class<?> cls = getClass();
150 int newThreadId;
151
152 AtomicInteger threadId = threadIds.putIfAbsent(cls, new AtomicInteger(1));
153
154 if (threadId == null) {
155 newThreadId = 1;
156 } else {
157
158 newThreadId = threadId.incrementAndGet();
159 }
160
161
162 return cls.getSimpleName() + '-' + newThreadId;
163 }
164
165
166
167
168 public final boolean isDisposing() {
169 return disposing;
170 }
171
172
173
174
175 public final boolean isDisposed() {
176 return disposed;
177 }
178
179
180
181
182 public final void dispose() {
183 if (disposed || disposing) {
184 return;
185 }
186
187 synchronized (disposalLock) {
188 disposing = true;
189 startupProcessor();
190 }
191
192 disposalFuture.awaitUninterruptibly();
193 disposed = true;
194 }
195
196
197
198
199
200
201
202 protected abstract void doDispose() throws Exception;
203
204
205
206
207
208
209
210
211
212
213 protected abstract int select(long timeout) throws Exception;
214
215
216
217
218
219
220
221
222 protected abstract int select() throws Exception;
223
224
225
226
227
228
229
230 protected abstract boolean isSelectorEmpty();
231
232
233
234
235 protected abstract void wakeup();
236
237
238
239
240
241
242
243 protected abstract Iterator<S> allSessions();
244
245
246
247
248
249
250
251 protected abstract Iterator<S> selectedSessions();
252
253
254
255
256
257
258
259
260 protected abstract SessionState getState(S session);
261
262
263
264
265
266
267
268
269 protected abstract boolean isWritable(S session);
270
271
272
273
274
275
276
277
278 protected abstract boolean isReadable(S session);
279
280
281
282
283
284
285
286
287
288 protected abstract void setInterestedInWrite(S session, boolean isInterested) throws Exception;
289
290
291
292
293
294
295
296
297
298 protected abstract void setInterestedInRead(S session, boolean isInterested) throws Exception;
299
300
301
302
303
304
305
306
307 protected abstract boolean isInterestedInRead(S session);
308
309
310
311
312
313
314
315
316 protected abstract boolean isInterestedInWrite(S session);
317
318
319
320
321
322
323
324 protected abstract void init(S session) throws Exception;
325
326
327
328
329
330
331
332
333
334 protected abstract void destroy(S session) throws Exception;
335
336
337
338
339
340
341
342
343
344
345
346
347
348 protected abstract int read(S session, IoBuffer buf) throws Exception;
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365 protected abstract int write(S session, IoBuffer buf, int length) throws Exception;
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383 protected abstract int transferFile(S session, FileRegion region, int length) throws Exception;
384
385
386
387
388 public final void add(S session) {
389 if (disposed || disposing) {
390 throw new IllegalStateException("Already disposed.");
391 }
392
393
394 newSessions.add(session);
395 startupProcessor();
396 }
397
398
399
400
401 public final void remove(S session) {
402 scheduleRemove(session);
403 startupProcessor();
404 }
405
406 private void scheduleRemove(S session) {
407 removingSessions.add(session);
408 }
409
410
411
412
413 public void write(S session, WriteRequest writeRequest) {
414 WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
415
416 writeRequestQueue.offer(session, writeRequest);
417
418 if (!session.isWriteSuspended()) {
419 this.flush(session);
420 }
421 }
422
423
424
425
426 public final void flush(S session) {
427
428
429 if (session.setScheduledForFlush(true)) {
430 flushingSessions.add(session);
431 wakeup();
432 }
433 }
434
435 private void scheduleFlush(S session) {
436
437
438 if (session.setScheduledForFlush(true)) {
439 flushingSessions.add(session);
440 }
441 }
442
443
444
445
446
447
448
449 public final void updateTrafficMask(S session) {
450 trafficControllingSessions.add(session);
451 wakeup();
452 }
453
454
455
456
457
458 private void startupProcessor() {
459 Processor processor = processorRef.get();
460
461 if (processor == null) {
462 processor = new Processor();
463
464 if (processorRef.compareAndSet(null, processor)) {
465 executor.execute(new NamePreservingRunnable(processor, threadName));
466 }
467 }
468
469
470
471 wakeup();
472 }
473
474
475
476
477
478
479
480
481
482 abstract protected void registerNewSelector() throws IOException;
483
484
485
486
487
488
489
490
491
492
493 abstract protected boolean isBrokenConnection() throws IOException;
494
495
496
497
498
499
500
501 private int handleNewSessions() {
502 int addedSessions = 0;
503
504 for (S session = newSessions.poll(); session != null; session = newSessions.poll()) {
505 if (addNow(session)) {
506
507 addedSessions++;
508 }
509 }
510
511 return addedSessions;
512 }
513
514
515
516
517
518
519
520
521
522
523 private boolean addNow(S session) {
524 boolean registered = false;
525
526 try {
527 init(session);
528 registered = true;
529
530
531 IoFilterChainBuilder chainBuilder = session.getService().getFilterChainBuilder();
532 chainBuilder.buildFilterChain(session.getFilterChain());
533
534
535
536
537 IoServiceListenerSupport listeners = ((AbstractIoService) session.getService()).getListeners();
538 listeners.fireSessionCreated(session);
539 } catch (Exception e) {
540 ExceptionMonitor.getInstance().exceptionCaught(e);
541
542 try {
543 destroy(session);
544 } catch (Exception e1) {
545 ExceptionMonitor.getInstance().exceptionCaught(e1);
546 } finally {
547 registered = false;
548 }
549 }
550
551 return registered;
552 }
553
554 private int removeSessions() {
555 int removedSessions = 0;
556
557 for (S session = removingSessions.poll(); session != null; session = removingSessions.poll()) {
558 SessionState state = getState(session);
559
560
561 switch (state) {
562 case OPENED:
563
564 if (removeNow(session)) {
565 removedSessions++;
566 }
567
568 break;
569
570 case CLOSING:
571
572 break;
573
574 case OPENING:
575
576
577 newSessions.remove(session);
578
579 if (removeNow(session)) {
580 removedSessions++;
581 }
582
583 break;
584
585 default:
586 throw new IllegalStateException(String.valueOf(state));
587 }
588 }
589
590 return removedSessions;
591 }
592
593 private boolean removeNow(S session) {
594 clearWriteRequestQueue(session);
595
596 try {
597 destroy(session);
598 return true;
599 } catch (Exception e) {
600 IoFilterChain filterChain = session.getFilterChain();
601 filterChain.fireExceptionCaught(e);
602 } finally {
603 clearWriteRequestQueue(session);
604 ((AbstractIoService) session.getService()).getListeners().fireSessionDestroyed(session);
605 }
606 return false;
607 }
608
609 private void clearWriteRequestQueue(S session) {
610 WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
611 WriteRequest req;
612
613 List<WriteRequest> failedRequests = new ArrayList<WriteRequest>();
614
615 if ((req = writeRequestQueue.poll(session)) != null) {
616 Object message = req.getMessage();
617
618 if (message instanceof IoBuffer) {
619 IoBuffer buf = (IoBuffer) message;
620
621
622
623 if (buf.hasRemaining()) {
624 buf.reset();
625 failedRequests.add(req);
626 } else {
627 IoFilterChain filterChain = session.getFilterChain();
628 filterChain.fireMessageSent(req);
629 }
630 } else {
631 failedRequests.add(req);
632 }
633
634
635 while ((req = writeRequestQueue.poll(session)) != null) {
636 failedRequests.add(req);
637 }
638 }
639
640
641 if (!failedRequests.isEmpty()) {
642 WriteToClosedSessionException cause = new WriteToClosedSessionException(failedRequests);
643
644 for (WriteRequest r : failedRequests) {
645 session.decreaseScheduledBytesAndMessages(r);
646 r.getFuture().setException(cause);
647 }
648
649 IoFilterChain filterChain = session.getFilterChain();
650 filterChain.fireExceptionCaught(cause);
651 }
652 }
653
654 private void process() throws Exception {
655 for (Iterator<S> i = selectedSessions(); i.hasNext();) {
656 S session = i.next();
657 process(session);
658 i.remove();
659 }
660 }
661
662
663
664
665 private void process(S session) {
666
667 if (isReadable(session) && !session.isReadSuspended()) {
668 read(session);
669 }
670
671
672 if (isWritable(session) && !session.isWriteSuspended()) {
673
674 if (session.setScheduledForFlush(true)) {
675 flushingSessions.add(session);
676 }
677 }
678 }
679
680 private void read(S session) {
681 IoSessionConfig config = session.getConfig();
682 int bufferSize = config.getReadBufferSize();
683 IoBuffer buf = IoBuffer.allocate(bufferSize);
684
685 final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
686
687 try {
688 int readBytes = 0;
689 int ret;
690
691 try {
692 if (hasFragmentation) {
693
694 while ((ret = read(session, buf)) > 0) {
695 readBytes += ret;
696
697 if (!buf.hasRemaining()) {
698 break;
699 }
700 }
701 } else {
702 ret = read(session, buf);
703
704 if (ret > 0) {
705 readBytes = ret;
706 }
707 }
708 } finally {
709 buf.flip();
710 }
711
712 if (readBytes > 0) {
713 IoFilterChain filterChain = session.getFilterChain();
714 filterChain.fireMessageReceived(buf);
715 buf = null;
716
717 if (hasFragmentation) {
718 if (readBytes << 1 < config.getReadBufferSize()) {
719 session.decreaseReadBufferSize();
720 } else if (readBytes == config.getReadBufferSize()) {
721 session.increaseReadBufferSize();
722 }
723 }
724 }
725
726 if (ret < 0) {
727
728 IoFilterChain filterChain = session.getFilterChain();
729 filterChain.fireInputClosed();
730 }
731 } catch (Exception e) {
732 if (e instanceof IOException) {
733 if (!(e instanceof PortUnreachableException)
734 || !AbstractDatagramSessionConfig.class.isAssignableFrom(config.getClass())
735 || ((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable()) {
736 scheduleRemove(session);
737 }
738 }
739
740 IoFilterChain filterChain = session.getFilterChain();
741 filterChain.fireExceptionCaught(e);
742 }
743 }
744
745 private void notifyIdleSessions(long currentTime) throws Exception {
746
747 if (currentTime - lastIdleCheckTime >= SELECT_TIMEOUT) {
748 lastIdleCheckTime = currentTime;
749 AbstractIoSession.notifyIdleness(allSessions(), currentTime);
750 }
751 }
752
753
754
755
756 private void flush(long currentTime) {
757 if (flushingSessions.isEmpty()) {
758 return;
759 }
760
761 do {
762 S session = flushingSessions.poll();
763
764 if (session == null) {
765
766 break;
767 }
768
769
770
771 session.unscheduledForFlush();
772
773 SessionState state = getState(session);
774
775 switch (state) {
776 case OPENED:
777 try {
778 boolean flushedAll = flushNow(session, currentTime);
779
780 if (flushedAll && !session.getWriteRequestQueue().isEmpty(session)
781 && !session.isScheduledForFlush()) {
782 scheduleFlush(session);
783 }
784 } catch (Exception e) {
785 scheduleRemove(session);
786 IoFilterChain filterChain = session.getFilterChain();
787 filterChain.fireExceptionCaught(e);
788 }
789
790 break;
791
792 case CLOSING:
793
794 break;
795
796 case OPENING:
797
798
799
800 scheduleFlush(session);
801 return;
802
803 default:
804 throw new IllegalStateException(String.valueOf(state));
805 }
806
807 } while (!flushingSessions.isEmpty());
808 }
809
810 private boolean flushNow(S session, long currentTime) {
811 if (!session.isConnected()) {
812 scheduleRemove(session);
813 return false;
814 }
815
816 final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
817
818 final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
819
820
821
822
823 final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()
824 + (session.getConfig().getMaxReadBufferSize() >>> 1);
825 int writtenBytes = 0;
826 WriteRequest req = null;
827
828 try {
829
830 setInterestedInWrite(session, false);
831
832 do {
833
834 req = session.getCurrentWriteRequest();
835
836 if (req == null) {
837 req = writeRequestQueue.poll(session);
838
839 if (req == null) {
840 break;
841 }
842
843 session.setCurrentWriteRequest(req);
844 }
845
846 int localWrittenBytes = 0;
847 Object message = req.getMessage();
848
849 if (message instanceof IoBuffer) {
850 localWrittenBytes = writeBuffer(session, req, hasFragmentation, maxWrittenBytes - writtenBytes,
851 currentTime);
852
853 if ((localWrittenBytes > 0) && ((IoBuffer) message).hasRemaining()) {
854
855 writtenBytes += localWrittenBytes;
856 setInterestedInWrite(session, true);
857 return false;
858 }
859 } else if (message instanceof FileRegion) {
860 localWrittenBytes = writeFile(session, req, hasFragmentation, maxWrittenBytes - writtenBytes,
861 currentTime);
862
863
864
865
866
867
868 if ((localWrittenBytes > 0) && (((FileRegion) message).getRemainingBytes() > 0)) {
869 writtenBytes += localWrittenBytes;
870 setInterestedInWrite(session, true);
871 return false;
872 }
873 } else {
874 throw new IllegalStateException("Don't know how to handle message of type '"
875 + message.getClass().getName() + "'. Are you missing a protocol encoder?");
876 }
877
878 if (localWrittenBytes == 0) {
879
880 setInterestedInWrite(session, true);
881 return false;
882 }
883
884 writtenBytes += localWrittenBytes;
885
886 if (writtenBytes >= maxWrittenBytes) {
887
888 scheduleFlush(session);
889 return false;
890 }
891
892 if (message instanceof IoBuffer) {
893 ((IoBuffer) message).free();
894 }
895 } while (writtenBytes < maxWrittenBytes);
896 } catch (Exception e) {
897 if (req != null) {
898 req.getFuture().setException(e);
899 }
900
901 IoFilterChain filterChain = session.getFilterChain();
902 filterChain.fireExceptionCaught(e);
903 return false;
904 }
905
906 return true;
907 }
908
909 private int writeBuffer(S session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime)
910 throws Exception {
911 IoBuffer buf = (IoBuffer) req.getMessage();
912 int localWrittenBytes = 0;
913
914 if (buf.hasRemaining()) {
915 int length;
916
917 if (hasFragmentation) {
918 length = Math.min(buf.remaining(), maxLength);
919 } else {
920 length = buf.remaining();
921 }
922
923 try {
924 localWrittenBytes = write(session, buf, length);
925 } catch (IOException ioe) {
926
927
928 buf.free();
929 session.close(true);
930 destroy(session);
931
932 return 0;
933 }
934
935 }
936
937 session.increaseWrittenBytes(localWrittenBytes, currentTime);
938
939 if (!buf.hasRemaining() || (!hasFragmentation && (localWrittenBytes != 0))) {
940
941 int pos = buf.position();
942 buf.reset();
943
944 fireMessageSent(session, req);
945
946
947 buf.position(pos);
948 }
949
950 return localWrittenBytes;
951 }
952
953 private int writeFile(S session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime)
954 throws Exception {
955 int localWrittenBytes;
956 FileRegion region = (FileRegion) req.getMessage();
957
958 if (region.getRemainingBytes() > 0) {
959 int length;
960
961 if (hasFragmentation) {
962 length = (int) Math.min(region.getRemainingBytes(), maxLength);
963 } else {
964 length = (int) Math.min(Integer.MAX_VALUE, region.getRemainingBytes());
965 }
966
967 localWrittenBytes = transferFile(session, region, length);
968 region.update(localWrittenBytes);
969 } else {
970 localWrittenBytes = 0;
971 }
972
973 session.increaseWrittenBytes(localWrittenBytes, currentTime);
974
975 if ((region.getRemainingBytes() <= 0) || (!hasFragmentation && (localWrittenBytes != 0))) {
976 fireMessageSent(session, req);
977 }
978
979 return localWrittenBytes;
980 }
981
982 private void fireMessageSent(S session, WriteRequest req) {
983 session.setCurrentWriteRequest(null);
984 IoFilterChain filterChain = session.getFilterChain();
985 filterChain.fireMessageSent(req);
986 }
987
988
989
990
991 private void updateTrafficMask() {
992 int queueSize = trafficControllingSessions.size();
993
994 while (queueSize > 0) {
995 S session = trafficControllingSessions.poll();
996
997 if (session == null) {
998
999 return;
1000 }
1001
1002 SessionState state = getState(session);
1003
1004 switch (state) {
1005 case OPENED:
1006 updateTrafficControl(session);
1007
1008 break;
1009
1010 case CLOSING:
1011 break;
1012
1013 case OPENING:
1014
1015
1016
1017
1018 trafficControllingSessions.add(session);
1019 break;
1020
1021 default:
1022 throw new IllegalStateException(String.valueOf(state));
1023 }
1024
1025
1026
1027
1028
1029 queueSize--;
1030 }
1031 }
1032
1033
1034
1035
1036 public void updateTrafficControl(S session) {
1037
1038 try {
1039 setInterestedInRead(session, !session.isReadSuspended());
1040 } catch (Exception e) {
1041 IoFilterChain filterChain = session.getFilterChain();
1042 filterChain.fireExceptionCaught(e);
1043 }
1044
1045 try {
1046 setInterestedInWrite(session,
1047 !session.getWriteRequestQueue().isEmpty(session) && !session.isWriteSuspended());
1048 } catch (Exception e) {
1049 IoFilterChain filterChain = session.getFilterChain();
1050 filterChain.fireExceptionCaught(e);
1051 }
1052 }
1053
1054
1055
1056
1057
1058
1059
1060 private class Processor implements Runnable {
1061 public void run() {
1062 assert (processorRef.get() == this);
1063
1064 int nSessions = 0;
1065 lastIdleCheckTime = System.currentTimeMillis();
1066
1067 for (;;) {
1068 try {
1069
1070
1071
1072
1073 long t0 = System.currentTimeMillis();
1074 int selected = select(SELECT_TIMEOUT);
1075 long t1 = System.currentTimeMillis();
1076 long delta = (t1 - t0);
1077
1078 if ((selected == 0) && !wakeupCalled.get() && (delta < 100)) {
1079
1080
1081 if (isBrokenConnection()) {
1082 LOG.warn("Broken connection");
1083
1084
1085
1086 wakeupCalled.getAndSet(false);
1087
1088 continue;
1089 } else {
1090 LOG.warn("Create a new selector. Selected is 0, delta = " + (t1 - t0));
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102 registerNewSelector();
1103 }
1104
1105
1106 wakeupCalled.getAndSet(false);
1107
1108
1109 continue;
1110 }
1111
1112
1113 nSessions += handleNewSessions();
1114
1115 updateTrafficMask();
1116
1117
1118
1119 if (selected > 0) {
1120
1121 process();
1122 }
1123
1124
1125 long currentTime = System.currentTimeMillis();
1126 flush(currentTime);
1127
1128
1129 nSessions -= removeSessions();
1130
1131
1132 notifyIdleSessions(currentTime);
1133
1134
1135
1136 if (nSessions == 0) {
1137 processorRef.set(null);
1138
1139 if (newSessions.isEmpty() && isSelectorEmpty()) {
1140
1141 assert (processorRef.get() != this);
1142 break;
1143 }
1144
1145 assert (processorRef.get() != this);
1146
1147 if (!processorRef.compareAndSet(null, this)) {
1148
1149 assert (processorRef.get() != this);
1150 break;
1151 }
1152
1153 assert (processorRef.get() == this);
1154 }
1155
1156
1157
1158 if (isDisposing()) {
1159 for (Iterator<S> i = allSessions(); i.hasNext();) {
1160 scheduleRemove(i.next());
1161 }
1162
1163 wakeup();
1164 }
1165 } catch (ClosedSelectorException cse) {
1166
1167
1168 ExceptionMonitor.getInstance().exceptionCaught(cse);
1169 break;
1170 } catch (Exception e) {
1171 ExceptionMonitor.getInstance().exceptionCaught(e);
1172
1173 try {
1174 Thread.sleep(1000);
1175 } catch (InterruptedException e1) {
1176 ExceptionMonitor.getInstance().exceptionCaught(e1);
1177 }
1178 }
1179 }
1180
1181 try {
1182 synchronized (disposalLock) {
1183 if (disposing) {
1184 doDispose();
1185 }
1186 }
1187 } catch (Exception e) {
1188 ExceptionMonitor.getInstance().exceptionCaught(e);
1189 } finally {
1190 disposalFuture.setValue(true);
1191 }
1192 }
1193 }
1194 }