1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.core.polling;
21
22 import java.io.IOException;
23 import java.net.PortUnreachableException;
24 import java.util.ArrayList;
25 import java.util.Iterator;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Queue;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.ConcurrentLinkedQueue;
31 import java.util.concurrent.Executor;
32 import java.util.concurrent.atomic.AtomicBoolean;
33 import java.util.concurrent.atomic.AtomicInteger;
34
35 import org.apache.mina.core.buffer.IoBuffer;
36 import org.apache.mina.core.file.FileRegion;
37 import org.apache.mina.core.filterchain.IoFilterChain;
38 import org.apache.mina.core.filterchain.IoFilterChainBuilder;
39 import org.apache.mina.core.future.DefaultIoFuture;
40 import org.apache.mina.core.service.AbstractIoService;
41 import org.apache.mina.core.service.IoProcessor;
42 import org.apache.mina.core.service.IoServiceListenerSupport;
43 import org.apache.mina.core.session.AbstractIoSession;
44 import org.apache.mina.core.session.IoSession;
45 import org.apache.mina.core.session.IoSessionConfig;
46 import org.apache.mina.core.session.SessionState;
47 import org.apache.mina.core.write.WriteRequest;
48 import org.apache.mina.core.write.WriteRequestQueue;
49 import org.apache.mina.core.write.WriteToClosedSessionException;
50 import org.apache.mina.transport.socket.AbstractDatagramSessionConfig;
51 import org.apache.mina.util.ExceptionMonitor;
52 import org.apache.mina.util.NamePreservingRunnable;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56
57
58
59
60
61
62
63
64
65
66 public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> implements IoProcessor<S> {
67
68 private final static Logger LOG = LoggerFactory.getLogger(IoProcessor.class);
69
70
71
72
73
74
75
76 private static final int WRITE_SPIN_COUNT = 256;
77
78
79
80
81
82 private static final long SELECT_TIMEOUT = 1000L;
83
84
85 private static final Map<Class<?>, AtomicInteger> threadIds = new ConcurrentHashMap<Class<?>, AtomicInteger>();
86
87
88 private final Object lock = new Object();
89
90
91 private final String threadName;
92
93
94 private final Executor executor;
95
96
97 private final Queue<S> newSessions = new ConcurrentLinkedQueue<S>();
98
99
100 private final Queue<S> removingSessions = new ConcurrentLinkedQueue<S>();
101
102
103 private final Queue<S> flushingSessions = new ConcurrentLinkedQueue<S>();
104
105
106
107
108
109 private final Queue<S> trafficControllingSessions = new ConcurrentLinkedQueue<S>();
110
111
112 private Processor processor;
113
114 private long lastIdleCheckTime;
115
116 private final Object disposalLock = new Object();
117
118 private volatile boolean disposing;
119
120 private volatile boolean disposed;
121
122 private final DefaultIoFuture disposalFuture = new DefaultIoFuture(null);
123
124 protected AtomicBoolean wakeupCalled = new AtomicBoolean(false);
125
126
127
128
129
130
131
132
133 protected AbstractPollingIoProcessor(Executor executor) {
134 if (executor == null) {
135 throw new IllegalArgumentException("executor");
136 }
137
138 this.threadName = nextThreadName();
139 this.executor = executor;
140 }
141
142
143
144
145
146
147
148
149
150 private String nextThreadName() {
151 Class<?> cls = getClass();
152 int newThreadId;
153
154
155
156
157 synchronized (threadIds) {
158
159 AtomicInteger threadId = threadIds.get(cls);
160
161 if (threadId == null) {
162
163
164
165 newThreadId = 1;
166 threadIds.put(cls, new AtomicInteger(newThreadId));
167 } else {
168
169 newThreadId = threadId.incrementAndGet();
170 }
171 }
172
173
174 return cls.getSimpleName() + '-' + newThreadId;
175 }
176
177
178
179
180 public final boolean isDisposing() {
181 return disposing;
182 }
183
184
185
186
187 public final boolean isDisposed() {
188 return disposed;
189 }
190
191
192
193
194 public final void dispose() {
195 if (disposed || disposing) {
196 return;
197 }
198
199 synchronized (disposalLock) {
200 disposing = true;
201 startupProcessor();
202 }
203
204 disposalFuture.awaitUninterruptibly();
205 disposed = true;
206 }
207
208
209
210
211
212
213
214 protected abstract void doDispose() throws Exception;
215
216
217
218
219
220
221
222
223
224
225 protected abstract int select(long timeout) throws Exception;
226
227
228
229
230
231
232
233
234 protected abstract int select() throws Exception;
235
236
237
238
239
240
241
242 protected abstract boolean isSelectorEmpty();
243
244
245
246
247 protected abstract void wakeup();
248
249
250
251
252
253
254
255 protected abstract Iterator<S> allSessions();
256
257
258
259
260
261
262 protected abstract Iterator<S> selectedSessions();
263
264
265
266
267
268
269
270
271 protected abstract SessionState getState(S session);
272
273
274
275
276
277
278
279
280 protected abstract boolean isWritable(S session);
281
282
283
284
285
286
287
288
289 protected abstract boolean isReadable(S session);
290
291
292
293
294
295
296
297
298
299 protected abstract void setInterestedInWrite(S session, boolean isInterested)
300 throws Exception;
301
302
303
304
305
306
307
308
309
310 protected abstract void setInterestedInRead(S session, boolean isInterested)
311 throws Exception;
312
313
314
315
316
317
318
319
320 protected abstract boolean isInterestedInRead(S session);
321
322
323
324
325
326
327
328
329 protected abstract boolean isInterestedInWrite(S session);
330
331
332
333
334
335
336
337 protected abstract void init(S session) throws Exception;
338
339
340
341
342
343
344
345
346
347 protected abstract void destroy(S session) throws Exception;
348
349
350
351
352
353
354
355
356
357
358
359
360
361 protected abstract int read(S session, IoBuffer buf) throws Exception;
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378 protected abstract int write(S session, IoBuffer buf, int length)
379 throws Exception;
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397 protected abstract int transferFile(S session, FileRegion region, int length)
398 throws Exception;
399
400
401
402
403 public final void add(S session) {
404 if (disposed || disposing) {
405 throw new IllegalStateException("Already disposed.");
406 }
407
408
409 newSessions.add(session);
410 startupProcessor();
411 }
412
413
414
415
416 public final void remove(S session) {
417 scheduleRemove(session);
418 startupProcessor();
419 }
420
421 private void scheduleRemove(S session) {
422 removingSessions.add(session);
423 }
424
425
426
427
428 public final void flush(S session) {
429
430
431 if (session.setScheduledForFlush( true )) {
432 flushingSessions.add(session);
433 wakeup();
434 }
435 }
436
437 private void scheduleFlush(S session) {
438
439
440 if (session.setScheduledForFlush(true)) {
441 flushingSessions.add(session);
442 }
443 }
444
445
446
447
448 public final void updateTrafficMask(S session) {
449 trafficControllingSessions.add(session);
450 wakeup();
451 }
452
453
454
455
456
457 private void startupProcessor() {
458 synchronized (lock) {
459 if (processor == null) {
460 processor = new Processor();
461 executor.execute(new NamePreservingRunnable(processor, threadName));
462 }
463 }
464
465
466
467 wakeup();
468 }
469
470
471
472
473
474
475
476 private int handleNewSessions() {
477 int addedSessions = 0;
478
479 for (S session = newSessions.poll(); session != null; session = newSessions.poll()) {
480 if (addNow(session)) {
481
482 addedSessions++;
483 }
484 }
485
486 return addedSessions;
487 }
488
489
490
491
492
493
494
495
496
497
498 private boolean addNow(S session) {
499 boolean registered = false;
500
501 try {
502 init(session);
503 registered = true;
504
505
506 IoFilterChainBuilder chainBuilder = session.getService().getFilterChainBuilder();
507 chainBuilder.buildFilterChain(session.getFilterChain());
508
509
510
511
512 IoServiceListenerSupport listeners = ((AbstractIoService) session.getService()).getListeners();
513 listeners.fireSessionCreated(session);
514 } catch (Throwable e) {
515 ExceptionMonitor.getInstance().exceptionCaught(e);
516
517 try {
518 destroy(session);
519 } catch (Exception e1) {
520 ExceptionMonitor.getInstance().exceptionCaught(e1);
521 } finally {
522 registered = false;
523 }
524 }
525
526 return registered;
527 }
528
529 private int removeSessions() {
530 int removedSessions = 0;
531
532 for (S session = removingSessions.poll(); session != null; session = removingSessions.poll()) {
533 SessionState state = getState(session);
534
535
536 switch (state) {
537 case OPENED:
538
539 if (removeNow(session)) {
540 removedSessions++;
541 }
542
543 break;
544
545 case CLOSING:
546
547 break;
548
549 case OPENING:
550
551
552 newSessions.remove(session);
553
554 if (removeNow(session)) {
555 removedSessions++;
556 }
557
558 break;
559
560 default:
561 throw new IllegalStateException(String.valueOf(state));
562 }
563 }
564
565 return removedSessions;
566 }
567
568 private boolean removeNow(S session) {
569 clearWriteRequestQueue(session);
570
571 try {
572 destroy(session);
573 return true;
574 } catch (Exception e) {
575 IoFilterChain filterChain = session.getFilterChain();
576 filterChain.fireExceptionCaught(e);
577 } finally {
578 clearWriteRequestQueue(session);
579 ((AbstractIoService) session.getService()).getListeners()
580 .fireSessionDestroyed(session);
581 }
582 return false;
583 }
584
585 private void clearWriteRequestQueue(S session) {
586 WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
587 WriteRequest req;
588
589 List<WriteRequest> failedRequests = new ArrayList<WriteRequest>();
590
591 if ((req = writeRequestQueue.poll(session)) != null) {
592 Object message = req.getMessage();
593
594 if (message instanceof IoBuffer) {
595 IoBuffer buf = (IoBuffer)message;
596
597
598
599 if (buf.hasRemaining()) {
600 buf.reset();
601 failedRequests.add(req);
602 } else {
603 IoFilterChain filterChain = session.getFilterChain();
604 filterChain.fireMessageSent(req);
605 }
606 } else {
607 failedRequests.add(req);
608 }
609
610
611 while ((req = writeRequestQueue.poll(session)) != null) {
612 failedRequests.add(req);
613 }
614 }
615
616
617 if (!failedRequests.isEmpty()) {
618 WriteToClosedSessionException cause = new WriteToClosedSessionException(
619 failedRequests);
620
621 for (WriteRequest r : failedRequests) {
622 session.decreaseScheduledBytesAndMessages(r);
623 r.getFuture().setException(cause);
624 }
625
626 IoFilterChain filterChain = session.getFilterChain();
627 filterChain.fireExceptionCaught(cause);
628 }
629 }
630
631 private void process() throws Exception {
632 for (Iterator<S> i = selectedSessions(); i.hasNext();) {
633 S session = i.next();
634 process(session);
635 i.remove();
636 }
637 }
638
639
640
641
642 private void process(S session) {
643
644 if (isReadable(session) && !session.isReadSuspended()) {
645 read(session);
646 }
647
648
649 if (isWritable(session) && !session.isWriteSuspended()) {
650
651 if (session.setScheduledForFlush(true)) {
652 flushingSessions.add(session);
653 }
654 }
655 }
656
657 private void read(S session) {
658 IoSessionConfig config = session.getConfig();
659 int bufferSize = config.getReadBufferSize();
660 IoBuffer buf = IoBuffer.allocate(bufferSize);
661
662 final boolean hasFragmentation = session.getTransportMetadata()
663 .hasFragmentation();
664
665 try {
666 int readBytes = 0;
667 int ret;
668
669 try {
670 if (hasFragmentation) {
671
672 while ((ret = read(session, buf)) > 0) {
673 readBytes += ret;
674
675 if (!buf.hasRemaining()) {
676 break;
677 }
678 }
679 } else {
680 ret = read(session, buf);
681
682 if (ret > 0) {
683 readBytes = ret;
684 }
685 }
686 } finally {
687 buf.flip();
688 }
689
690 if (readBytes > 0) {
691 IoFilterChain filterChain = session.getFilterChain();
692 filterChain.fireMessageReceived(buf);
693 buf = null;
694
695 if (hasFragmentation) {
696 if (readBytes << 1 < config.getReadBufferSize()) {
697 session.decreaseReadBufferSize();
698 } else if (readBytes == config.getReadBufferSize()) {
699 session.increaseReadBufferSize();
700 }
701 }
702 }
703
704 if (ret < 0) {
705 scheduleRemove(session);
706 }
707 } catch (Throwable e) {
708 if (e instanceof IOException) {
709 if (!(e instanceof PortUnreachableException)
710 || !AbstractDatagramSessionConfig.class.isAssignableFrom(config.getClass())
711 || ((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable()) {
712 scheduleRemove(session);
713 }
714 }
715
716 IoFilterChain filterChain = session.getFilterChain();
717 filterChain.fireExceptionCaught(e);
718 }
719 }
720
721
722 private static String byteArrayToHex( byte[] barray )
723 {
724 char[] c = new char[barray.length * 2];
725 int pos = 0;
726
727 for ( byte b : barray )
728 {
729 int bb = ( b & 0x00FF ) >> 4;
730 c[pos++] = ( char ) ( bb > 9 ? bb + 0x37 : bb + 0x30 );
731 bb = b & 0x0F;
732 c[pos++] = ( char ) ( bb > 9 ? bb + 0x37 : bb + 0x30 );
733 if ( pos > 60 )
734 {
735 break;
736 }
737 }
738
739 return new String( c );
740 }
741
742
743 private void notifyIdleSessions(long currentTime) throws Exception {
744
745 if (currentTime - lastIdleCheckTime >= SELECT_TIMEOUT) {
746 lastIdleCheckTime = currentTime;
747 AbstractIoSession.notifyIdleness(allSessions(), currentTime);
748 }
749 }
750
751
752
753
754 private void flush(long currentTime) {
755 if (flushingSessions.isEmpty()) {
756 return;
757 }
758
759 do {
760 S session = flushingSessions.poll();
761
762 if (session == null) {
763
764 break;
765 }
766
767
768
769 session.unscheduledForFlush();
770
771 SessionState state = getState(session);
772
773 switch (state) {
774 case OPENED:
775 try {
776 boolean flushedAll = flushNow(session, currentTime);
777
778 if (flushedAll
779 && !session.getWriteRequestQueue().isEmpty(session)
780 && !session.isScheduledForFlush()) {
781 scheduleFlush(session);
782 }
783 } catch (Exception e) {
784 scheduleRemove(session);
785 IoFilterChain filterChain = session.getFilterChain();
786 filterChain.fireExceptionCaught(e);
787 }
788
789 break;
790
791 case CLOSING:
792
793 break;
794
795 case OPENING:
796
797
798
799 scheduleFlush(session);
800 return;
801
802 default:
803 throw new IllegalStateException(String.valueOf(state));
804 }
805
806 } while (!flushingSessions.isEmpty());
807 }
808
809 private boolean flushNow(S session, long currentTime) {
810 if (!session.isConnected()) {
811 scheduleRemove(session);
812 return false;
813 }
814
815 final boolean hasFragmentation = session.getTransportMetadata()
816 .hasFragmentation();
817
818 final WriteRequestQueue writeRequestQueue = session
819 .getWriteRequestQueue();
820
821
822
823
824 final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()
825 + (session.getConfig().getMaxReadBufferSize() >>> 1);
826 int writtenBytes = 0;
827 WriteRequest req = null;
828
829 try {
830
831 setInterestedInWrite(session, false);
832
833 do {
834
835 req = session.getCurrentWriteRequest();
836
837 if (req == null) {
838 req = writeRequestQueue.poll(session);
839
840 if (req == null) {
841 break;
842 }
843
844 session.setCurrentWriteRequest(req);
845 }
846
847 int localWrittenBytes = 0;
848 Object message = req.getMessage();
849
850 if (message instanceof IoBuffer) {
851 localWrittenBytes = writeBuffer(session, req,
852 hasFragmentation, maxWrittenBytes - writtenBytes,
853 currentTime);
854
855 if (localWrittenBytes > 0
856 && ((IoBuffer) message).hasRemaining()) {
857
858 writtenBytes += localWrittenBytes;
859 setInterestedInWrite(session, true);
860 return false;
861 }
862 } else if (message instanceof FileRegion) {
863 localWrittenBytes = writeFile(session, req,
864 hasFragmentation, maxWrittenBytes - writtenBytes,
865 currentTime);
866
867
868
869
870
871
872 if (localWrittenBytes > 0
873 && ((FileRegion) message).getRemainingBytes() > 0) {
874 writtenBytes += localWrittenBytes;
875 setInterestedInWrite(session, true);
876 return false;
877 }
878 } else {
879 throw new IllegalStateException(
880 "Don't know how to handle message of type '"
881 + message.getClass().getName()
882 + "'. Are you missing a protocol encoder?");
883 }
884
885 if (localWrittenBytes == 0) {
886
887 setInterestedInWrite(session, true);
888 return false;
889 }
890
891 writtenBytes += localWrittenBytes;
892
893 if (writtenBytes >= maxWrittenBytes) {
894
895 scheduleFlush(session);
896 return false;
897 }
898 } while (writtenBytes < maxWrittenBytes);
899 } catch (Exception e) {
900 if (req != null) {
901 req.getFuture().setException(e);
902 }
903
904 IoFilterChain filterChain = session.getFilterChain();
905 filterChain.fireExceptionCaught(e);
906 return false;
907 }
908
909 return true;
910 }
911
912 private int writeBuffer(S session, WriteRequest req,
913 boolean hasFragmentation, int maxLength, long currentTime)
914 throws Exception {
915 IoBuffer buf = (IoBuffer) req.getMessage();
916 int localWrittenBytes = 0;
917
918 if (buf.hasRemaining()) {
919 int length;
920
921 if (hasFragmentation) {
922 length = Math.min(buf.remaining(), maxLength);
923 } else {
924 length = buf.remaining();
925 }
926
927 localWrittenBytes = write(session, buf, length);
928 }
929
930 session.increaseWrittenBytes(localWrittenBytes, currentTime);
931
932 if (!buf.hasRemaining() || !hasFragmentation && localWrittenBytes != 0) {
933
934 int pos = buf.position();
935 buf.reset();
936
937 fireMessageSent(session, req);
938
939
940 buf.position(pos);
941 }
942 return localWrittenBytes;
943 }
944
945 private int writeFile(S session, WriteRequest req,
946 boolean hasFragmentation, int maxLength, long currentTime)
947 throws Exception {
948 int localWrittenBytes;
949 FileRegion region = (FileRegion) req.getMessage();
950
951 if (region.getRemainingBytes() > 0) {
952 int length;
953
954 if (hasFragmentation) {
955 length = (int) Math.min(region.getRemainingBytes(), maxLength);
956 } else {
957 length = (int) Math.min(Integer.MAX_VALUE, region
958 .getRemainingBytes());
959 }
960
961 localWrittenBytes = transferFile(session, region, length);
962 region.update(localWrittenBytes);
963 } else {
964 localWrittenBytes = 0;
965 }
966
967 session.increaseWrittenBytes(localWrittenBytes, currentTime);
968
969 if (region.getRemainingBytes() <= 0 || !hasFragmentation
970 && localWrittenBytes != 0) {
971 fireMessageSent(session, req);
972 }
973
974 return localWrittenBytes;
975 }
976
977 private void fireMessageSent(S session, WriteRequest req) {
978 session.setCurrentWriteRequest(null);
979 IoFilterChain filterChain = session.getFilterChain();
980 filterChain.fireMessageSent(req);
981 }
982
983
984
985
986 private void updateTrafficMask() {
987 int queueSize = trafficControllingSessions.size();
988
989 while (queueSize > 0) {
990 S session = trafficControllingSessions.poll();
991
992 if (session == null) {
993
994 return;
995 }
996
997 SessionState state = getState(session);
998
999 switch (state) {
1000 case OPENED:
1001 updateTrafficControl(session);
1002
1003 break;
1004
1005 case CLOSING:
1006 break;
1007
1008 case OPENING:
1009
1010
1011
1012
1013 trafficControllingSessions.add(session);
1014 break;
1015
1016 default:
1017 throw new IllegalStateException(String.valueOf(state));
1018 }
1019
1020
1021
1022
1023
1024 queueSize--;
1025 }
1026 }
1027
1028
1029
1030
1031 public void updateTrafficControl(S session) {
1032
1033 try {
1034 setInterestedInRead(session, !session.isReadSuspended());
1035 } catch (Exception e) {
1036 IoFilterChain filterChain = session.getFilterChain();
1037 filterChain.fireExceptionCaught(e);
1038 }
1039
1040 try {
1041 setInterestedInWrite(session, !session.getWriteRequestQueue()
1042 .isEmpty(session)
1043 && !session.isWriteSuspended());
1044 } catch (Exception e) {
1045 IoFilterChain filterChain = session.getFilterChain();
1046 filterChain.fireExceptionCaught(e);
1047 }
1048 }
1049
1050
1051
1052
1053
1054
1055
1056 private class Processor implements Runnable {
1057 public void run() {
1058 int nSessions = 0;
1059 lastIdleCheckTime = System.currentTimeMillis();
1060
1061 for (;;) {
1062 try {
1063
1064
1065
1066
1067 int selected = select(SELECT_TIMEOUT);
1068
1069
1070 nSessions += handleNewSessions();
1071
1072 updateTrafficMask();
1073
1074
1075
1076 if (selected > 0) {
1077
1078 process();
1079 }
1080
1081
1082 long currentTime = System.currentTimeMillis();
1083 flush(currentTime);
1084
1085
1086 nSessions -= removeSessions();
1087
1088
1089 notifyIdleSessions(currentTime);
1090
1091
1092
1093 if (nSessions == 0) {
1094 synchronized (lock) {
1095 if (newSessions.isEmpty() && isSelectorEmpty()) {
1096 processor = null;
1097 break;
1098 }
1099 }
1100 }
1101
1102
1103
1104 if (isDisposing()) {
1105 for (Iterator<S> i = allSessions(); i.hasNext();) {
1106 scheduleRemove(i.next());
1107 }
1108
1109 wakeup();
1110 }
1111 } catch (Throwable t) {
1112 ExceptionMonitor.getInstance().exceptionCaught(t);
1113
1114 try {
1115 Thread.sleep(1000);
1116 } catch (InterruptedException e1) {
1117 ExceptionMonitor.getInstance().exceptionCaught(e1);
1118 }
1119 }
1120 }
1121
1122 try {
1123 synchronized (disposalLock) {
1124 if (disposing) {
1125 doDispose();
1126 }
1127 }
1128 } catch (Throwable t) {
1129 ExceptionMonitor.getInstance().exceptionCaught(t);
1130 } finally {
1131 disposalFuture.setValue(true);
1132 }
1133 }
1134 }
1135 }