1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.common;
21
22 import java.io.File;
23 import java.io.FileInputStream;
24 import java.io.IOException;
25 import java.net.SocketAddress;
26 import java.nio.channels.FileChannel;
27 import java.util.Queue;
28 import java.util.Set;
29 import java.util.concurrent.atomic.AtomicBoolean;
30 import java.util.concurrent.atomic.AtomicInteger;
31 import java.util.concurrent.atomic.AtomicLong;
32
33 import org.apache.mina.util.CircularQueue;
34
35
36
37
38
39
40
41
42 public abstract class AbstractIoSession implements IoSession {
43
44 private static final AttributeKey READY_READ_FUTURES =
45 new AttributeKey(AbstractIoSession.class, "readyReadFutures");
46 private static final AttributeKey WAITING_READ_FUTURES =
47 new AttributeKey(AbstractIoSession.class, "waitingReadFutures");
48
49 private static final IoFutureListener<CloseFuture> SCHEDULED_COUNTER_RESETTER =
50 new IoFutureListener<CloseFuture>() {
51 public void operationComplete(CloseFuture future) {
52 AbstractIoSession s = (AbstractIoSession) future.getSession();
53 s.scheduledWriteBytes.set(0);
54 s.scheduledWriteMessages.set(0);
55 s.readBytesThroughput = 0;
56 s.readMessagesThroughput = 0;
57 s.writtenBytesThroughput = 0;
58 s.writtenMessagesThroughput = 0;
59 }
60 };
61
62
63
64
65
66 private static final WriteRequest CLOSE_REQUEST =
67 new DefaultWriteRequest(new Object());
68
69 private final Object lock = new Object();
70
71 private IoSessionAttributeMap attributes;
72 private WriteRequestQueue writeRequestQueue;
73 private WriteRequest currentWriteRequest;
74 private final long creationTime;
75
76
77
78
79 private final CloseFuture closeFuture = new DefaultCloseFuture(this);
80
81 private volatile boolean closing;
82 private volatile TrafficMask trafficMask = TrafficMask.ALL;
83
84
85 private final AtomicBoolean scheduledForFlush = new AtomicBoolean();
86 private final AtomicLong scheduledWriteBytes = new AtomicLong();
87 private final AtomicInteger scheduledWriteMessages = new AtomicInteger();
88
89 private long readBytes;
90 private long writtenBytes;
91 private long readMessages;
92 private long writtenMessages;
93 private long lastReadTime;
94 private long lastWriteTime;
95
96 private long lastThroughputCalculationTime;
97 private long lastReadBytes;
98 private long lastWrittenBytes;
99 private long lastReadMessages;
100 private long lastWrittenMessages;
101 private double readBytesThroughput;
102 private double writtenBytesThroughput;
103 private double readMessagesThroughput;
104 private double writtenMessagesThroughput;
105
106 private int idleCountForBoth;
107 private int idleCountForRead;
108 private int idleCountForWrite;
109
110 private long lastIdleTimeForBoth;
111 private long lastIdleTimeForRead;
112 private long lastIdleTimeForWrite;
113
114 private boolean deferDecreaseReadBuffer = true;
115
116 protected AbstractIoSession() {
117 creationTime = lastThroughputCalculationTime =
118 lastReadTime = lastWriteTime =
119 lastIdleTimeForBoth = lastIdleTimeForRead =
120 lastIdleTimeForWrite = System.currentTimeMillis();
121 closeFuture.addListener(SCHEDULED_COUNTER_RESETTER);
122 }
123
124 public final long getId() {
125 return hashCode() & 0xFFFFFFFFL;
126 }
127
128 @SuppressWarnings("unchecked")
129 protected abstract IoProcessor getProcessor();
130
131 public final boolean isConnected() {
132 return !closeFuture.isClosed();
133 }
134
135 public final boolean isClosing() {
136 return closing || closeFuture.isClosed();
137 }
138
139 public final CloseFuture getCloseFuture() {
140 return closeFuture;
141 }
142
143 protected final boolean isScheduledForFlush() {
144 return scheduledForFlush.get();
145 }
146
147 protected final boolean setScheduledForFlush(boolean flag) {
148 if (flag) {
149 return scheduledForFlush.compareAndSet(false, true);
150 } else {
151 scheduledForFlush.set(false);
152 return true;
153 }
154 }
155
156 public final CloseFuture close(boolean rightNow) {
157 if (rightNow) {
158 return close();
159 } else {
160 return closeOnFlush();
161 }
162 }
163
164 public final CloseFuture close() {
165 synchronized (lock) {
166 if (isClosing()) {
167 return closeFuture;
168 } else {
169 closing = true;
170 }
171 }
172
173 getFilterChain().fireFilterClose();
174 return closeFuture;
175 }
176
177 @SuppressWarnings("unchecked")
178 public final CloseFuture closeOnFlush() {
179 getWriteRequestQueue().offer(this, CLOSE_REQUEST);
180 getProcessor().flush(this);
181 return closeFuture;
182 }
183
184 public final ReadFuture read() {
185 if (!getConfig().isUseReadOperation()) {
186 throw new IllegalStateException("useReadOperation is not enabled.");
187 }
188
189 Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
190 ReadFuture future;
191 synchronized (readyReadFutures) {
192 future = readyReadFutures.poll();
193 if (future != null) {
194 if (future.isClosed()) {
195
196 readyReadFutures.offer(future);
197 }
198 } else {
199 future = new DefaultReadFuture(this);
200 getWaitingReadFutures().offer(future);
201 }
202 }
203
204 return future;
205 }
206
207 protected final void offerReadFuture(Object message) {
208 newReadFuture().setRead(message);
209 }
210
211 protected final void offerFailedReadFuture(Throwable exception) {
212 newReadFuture().setException(exception);
213 }
214
215 protected final void offerClosedReadFuture() {
216 Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
217 synchronized (readyReadFutures) {
218 newReadFuture().setClosed();
219 }
220 }
221
222 private ReadFuture newReadFuture() {
223 Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
224 Queue<ReadFuture> waitingReadFutures = getWaitingReadFutures();
225 ReadFuture future;
226 synchronized (readyReadFutures) {
227 future = waitingReadFutures.poll();
228 if (future == null) {
229 future = new DefaultReadFuture(this);
230 readyReadFutures.offer(future);
231 }
232 }
233 return future;
234 }
235
236 @SuppressWarnings("unchecked")
237 private Queue<ReadFuture> getReadyReadFutures() {
238 Queue<ReadFuture> readyReadFutures =
239 (Queue<ReadFuture>) getAttribute(READY_READ_FUTURES);
240 if (readyReadFutures == null) {
241 readyReadFutures = new CircularQueue<ReadFuture>();
242
243 Queue<ReadFuture> oldReadyReadFutures =
244 (Queue<ReadFuture>) setAttributeIfAbsent(
245 READY_READ_FUTURES, readyReadFutures);
246 if (oldReadyReadFutures != null) {
247 readyReadFutures = oldReadyReadFutures;
248 }
249
250
251 Queue<ReadFuture> waitingReadFutures =
252 new CircularQueue<ReadFuture>();
253 setAttributeIfAbsent(WAITING_READ_FUTURES, waitingReadFutures);
254 }
255 return readyReadFutures;
256 }
257
258 @SuppressWarnings("unchecked")
259 private Queue<ReadFuture> getWaitingReadFutures() {
260 return (Queue<ReadFuture>) getAttribute(WAITING_READ_FUTURES);
261 }
262
263 public final WriteFuture write(Object message) {
264 return write(message, null);
265 }
266
267 public final WriteFuture write(Object message, SocketAddress remoteAddress) {
268 if (message == null) {
269 throw new NullPointerException("message");
270 }
271
272 if (!getTransportMetadata().isConnectionless() &&
273 remoteAddress != null) {
274 throw new UnsupportedOperationException();
275 }
276
277 if (isClosing() || !isConnected()) {
278 WriteFuture future = new DefaultWriteFuture(this);
279 WriteRequest request = new DefaultWriteRequest(message, future, remoteAddress);
280 future.setException(new WriteToClosedSessionException(request));
281 return future;
282 }
283
284 FileChannel openedFileChannel = null;
285 try {
286 if (message instanceof IoBuffer
287 && !((IoBuffer) message).hasRemaining()) {
288 throw new IllegalArgumentException(
289 "message is empty. Forgot to call flip()?");
290 } else if (message instanceof FileChannel) {
291 FileChannel fileChannel = (FileChannel) message;
292 message = new DefaultFileRegion(fileChannel, 0, fileChannel.size());
293 } else if (message instanceof File) {
294 File file = (File) message;
295 openedFileChannel = new FileInputStream(file).getChannel();
296 message = new DefaultFileRegion(openedFileChannel, 0, openedFileChannel.size());
297 }
298 } catch (IOException e) {
299 ExceptionMonitor.getInstance().exceptionCaught(e);
300 return DefaultWriteFuture.newNotWrittenFuture(this, e);
301 }
302
303 WriteFuture future = new DefaultWriteFuture(this);
304 getFilterChain().fireFilterWrite(
305 new DefaultWriteRequest(message, future, remoteAddress));
306
307 if (openedFileChannel != null) {
308
309 final FileChannel finalChannel = openedFileChannel;
310 future.addListener(new IoFutureListener<WriteFuture>() {
311 public void operationComplete(WriteFuture future) {
312 try {
313 finalChannel.close();
314 } catch (IOException e) {
315 ExceptionMonitor.getInstance().exceptionCaught(e);
316 }
317 }
318 });
319 }
320
321 return future;
322 }
323
324 public final Object getAttachment() {
325 return getAttribute("");
326 }
327
328 public final Object setAttachment(Object attachment) {
329 return setAttribute("", attachment);
330 }
331
332 public final Object getAttribute(Object key) {
333 return getAttribute(key, null);
334 }
335
336 public final Object getAttribute(Object key, Object defaultValue) {
337 return attributes.getAttribute(this, key, defaultValue);
338 }
339
340 public final Object setAttribute(Object key, Object value) {
341 return attributes.setAttribute(this, key, value);
342 }
343
344 public final Object setAttribute(Object key) {
345 return setAttribute(key, Boolean.TRUE);
346 }
347
348 public final Object setAttributeIfAbsent(Object key, Object value) {
349 return attributes.setAttributeIfAbsent(this, key, value);
350 }
351
352 public final Object setAttributeIfAbsent(Object key) {
353 return setAttributeIfAbsent(key, Boolean.TRUE);
354 }
355
356 public final Object removeAttribute(Object key) {
357 return attributes.removeAttribute(this, key);
358 }
359
360 public final boolean removeAttribute(Object key, Object value) {
361 return attributes.removeAttribute(this, key, value);
362 }
363
364 public final boolean replaceAttribute(Object key, Object oldValue, Object newValue) {
365 return attributes.replaceAttribute(this, key, oldValue, newValue);
366 }
367
368 public final boolean containsAttribute(Object key) {
369 return attributes.containsAttribute(this, key);
370 }
371
372 public final Set<Object> getAttributeKeys() {
373 return attributes.getAttributeKeys(this);
374 }
375
376 protected final IoSessionAttributeMap getAttributeMap() {
377 return attributes;
378 }
379
380 protected final void setAttributeMap(IoSessionAttributeMap attributes) {
381 this.attributes = attributes;
382 }
383
384 protected final void setWriteRequestQueue(WriteRequestQueue writeRequestQueue) {
385 this.writeRequestQueue =
386 new CloseRequestAwareWriteRequestQueue(writeRequestQueue);
387 }
388
389 public final TrafficMask getTrafficMask() {
390 return trafficMask;
391 }
392
393 public final void setTrafficMask(TrafficMask trafficMask) {
394 if (trafficMask == null) {
395 throw new NullPointerException("trafficMask");
396 }
397
398 if (isClosing() || !isConnected()) {
399 return;
400 }
401
402 getFilterChain().fireFilterSetTrafficMask(trafficMask);
403 }
404
405 protected final void setTrafficMaskNow(TrafficMask trafficMask) {
406 this.trafficMask = trafficMask;
407 }
408
409 public final void suspendRead() {
410 setTrafficMask(getTrafficMask().and(TrafficMask.READ.not()));
411 }
412
413 public final void suspendWrite() {
414 setTrafficMask(getTrafficMask().and(TrafficMask.WRITE.not()));
415 }
416
417 public final void resumeRead() {
418 setTrafficMask(getTrafficMask().or(TrafficMask.READ));
419 }
420
421 public final void resumeWrite() {
422 setTrafficMask(getTrafficMask().or(TrafficMask.WRITE));
423 }
424
425 public final long getReadBytes() {
426 return readBytes;
427 }
428
429 public final long getWrittenBytes() {
430 return writtenBytes;
431 }
432
433 public final long getReadMessages() {
434 return readMessages;
435 }
436
437 public final long getWrittenMessages() {
438 return writtenMessages;
439 }
440
441 public final double getReadBytesThroughput() {
442 return readBytesThroughput;
443 }
444
445 public final double getWrittenBytesThroughput() {
446 return writtenBytesThroughput;
447 }
448
449 public final double getReadMessagesThroughput() {
450 return readMessagesThroughput;
451 }
452
453 public final double getWrittenMessagesThroughput() {
454 return writtenMessagesThroughput;
455 }
456
457
458
459
460
461
462
463
464
465
466
467
468 protected final void updateThroughput(long currentTime, boolean force) {
469 int interval = (int) (currentTime - lastThroughputCalculationTime);
470
471 long minInterval = getConfig().getThroughputCalculationIntervalInMillis();
472 if (minInterval == 0 || interval < minInterval) {
473 if (!force) {
474 return;
475 }
476 }
477
478 readBytesThroughput = (readBytes - lastReadBytes) * 1000.0 / interval;
479 writtenBytesThroughput = (writtenBytes - lastWrittenBytes) * 1000.0 / interval;
480 readMessagesThroughput = (readMessages - lastReadMessages) * 1000.0 / interval;
481 writtenMessagesThroughput = (writtenMessages - lastWrittenMessages) * 1000.0 / interval;
482
483 lastReadBytes = readBytes;
484 lastWrittenBytes = writtenBytes;
485 lastReadMessages = readMessages;
486 lastWrittenMessages = writtenMessages;
487
488 lastThroughputCalculationTime = currentTime;
489 }
490
491 public final long getScheduledWriteBytes() {
492 return scheduledWriteBytes.get();
493 }
494
495 public final int getScheduledWriteMessages() {
496 return scheduledWriteMessages.get();
497 }
498
499 protected void setScheduledWriteBytes(long byteCount){
500 scheduledWriteBytes.set(byteCount);
501 }
502
503 protected void setScheduledWriteMessages(int messages) {
504 scheduledWriteMessages.set(messages);
505 }
506
507 protected final void increaseReadBytes(long increment, long currentTime) {
508 if (increment <= 0) {
509 return;
510 }
511
512 readBytes += increment;
513 lastReadTime = currentTime;
514 idleCountForBoth = 0;
515 idleCountForRead = 0;
516
517 if (getService() instanceof AbstractIoService) {
518 ((AbstractIoService) getService()).increaseReadBytes(increment, currentTime);
519 }
520 }
521
522 protected final void increaseReadMessages(long currentTime) {
523 readMessages++;
524 lastReadTime = currentTime;
525 idleCountForBoth = 0;
526 idleCountForRead = 0;
527
528 if (getService() instanceof AbstractIoService) {
529 ((AbstractIoService) getService()).increaseReadMessages(currentTime);
530 }
531 }
532
533 protected final void increaseWrittenBytesAndMessages(
534 WriteRequest request, long currentTime) {
535
536 Object message = request.getMessage();
537 if (message instanceof IoBuffer) {
538 IoBuffer b = (IoBuffer) message;
539 if (b.hasRemaining()) {
540 increaseWrittenBytes(((IoBuffer) message).remaining(), currentTime);
541 } else {
542 increaseWrittenMessages(currentTime);
543 }
544 } else if (message instanceof FileRegion) {
545 FileRegion region = (FileRegion) message;
546 if (region.getCount() == 0) {
547 increaseWrittenBytes(region.getWrittenBytes(), currentTime);
548 increaseWrittenMessages(currentTime);
549 }
550 } else {
551 increaseWrittenMessages(currentTime);
552 }
553 }
554
555 private void increaseWrittenBytes(long increment, long currentTime) {
556 if (increment <= 0) {
557 return;
558 }
559
560 writtenBytes += increment;
561 lastWriteTime = currentTime;
562 idleCountForBoth = 0;
563 idleCountForWrite = 0;
564
565 if (getService() instanceof AbstractIoService) {
566 ((AbstractIoService) getService()).increaseWrittenBytes(increment, currentTime);
567 }
568
569 increaseScheduledWriteBytes(-increment);
570 }
571
572 private void increaseWrittenMessages(long currentTime) {
573 writtenMessages++;
574 lastWriteTime = currentTime;
575 if (getService() instanceof AbstractIoService) {
576 ((AbstractIoService) getService()).increaseWrittenMessages(currentTime);
577 }
578
579 decreaseScheduledWriteMessages();
580 }
581
582 protected final void increaseScheduledWriteBytes(long increment) {
583 scheduledWriteBytes.addAndGet(increment);
584 if (getService() instanceof AbstractIoService) {
585 ((AbstractIoService) getService()).increaseScheduledWriteBytes(increment);
586 }
587 }
588
589 protected final void increaseScheduledWriteMessages() {
590 scheduledWriteMessages.incrementAndGet();
591 if (getService() instanceof AbstractIoService) {
592 ((AbstractIoService) getService()).increaseScheduledWriteMessages();
593 }
594 }
595
596 private void decreaseScheduledWriteMessages() {
597 scheduledWriteMessages.decrementAndGet();
598 if (getService() instanceof AbstractIoService) {
599 ((AbstractIoService) getService()).decreaseScheduledWriteMessages();
600 }
601 }
602
603 protected final void decreaseScheduledBytesAndMessages(WriteRequest request) {
604 Object message = request.getMessage();
605 if (message instanceof IoBuffer) {
606 IoBuffer b = (IoBuffer) message;
607 if (b.hasRemaining()) {
608 increaseScheduledWriteBytes(-((IoBuffer) message).remaining());
609 } else {
610 decreaseScheduledWriteMessages();
611 }
612 } else {
613 decreaseScheduledWriteMessages();
614 }
615 }
616
617 protected final WriteRequestQueue getWriteRequestQueue() {
618 if (writeRequestQueue == null) {
619 throw new IllegalStateException();
620 }
621 return writeRequestQueue;
622 }
623
624 protected final WriteRequest getCurrentWriteRequest() {
625 return currentWriteRequest;
626 }
627
628 protected final void setCurrentWriteRequest(WriteRequest currentWriteRequest) {
629 this.currentWriteRequest = currentWriteRequest;
630 }
631
632 protected final void increaseReadBufferSize() {
633 int newReadBufferSize = getConfig().getReadBufferSize() << 1;
634 if (newReadBufferSize <= getConfig().getMaxReadBufferSize()) {
635 getConfig().setReadBufferSize(newReadBufferSize);
636 } else {
637 getConfig().setReadBufferSize(getConfig().getMaxReadBufferSize());
638 }
639
640 deferDecreaseReadBuffer = true;
641 }
642
643 protected final void decreaseReadBufferSize() {
644 if (deferDecreaseReadBuffer) {
645 deferDecreaseReadBuffer = false;
646 return;
647 }
648
649 if (getConfig().getReadBufferSize() > getConfig().getMinReadBufferSize()) {
650 getConfig().setReadBufferSize(getConfig().getReadBufferSize() >>> 1);
651 }
652
653 deferDecreaseReadBuffer = true;
654 }
655
656 public final long getCreationTime() {
657 return creationTime;
658 }
659
660 public final long getLastIoTime() {
661 return Math.max(lastReadTime, lastWriteTime);
662 }
663
664 public final long getLastReadTime() {
665 return lastReadTime;
666 }
667
668 public final long getLastWriteTime() {
669 return lastWriteTime;
670 }
671
672 public final boolean isIdle(IdleStatus status) {
673 if (status == IdleStatus.BOTH_IDLE) {
674 return idleCountForBoth > 0;
675 }
676
677 if (status == IdleStatus.READER_IDLE) {
678 return idleCountForRead > 0;
679 }
680
681 if (status == IdleStatus.WRITER_IDLE) {
682 return idleCountForWrite > 0;
683 }
684
685 throw new IllegalArgumentException("Unknown idle status: " + status);
686 }
687
688 public final boolean isBothIdle() {
689 return isIdle(IdleStatus.BOTH_IDLE);
690 }
691
692 public final boolean isReaderIdle() {
693 return isIdle(IdleStatus.READER_IDLE);
694 }
695
696 public final boolean isWriterIdle() {
697 return isIdle(IdleStatus.WRITER_IDLE);
698 }
699
700 public final int getIdleCount(IdleStatus status) {
701 if (getConfig().getIdleTime(status) == 0) {
702 if (status == IdleStatus.BOTH_IDLE) {
703 idleCountForBoth = 0;
704 }
705
706 if (status == IdleStatus.READER_IDLE) {
707 idleCountForRead = 0;
708 }
709
710 if (status == IdleStatus.WRITER_IDLE) {
711 idleCountForWrite = 0;
712 }
713 }
714
715 if (status == IdleStatus.BOTH_IDLE) {
716 return idleCountForBoth;
717 }
718
719 if (status == IdleStatus.READER_IDLE) {
720 return idleCountForRead;
721 }
722
723 if (status == IdleStatus.WRITER_IDLE) {
724 return idleCountForWrite;
725 }
726
727 throw new IllegalArgumentException("Unknown idle status: " + status);
728 }
729
730 public final long getLastIdleTime(IdleStatus status) {
731 if (status == IdleStatus.BOTH_IDLE) {
732 return lastIdleTimeForBoth;
733 }
734
735 if (status == IdleStatus.READER_IDLE) {
736 return lastIdleTimeForRead;
737 }
738
739 if (status == IdleStatus.WRITER_IDLE) {
740 return lastIdleTimeForWrite;
741 }
742
743 throw new IllegalArgumentException("Unknown idle status: " + status);
744 }
745
746 protected final void increaseIdleCount(IdleStatus status, long currentTime) {
747 if (status == IdleStatus.BOTH_IDLE) {
748 idleCountForBoth++;
749 lastIdleTimeForBoth = currentTime;
750 } else if (status == IdleStatus.READER_IDLE) {
751 idleCountForRead++;
752 lastIdleTimeForRead = currentTime;
753 } else if (status == IdleStatus.WRITER_IDLE) {
754 idleCountForWrite++;
755 lastIdleTimeForWrite = currentTime;
756 } else {
757 throw new IllegalArgumentException("Unknown idle status: " + status);
758 }
759 }
760
761 public final int getBothIdleCount() {
762 return getIdleCount(IdleStatus.BOTH_IDLE);
763 }
764
765 public final long getLastBothIdleTime() {
766 return getLastIdleTime(IdleStatus.BOTH_IDLE);
767 }
768
769 public final long getLastReaderIdleTime() {
770 return getLastIdleTime(IdleStatus.READER_IDLE);
771 }
772
773 public final long getLastWriterIdleTime() {
774 return getLastIdleTime(IdleStatus.WRITER_IDLE);
775 }
776
777 public final int getReaderIdleCount() {
778 return getIdleCount(IdleStatus.READER_IDLE);
779 }
780
781 public final int getWriterIdleCount() {
782 return getIdleCount(IdleStatus.WRITER_IDLE);
783 }
784
785 public SocketAddress getServiceAddress() {
786 IoService service = getService();
787 if (service instanceof IoAcceptor) {
788 return ((IoAcceptor) service).getLocalAddress();
789 } else {
790 return getRemoteAddress();
791 }
792 }
793
794 @Override
795 public final int hashCode() {
796 return super.hashCode();
797 }
798
799 @Override
800 public final boolean equals(Object o) {
801 return super.equals(o);
802 }
803
804 @Override
805 public String toString() {
806 if (getService() instanceof IoAcceptor) {
807 return "(" + getIdAsString() + ": " + getServiceName() + ", server, " +
808 getRemoteAddress() + " => " + getLocalAddress() + ')';
809 } else {
810 return "(" + getIdAsString() + ": " + getServiceName() + ", client, " +
811 getLocalAddress() + " => " + getRemoteAddress() + ')';
812 }
813 }
814
815 private String getIdAsString() {
816 String id = Long.toHexString(getId()).toUpperCase();
817
818
819
820 while (id.length() < 8) {
821 id = '0' + id;
822 }
823 id = "0x" + id;
824
825 return id;
826 }
827
828 private String getServiceName() {
829 TransportMetadata tm = getTransportMetadata();
830 if (tm == null) {
831 return "null";
832 } else {
833 return tm.getProviderName() + ' ' + tm.getName();
834 }
835 }
836
837 private class CloseRequestAwareWriteRequestQueue implements WriteRequestQueue {
838
839 private final WriteRequestQueue q;
840
841 public CloseRequestAwareWriteRequestQueue(WriteRequestQueue q) {
842 this.q = q;
843 }
844
845 public synchronized WriteRequest poll(IoSession session) {
846 WriteRequest answer = q.poll(session);
847 if (answer == CLOSE_REQUEST) {
848 AbstractIoSession.this.close();
849 dispose(session);
850 answer = null;
851 }
852 return answer;
853 }
854
855 public void offer(IoSession session, WriteRequest e) {
856 q.offer(session, e);
857 }
858
859 public boolean isEmpty(IoSession session) {
860 return q.isEmpty(session);
861 }
862
863 public void clear(IoSession session) {
864 q.clear(session);
865 }
866
867 public void dispose(IoSession session) {
868 q.dispose(session);
869 }
870 }
871 }