1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.core.polling;
21
22 import java.io.IOException;
23 import java.net.PortUnreachableException;
24 import java.nio.channels.ClosedSelectorException;
25 import java.util.ArrayList;
26 import java.util.Iterator;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Queue;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.ConcurrentLinkedQueue;
32 import java.util.concurrent.Executor;
33 import java.util.concurrent.atomic.AtomicBoolean;
34 import java.util.concurrent.atomic.AtomicInteger;
35
36 import org.apache.mina.core.buffer.IoBuffer;
37 import org.apache.mina.core.file.FileRegion;
38 import org.apache.mina.core.filterchain.IoFilterChain;
39 import org.apache.mina.core.filterchain.IoFilterChainBuilder;
40 import org.apache.mina.core.future.DefaultIoFuture;
41 import org.apache.mina.core.service.AbstractIoService;
42 import org.apache.mina.core.service.IoProcessor;
43 import org.apache.mina.core.service.IoServiceListenerSupport;
44 import org.apache.mina.core.session.AbstractIoSession;
45 import org.apache.mina.core.session.IoSession;
46 import org.apache.mina.core.session.IoSessionConfig;
47 import org.apache.mina.core.session.SessionState;
48 import org.apache.mina.core.write.WriteRequest;
49 import org.apache.mina.core.write.WriteRequestQueue;
50 import org.apache.mina.core.write.WriteToClosedSessionException;
51 import org.apache.mina.transport.socket.AbstractDatagramSessionConfig;
52 import org.apache.mina.util.ExceptionMonitor;
53 import org.apache.mina.util.NamePreservingRunnable;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56
57
58
59
60
61
62
63
64
65
66
67 public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> implements IoProcessor<S> {
68
69 private final static Logger LOG = LoggerFactory.getLogger(IoProcessor.class);
70
71
72
73
74
75
76
77 private static final int WRITE_SPIN_COUNT = 256;
78
79
80
81
82
83 private static final long SELECT_TIMEOUT = 1000L;
84
85
86 private static final Map<Class<?>, AtomicInteger> threadIds = new ConcurrentHashMap<Class<?>, AtomicInteger>();
87
88
89 private final Object lock = new Object();
90
91
92 private final String threadName;
93
94
95 private final Executor executor;
96
97
98 private final Queue<S> newSessions = new ConcurrentLinkedQueue<S>();
99
100
101 private final Queue<S> removingSessions = new ConcurrentLinkedQueue<S>();
102
103
104 private final Queue<S> flushingSessions = new ConcurrentLinkedQueue<S>();
105
106
107
108
109
110 private final Queue<S> trafficControllingSessions = new ConcurrentLinkedQueue<S>();
111
112
113 private Processor processor;
114
115 private long lastIdleCheckTime;
116
117 private final Object disposalLock = new Object();
118
119 private volatile boolean disposing;
120
121 private volatile boolean disposed;
122
123 private final DefaultIoFuture disposalFuture = new DefaultIoFuture(null);
124
125 protected AtomicBoolean wakeupCalled = new AtomicBoolean(false);
126
127
128
129
130
131
132
133
134 protected AbstractPollingIoProcessor(Executor executor) {
135 if (executor == null) {
136 throw new IllegalArgumentException("executor");
137 }
138
139 this.threadName = nextThreadName();
140 this.executor = executor;
141 }
142
143
144
145
146
147
148
149
150
151 private String nextThreadName() {
152 Class<?> cls = getClass();
153 int newThreadId;
154
155
156
157
158 synchronized (threadIds) {
159
160 AtomicInteger threadId = threadIds.get(cls);
161
162 if (threadId == null) {
163
164
165
166 newThreadId = 1;
167 threadIds.put(cls, new AtomicInteger(newThreadId));
168 } else {
169
170 newThreadId = threadId.incrementAndGet();
171 }
172 }
173
174
175 return cls.getSimpleName() + '-' + newThreadId;
176 }
177
178
179
180
181 public final boolean isDisposing() {
182 return disposing;
183 }
184
185
186
187
188 public final boolean isDisposed() {
189 return disposed;
190 }
191
192
193
194
195 public final void dispose() {
196 if (disposed || disposing) {
197 return;
198 }
199
200 synchronized (disposalLock) {
201 disposing = true;
202 startupProcessor();
203 }
204
205 disposalFuture.awaitUninterruptibly();
206 disposed = true;
207 }
208
209
210
211
212
213
214
215 protected abstract void doDispose() throws Exception;
216
217
218
219
220
221
222
223
224
225
226 protected abstract int select(long timeout) throws Exception;
227
228
229
230
231
232
233
234
235 protected abstract int select() throws Exception;
236
237
238
239
240
241
242
243 protected abstract boolean isSelectorEmpty();
244
245
246
247
248 protected abstract void wakeup();
249
250
251
252
253
254
255
256 protected abstract Iterator<S> allSessions();
257
258
259
260
261
262
263 protected abstract Iterator<S> selectedSessions();
264
265
266
267
268
269
270
271
272 protected abstract SessionState getState(S session);
273
274
275
276
277
278
279
280
281 protected abstract boolean isWritable(S session);
282
283
284
285
286
287
288
289
290 protected abstract boolean isReadable(S session);
291
292
293
294
295
296
297
298
299
300 protected abstract void setInterestedInWrite(S session, boolean isInterested)
301 throws Exception;
302
303
304
305
306
307
308
309
310
311 protected abstract void setInterestedInRead(S session, boolean isInterested)
312 throws Exception;
313
314
315
316
317
318
319
320
321 protected abstract boolean isInterestedInRead(S session);
322
323
324
325
326
327
328
329
330 protected abstract boolean isInterestedInWrite(S session);
331
332
333
334
335
336
337
338 protected abstract void init(S session) throws Exception;
339
340
341
342
343
344
345
346
347
348 protected abstract void destroy(S session) throws Exception;
349
350
351
352
353
354
355
356
357
358
359
360
361
362 protected abstract int read(S session, IoBuffer buf) throws Exception;
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379 protected abstract int write(S session, IoBuffer buf, int length)
380 throws Exception;
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398 protected abstract int transferFile(S session, FileRegion region, int length)
399 throws Exception;
400
401
402
403
404 public final void add(S session) {
405 if (disposed || disposing) {
406 throw new IllegalStateException("Already disposed.");
407 }
408
409
410 newSessions.add(session);
411 startupProcessor();
412 }
413
414
415
416
417 public final void remove(S session) {
418 scheduleRemove(session);
419 startupProcessor();
420 }
421
422 private void scheduleRemove(S session) {
423 removingSessions.add(session);
424 }
425
426
427
428
429 public final void flush(S session) {
430
431
432 if (session.setScheduledForFlush( true )) {
433 flushingSessions.add(session);
434 wakeup();
435 }
436 }
437
438 private void scheduleFlush(S session) {
439
440
441 if (session.setScheduledForFlush(true)) {
442 flushingSessions.add(session);
443 }
444 }
445
446
447
448
449 public final void updateTrafficMask(S session) {
450 trafficControllingSessions.add(session);
451 wakeup();
452 }
453
454
455
456
457
458 private void startupProcessor() {
459 synchronized (lock) {
460 if (processor == null) {
461 processor = new Processor();
462 executor.execute(new NamePreservingRunnable(processor, threadName));
463 }
464 }
465
466
467
468 wakeup();
469 }
470
471
472
473
474
475
476
477 private int handleNewSessions() {
478 int addedSessions = 0;
479
480 for (S session = newSessions.poll(); session != null; session = newSessions.poll()) {
481 if (addNow(session)) {
482
483 addedSessions++;
484 }
485 }
486
487 return addedSessions;
488 }
489
490
491
492
493
494
495
496
497
498
499 private boolean addNow(S session) {
500 boolean registered = false;
501
502 try {
503 init(session);
504 registered = true;
505
506
507 IoFilterChainBuilder chainBuilder = session.getService().getFilterChainBuilder();
508 chainBuilder.buildFilterChain(session.getFilterChain());
509
510
511
512
513 IoServiceListenerSupport listeners = ((AbstractIoService) session.getService()).getListeners();
514 listeners.fireSessionCreated(session);
515 } catch (Throwable e) {
516 ExceptionMonitor.getInstance().exceptionCaught(e);
517
518 try {
519 destroy(session);
520 } catch (Exception e1) {
521 ExceptionMonitor.getInstance().exceptionCaught(e1);
522 } finally {
523 registered = false;
524 }
525 }
526
527 return registered;
528 }
529
530 private int removeSessions() {
531 int removedSessions = 0;
532
533 for (S session = removingSessions.poll(); session != null; session = removingSessions.poll()) {
534 SessionState state = getState(session);
535
536
537 switch (state) {
538 case OPENED:
539
540 if (removeNow(session)) {
541 removedSessions++;
542 }
543
544 break;
545
546 case CLOSING:
547
548 break;
549
550 case OPENING:
551
552
553 newSessions.remove(session);
554
555 if (removeNow(session)) {
556 removedSessions++;
557 }
558
559 break;
560
561 default:
562 throw new IllegalStateException(String.valueOf(state));
563 }
564 }
565
566 return removedSessions;
567 }
568
569 private boolean removeNow(S session) {
570 clearWriteRequestQueue(session);
571
572 try {
573 destroy(session);
574 return true;
575 } catch (Exception e) {
576 IoFilterChain filterChain = session.getFilterChain();
577 filterChain.fireExceptionCaught(e);
578 } finally {
579 clearWriteRequestQueue(session);
580 ((AbstractIoService) session.getService()).getListeners()
581 .fireSessionDestroyed(session);
582 }
583 return false;
584 }
585
586 private void clearWriteRequestQueue(S session) {
587 WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
588 WriteRequest req;
589
590 List<WriteRequest> failedRequests = new ArrayList<WriteRequest>();
591
592 if ((req = writeRequestQueue.poll(session)) != null) {
593 Object message = req.getMessage();
594
595 if (message instanceof IoBuffer) {
596 IoBuffer buf = (IoBuffer)message;
597
598
599
600 if (buf.hasRemaining()) {
601 buf.reset();
602 failedRequests.add(req);
603 } else {
604 IoFilterChain filterChain = session.getFilterChain();
605 filterChain.fireMessageSent(req);
606 }
607 } else {
608 failedRequests.add(req);
609 }
610
611
612 while ((req = writeRequestQueue.poll(session)) != null) {
613 failedRequests.add(req);
614 }
615 }
616
617
618 if (!failedRequests.isEmpty()) {
619 WriteToClosedSessionException cause = new WriteToClosedSessionException(
620 failedRequests);
621
622 for (WriteRequest r : failedRequests) {
623 session.decreaseScheduledBytesAndMessages(r);
624 r.getFuture().setException(cause);
625 }
626
627 IoFilterChain filterChain = session.getFilterChain();
628 filterChain.fireExceptionCaught(cause);
629 }
630 }
631
632 private void process() throws Exception {
633 for (Iterator<S> i = selectedSessions(); i.hasNext();) {
634 S session = i.next();
635 process(session);
636 i.remove();
637 }
638 }
639
640
641
642
643 private void process(S session) {
644
645 if (isReadable(session) && !session.isReadSuspended()) {
646 read(session);
647 }
648
649
650 if (isWritable(session) && !session.isWriteSuspended()) {
651
652 if (session.setScheduledForFlush(true)) {
653 flushingSessions.add(session);
654 }
655 }
656 }
657
658 private void read(S session) {
659 IoSessionConfig config = session.getConfig();
660 int bufferSize = config.getReadBufferSize();
661 IoBuffer buf = IoBuffer.allocate(bufferSize);
662
663 final boolean hasFragmentation = session.getTransportMetadata()
664 .hasFragmentation();
665
666 try {
667 int readBytes = 0;
668 int ret;
669
670 try {
671 if (hasFragmentation) {
672
673 while ((ret = read(session, buf)) > 0) {
674 readBytes += ret;
675
676 if (!buf.hasRemaining()) {
677 break;
678 }
679 }
680 } else {
681 ret = read(session, buf);
682
683 if (ret > 0) {
684 readBytes = ret;
685 }
686 }
687 } finally {
688 buf.flip();
689 }
690
691 if (readBytes > 0) {
692 IoFilterChain filterChain = session.getFilterChain();
693 filterChain.fireMessageReceived(buf);
694 buf = null;
695
696 if (hasFragmentation) {
697 if (readBytes << 1 < config.getReadBufferSize()) {
698 session.decreaseReadBufferSize();
699 } else if (readBytes == config.getReadBufferSize()) {
700 session.increaseReadBufferSize();
701 }
702 }
703 }
704
705 if (ret < 0) {
706 scheduleRemove(session);
707 }
708 } catch (Throwable e) {
709 if (e instanceof IOException) {
710 if (!(e instanceof PortUnreachableException)
711 || !AbstractDatagramSessionConfig.class.isAssignableFrom(config.getClass())
712 || ((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable()) {
713 scheduleRemove(session);
714 }
715 }
716
717 IoFilterChain filterChain = session.getFilterChain();
718 filterChain.fireExceptionCaught(e);
719 }
720 }
721
722
723 private static String byteArrayToHex( byte[] barray )
724 {
725 char[] c = new char[barray.length * 2];
726 int pos = 0;
727
728 for ( byte b : barray )
729 {
730 int bb = ( b & 0x00FF ) >> 4;
731 c[pos++] = ( char ) ( bb > 9 ? bb + 0x37 : bb + 0x30 );
732 bb = b & 0x0F;
733 c[pos++] = ( char ) ( bb > 9 ? bb + 0x37 : bb + 0x30 );
734 if ( pos > 60 )
735 {
736 break;
737 }
738 }
739
740 return new String( c );
741 }
742
743
744 private void notifyIdleSessions(long currentTime) throws Exception {
745
746 if (currentTime - lastIdleCheckTime >= SELECT_TIMEOUT) {
747 lastIdleCheckTime = currentTime;
748 AbstractIoSession.notifyIdleness(allSessions(), currentTime);
749 }
750 }
751
752
753
754
755 private void flush(long currentTime) {
756 if (flushingSessions.isEmpty()) {
757 return;
758 }
759
760 do {
761 S session = flushingSessions.poll();
762
763 if (session == null) {
764
765 break;
766 }
767
768
769
770 session.unscheduledForFlush();
771
772 SessionState state = getState(session);
773
774 switch (state) {
775 case OPENED:
776 try {
777 boolean flushedAll = flushNow(session, currentTime);
778
779 if (flushedAll
780 && !session.getWriteRequestQueue().isEmpty(session)
781 && !session.isScheduledForFlush()) {
782 scheduleFlush(session);
783 }
784 } catch (Exception e) {
785 scheduleRemove(session);
786 IoFilterChain filterChain = session.getFilterChain();
787 filterChain.fireExceptionCaught(e);
788 }
789
790 break;
791
792 case CLOSING:
793
794 break;
795
796 case OPENING:
797
798
799
800 scheduleFlush(session);
801 return;
802
803 default:
804 throw new IllegalStateException(String.valueOf(state));
805 }
806
807 } while (!flushingSessions.isEmpty());
808 }
809
810 private boolean flushNow(S session, long currentTime) {
811 if (!session.isConnected()) {
812 scheduleRemove(session);
813 return false;
814 }
815
816 final boolean hasFragmentation = session.getTransportMetadata()
817 .hasFragmentation();
818
819 final WriteRequestQueue writeRequestQueue = session
820 .getWriteRequestQueue();
821
822
823
824
825 final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()
826 + (session.getConfig().getMaxReadBufferSize() >>> 1);
827 int writtenBytes = 0;
828 WriteRequest req = null;
829
830 try {
831
832 setInterestedInWrite(session, false);
833
834 do {
835
836 req = session.getCurrentWriteRequest();
837
838 if (req == null) {
839 req = writeRequestQueue.poll(session);
840
841 if (req == null) {
842 break;
843 }
844
845 session.setCurrentWriteRequest(req);
846 }
847
848 int localWrittenBytes = 0;
849 Object message = req.getMessage();
850
851 if (message instanceof IoBuffer) {
852 localWrittenBytes = writeBuffer(session, req,
853 hasFragmentation, maxWrittenBytes - writtenBytes,
854 currentTime);
855
856 if (localWrittenBytes > 0
857 && ((IoBuffer) message).hasRemaining()) {
858
859 writtenBytes += localWrittenBytes;
860 setInterestedInWrite(session, true);
861 return false;
862 }
863 } else if (message instanceof FileRegion) {
864 localWrittenBytes = writeFile(session, req,
865 hasFragmentation, maxWrittenBytes - writtenBytes,
866 currentTime);
867
868
869
870
871
872
873 if (localWrittenBytes > 0
874 && ((FileRegion) message).getRemainingBytes() > 0) {
875 writtenBytes += localWrittenBytes;
876 setInterestedInWrite(session, true);
877 return false;
878 }
879 } else {
880 throw new IllegalStateException(
881 "Don't know how to handle message of type '"
882 + message.getClass().getName()
883 + "'. Are you missing a protocol encoder?");
884 }
885
886 if (localWrittenBytes == 0) {
887
888 setInterestedInWrite(session, true);
889 return false;
890 }
891
892 writtenBytes += localWrittenBytes;
893
894 if (writtenBytes >= maxWrittenBytes) {
895
896 scheduleFlush(session);
897 return false;
898 }
899 } while (writtenBytes < maxWrittenBytes);
900 } catch (Exception e) {
901 if (req != null) {
902 req.getFuture().setException(e);
903 }
904
905 IoFilterChain filterChain = session.getFilterChain();
906 filterChain.fireExceptionCaught(e);
907 return false;
908 }
909
910 return true;
911 }
912
913 private int writeBuffer(S session, WriteRequest req,
914 boolean hasFragmentation, int maxLength, long currentTime)
915 throws Exception {
916 IoBuffer buf = (IoBuffer) req.getMessage();
917 int localWrittenBytes = 0;
918
919 if (buf.hasRemaining()) {
920 int length;
921
922 if (hasFragmentation) {
923 length = Math.min(buf.remaining(), maxLength);
924 } else {
925 length = buf.remaining();
926 }
927
928 localWrittenBytes = write(session, buf, length);
929 }
930
931 session.increaseWrittenBytes(localWrittenBytes, currentTime);
932
933 if (!buf.hasRemaining() || !hasFragmentation && localWrittenBytes != 0) {
934
935 int pos = buf.position();
936 buf.reset();
937
938 fireMessageSent(session, req);
939
940
941 buf.position(pos);
942 }
943 return localWrittenBytes;
944 }
945
946 private int writeFile(S session, WriteRequest req,
947 boolean hasFragmentation, int maxLength, long currentTime)
948 throws Exception {
949 int localWrittenBytes;
950 FileRegion region = (FileRegion) req.getMessage();
951
952 if (region.getRemainingBytes() > 0) {
953 int length;
954
955 if (hasFragmentation) {
956 length = (int) Math.min(region.getRemainingBytes(), maxLength);
957 } else {
958 length = (int) Math.min(Integer.MAX_VALUE, region
959 .getRemainingBytes());
960 }
961
962 localWrittenBytes = transferFile(session, region, length);
963 region.update(localWrittenBytes);
964 } else {
965 localWrittenBytes = 0;
966 }
967
968 session.increaseWrittenBytes(localWrittenBytes, currentTime);
969
970 if (region.getRemainingBytes() <= 0 || !hasFragmentation
971 && localWrittenBytes != 0) {
972 fireMessageSent(session, req);
973 }
974
975 return localWrittenBytes;
976 }
977
978 private void fireMessageSent(S session, WriteRequest req) {
979 session.setCurrentWriteRequest(null);
980 IoFilterChain filterChain = session.getFilterChain();
981 filterChain.fireMessageSent(req);
982 }
983
984
985
986
987 private void updateTrafficMask() {
988 int queueSize = trafficControllingSessions.size();
989
990 while (queueSize > 0) {
991 S session = trafficControllingSessions.poll();
992
993 if (session == null) {
994
995 return;
996 }
997
998 SessionState state = getState(session);
999
1000 switch (state) {
1001 case OPENED:
1002 updateTrafficControl(session);
1003
1004 break;
1005
1006 case CLOSING:
1007 break;
1008
1009 case OPENING:
1010
1011
1012
1013
1014 trafficControllingSessions.add(session);
1015 break;
1016
1017 default:
1018 throw new IllegalStateException(String.valueOf(state));
1019 }
1020
1021
1022
1023
1024
1025 queueSize--;
1026 }
1027 }
1028
1029
1030
1031
1032 public void updateTrafficControl(S session) {
1033
1034 try {
1035 setInterestedInRead(session, !session.isReadSuspended());
1036 } catch (Exception e) {
1037 IoFilterChain filterChain = session.getFilterChain();
1038 filterChain.fireExceptionCaught(e);
1039 }
1040
1041 try {
1042 setInterestedInWrite(session, !session.getWriteRequestQueue()
1043 .isEmpty(session)
1044 && !session.isWriteSuspended());
1045 } catch (Exception e) {
1046 IoFilterChain filterChain = session.getFilterChain();
1047 filterChain.fireExceptionCaught(e);
1048 }
1049 }
1050
1051
1052
1053
1054
1055
1056
1057 private class Processor implements Runnable {
1058 public void run() {
1059 int nSessions = 0;
1060 lastIdleCheckTime = System.currentTimeMillis();
1061
1062 for (;;) {
1063 try {
1064
1065
1066
1067
1068 int selected = select(SELECT_TIMEOUT);
1069
1070
1071 nSessions += handleNewSessions();
1072
1073 updateTrafficMask();
1074
1075
1076
1077 if (selected > 0) {
1078
1079 process();
1080 }
1081
1082
1083 long currentTime = System.currentTimeMillis();
1084 flush(currentTime);
1085
1086
1087 nSessions -= removeSessions();
1088
1089
1090 notifyIdleSessions(currentTime);
1091
1092
1093
1094 if (nSessions == 0) {
1095 synchronized (lock) {
1096 if (newSessions.isEmpty() && isSelectorEmpty()) {
1097 processor = null;
1098 break;
1099 }
1100 }
1101 }
1102
1103
1104
1105 if (isDisposing()) {
1106 for (Iterator<S> i = allSessions(); i.hasNext();) {
1107 scheduleRemove(i.next());
1108 }
1109
1110 wakeup();
1111 }
1112 } catch (ClosedSelectorException cse) {
1113
1114 break;
1115 } catch (Throwable t) {
1116 ExceptionMonitor.getInstance().exceptionCaught(t);
1117
1118 try {
1119 Thread.sleep(1000);
1120 } catch (InterruptedException e1) {
1121 ExceptionMonitor.getInstance().exceptionCaught(e1);
1122 }
1123 }
1124 }
1125
1126 try {
1127 synchronized (disposalLock) {
1128 if (disposing) {
1129 doDispose();
1130 }
1131 }
1132 } catch (Throwable t) {
1133 ExceptionMonitor.getInstance().exceptionCaught(t);
1134 } finally {
1135 disposalFuture.setValue(true);
1136 }
1137 }
1138 }
1139 }