1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.core.service;
21
22 import java.util.AbstractSet;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Set;
27 import java.util.concurrent.Executor;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicInteger;
32 import java.util.concurrent.atomic.AtomicLong;
33
34 import org.apache.mina.core.ExceptionMonitor;
35 import org.apache.mina.core.IoUtil;
36 import org.apache.mina.core.filterchain.DefaultIoFilterChain;
37 import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
38 import org.apache.mina.core.filterchain.IoFilterChainBuilder;
39 import org.apache.mina.core.future.ConnectFuture;
40 import org.apache.mina.core.future.DefaultIoFuture;
41 import org.apache.mina.core.future.IoFuture;
42 import org.apache.mina.core.future.WriteFuture;
43 import org.apache.mina.core.session.AbstractIoSession;
44 import org.apache.mina.core.session.DefaultIoSessionDataStructureFactory;
45 import org.apache.mina.core.session.IdleStatus;
46 import org.apache.mina.core.session.IdleStatusChecker;
47 import org.apache.mina.core.session.IoSession;
48 import org.apache.mina.core.session.IoSessionConfig;
49 import org.apache.mina.core.session.IoSessionDataStructureFactory;
50 import org.apache.mina.core.session.IoSessionInitializationException;
51 import org.apache.mina.core.session.IoSessionInitializer;
52 import org.apache.mina.util.NamePreservingRunnable;
53
54
55
56
57
58
59
60
61 public abstract class AbstractIoService implements IoService {
62 private static final AtomicInteger id = new AtomicInteger();
63
64 private final IoServiceListener serviceActivationListener =
65 new IoServiceListener() {
66 public void serviceActivated(IoService service) {
67
68 AbstractIoService s = (AbstractIoService) service;
69 s.setLastReadTime(s.getActivationTime());
70 s.setLastWriteTime(s.getActivationTime());
71 s.lastThroughputCalculationTime = s.getActivationTime();
72
73
74 idleStatusChecker.addService(s);
75 }
76
77 public void serviceDeactivated(IoService service) {
78 idleStatusChecker.removeService((AbstractIoService) service);
79 }
80
81 public void serviceIdle(IoService service, IdleStatus idleStatus) {}
82 public void sessionCreated(IoSession session) {}
83 public void sessionDestroyed(IoSession session) {}
84 };
85
86
87
88
89 private IoFilterChainBuilder filterChainBuilder = new DefaultIoFilterChainBuilder();
90
91
92
93
94 private IoHandler handler;
95
96 private IoSessionDataStructureFactory sessionDataStructureFactory =
97 new DefaultIoSessionDataStructureFactory();
98
99
100
101
102 private final IoServiceListenerSupport listeners;
103
104 private final Executor executor;
105 private final String threadName;
106 private final boolean createdExecutor;
107
108
109
110
111
112 protected final Object disposalLock = new Object();
113 private volatile boolean disposing;
114 private volatile boolean disposed;
115 private IoFuture disposalFuture;
116
117 private final AtomicLong readBytes = new AtomicLong();
118 private final AtomicLong writtenBytes = new AtomicLong();
119 private final AtomicLong readMessages = new AtomicLong();
120 private final AtomicLong writtenMessages = new AtomicLong();
121 private long lastReadTime;
122 private long lastWriteTime;
123
124 private final AtomicInteger scheduledWriteBytes = new AtomicInteger();
125 private final AtomicInteger scheduledWriteMessages = new AtomicInteger();
126
127 private final Object throughputCalculationLock = new Object();
128 private int throughputCalculationInterval = 3;
129
130 private long lastThroughputCalculationTime;
131 private long lastReadBytes;
132 private long lastWrittenBytes;
133 private long lastReadMessages;
134 private long lastWrittenMessages;
135 private double readBytesThroughput;
136 private double writtenBytesThroughput;
137 private double readMessagesThroughput;
138 private double writtenMessagesThroughput;
139 private double largestReadBytesThroughput;
140 private double largestWrittenBytesThroughput;
141 private double largestReadMessagesThroughput;
142 private double largestWrittenMessagesThroughput;
143
144 private final IdleStatusChecker idleStatusChecker = new IdleStatusChecker();
145 private final Object idlenessCheckLock = new Object();
146 private int idleTimeForRead;
147 private int idleTimeForWrite;
148 private int idleTimeForBoth;
149
150 private int idleCountForBoth;
151 private int idleCountForRead;
152 private int idleCountForWrite;
153
154 private long lastIdleTimeForBoth;
155 private long lastIdleTimeForRead;
156 private long lastIdleTimeForWrite;
157
158
159
160
161 private final IoSessionConfig sessionConfig;
162
163
164
165
166
167
168
169
170
171
172
173
174
175 protected AbstractIoService(IoSessionConfig sessionConfig, Executor executor) {
176 if (sessionConfig == null) {
177 throw new NullPointerException("sessionConfig");
178 }
179
180 if (!getTransportMetadata().getSessionConfigType().isAssignableFrom(
181 sessionConfig.getClass())) {
182 throw new IllegalArgumentException("sessionConfig type: "
183 + sessionConfig.getClass() + " (expected: "
184 + getTransportMetadata().getSessionConfigType() + ")");
185 }
186
187 listeners = new IoServiceListenerSupport(this);
188 listeners.add(serviceActivationListener);
189 this.sessionConfig = sessionConfig;
190
191
192
193 ExceptionMonitor.getInstance();
194
195 if (executor == null) {
196 this.executor = Executors.newCachedThreadPool();
197 createdExecutor = true;
198 } else {
199 this.executor = executor;
200 createdExecutor = false;
201 }
202
203 threadName = getClass().getSimpleName() + '-' + id.incrementAndGet();
204
205 executeWorker(idleStatusChecker.getNotifyingTask(), "idleStatusChecker");
206 }
207
208
209
210
211 public final IoFilterChainBuilder getFilterChainBuilder() {
212 return filterChainBuilder;
213 }
214
215
216
217
218 public final void setFilterChainBuilder(IoFilterChainBuilder builder) {
219 if (builder == null) {
220 builder = new DefaultIoFilterChainBuilder();
221 }
222 filterChainBuilder = builder;
223 }
224
225
226
227
228 public final DefaultIoFilterChainBuilder getFilterChain() {
229 if (filterChainBuilder instanceof DefaultIoFilterChainBuilder) {
230 return (DefaultIoFilterChainBuilder) filterChainBuilder;
231 } else {
232 throw new IllegalStateException(
233 "Current filter chain builder is not a DefaultIoFilterChainBuilder.");
234 }
235 }
236
237
238
239
240 public final void addListener(IoServiceListener listener) {
241 listeners.add(listener);
242 }
243
244
245
246
247 public final void removeListener(IoServiceListener listener) {
248 listeners.remove(listener);
249 }
250
251
252
253
254 public final boolean isActive() {
255 return listeners.isActive();
256 }
257
258
259
260
261 public final boolean isDisposing() {
262 return disposing;
263 }
264
265
266
267
268 public final boolean isDisposed() {
269 return disposed;
270 }
271
272
273
274
275 public final void dispose() {
276 if (disposed) {
277 return;
278 }
279
280 IoFuture disposalFuture;
281 synchronized (disposalLock) {
282 disposalFuture = this.disposalFuture;
283 if (!disposing) {
284 disposing = true;
285 try {
286 this.disposalFuture = disposalFuture = dispose0();
287 } catch (Exception e) {
288 ExceptionMonitor.getInstance().exceptionCaught(e);
289 } finally {
290 if (disposalFuture == null) {
291 disposed = true;
292 }
293 }
294 }
295 }
296
297 idleStatusChecker.getNotifyingTask().cancel();
298 if (disposalFuture != null) {
299 disposalFuture.awaitUninterruptibly();
300 }
301 if (createdExecutor) {
302 ExecutorService e = (ExecutorService) executor;
303 e.shutdown();
304 while (!e.isTerminated()) {
305 try {
306 e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
307 } catch (InterruptedException e1) {
308
309 }
310 }
311 }
312
313 disposed = true;
314 }
315
316
317
318
319
320 protected abstract IoFuture dispose0() throws Exception;
321
322
323
324
325 public final Map<Long, IoSession> getManagedSessions() {
326 return listeners.getManagedSessions();
327 }
328
329
330
331
332 public final long getCumulativeManagedSessionCount() {
333 return listeners.getCumulativeManagedSessionCount();
334 }
335
336
337
338
339 public final int getLargestManagedSessionCount() {
340 return listeners.getLargestManagedSessionCount();
341 }
342
343
344
345
346 public final int getManagedSessionCount() {
347 return listeners.getManagedSessionCount();
348 }
349
350
351
352
353 public final IoHandler getHandler() {
354 return handler;
355 }
356
357
358
359
360 public final void setHandler(IoHandler handler) {
361 if (handler == null) {
362 throw new NullPointerException("handler");
363 }
364
365 if (isActive()) {
366 throw new IllegalStateException("handler cannot be set while the service is active.");
367 }
368
369 this.handler = handler;
370 }
371
372
373
374
375 public IoSessionConfig getSessionConfig() {
376 return sessionConfig;
377 }
378
379
380
381
382 public final IoSessionDataStructureFactory getSessionDataStructureFactory() {
383 return sessionDataStructureFactory;
384 }
385
386
387
388
389 public final void setSessionDataStructureFactory(IoSessionDataStructureFactory sessionDataStructureFactory) {
390 if (sessionDataStructureFactory == null) {
391 throw new NullPointerException("sessionDataStructureFactory");
392 }
393
394 if (isActive()) {
395 throw new IllegalStateException(
396 "sessionDataStructureFactory cannot be set while the service is active.");
397 }
398
399 this.sessionDataStructureFactory = sessionDataStructureFactory;
400 }
401
402
403
404
405 public final long getReadBytes() {
406 return readBytes.get();
407 }
408
409 public final void increaseReadBytes(long increment, long currentTime) {
410 readBytes.addAndGet(increment);
411 lastReadTime = currentTime;
412 idleCountForBoth = 0;
413 idleCountForRead = 0;
414 }
415
416
417
418
419 public final long getReadMessages() {
420 return readMessages.get();
421 }
422
423 public final void increaseReadMessages(long currentTime) {
424 readMessages.incrementAndGet();
425 lastReadTime = currentTime;
426 idleCountForBoth = 0;
427 idleCountForRead = 0;
428 }
429
430
431
432
433 public final int getThroughputCalculationInterval() {
434 return throughputCalculationInterval;
435 }
436
437
438
439
440 public final void setThroughputCalculationInterval(int throughputCalculationInterval) {
441 if (throughputCalculationInterval < 0) {
442 throw new IllegalArgumentException(
443 "throughputCalculationInterval: " + throughputCalculationInterval);
444 }
445
446 this.throughputCalculationInterval = throughputCalculationInterval;
447 }
448
449
450
451
452 public final long getThroughputCalculationIntervalInMillis() {
453 return throughputCalculationInterval * 1000L;
454 }
455
456
457
458
459 public final double getReadBytesThroughput() {
460 resetThroughput();
461 return readBytesThroughput;
462 }
463
464
465
466
467 public final double getWrittenBytesThroughput() {
468 resetThroughput();
469 return writtenBytesThroughput;
470 }
471
472
473
474
475 public final double getReadMessagesThroughput() {
476 resetThroughput();
477 return readMessagesThroughput;
478 }
479
480
481
482
483 public final double getWrittenMessagesThroughput() {
484 resetThroughput();
485 return writtenMessagesThroughput;
486 }
487
488
489
490
491 public final double getLargestReadBytesThroughput() {
492 return largestReadBytesThroughput;
493 }
494
495
496
497
498 public final double getLargestWrittenBytesThroughput() {
499 return largestWrittenBytesThroughput;
500 }
501
502
503
504
505 public final double getLargestReadMessagesThroughput() {
506 return largestReadMessagesThroughput;
507 }
508
509
510
511
512 public final double getLargestWrittenMessagesThroughput() {
513 return largestWrittenMessagesThroughput;
514 }
515
516 private void resetThroughput() {
517 if (getManagedSessionCount() == 0) {
518 readBytesThroughput = 0;
519 writtenBytesThroughput = 0;
520 readMessagesThroughput = 0;
521 writtenMessagesThroughput = 0;
522 }
523 }
524
525 private void updateThroughput(long currentTime) {
526 synchronized (throughputCalculationLock) {
527 int interval = (int) (currentTime - lastThroughputCalculationTime);
528 long minInterval = getThroughputCalculationIntervalInMillis();
529 if (minInterval == 0 || interval < minInterval) {
530 return;
531 }
532
533 long readBytes = this.readBytes.get();
534 long writtenBytes = this.writtenBytes.get();
535 long readMessages = this.readMessages.get();
536 long writtenMessages = this.writtenMessages.get();
537
538 readBytesThroughput = (readBytes - lastReadBytes) * 1000.0 / interval;
539 writtenBytesThroughput = (writtenBytes - lastWrittenBytes) * 1000.0 / interval;
540 readMessagesThroughput = (readMessages - lastReadMessages) * 1000.0 / interval;
541 writtenMessagesThroughput = (writtenMessages - lastWrittenMessages) * 1000.0 / interval;
542
543 if (readBytesThroughput > largestReadBytesThroughput) {
544 largestReadBytesThroughput = readBytesThroughput;
545 }
546 if (writtenBytesThroughput > largestWrittenBytesThroughput) {
547 largestWrittenBytesThroughput = writtenBytesThroughput;
548 }
549 if (readMessagesThroughput > largestReadMessagesThroughput) {
550 largestReadMessagesThroughput = readMessagesThroughput;
551 }
552 if (writtenMessagesThroughput > largestWrittenMessagesThroughput) {
553 largestWrittenMessagesThroughput = writtenMessagesThroughput;
554 }
555
556 lastReadBytes = readBytes;
557 lastWrittenBytes = writtenBytes;
558 lastReadMessages = readMessages;
559 lastWrittenMessages = writtenMessages;
560
561 lastThroughputCalculationTime = currentTime;
562 }
563 }
564
565
566
567
568 public final int getScheduledWriteBytes() {
569 return scheduledWriteBytes.get();
570 }
571
572 public final void increaseScheduledWriteBytes(int increment) {
573 scheduledWriteBytes.addAndGet(increment);
574 }
575
576
577
578
579 public final int getScheduledWriteMessages() {
580 return scheduledWriteMessages.get();
581 }
582
583 public final void increaseScheduledWriteMessages() {
584 scheduledWriteMessages.incrementAndGet();
585 }
586
587 public final void decreaseScheduledWriteMessages() {
588 scheduledWriteMessages.decrementAndGet();
589 }
590
591
592
593
594 public final long getActivationTime() {
595 return listeners.getActivationTime();
596 }
597
598
599
600
601 public final long getLastIoTime() {
602 return Math.max(lastReadTime, lastWriteTime);
603 }
604
605
606
607
608 public final long getLastReadTime() {
609 return lastReadTime;
610 }
611
612 protected final void setLastReadTime(long lastReadTime) {
613 this.lastReadTime = lastReadTime;
614 }
615
616
617
618
619 public final long getLastWriteTime() {
620 return lastWriteTime;
621 }
622
623 protected final void setLastWriteTime(long lastWriteTime) {
624 this.lastWriteTime = lastWriteTime;
625 }
626
627
628
629
630 public final long getWrittenBytes() {
631 return writtenBytes.get();
632 }
633
634 public final void increaseWrittenBytes(long increment, long currentTime) {
635 writtenBytes.addAndGet(increment);
636 lastWriteTime = currentTime;
637 idleCountForBoth = 0;
638 idleCountForWrite = 0;
639 }
640
641
642
643
644 public final long getWrittenMessages() {
645 return writtenMessages.get();
646 }
647
648 public final void increaseWrittenMessages(long currentTime) {
649 writtenMessages.incrementAndGet();
650 lastWriteTime = currentTime;
651 idleCountForBoth = 0;
652 idleCountForWrite = 0;
653 }
654
655
656
657
658 public final int getIdleTime(IdleStatus status) {
659 if (status == IdleStatus.BOTH_IDLE) {
660 return idleTimeForBoth;
661 }
662
663 if (status == IdleStatus.READER_IDLE) {
664 return idleTimeForRead;
665 }
666
667 if (status == IdleStatus.WRITER_IDLE) {
668 return idleTimeForWrite;
669 }
670
671 throw new IllegalArgumentException("Unknown idle status: " + status);
672 }
673
674
675
676
677 public final long getIdleTimeInMillis(IdleStatus status) {
678 return getIdleTime(status) * 1000L;
679 }
680
681
682
683
684 public final void setIdleTime(IdleStatus status, int idleTime) {
685 if (idleTime < 0) {
686 throw new IllegalArgumentException("Illegal idle time: " + idleTime);
687 }
688
689 if (status == IdleStatus.BOTH_IDLE) {
690 idleTimeForBoth = idleTime;
691 } else if (status == IdleStatus.READER_IDLE) {
692 idleTimeForRead = idleTime;
693 } else if (status == IdleStatus.WRITER_IDLE) {
694 idleTimeForWrite = idleTime;
695 } else {
696 throw new IllegalArgumentException("Unknown idle status: " + status);
697 }
698
699 if (idleTime == 0) {
700 if (status == IdleStatus.BOTH_IDLE) {
701 idleCountForBoth = 0;
702 } else if (status == IdleStatus.READER_IDLE) {
703 idleCountForRead = 0;
704 } else if (status == IdleStatus.WRITER_IDLE) {
705 idleCountForWrite = 0;
706 }
707 }
708 }
709
710
711
712
713 public final boolean isIdle(IdleStatus status) {
714 if (status == IdleStatus.BOTH_IDLE) {
715 return idleCountForBoth > 0;
716 }
717
718 if (status == IdleStatus.READER_IDLE) {
719 return idleCountForRead > 0;
720 }
721
722 if (status == IdleStatus.WRITER_IDLE) {
723 return idleCountForWrite > 0;
724 }
725
726 throw new IllegalArgumentException("Unknown idle status: " + status);
727 }
728
729
730
731
732 public final int getIdleCount(IdleStatus status) {
733 if (status == IdleStatus.BOTH_IDLE) {
734 return idleCountForBoth;
735 }
736
737 if (status == IdleStatus.READER_IDLE) {
738 return idleCountForRead;
739 }
740
741 if (status == IdleStatus.WRITER_IDLE) {
742 return idleCountForWrite;
743 }
744
745 throw new IllegalArgumentException("Unknown idle status: " + status);
746 }
747
748
749
750
751 public final long getLastIdleTime(IdleStatus status) {
752 if (status == IdleStatus.BOTH_IDLE) {
753 return lastIdleTimeForBoth;
754 }
755
756 if (status == IdleStatus.READER_IDLE) {
757 return lastIdleTimeForRead;
758 }
759
760 if (status == IdleStatus.WRITER_IDLE) {
761 return lastIdleTimeForWrite;
762 }
763
764 throw new IllegalArgumentException("Unknown idle status: " + status);
765 }
766
767 private void increaseIdleCount(IdleStatus status, long currentTime) {
768 if (status == IdleStatus.BOTH_IDLE) {
769 idleCountForBoth++;
770 lastIdleTimeForBoth = currentTime;
771 } else if (status == IdleStatus.READER_IDLE) {
772 idleCountForRead++;
773 lastIdleTimeForRead = currentTime;
774 } else if (status == IdleStatus.WRITER_IDLE) {
775 idleCountForWrite++;
776 lastIdleTimeForWrite = currentTime;
777 } else {
778 throw new IllegalArgumentException("Unknown idle status: " + status);
779 }
780 }
781
782 public final void notifyIdleness(long currentTime) {
783 updateThroughput(currentTime);
784
785 synchronized (idlenessCheckLock) {
786 notifyIdleness(
787 currentTime,
788 getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
789 IdleStatus.BOTH_IDLE, Math.max(
790 getLastIoTime(),
791 getLastIdleTime(IdleStatus.BOTH_IDLE)));
792
793 notifyIdleness(
794 currentTime,
795 getIdleTimeInMillis(IdleStatus.READER_IDLE),
796 IdleStatus.READER_IDLE, Math.max(
797 getLastReadTime(),
798 getLastIdleTime(IdleStatus.READER_IDLE)));
799
800 notifyIdleness(
801 currentTime,
802 getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
803 IdleStatus.WRITER_IDLE, Math.max(
804 getLastWriteTime(),
805 getLastIdleTime(IdleStatus.WRITER_IDLE)));
806 }
807 }
808
809 private void notifyIdleness(
810 long currentTime, long idleTime, IdleStatus status, long lastIoTime) {
811 if (idleTime > 0 && lastIoTime != 0
812 && currentTime - lastIoTime >= idleTime) {
813 increaseIdleCount(status, currentTime);
814 listeners.fireServiceIdle(status);
815 }
816 }
817
818
819
820
821 public final int getBothIdleCount() {
822 return getIdleCount(IdleStatus.BOTH_IDLE);
823 }
824
825
826
827
828 public final long getLastBothIdleTime() {
829 return getLastIdleTime(IdleStatus.BOTH_IDLE);
830 }
831
832
833
834
835 public final long getLastReaderIdleTime() {
836 return getLastIdleTime(IdleStatus.READER_IDLE);
837 }
838
839
840
841
842 public final long getLastWriterIdleTime() {
843 return getLastIdleTime(IdleStatus.WRITER_IDLE);
844 }
845
846
847
848
849 public final int getReaderIdleCount() {
850 return getIdleCount(IdleStatus.READER_IDLE);
851 }
852
853
854
855
856 public final int getWriterIdleCount() {
857 return getIdleCount(IdleStatus.WRITER_IDLE);
858 }
859
860
861
862
863 public final int getBothIdleTime() {
864 return getIdleTime(IdleStatus.BOTH_IDLE);
865 }
866
867
868
869
870 public final long getBothIdleTimeInMillis() {
871 return getIdleTimeInMillis(IdleStatus.BOTH_IDLE);
872 }
873
874
875
876
877 public final int getReaderIdleTime() {
878 return getIdleTime(IdleStatus.READER_IDLE);
879 }
880
881
882
883
884 public final long getReaderIdleTimeInMillis() {
885 return getIdleTimeInMillis(IdleStatus.READER_IDLE);
886 }
887
888
889
890
891 public final int getWriterIdleTime() {
892 return getIdleTime(IdleStatus.WRITER_IDLE);
893 }
894
895
896
897
898 public final long getWriterIdleTimeInMillis() {
899 return getIdleTimeInMillis(IdleStatus.WRITER_IDLE);
900 }
901
902
903
904
905 public final boolean isBothIdle() {
906 return isIdle(IdleStatus.BOTH_IDLE);
907 }
908
909
910
911
912 public final boolean isReaderIdle() {
913 return isIdle(IdleStatus.READER_IDLE);
914 }
915
916
917
918
919 public final boolean isWriterIdle() {
920 return isIdle(IdleStatus.WRITER_IDLE);
921 }
922
923
924
925
926 public final void setBothIdleTime(int idleTime) {
927 setIdleTime(IdleStatus.BOTH_IDLE, idleTime);
928 }
929
930
931
932
933 public final void setReaderIdleTime(int idleTime) {
934 setIdleTime(IdleStatus.READER_IDLE, idleTime);
935 }
936
937
938
939
940 public final void setWriterIdleTime(int idleTime) {
941 setIdleTime(IdleStatus.WRITER_IDLE, idleTime);
942 }
943
944
945
946
947 public final Set<WriteFuture> broadcast(Object message) {
948
949
950
951 final List<WriteFuture> futures = IoUtil.broadcast(
952 message, getManagedSessions().values());
953 return new AbstractSet<WriteFuture>() {
954 @Override
955 public Iterator<WriteFuture> iterator() {
956 return futures.iterator();
957 }
958
959 @Override
960 public int size() {
961 return futures.size();
962 }
963 };
964 }
965
966 public final IoServiceListenerSupport getListeners() {
967 return listeners;
968 }
969
970 protected final IdleStatusChecker getIdleStatusChecker() {
971 return idleStatusChecker;
972 }
973
974 protected final void executeWorker(Runnable worker) {
975 executeWorker(worker, null);
976 }
977
978 protected final void executeWorker(Runnable worker, String suffix) {
979 String actualThreadName = threadName;
980 if (suffix != null) {
981 actualThreadName = actualThreadName + '-' + suffix;
982 }
983 executor.execute(new NamePreservingRunnable(worker, actualThreadName));
984 }
985
986
987 @SuppressWarnings("unchecked")
988 protected final void finishSessionInitialization(
989 IoSession session, IoFuture future, IoSessionInitializer sessionInitializer) {
990
991 if (getLastReadTime() == 0) {
992 setLastReadTime(getActivationTime());
993 }
994 if (getLastWriteTime() == 0) {
995 setLastWriteTime(getActivationTime());
996 }
997
998
999
1000
1001
1002 try {
1003 ((AbstractIoSession) session).setAttributeMap(
1004 session.getService().getSessionDataStructureFactory().getAttributeMap(session));
1005 } catch (IoSessionInitializationException e) {
1006 throw e;
1007 } catch (Exception e) {
1008 throw new IoSessionInitializationException(
1009 "Failed to initialize an attributeMap.", e);
1010 }
1011
1012 try {
1013 ((AbstractIoSession) session).setWriteRequestQueue(
1014 session.getService().getSessionDataStructureFactory().getWriteRequestQueue(session));
1015 } catch (IoSessionInitializationException e) {
1016 throw e;
1017 } catch (Exception e) {
1018 throw new IoSessionInitializationException(
1019 "Failed to initialize a writeRequestQueue.", e);
1020 }
1021
1022 if (future != null && future instanceof ConnectFuture) {
1023
1024 session.setAttribute(DefaultIoFilterChain.SESSION_CREATED_FUTURE, future);
1025 }
1026
1027 if (sessionInitializer != null) {
1028 sessionInitializer.initializeSession(session, future);
1029 }
1030
1031 finishSessionInitialization0(session, future);
1032 }
1033
1034
1035
1036
1037
1038
1039
1040 protected void finishSessionInitialization0(IoSession session, IoFuture future) {}
1041
1042 protected static class ServiceOperationFuture extends DefaultIoFuture {
1043 public ServiceOperationFuture() {
1044 super(null);
1045 }
1046
1047 public final boolean isDone() {
1048 return getValue() == Boolean.TRUE;
1049 }
1050
1051 public final void setDone() {
1052 setValue(Boolean.TRUE);
1053 }
1054
1055 public final Exception getException() {
1056 if (getValue() instanceof Exception) {
1057 return (Exception) getValue();
1058 } else {
1059 return null;
1060 }
1061 }
1062
1063 public final void setException(Exception exception) {
1064 if (exception == null) {
1065 throw new NullPointerException("exception");
1066 }
1067 setValue(exception);
1068 }
1069 }
1070 }