1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.common;
21
22 import java.io.IOException;
23 import java.nio.channels.SelectionKey;
24 import java.util.ArrayList;
25 import java.util.Iterator;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Queue;
29 import java.util.concurrent.ConcurrentLinkedQueue;
30 import java.util.concurrent.Executor;
31 import java.util.concurrent.atomic.AtomicInteger;
32
33 import org.apache.mina.util.CopyOnWriteMap;
34 import org.apache.mina.util.NamePreservingRunnable;
35
36
37
38
39
40
41
42
43 public abstract class AbstractPollingIoProcessor<T extends AbstractIoSession> implements IoProcessor<T> {
44
45
46
47
48
49
50 private static final int WRITE_SPIN_COUNT = 256;
51
52 private static final Map<Class<?>, AtomicInteger> threadIds =
53 new CopyOnWriteMap<Class<?>, AtomicInteger>();
54
55 private final Object lock = new Object();
56 private final String threadName;
57 private final Executor executor;
58
59 private final Queue<T> newSessions = new ConcurrentLinkedQueue<T>();
60 private final Queue<T> removingSessions = new ConcurrentLinkedQueue<T>();
61 private final Queue<T> flushingSessions = new ConcurrentLinkedQueue<T>();
62 private final Queue<T> trafficControllingSessions = new ConcurrentLinkedQueue<T>();
63
64 private Worker worker;
65 private long lastIdleCheckTime;
66
67 private final Object disposalLock = new Object();
68 private volatile boolean disposing;
69 private volatile boolean disposed;
70 private final DefaultIoFuture disposalFuture = new DefaultIoFuture(null);
71
72 protected AbstractPollingIoProcessor(Executor executor) {
73 if (executor == null) {
74 throw new NullPointerException("executor");
75 }
76
77 this.threadName = nextThreadName();
78 this.executor = executor;
79 }
80
81 private String nextThreadName() {
82 Class<?> cls = getClass();
83 AtomicInteger threadId = threadIds.get(cls);
84 int newThreadId;
85 if (threadId == null) {
86 newThreadId = 1;
87 threadIds.put(cls, new AtomicInteger(newThreadId));
88 } else {
89 newThreadId = threadId.incrementAndGet();
90 }
91
92 return cls.getSimpleName() + '-' + newThreadId;
93 }
94
95 public final boolean isDisposing() {
96 return disposing;
97 }
98
99 public final boolean isDisposed() {
100 return disposed;
101 }
102
103 public final void dispose() {
104 if (disposed) {
105 return;
106 }
107
108 synchronized (disposalLock) {
109 if (!disposing) {
110 disposing = true;
111 startupWorker();
112 }
113 }
114
115 disposalFuture.awaitUninterruptibly();
116 disposed = true;
117 }
118
119 protected abstract void dispose0() throws Exception;
120
121
122
123
124
125
126
127 protected abstract boolean select(int timeout) throws Exception;
128 protected abstract void wakeup();
129 protected abstract Iterator<T> allSessions();
130 protected abstract Iterator<T> selectedSessions();
131 protected abstract SessionState state(T session);
132
133
134
135
136
137
138 protected abstract boolean isWritable(T session);
139
140
141
142
143
144
145 protected abstract boolean isReadable(T session);
146
147
148
149
150
151
152 protected abstract void setInterestedInWrite(T session, boolean interested)
153 throws Exception;
154
155
156
157
158
159
160 protected abstract void setInterestedInRead(T session, boolean interested)
161 throws Exception;
162
163
164
165
166
167
168 protected abstract boolean isInterestedInRead(T session);
169
170
171
172
173
174
175 protected abstract boolean isInterestedInWrite(T session);
176
177 protected abstract void init(T session) throws Exception;
178 protected abstract void destroy(T session) throws Exception;
179 protected abstract int read(T session, IoBuffer buf) throws Exception;
180 protected abstract int write(T session, IoBuffer buf, int length) throws Exception;
181 protected abstract int transferFile(T session, FileRegion region, int length) throws Exception;
182
183 public final void add(T session) {
184 if (isDisposing()) {
185 throw new IllegalStateException("Already disposed.");
186 }
187
188 newSessions.add(session);
189 startupWorker();
190 }
191
192 public final void remove(T session) {
193 scheduleRemove(session);
194 startupWorker();
195 }
196
197 private void scheduleRemove(T session) {
198 removingSessions.add(session);
199 }
200
201 public final void flush(T session) {
202 boolean needsWakeup = flushingSessions.isEmpty();
203 if (scheduleFlush(session) && needsWakeup) {
204 wakeup();
205 }
206 }
207
208 private boolean scheduleFlush(T session) {
209 if (session.setScheduledForFlush(true)) {
210 flushingSessions.add(session);
211 return true;
212 }
213 return false;
214 }
215
216 public final void updateTrafficMask(T session) {
217 scheduleTrafficControl(session);
218 wakeup();
219 }
220
221 private void scheduleTrafficControl(T session) {
222 trafficControllingSessions.add(session);
223 }
224
225 private void startupWorker() {
226 synchronized (lock) {
227 if (worker == null) {
228 worker = new Worker();
229 executor.execute(new NamePreservingRunnable(worker, threadName));
230 }
231 }
232 wakeup();
233 }
234
235 private int add() {
236 int addedSessions = 0;
237 for (; ;) {
238 T session = newSessions.poll();
239
240 if (session == null) {
241 break;
242 }
243
244
245 if (addNow(session)) {
246 addedSessions ++;
247 }
248 }
249
250 return addedSessions;
251 }
252
253 private boolean addNow(T session) {
254
255 boolean registered = false;
256 boolean notified = false;
257 try {
258 init(session);
259 registered = true;
260
261
262 session.getService().getFilterChainBuilder().buildFilterChain(
263 session.getFilterChain());
264
265
266
267 ((AbstractIoService) session.getService()).getListeners().fireSessionCreated(session);
268 notified = true;
269 } catch (Throwable e) {
270 if (notified) {
271
272
273 scheduleRemove(session);
274 session.getFilterChain().fireExceptionCaught(e);
275 wakeup();
276 } else {
277 ExceptionMonitor.getInstance().exceptionCaught(e);
278 try {
279 destroy(session);
280 } catch (Exception e1) {
281 ExceptionMonitor.getInstance().exceptionCaught(e1);
282 } finally {
283 registered = false;
284 }
285 }
286 }
287 return registered;
288 }
289
290 private int remove() {
291 int removedSessions = 0;
292 for (; ;) {
293 T session = removingSessions.poll();
294
295 if (session == null) {
296 break;
297 }
298
299 SessionState state = state(session);
300 switch (state) {
301 case OPEN:
302 if (removeNow(session)) {
303 removedSessions ++;
304 }
305 break;
306 case CLOSED:
307
308 break;
309 case PREPARING:
310
311
312 scheduleRemove(session);
313 return removedSessions;
314 default:
315 throw new IllegalStateException(String.valueOf(state));
316 }
317 }
318
319 return removedSessions;
320 }
321
322 private boolean removeNow(T session) {
323 clearWriteRequestQueue(session);
324
325 try {
326 destroy(session);
327 return true;
328 } catch (Exception e) {
329 session.getFilterChain().fireExceptionCaught(e);
330 } finally {
331 clearWriteRequestQueue(session);
332 ((AbstractIoService) session.getService()).getListeners().fireSessionDestroyed(session);
333 }
334 return false;
335 }
336
337 private void clearWriteRequestQueue(T session) {
338 WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
339 WriteRequest req;
340
341 List<WriteRequest> failedRequests = new ArrayList<WriteRequest>();
342
343 if ((req = writeRequestQueue.poll(session)) != null) {
344 Object m = req.getMessage();
345 if (m instanceof IoBuffer) {
346 IoBuffer buf = (IoBuffer) req.getMessage();
347
348
349
350 if (buf.hasRemaining()) {
351 buf.reset();
352 failedRequests.add(req);
353 } else {
354 session.getFilterChain().fireMessageSent(req);
355 }
356 } else {
357 failedRequests.add(req);
358 }
359
360
361 while ((req = writeRequestQueue.poll(session)) != null) {
362 failedRequests.add(req);
363 }
364 }
365
366
367 if (!failedRequests.isEmpty()) {
368 WriteToClosedSessionException cause = new WriteToClosedSessionException(failedRequests);
369 for (WriteRequest r: failedRequests) {
370 session.decreaseScheduledBytesAndMessages(r);
371 r.getFuture().setException(cause);
372 }
373 session.getFilterChain().fireExceptionCaught(cause);
374 }
375 }
376
377 private void process() throws Exception {
378 for (Iterator<T> i = selectedSessions(); i.hasNext();) {
379 process(i.next());
380 i.remove();
381 }
382 }
383
384 private void process(T session) {
385
386 if (isReadable(session) && session.getTrafficMask().isReadable()) {
387 read(session);
388 }
389
390 if (isWritable(session) && session.getTrafficMask().isWritable()) {
391 scheduleFlush(session);
392 }
393 }
394
395 private void read(T session) {
396 IoSessionConfig config = session.getConfig();
397 IoBuffer buf = IoBuffer.allocate(config.getReadBufferSize());
398
399 final boolean hasFragmentation =
400 session.getTransportMetadata().hasFragmentation();
401
402 try {
403 int readBytes = 0;
404 int ret;
405
406 try {
407 if (hasFragmentation) {
408 while ((ret = read(session, buf)) > 0) {
409 readBytes += ret;
410 if (!buf.hasRemaining()) {
411 break;
412 }
413 }
414 } else {
415 ret = read(session, buf);
416 if (ret > 0) {
417 readBytes = ret;
418 }
419 }
420 } finally {
421 buf.flip();
422 }
423
424 if (readBytes > 0) {
425 session.getFilterChain().fireMessageReceived(buf);
426 buf = null;
427
428 if (hasFragmentation) {
429 if (readBytes << 1 < config.getReadBufferSize()) {
430 session.decreaseReadBufferSize();
431 } else if (readBytes == config.getReadBufferSize()) {
432 session.increaseReadBufferSize();
433 }
434 }
435 }
436 if (ret < 0) {
437 scheduleRemove(session);
438 }
439 } catch (Throwable e) {
440 if (e instanceof IOException) {
441 scheduleRemove(session);
442 }
443 session.getFilterChain().fireExceptionCaught(e);
444 }
445 }
446
447 private void notifyIdleSessions() throws Exception {
448
449 long currentTime = System.currentTimeMillis();
450 if (currentTime - lastIdleCheckTime >= 1000) {
451 lastIdleCheckTime = currentTime;
452 IdleStatusChecker.notifyIdleness(allSessions(), currentTime);
453 }
454 }
455
456 private void flush() {
457 for (; ;) {
458 T session = flushingSessions.poll();
459
460 if (session == null) {
461 break;
462 }
463
464 session.setScheduledForFlush(false);
465 SessionState state = state(session);
466 switch (state) {
467 case OPEN:
468 try {
469 boolean flushedAll = flushNow(session);
470 if (flushedAll && !session.getWriteRequestQueue().isEmpty(session) &&
471 !session.isScheduledForFlush()) {
472 scheduleFlush(session);
473 }
474 } catch (Exception e) {
475 scheduleRemove(session);
476 session.getFilterChain().fireExceptionCaught(e);
477 }
478 break;
479 case CLOSED:
480
481 break;
482 case PREPARING:
483
484
485 scheduleFlush(session);
486 return;
487 default:
488 throw new IllegalStateException(String.valueOf(state));
489 }
490 }
491 }
492
493 private boolean flushNow(T session) {
494 if (!session.isConnected()) {
495 scheduleRemove(session);
496 return false;
497 }
498
499 final boolean hasFragmentation =
500 session.getTransportMetadata().hasFragmentation();
501
502 try {
503
504 setInterestedInWrite(session, false);
505
506 WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
507
508
509
510
511 int maxWrittenBytes = session.getConfig().getMaxReadBufferSize() +
512 (session.getConfig().getMaxReadBufferSize() >>> 1);
513 int writtenBytes = 0;
514 do {
515
516 WriteRequest req = session.getCurrentWriteRequest();
517 if (req == null) {
518 req = writeRequestQueue.poll(session);
519 if (req == null) {
520 break;
521 }
522 session.setCurrentWriteRequest(req);
523 }
524
525 int localWrittenBytes = 0;
526 Object message = req.getMessage();
527 if (message instanceof IoBuffer) {
528 localWrittenBytes = writeBuffer(
529 session, req, hasFragmentation,
530 maxWrittenBytes - writtenBytes);
531 } else if (message instanceof FileRegion) {
532 localWrittenBytes = writeFile(
533 session, req, hasFragmentation,
534 maxWrittenBytes - writtenBytes);
535 } else {
536 throw new IllegalStateException("Don't know how to handle message of type '" + message.getClass().getName() + "'. Are you missing a protocol encoder?");
537 }
538
539 writtenBytes += localWrittenBytes;
540
541 if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
542
543 setInterestedInWrite(session, true);
544 return false;
545 }
546 } while (writtenBytes < maxWrittenBytes);
547 } catch (Exception e) {
548 session.getFilterChain().fireExceptionCaught(e);
549 return false;
550 }
551
552 return true;
553 }
554
555 private int writeBuffer(T session, WriteRequest req,
556 boolean hasFragmentation, int maxLength) throws Exception {
557 IoBuffer buf = (IoBuffer) req.getMessage();
558 int localWrittenBytes = 0;
559 if (buf.hasRemaining()) {
560 int length;
561 if (hasFragmentation) {
562 length = Math.min(buf.remaining(), maxLength);
563 } else {
564 length = buf.remaining();
565 }
566 for (int i = WRITE_SPIN_COUNT; i > 0; i --) {
567 localWrittenBytes = write(session, buf, length);
568 if (localWrittenBytes != 0) {
569 break;
570 }
571 }
572 }
573
574 if (!buf.hasRemaining() ||
575 (!hasFragmentation && localWrittenBytes != 0)) {
576
577 buf.reset();
578 fireMessageSent(session, req);
579 }
580 return localWrittenBytes;
581 }
582
583 private int writeFile(T session, WriteRequest req,
584 boolean hasFragmentation, int maxLength) throws Exception {
585 int localWrittenBytes;
586 FileRegion region = (FileRegion) req.getMessage();
587 if (region.getCount() > 0) {
588 int length;
589 if (hasFragmentation) {
590 length = (int) Math.min(region.getCount(), maxLength);
591 } else {
592 length = (int) Math.min(Integer.MAX_VALUE, region.getCount());
593 }
594 localWrittenBytes = transferFile(session, region, length);
595 region.setPosition(region.getPosition() + localWrittenBytes);
596
597
598
599
600 if (localWrittenBytes > 0 && region.getCount() > 0) {
601 return 0;
602 }
603 } else {
604 localWrittenBytes = 0;
605 }
606
607 if (region.getCount() <= 0 ||
608 (!hasFragmentation && localWrittenBytes != 0)) {
609 fireMessageSent(session, req);
610 }
611 return localWrittenBytes;
612 }
613
614 private void fireMessageSent(T session, WriteRequest req) {
615 session.setCurrentWriteRequest(null);
616 session.getFilterChain().fireMessageSent(req);
617 }
618
619 private void updateTrafficMask() {
620 for (; ;) {
621 T session = trafficControllingSessions.poll();
622
623 if (session == null) {
624 break;
625 }
626
627 SessionState state = state(session);
628 switch (state) {
629 case OPEN:
630 updateTrafficMaskNow(session);
631 break;
632 case CLOSED:
633 break;
634 case PREPARING:
635
636
637
638 scheduleTrafficControl(session);
639 return;
640 default:
641 throw new IllegalStateException(String.valueOf(state));
642 }
643 }
644 }
645
646 private void updateTrafficMaskNow(T session) {
647
648
649 int mask = session.getTrafficMask().getInterestOps();
650 try {
651 setInterestedInRead(session, (mask & SelectionKey.OP_READ) != 0);
652 } catch (Exception e) {
653 session.getFilterChain().fireExceptionCaught(e);
654 }
655 try {
656 setInterestedInWrite(
657 session,
658 !session.getWriteRequestQueue().isEmpty(session) &&
659 (mask & SelectionKey.OP_WRITE) != 0);
660 } catch (Exception e) {
661 session.getFilterChain().fireExceptionCaught(e);
662 }
663 }
664
665 private class Worker implements Runnable {
666 public void run() {
667 int nSessions = 0;
668 lastIdleCheckTime = System.currentTimeMillis();
669
670 for (;;) {
671 try {
672 boolean selected = select(1000);
673
674 nSessions += add();
675 updateTrafficMask();
676
677 if (selected) {
678 process();
679 }
680
681 flush();
682 nSessions -= remove();
683 notifyIdleSessions();
684
685 if (nSessions == 0) {
686 synchronized (lock) {
687 if (newSessions.isEmpty()) {
688 worker = null;
689 break;
690 }
691 }
692 }
693
694
695
696 if (isDisposing()) {
697 for (Iterator<T> i = allSessions(); i.hasNext(); ) {
698 scheduleRemove(i.next());
699 }
700 wakeup();
701 }
702 } catch (Throwable t) {
703 ExceptionMonitor.getInstance().exceptionCaught(t);
704
705 try {
706 Thread.sleep(1000);
707 } catch (InterruptedException e1) {
708 ExceptionMonitor.getInstance().exceptionCaught(e1);
709 }
710 }
711 }
712
713 if (isDisposing()) {
714 try {
715 dispose0();
716 } catch (Throwable t) {
717 ExceptionMonitor.getInstance().exceptionCaught(t);
718 } finally {
719 disposalFuture.setValue(true);
720 }
721 }
722 }
723 }
724
725 protected static enum SessionState {
726 OPEN,
727 CLOSED,
728 PREPARING,
729 }
730 }