1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.core.polling;
21
22 import java.io.IOException;
23 import java.nio.channels.SelectionKey;
24 import java.util.ArrayList;
25 import java.util.HashMap;
26 import java.util.Iterator;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Queue;
30 import java.util.concurrent.ConcurrentLinkedQueue;
31 import java.util.concurrent.Executor;
32 import java.util.concurrent.atomic.AtomicInteger;
33
34 import org.apache.mina.core.ExceptionMonitor;
35 import org.apache.mina.core.buffer.IoBuffer;
36 import org.apache.mina.core.file.FileRegion;
37 import org.apache.mina.core.future.DefaultIoFuture;
38 import org.apache.mina.core.service.AbstractIoService;
39 import org.apache.mina.core.service.IoProcessor;
40 import org.apache.mina.core.session.AbstractIoSession;
41 import org.apache.mina.core.session.IdleStatusChecker;
42 import org.apache.mina.core.session.IoSessionConfig;
43 import org.apache.mina.core.write.WriteRequest;
44 import org.apache.mina.core.write.WriteRequestQueue;
45 import org.apache.mina.core.write.WriteToClosedSessionException;
46 import org.apache.mina.util.NamePreservingRunnable;
47
48
49
50
51
52
53
54
55 public abstract class AbstractPollingIoProcessor<T extends AbstractIoSession> implements IoProcessor<T> {
56
57
58
59
60
61
62 private static final int WRITE_SPIN_COUNT = 256;
63
64
65 private static final Map<Class<?>, AtomicInteger> threadIds =
66 new HashMap<Class<?>, AtomicInteger>();
67
68 private final Object lock = new Object();
69 private final String threadName;
70 private final Executor executor;
71
72 private final Queue<T> newSessions = new ConcurrentLinkedQueue<T>();
73 private final Queue<T> removingSessions = new ConcurrentLinkedQueue<T>();
74 private final Queue<T> flushingSessions = new ConcurrentLinkedQueue<T>();
75 private final Queue<T> trafficControllingSessions = new ConcurrentLinkedQueue<T>();
76
77 private Worker worker;
78 private long lastIdleCheckTime;
79
80 private final Object disposalLock = new Object();
81 private volatile boolean disposing;
82 private volatile boolean disposed;
83 private final DefaultIoFuture disposalFuture = new DefaultIoFuture(null);
84
85 protected AbstractPollingIoProcessor(Executor executor) {
86 if (executor == null) {
87 throw new NullPointerException("executor");
88 }
89
90 this.threadName = nextThreadName();
91 this.executor = executor;
92 }
93
94
95
96
97
98
99
100
101
102 private String nextThreadName() {
103 Class<?> cls = getClass();
104 int newThreadId;
105
106
107
108
109 synchronized( threadIds ) {
110
111 AtomicInteger threadId = threadIds.get(cls);
112
113 if (threadId == null) {
114
115
116
117 newThreadId = 1;
118 threadIds.put(cls, new AtomicInteger(newThreadId));
119 } else {
120
121 newThreadId = threadId.incrementAndGet();
122 }
123 }
124
125
126 return cls.getSimpleName() + '-' + newThreadId;
127 }
128
129
130
131
132 public final boolean isDisposing() {
133 return disposing;
134 }
135
136
137
138
139 public final boolean isDisposed() {
140 return disposed;
141 }
142
143
144
145
146 public final void dispose() {
147 if (disposed) {
148 return;
149 }
150
151 synchronized (disposalLock) {
152 if (!disposing) {
153 disposing = true;
154 startupWorker();
155 }
156 }
157
158 disposalFuture.awaitUninterruptibly();
159 disposed = true;
160 }
161
162 protected abstract void dispose0() throws Exception;
163
164
165
166
167
168
169
170 protected abstract boolean select(int timeout) throws Exception;
171 protected abstract boolean isSelectorEmpty();
172
173
174
175
176 protected abstract void wakeup();
177 protected abstract Iterator<T> allSessions();
178 protected abstract Iterator<T> selectedSessions();
179 protected abstract SessionState state(T session);
180
181
182
183
184
185
186 protected abstract boolean isWritable(T session);
187
188
189
190
191
192
193 protected abstract boolean isReadable(T session);
194
195
196
197
198
199
200 protected abstract void setInterestedInWrite(T session, boolean interested)
201 throws Exception;
202
203
204
205
206
207
208 protected abstract void setInterestedInRead(T session, boolean interested)
209 throws Exception;
210
211
212
213
214
215
216 protected abstract boolean isInterestedInRead(T session);
217
218
219
220
221
222
223 protected abstract boolean isInterestedInWrite(T session);
224
225 protected abstract void init(T session) throws Exception;
226 protected abstract void destroy(T session) throws Exception;
227 protected abstract int read(T session, IoBuffer buf) throws Exception;
228 protected abstract int write(T session, IoBuffer buf, int length) throws Exception;
229 protected abstract int transferFile(T session, FileRegion region, int length) throws Exception;
230
231
232
233
234 public final void add(T session) {
235 if (isDisposing()) {
236 throw new IllegalStateException("Already disposed.");
237 }
238
239 newSessions.add(session);
240 startupWorker();
241 }
242
243
244
245
246 public final void remove(T session) {
247 scheduleRemove(session);
248 startupWorker();
249 }
250
251 private void scheduleRemove(T session) {
252 removingSessions.add(session);
253 }
254
255
256
257
258 public final void flush(T session) {
259
260
261
262
263
264
265
266
267 boolean needsWakeup = flushingSessions.isEmpty();
268 if (scheduleFlush(session) && needsWakeup) {
269 wakeup();
270 }
271 }
272
273 private boolean scheduleFlush(T session) {
274 if (session.setScheduledForFlush(true)) {
275 flushingSessions.add(session);
276 return true;
277 }
278 return false;
279 }
280
281
282
283
284 public final void updateTrafficMask(T session) {
285 scheduleTrafficControl(session);
286 wakeup();
287 }
288
289 private void scheduleTrafficControl(T session) {
290 trafficControllingSessions.add(session);
291 }
292
293 private void startupWorker() {
294 synchronized (lock) {
295 if (worker == null) {
296 worker = new Worker();
297 executor.execute(new NamePreservingRunnable(worker, threadName));
298 }
299 }
300 wakeup();
301 }
302
303 private int add() {
304 int addedSessions = 0;
305
306
307
308 for (;;) {
309 T session = newSessions.poll();
310
311 if (session == null) {
312
313 break;
314 }
315
316
317 if (addNow(session)) {
318
319 addedSessions ++;
320 }
321 }
322
323 return addedSessions;
324 }
325
326 private boolean addNow(T session) {
327
328 boolean registered = false;
329 boolean notified = false;
330 try {
331 init(session);
332 registered = true;
333
334
335 session.getService().getFilterChainBuilder().buildFilterChain(
336 session.getFilterChain());
337
338
339
340 ((AbstractIoService) session.getService()).getListeners().fireSessionCreated(session);
341 notified = true;
342 } catch (Throwable e) {
343 if (notified) {
344
345
346 scheduleRemove(session);
347 session.getFilterChain().fireExceptionCaught(e);
348 wakeup();
349 } else {
350 ExceptionMonitor.getInstance().exceptionCaught(e);
351 try {
352 destroy(session);
353 } catch (Exception e1) {
354 ExceptionMonitor.getInstance().exceptionCaught(e1);
355 } finally {
356 registered = false;
357 }
358 }
359 }
360 return registered;
361 }
362
363 private int remove() {
364 int removedSessions = 0;
365 for (; ;) {
366 T session = removingSessions.poll();
367
368 if (session == null) {
369 break;
370 }
371
372 SessionState state = state(session);
373 switch (state) {
374 case OPEN:
375 if (removeNow(session)) {
376 removedSessions ++;
377 }
378 break;
379 case CLOSED:
380
381 break;
382 case PREPARING:
383
384
385 scheduleRemove(session);
386 return removedSessions;
387 default:
388 throw new IllegalStateException(String.valueOf(state));
389 }
390 }
391
392 return removedSessions;
393 }
394
395 private boolean removeNow(T session) {
396 clearWriteRequestQueue(session);
397
398 try {
399 destroy(session);
400 return true;
401 } catch (Exception e) {
402 session.getFilterChain().fireExceptionCaught(e);
403 } finally {
404 clearWriteRequestQueue(session);
405 ((AbstractIoService) session.getService()).getListeners().fireSessionDestroyed(session);
406 }
407 return false;
408 }
409
410 private void clearWriteRequestQueue(T session) {
411 WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
412 WriteRequest req;
413
414 List<WriteRequest> failedRequests = new ArrayList<WriteRequest>();
415
416 if ((req = writeRequestQueue.poll(session)) != null) {
417 Object m = req.getMessage();
418 if (m instanceof IoBuffer) {
419 IoBuffer buf = (IoBuffer) req.getMessage();
420
421
422
423 if (buf.hasRemaining()) {
424 buf.reset();
425 failedRequests.add(req);
426 } else {
427 session.getFilterChain().fireMessageSent(req);
428 }
429 } else {
430 failedRequests.add(req);
431 }
432
433
434 while ((req = writeRequestQueue.poll(session)) != null) {
435 failedRequests.add(req);
436 }
437 }
438
439
440 if (!failedRequests.isEmpty()) {
441 WriteToClosedSessionException cause = new WriteToClosedSessionException(failedRequests);
442 for (WriteRequest r: failedRequests) {
443 session.decreaseScheduledBytesAndMessages(r);
444 r.getFuture().setException(cause);
445 }
446 session.getFilterChain().fireExceptionCaught(cause);
447 }
448 }
449
450 private void process() throws Exception {
451 for (Iterator<T> i = selectedSessions(); i.hasNext();) {
452 process(i.next());
453 i.remove();
454 }
455 }
456
457 private void process(T session) {
458
459 if (isReadable(session) && session.getTrafficMask().isReadable()) {
460 read(session);
461 }
462
463 if (isWritable(session) && session.getTrafficMask().isWritable()) {
464 scheduleFlush(session);
465 }
466 }
467
468 private void read(T session) {
469 IoSessionConfig config = session.getConfig();
470 IoBuffer buf = IoBuffer.allocate(config.getReadBufferSize());
471
472 final boolean hasFragmentation =
473 session.getTransportMetadata().hasFragmentation();
474
475 try {
476 int readBytes = 0;
477 int ret;
478
479 try {
480 if (hasFragmentation) {
481 while ((ret = read(session, buf)) > 0) {
482 readBytes += ret;
483 if (!buf.hasRemaining()) {
484 break;
485 }
486 }
487 } else {
488 ret = read(session, buf);
489 if (ret > 0) {
490 readBytes = ret;
491 }
492 }
493 } finally {
494 buf.flip();
495 }
496
497 if (readBytes > 0) {
498 session.getFilterChain().fireMessageReceived(buf);
499 buf = null;
500
501 if (hasFragmentation) {
502 if (readBytes << 1 < config.getReadBufferSize()) {
503 session.decreaseReadBufferSize();
504 } else if (readBytes == config.getReadBufferSize()) {
505 session.increaseReadBufferSize();
506 }
507 }
508 }
509 if (ret < 0) {
510 scheduleRemove(session);
511 }
512 } catch (Throwable e) {
513 if (e instanceof IOException) {
514 scheduleRemove(session);
515 }
516 session.getFilterChain().fireExceptionCaught(e);
517 }
518 }
519
520 private void notifyIdleSessions(long currentTime) throws Exception {
521
522 if (currentTime - lastIdleCheckTime >= 1000) {
523 lastIdleCheckTime = currentTime;
524 IdleStatusChecker.notifyIdleness(allSessions(), currentTime);
525 }
526 }
527
528 private void flush(long currentTime) {
529 final T firstSession = flushingSessions.peek();
530 if (firstSession == null) {
531 return;
532 }
533
534 T session = flushingSessions.poll();
535 for (; ;) {
536 session.setScheduledForFlush(false);
537 SessionState state = state(session);
538 switch (state) {
539 case OPEN:
540 try {
541 boolean flushedAll = flushNow(session, currentTime);
542 if (flushedAll && !session.getWriteRequestQueue().isEmpty(session) &&
543 !session.isScheduledForFlush()) {
544 scheduleFlush(session);
545 }
546 } catch (Exception e) {
547 scheduleRemove(session);
548 session.getFilterChain().fireExceptionCaught(e);
549 }
550 break;
551 case CLOSED:
552
553 break;
554 case PREPARING:
555
556
557 scheduleFlush(session);
558 return;
559 default:
560 throw new IllegalStateException(String.valueOf(state));
561 }
562
563 session = flushingSessions.peek();
564 if (session == null || session == firstSession) {
565 break;
566 }
567 session = flushingSessions.poll();
568 }
569 }
570
571 private boolean flushNow(T session, long currentTime) {
572 if (!session.isConnected()) {
573 scheduleRemove(session);
574 return false;
575 }
576
577 final boolean hasFragmentation =
578 session.getTransportMetadata().hasFragmentation();
579
580 final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
581
582
583
584
585 final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize() +
586 (session.getConfig().getMaxReadBufferSize() >>> 1);
587 int writtenBytes = 0;
588 try {
589
590 setInterestedInWrite(session, false);
591 do {
592
593 WriteRequest req = session.getCurrentWriteRequest();
594 if (req == null) {
595 req = writeRequestQueue.poll(session);
596 if (req == null) {
597 break;
598 }
599 session.setCurrentWriteRequest(req);
600 }
601
602 int localWrittenBytes = 0;
603 Object message = req.getMessage();
604 if (message instanceof IoBuffer) {
605 localWrittenBytes = writeBuffer(
606 session, req, hasFragmentation,
607 maxWrittenBytes - writtenBytes,
608 currentTime);
609 } else if (message instanceof FileRegion) {
610 localWrittenBytes = writeFile(
611 session, req, hasFragmentation,
612 maxWrittenBytes - writtenBytes,
613 currentTime);
614
615
616
617
618 if (localWrittenBytes > 0 && ((FileRegion) message).getRemainingBytes() > 0) {
619 writtenBytes += localWrittenBytes;
620 setInterestedInWrite(session, true);
621 return false;
622 }
623 } else {
624 throw new IllegalStateException("Don't know how to handle message of type '" + message.getClass().getName() + "'. Are you missing a protocol encoder?");
625 }
626
627 if (localWrittenBytes == 0) {
628
629 setInterestedInWrite(session, true);
630 return false;
631 }
632
633 writtenBytes += localWrittenBytes;
634
635 if (writtenBytes >= maxWrittenBytes) {
636
637 scheduleFlush(session);
638 return false;
639 }
640 } while (writtenBytes < maxWrittenBytes);
641 } catch (Exception e) {
642 session.getFilterChain().fireExceptionCaught(e);
643 return false;
644 }
645
646 return true;
647 }
648
649 private int writeBuffer(T session, WriteRequest req,
650 boolean hasFragmentation, int maxLength, long currentTime) throws Exception {
651 IoBuffer buf = (IoBuffer) req.getMessage();
652 int localWrittenBytes = 0;
653 if (buf.hasRemaining()) {
654 int length;
655 if (hasFragmentation) {
656 length = Math.min(buf.remaining(), maxLength);
657 } else {
658 length = buf.remaining();
659 }
660 for (int i = WRITE_SPIN_COUNT; i > 0; i --) {
661 localWrittenBytes = write(session, buf, length);
662 if (localWrittenBytes != 0) {
663 break;
664 }
665 }
666 }
667
668 session.increaseWrittenBytes(localWrittenBytes, currentTime);
669
670 if (!buf.hasRemaining() ||
671 !hasFragmentation && localWrittenBytes != 0) {
672
673 buf.reset();
674 fireMessageSent(session, req);
675 }
676 return localWrittenBytes;
677 }
678
679 private int writeFile(T session, WriteRequest req,
680 boolean hasFragmentation, int maxLength, long currentTime) throws Exception {
681 int localWrittenBytes;
682 FileRegion region = (FileRegion) req.getMessage();
683 if (region.getRemainingBytes() > 0) {
684 int length;
685 if (hasFragmentation) {
686 length = (int) Math.min(region.getRemainingBytes(), maxLength);
687 } else {
688 length = (int) Math.min(Integer.MAX_VALUE, region.getRemainingBytes());
689 }
690 localWrittenBytes = transferFile(session, region, length);
691 region.update(localWrittenBytes);
692 } else {
693 localWrittenBytes = 0;
694 }
695
696 session.increaseWrittenBytes(localWrittenBytes, currentTime);
697
698 if (region.getRemainingBytes() <= 0 ||
699 !hasFragmentation && localWrittenBytes != 0) {
700 fireMessageSent(session, req);
701 }
702
703 return localWrittenBytes;
704 }
705
706 private void fireMessageSent(T session, WriteRequest req) {
707 session.setCurrentWriteRequest(null);
708 session.getFilterChain().fireMessageSent(req);
709 }
710
711 private void updateTrafficMask() {
712 for (; ;) {
713 T session = trafficControllingSessions.poll();
714
715 if (session == null) {
716 break;
717 }
718
719 SessionState state = state(session);
720 switch (state) {
721 case OPEN:
722 updateTrafficMaskNow(session);
723 break;
724 case CLOSED:
725 break;
726 case PREPARING:
727
728
729
730 scheduleTrafficControl(session);
731 return;
732 default:
733 throw new IllegalStateException(String.valueOf(state));
734 }
735 }
736 }
737
738 private void updateTrafficMaskNow(T session) {
739
740
741 int mask = session.getTrafficMask().getInterestOps();
742 try {
743 setInterestedInRead(session, (mask & SelectionKey.OP_READ) != 0);
744 } catch (Exception e) {
745 session.getFilterChain().fireExceptionCaught(e);
746 }
747 try {
748 setInterestedInWrite(
749 session,
750 !session.getWriteRequestQueue().isEmpty(session) &&
751 (mask & SelectionKey.OP_WRITE) != 0);
752 } catch (Exception e) {
753 session.getFilterChain().fireExceptionCaught(e);
754 }
755 }
756
757
758
759
760
761
762 private class Worker implements Runnable {
763 public void run() {
764 int nSessions = 0;
765 lastIdleCheckTime = System.currentTimeMillis();
766
767 for (;;) {
768 try {
769 boolean selected = select(1000);
770
771 nSessions += add();
772 updateTrafficMask();
773
774 if (selected) {
775 process();
776 }
777
778 long currentTime = System.currentTimeMillis();
779 flush(currentTime);
780 nSessions -= remove();
781 notifyIdleSessions(currentTime);
782
783 if (nSessions == 0) {
784 synchronized (lock) {
785 if (newSessions.isEmpty() && isSelectorEmpty()) {
786 worker = null;
787 break;
788 }
789 }
790 }
791
792
793
794 if (isDisposing()) {
795 for (Iterator<T> i = allSessions(); i.hasNext(); ) {
796 scheduleRemove(i.next());
797 }
798 wakeup();
799 }
800 } catch (Throwable t) {
801 ExceptionMonitor.getInstance().exceptionCaught(t);
802
803 try {
804 Thread.sleep(1000);
805 } catch (InterruptedException e1) {
806 ExceptionMonitor.getInstance().exceptionCaught(e1);
807 }
808 }
809 }
810
811 try {
812 synchronized (disposalLock) {
813 if (isDisposing()) {
814 dispose0();
815 }
816 }
817 } catch (Throwable t) {
818 ExceptionMonitor.getInstance().exceptionCaught(t);
819 } finally {
820 disposalFuture.setValue(true);
821 }
822 }
823 }
824
825 protected static enum SessionState {
826 OPEN,
827 CLOSED,
828 PREPARING,
829 }
830 }