View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.session;
21  
22  import java.io.File;
23  import java.io.FileInputStream;
24  import java.io.IOException;
25  import java.net.SocketAddress;
26  import java.nio.channels.FileChannel;
27  import java.util.Iterator;
28  import java.util.Queue;
29  import java.util.Set;
30  import java.util.concurrent.ConcurrentLinkedQueue;
31  import java.util.concurrent.atomic.AtomicBoolean;
32  import java.util.concurrent.atomic.AtomicInteger;
33  import java.util.concurrent.atomic.AtomicLong;
34  
35  import org.apache.mina.core.buffer.IoBuffer;
36  import org.apache.mina.core.file.DefaultFileRegion;
37  import org.apache.mina.core.file.FilenameFileRegion;
38  import org.apache.mina.core.filterchain.IoFilterChain;
39  import org.apache.mina.core.future.CloseFuture;
40  import org.apache.mina.core.future.DefaultCloseFuture;
41  import org.apache.mina.core.future.DefaultReadFuture;
42  import org.apache.mina.core.future.DefaultWriteFuture;
43  import org.apache.mina.core.future.IoFutureListener;
44  import org.apache.mina.core.future.ReadFuture;
45  import org.apache.mina.core.future.WriteFuture;
46  import org.apache.mina.core.service.AbstractIoService;
47  import org.apache.mina.core.service.IoAcceptor;
48  import org.apache.mina.core.service.IoHandler;
49  import org.apache.mina.core.service.IoProcessor;
50  import org.apache.mina.core.service.IoService;
51  import org.apache.mina.core.service.TransportMetadata;
52  import org.apache.mina.core.write.DefaultWriteRequest;
53  import org.apache.mina.core.write.WriteException;
54  import org.apache.mina.core.write.WriteRequest;
55  import org.apache.mina.core.write.WriteRequestQueue;
56  import org.apache.mina.core.write.WriteTimeoutException;
57  import org.apache.mina.core.write.WriteToClosedSessionException;
58  import org.apache.mina.util.ExceptionMonitor;
59  
60  /**
61   * Base implementation of {@link IoSession}.
62   *
63   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
64   */
65  public abstract class AbstractIoSession implements IoSession {
66      /** The associated handler */
67      private final IoHandler handler;
68  
69      /** The session config */
70      protected IoSessionConfig config;
71  
72      /** The service which will manage this session */
73      private final IoService service;
74  
75      private static final AttributeKey READY_READ_FUTURES_KEY = new AttributeKey(AbstractIoSession.class,
76              "readyReadFutures");
77  
78      private static final AttributeKey WAITING_READ_FUTURES_KEY = new AttributeKey(AbstractIoSession.class,
79              "waitingReadFutures");
80  
81      private static final IoFutureListener<CloseFuture> SCHEDULED_COUNTER_RESETTER = new IoFutureListener<CloseFuture>() {
82          public void operationComplete(CloseFuture future) {
83              AbstractIoSession session = (AbstractIoSession) future.getSession();
84              session.scheduledWriteBytes.set(0);
85              session.scheduledWriteMessages.set(0);
86              session.readBytesThroughput = 0;
87              session.readMessagesThroughput = 0;
88              session.writtenBytesThroughput = 0;
89              session.writtenMessagesThroughput = 0;
90          }
91      };
92  
93      /**
94       * An internal write request object that triggers session close.
95       *
96       * @see #writeRequestQueue
97       */
98      private static final WriteRequest CLOSE_REQUEST = new DefaultWriteRequest(new Object());
99  
100     private final Object lock = new Object();
101 
102     private IoSessionAttributeMap attributes;
103 
104     private WriteRequestQueue writeRequestQueue;
105 
106     private WriteRequest currentWriteRequest;
107 
108     /** The Session creation's time */
109     private final long creationTime;
110 
111     /** An id generator guaranteed to generate unique IDs for the session */
112     private static AtomicLong idGenerator = new AtomicLong(0);
113 
114     /** The session ID */
115     private long sessionId;
116 
117     /**
118      * A future that will be set 'closed' when the connection is closed.
119      */
120     private final CloseFuture closeFuture = new DefaultCloseFuture(this);
121 
122     private volatile boolean closing;
123 
124     // traffic control
125     private boolean readSuspended = false;
126 
127     private boolean writeSuspended = false;
128 
129     // Status variables
130     private final AtomicBoolean scheduledForFlush = new AtomicBoolean();
131 
132     private final AtomicInteger scheduledWriteBytes = new AtomicInteger();
133 
134     private final AtomicInteger scheduledWriteMessages = new AtomicInteger();
135 
136     private long readBytes;
137 
138     private long writtenBytes;
139 
140     private long readMessages;
141 
142     private long writtenMessages;
143 
144     private long lastReadTime;
145 
146     private long lastWriteTime;
147 
148     private long lastThroughputCalculationTime;
149 
150     private long lastReadBytes;
151 
152     private long lastWrittenBytes;
153 
154     private long lastReadMessages;
155 
156     private long lastWrittenMessages;
157 
158     private double readBytesThroughput;
159 
160     private double writtenBytesThroughput;
161 
162     private double readMessagesThroughput;
163 
164     private double writtenMessagesThroughput;
165 
166     private AtomicInteger idleCountForBoth = new AtomicInteger();
167 
168     private AtomicInteger idleCountForRead = new AtomicInteger();
169 
170     private AtomicInteger idleCountForWrite = new AtomicInteger();
171 
172     private long lastIdleTimeForBoth;
173 
174     private long lastIdleTimeForRead;
175 
176     private long lastIdleTimeForWrite;
177 
178     private boolean deferDecreaseReadBuffer = true;
179 
180     /**
181      * TODO Add method documentation
182      */
183     protected AbstractIoSession(IoService service) {
184         this.service = service;
185         this.handler = service.getHandler();
186 
187         // Initialize all the Session counters to the current time
188         long currentTime = System.currentTimeMillis();
189         creationTime = currentTime;
190         lastThroughputCalculationTime = currentTime;
191         lastReadTime = currentTime;
192         lastWriteTime = currentTime;
193         lastIdleTimeForBoth = currentTime;
194         lastIdleTimeForRead = currentTime;
195         lastIdleTimeForWrite = currentTime;
196 
197         // TODO add documentation
198         closeFuture.addListener(SCHEDULED_COUNTER_RESETTER);
199 
200         // Set a new ID for this session
201         sessionId = idGenerator.incrementAndGet();
202     }
203 
204     /**
205      * {@inheritDoc}
206      *
207      * We use an AtomicLong to guarantee that the session ID are unique.
208      */
209     public final long getId() {
210         return sessionId;
211     }
212 
213     /**
214      * @return The associated IoProcessor for this session
215      */
216     public abstract IoProcessor getProcessor();
217 
218     /**
219      * {@inheritDoc}
220      */
221     public final boolean isConnected() {
222         return !closeFuture.isClosed();
223     }
224 
225     /**
226      * {@inheritDoc}
227      */
228     public final boolean isClosing() {
229         return closing || closeFuture.isClosed();
230     }
231 
232     /**
233      * {@inheritDoc}
234      */
235     public boolean isSecured() {
236         // Always false...
237         return false;
238     }
239 
240     /**
241      * {@inheritDoc}
242      */
243     public final CloseFuture getCloseFuture() {
244         return closeFuture;
245     }
246 
247     /**
248      * Tells if the session is scheduled for flushed
249      * 
250      * @return true if the session is scheduled for flush
251      */
252     public final boolean isScheduledForFlush() {
253         return scheduledForFlush.get();
254     }
255 
256     /**
257      * Schedule the session for flushed
258      */
259     public final void scheduledForFlush() {
260         scheduledForFlush.set(true);
261     }
262 
263     /**
264      * Change the session's status : it's not anymore scheduled for flush
265      */
266     public final void unscheduledForFlush() {
267         scheduledForFlush.set(false);
268     }
269 
270     /**
271      * Set the scheduledForFLush flag. As we may have concurrent access to this
272      * flag, we compare and set it in one call.
273      *
274      * @param schedule
275      *            the new value to set if not already set.
276      * @return true if the session flag has been set, and if it wasn't set
277      *         already.
278      */
279     public final boolean setScheduledForFlush(boolean schedule) {
280         if (schedule) {
281             // If the current tag is set to false, switch it to true,
282             // otherwise, we do nothing but return false : the session
283             // is already scheduled for flush
284             return scheduledForFlush.compareAndSet(false, schedule);
285         }
286 
287         scheduledForFlush.set(schedule);
288         return true;
289     }
290 
291     /**
292      * {@inheritDoc}
293      */
294     public final CloseFuture close(boolean rightNow) {
295         if (!isClosing()) {
296             if (rightNow) {
297                 return close();
298             }
299 
300             return closeOnFlush();
301         } else {
302             return closeFuture;
303         }
304     }
305 
306     /**
307      * {@inheritDoc}
308      */
309     public final CloseFuture close() {
310         synchronized (lock) {
311             if (isClosing()) {
312                 return closeFuture;
313             }
314 
315             closing = true;
316         }
317 
318         getFilterChain().fireFilterClose();
319         return closeFuture;
320     }
321 
322     private final CloseFuture closeOnFlush() {
323         getWriteRequestQueue().offer(this, CLOSE_REQUEST);
324         getProcessor().flush(this);
325         return closeFuture;
326     }
327 
328     /**
329      * {@inheritDoc}
330      */
331     public IoHandler getHandler() {
332         return handler;
333     }
334 
335     /**
336      * {@inheritDoc}
337      */
338     public IoSessionConfig getConfig() {
339         return config;
340     }
341 
342     /**
343      * {@inheritDoc}
344      */
345     public final ReadFuture read() {
346         if (!getConfig().isUseReadOperation()) {
347             throw new IllegalStateException("useReadOperation is not enabled.");
348         }
349 
350         Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
351         ReadFuture future;
352         synchronized (readyReadFutures) {
353             future = readyReadFutures.poll();
354             if (future != null) {
355                 if (future.isClosed()) {
356                     // Let other readers get notified.
357                     readyReadFutures.offer(future);
358                 }
359             } else {
360                 future = new DefaultReadFuture(this);
361                 getWaitingReadFutures().offer(future);
362             }
363         }
364 
365         return future;
366     }
367 
368     /**
369      * TODO Add method documentation
370      */
371     public final void offerReadFuture(Object message) {
372         newReadFuture().setRead(message);
373     }
374 
375     /**
376      * TODO Add method documentation
377      */
378     public final void offerFailedReadFuture(Throwable exception) {
379         newReadFuture().setException(exception);
380     }
381 
382     /**
383      * TODO Add method documentation
384      */
385     public final void offerClosedReadFuture() {
386         Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
387         synchronized (readyReadFutures) {
388             newReadFuture().setClosed();
389         }
390     }
391 
392     /**
393      * TODO Add method documentation
394      */
395     private ReadFuture newReadFuture() {
396         Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
397         Queue<ReadFuture> waitingReadFutures = getWaitingReadFutures();
398         ReadFuture future;
399         synchronized (readyReadFutures) {
400             future = waitingReadFutures.poll();
401             if (future == null) {
402                 future = new DefaultReadFuture(this);
403                 readyReadFutures.offer(future);
404             }
405         }
406         return future;
407     }
408 
409     /**
410      * TODO Add method documentation
411      */
412     private Queue<ReadFuture> getReadyReadFutures() {
413         Queue<ReadFuture> readyReadFutures = (Queue<ReadFuture>) getAttribute(READY_READ_FUTURES_KEY);
414         if (readyReadFutures == null) {
415             readyReadFutures = new ConcurrentLinkedQueue<ReadFuture>();
416 
417             Queue<ReadFuture> oldReadyReadFutures = (Queue<ReadFuture>) setAttributeIfAbsent(READY_READ_FUTURES_KEY,
418                     readyReadFutures);
419             if (oldReadyReadFutures != null) {
420                 readyReadFutures = oldReadyReadFutures;
421             }
422         }
423         return readyReadFutures;
424     }
425 
426     /**
427      * TODO Add method documentation
428      */
429     private Queue<ReadFuture> getWaitingReadFutures() {
430         Queue<ReadFuture> waitingReadyReadFutures = (Queue<ReadFuture>) getAttribute(WAITING_READ_FUTURES_KEY);
431         if (waitingReadyReadFutures == null) {
432             waitingReadyReadFutures = new ConcurrentLinkedQueue<ReadFuture>();
433 
434             Queue<ReadFuture> oldWaitingReadyReadFutures = (Queue<ReadFuture>) setAttributeIfAbsent(
435                     WAITING_READ_FUTURES_KEY, waitingReadyReadFutures);
436             if (oldWaitingReadyReadFutures != null) {
437                 waitingReadyReadFutures = oldWaitingReadyReadFutures;
438             }
439         }
440         return waitingReadyReadFutures;
441     }
442 
443     /**
444      * {@inheritDoc}
445      */
446     public WriteFuture write(Object message) {
447         return write(message, null);
448     }
449 
450     /**
451      * {@inheritDoc}
452      */
453     public WriteFuture write(Object message, SocketAddress remoteAddress) {
454         if (message == null) {
455             throw new IllegalArgumentException("Trying to write a null message : not allowed");
456         }
457 
458         // We can't send a message to a connected session if we don't have
459         // the remote address
460         if (!getTransportMetadata().isConnectionless() && (remoteAddress != null)) {
461             throw new UnsupportedOperationException();
462         }
463 
464         // If the session has been closed or is closing, we can't either
465         // send a message to the remote side. We generate a future
466         // containing an exception.
467         if (isClosing() || !isConnected()) {
468             WriteFuture future = new DefaultWriteFuture(this);
469             WriteRequest request = new DefaultWriteRequest(message, future, remoteAddress);
470             WriteException writeException = new WriteToClosedSessionException(request);
471             future.setException(writeException);
472             return future;
473         }
474 
475         FileChannel openedFileChannel = null;
476 
477         // TODO: remove this code as soon as we use InputStream
478         // instead of Object for the message.
479         try {
480             if ((message instanceof IoBuffer) && !((IoBuffer) message).hasRemaining()) {
481                 // Nothing to write : probably an error in the user code
482                 throw new IllegalArgumentException("message is empty. Forgot to call flip()?");
483             } else if (message instanceof FileChannel) {
484                 FileChannel fileChannel = (FileChannel) message;
485                 message = new DefaultFileRegion(fileChannel, 0, fileChannel.size());
486             } else if (message instanceof File) {
487                 File file = (File) message;
488                 openedFileChannel = new FileInputStream(file).getChannel();
489                 message = new FilenameFileRegion(file, openedFileChannel, 0, openedFileChannel.size());
490             }
491         } catch (IOException e) {
492             ExceptionMonitor.getInstance().exceptionCaught(e);
493             return DefaultWriteFuture.newNotWrittenFuture(this, e);
494         }
495 
496         // Now, we can write the message. First, create a future
497         WriteFuture writeFuture = new DefaultWriteFuture(this);
498         WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress);
499 
500         // Then, get the chain and inject the WriteRequest into it
501         IoFilterChain filterChain = getFilterChain();
502         filterChain.fireFilterWrite(writeRequest);
503 
504         // TODO : This is not our business ! The caller has created a
505         // FileChannel,
506         // he has to close it !
507         if (openedFileChannel != null) {
508             // If we opened a FileChannel, it needs to be closed when the write
509             // has completed
510             final FileChannel finalChannel = openedFileChannel;
511             writeFuture.addListener(new IoFutureListener<WriteFuture>() {
512                 public void operationComplete(WriteFuture future) {
513                     try {
514                         finalChannel.close();
515                     } catch (IOException e) {
516                         ExceptionMonitor.getInstance().exceptionCaught(e);
517                     }
518                 }
519             });
520         }
521 
522         // Return the WriteFuture.
523         return writeFuture;
524     }
525 
526     /**
527      * {@inheritDoc}
528      */
529     public final Object getAttachment() {
530         return getAttribute("");
531     }
532 
533     /**
534      * {@inheritDoc}
535      */
536     public final Object setAttachment(Object attachment) {
537         return setAttribute("", attachment);
538     }
539 
540     /**
541      * {@inheritDoc}
542      */
543     public final Object getAttribute(Object key) {
544         return getAttribute(key, null);
545     }
546 
547     /**
548      * {@inheritDoc}
549      */
550     public final Object getAttribute(Object key, Object defaultValue) {
551         return attributes.getAttribute(this, key, defaultValue);
552     }
553 
554     /**
555      * {@inheritDoc}
556      */
557     public final Object setAttribute(Object key, Object value) {
558         return attributes.setAttribute(this, key, value);
559     }
560 
561     /**
562      * {@inheritDoc}
563      */
564     public final Object setAttribute(Object key) {
565         return setAttribute(key, Boolean.TRUE);
566     }
567 
568     /**
569      * {@inheritDoc}
570      */
571     public final Object setAttributeIfAbsent(Object key, Object value) {
572         return attributes.setAttributeIfAbsent(this, key, value);
573     }
574 
575     /**
576      * {@inheritDoc}
577      */
578     public final Object setAttributeIfAbsent(Object key) {
579         return setAttributeIfAbsent(key, Boolean.TRUE);
580     }
581 
582     /**
583      * {@inheritDoc}
584      */
585     public final Object removeAttribute(Object key) {
586         return attributes.removeAttribute(this, key);
587     }
588 
589     /**
590      * {@inheritDoc}
591      */
592     public final boolean removeAttribute(Object key, Object value) {
593         return attributes.removeAttribute(this, key, value);
594     }
595 
596     /**
597      * {@inheritDoc}
598      */
599     public final boolean replaceAttribute(Object key, Object oldValue, Object newValue) {
600         return attributes.replaceAttribute(this, key, oldValue, newValue);
601     }
602 
603     /**
604      * {@inheritDoc}
605      */
606     public final boolean containsAttribute(Object key) {
607         return attributes.containsAttribute(this, key);
608     }
609 
610     /**
611      * {@inheritDoc}
612      */
613     public final Set<Object> getAttributeKeys() {
614         return attributes.getAttributeKeys(this);
615     }
616 
617     /**
618      * TODO Add method documentation
619      */
620     public final IoSessionAttributeMap getAttributeMap() {
621         return attributes;
622     }
623 
624     /**
625      * TODO Add method documentation
626      */
627     public final void setAttributeMap(IoSessionAttributeMap attributes) {
628         this.attributes = attributes;
629     }
630 
631     /**
632      * Create a new close aware write queue, based on the given write queue.
633      *
634      * @param writeRequestQueue
635      *            The write request queue
636      */
637     public final void setWriteRequestQueue(WriteRequestQueue writeRequestQueue) {
638         this.writeRequestQueue = new CloseAwareWriteQueue(writeRequestQueue);
639     }
640 
641     /**
642      * {@inheritDoc}
643      */
644     public final void suspendRead() {
645         readSuspended = true;
646         if (isClosing() || !isConnected()) {
647             return;
648         }
649         getProcessor().updateTrafficControl(this);
650     }
651 
652     /**
653      * {@inheritDoc}
654      */
655     public final void suspendWrite() {
656         writeSuspended = true;
657         if (isClosing() || !isConnected()) {
658             return;
659         }
660         getProcessor().updateTrafficControl(this);
661     }
662 
663     /**
664      * {@inheritDoc}
665      */
666     @SuppressWarnings("unchecked")
667     public final void resumeRead() {
668         readSuspended = false;
669         if (isClosing() || !isConnected()) {
670             return;
671         }
672         getProcessor().updateTrafficControl(this);
673     }
674 
675     /**
676      * {@inheritDoc}
677      */
678     @SuppressWarnings("unchecked")
679     public final void resumeWrite() {
680         writeSuspended = false;
681         if (isClosing() || !isConnected()) {
682             return;
683         }
684         getProcessor().updateTrafficControl(this);
685     }
686 
687     /**
688      * {@inheritDoc}
689      */
690     public boolean isReadSuspended() {
691         return readSuspended;
692     }
693 
694     /**
695      * {@inheritDoc}
696      */
697     public boolean isWriteSuspended() {
698         return writeSuspended;
699     }
700 
701     /**
702      * {@inheritDoc}
703      */
704     public final long getReadBytes() {
705         return readBytes;
706     }
707 
708     /**
709      * {@inheritDoc}
710      */
711     public final long getWrittenBytes() {
712         return writtenBytes;
713     }
714 
715     /**
716      * {@inheritDoc}
717      */
718     public final long getReadMessages() {
719         return readMessages;
720     }
721 
722     /**
723      * {@inheritDoc}
724      */
725     public final long getWrittenMessages() {
726         return writtenMessages;
727     }
728 
729     /**
730      * {@inheritDoc}
731      */
732     public final double getReadBytesThroughput() {
733         return readBytesThroughput;
734     }
735 
736     /**
737      * {@inheritDoc}
738      */
739     public final double getWrittenBytesThroughput() {
740         return writtenBytesThroughput;
741     }
742 
743     /**
744      * {@inheritDoc}
745      */
746     public final double getReadMessagesThroughput() {
747         return readMessagesThroughput;
748     }
749 
750     /**
751      * {@inheritDoc}
752      */
753     public final double getWrittenMessagesThroughput() {
754         return writtenMessagesThroughput;
755     }
756 
757     /**
758      * {@inheritDoc}
759      */
760     public final void updateThroughput(long currentTime, boolean force) {
761         int interval = (int) (currentTime - lastThroughputCalculationTime);
762 
763         long minInterval = getConfig().getThroughputCalculationIntervalInMillis();
764 
765         if (((minInterval == 0) || (interval < minInterval)) && !force) {
766             return;
767         }
768 
769         readBytesThroughput = (readBytes - lastReadBytes) * 1000.0 / interval;
770         writtenBytesThroughput = (writtenBytes - lastWrittenBytes) * 1000.0 / interval;
771         readMessagesThroughput = (readMessages - lastReadMessages) * 1000.0 / interval;
772         writtenMessagesThroughput = (writtenMessages - lastWrittenMessages) * 1000.0 / interval;
773 
774         lastReadBytes = readBytes;
775         lastWrittenBytes = writtenBytes;
776         lastReadMessages = readMessages;
777         lastWrittenMessages = writtenMessages;
778 
779         lastThroughputCalculationTime = currentTime;
780     }
781 
782     /**
783      * {@inheritDoc}
784      */
785     public final long getScheduledWriteBytes() {
786         return scheduledWriteBytes.get();
787     }
788 
789     /**
790      * {@inheritDoc}
791      */
792     public final int getScheduledWriteMessages() {
793         return scheduledWriteMessages.get();
794     }
795 
796     /**
797      * TODO Add method documentation
798      */
799     protected void setScheduledWriteBytes(int byteCount) {
800         scheduledWriteBytes.set(byteCount);
801     }
802 
803     /**
804      * TODO Add method documentation
805      */
806     protected void setScheduledWriteMessages(int messages) {
807         scheduledWriteMessages.set(messages);
808     }
809 
810     /**
811      * TODO Add method documentation
812      */
813     public final void increaseReadBytes(long increment, long currentTime) {
814         if (increment <= 0) {
815             return;
816         }
817 
818         readBytes += increment;
819         lastReadTime = currentTime;
820         idleCountForBoth.set(0);
821         idleCountForRead.set(0);
822 
823         if (getService() instanceof AbstractIoService) {
824             ((AbstractIoService) getService()).getStatistics().increaseReadBytes(increment, currentTime);
825         }
826     }
827 
828     /**
829      * TODO Add method documentation
830      */
831     public final void increaseReadMessages(long currentTime) {
832         readMessages++;
833         lastReadTime = currentTime;
834         idleCountForBoth.set(0);
835         idleCountForRead.set(0);
836 
837         if (getService() instanceof AbstractIoService) {
838             ((AbstractIoService) getService()).getStatistics().increaseReadMessages(currentTime);
839         }
840     }
841 
842     /**
843      * TODO Add method documentation
844      */
845     public final void increaseWrittenBytes(int increment, long currentTime) {
846         if (increment <= 0) {
847             return;
848         }
849 
850         writtenBytes += increment;
851         lastWriteTime = currentTime;
852         idleCountForBoth.set(0);
853         idleCountForWrite.set(0);
854 
855         if (getService() instanceof AbstractIoService) {
856             ((AbstractIoService) getService()).getStatistics().increaseWrittenBytes(increment, currentTime);
857         }
858 
859         increaseScheduledWriteBytes(-increment);
860     }
861 
862     /**
863      * TODO Add method documentation
864      */
865     public final void increaseWrittenMessages(WriteRequest request, long currentTime) {
866         Object message = request.getMessage();
867 
868         if (message instanceof IoBuffer) {
869             IoBuffer b = (IoBuffer) message;
870 
871             if (b.hasRemaining()) {
872                 return;
873             }
874         }
875 
876         writtenMessages++;
877         lastWriteTime = currentTime;
878 
879         if (getService() instanceof AbstractIoService) {
880             ((AbstractIoService) getService()).getStatistics().increaseWrittenMessages(currentTime);
881         }
882 
883         decreaseScheduledWriteMessages();
884     }
885 
886     /**
887      * Increase the number of scheduled write bytes for the session
888      * 
889      * @param increment
890      *            The number of newly added bytes to write
891      */
892     public final void increaseScheduledWriteBytes(int increment) {
893         scheduledWriteBytes.addAndGet(increment);
894         if (getService() instanceof AbstractIoService) {
895             ((AbstractIoService) getService()).getStatistics().increaseScheduledWriteBytes(increment);
896         }
897     }
898 
899     /**
900      * TODO Add method documentation
901      */
902     public final void increaseScheduledWriteMessages() {
903         scheduledWriteMessages.incrementAndGet();
904         if (getService() instanceof AbstractIoService) {
905             ((AbstractIoService) getService()).getStatistics().increaseScheduledWriteMessages();
906         }
907     }
908 
909     /**
910      * TODO Add method documentation
911      */
912     private void decreaseScheduledWriteMessages() {
913         scheduledWriteMessages.decrementAndGet();
914         if (getService() instanceof AbstractIoService) {
915             ((AbstractIoService) getService()).getStatistics().decreaseScheduledWriteMessages();
916         }
917     }
918 
919     /**
920      * TODO Add method documentation
921      */
922     public final void decreaseScheduledBytesAndMessages(WriteRequest request) {
923         Object message = request.getMessage();
924         if (message instanceof IoBuffer) {
925             IoBuffer b = (IoBuffer) message;
926             if (b.hasRemaining()) {
927                 increaseScheduledWriteBytes(-((IoBuffer) message).remaining());
928             } else {
929                 decreaseScheduledWriteMessages();
930             }
931         } else {
932             decreaseScheduledWriteMessages();
933         }
934     }
935 
936     /**
937      * {@inheritDoc}
938      */
939     public final WriteRequestQueue getWriteRequestQueue() {
940         if (writeRequestQueue == null) {
941             throw new IllegalStateException();
942         }
943         return writeRequestQueue;
944     }
945 
946     /**
947      * {@inheritDoc}
948      */
949     public final WriteRequest getCurrentWriteRequest() {
950         return currentWriteRequest;
951     }
952 
953     /**
954      * {@inheritDoc}
955      */
956     public final Object getCurrentWriteMessage() {
957         WriteRequest req = getCurrentWriteRequest();
958         if (req == null) {
959             return null;
960         }
961         return req.getMessage();
962     }
963 
964     /**
965      * {@inheritDoc}
966      */
967     public final void setCurrentWriteRequest(WriteRequest currentWriteRequest) {
968         this.currentWriteRequest = currentWriteRequest;
969     }
970 
971     /**
972      * TODO Add method documentation
973      */
974     public final void increaseReadBufferSize() {
975         int newReadBufferSize = getConfig().getReadBufferSize() << 1;
976         if (newReadBufferSize <= getConfig().getMaxReadBufferSize()) {
977             getConfig().setReadBufferSize(newReadBufferSize);
978         } else {
979             getConfig().setReadBufferSize(getConfig().getMaxReadBufferSize());
980         }
981 
982         deferDecreaseReadBuffer = true;
983     }
984 
985     /**
986      * TODO Add method documentation
987      */
988     public final void decreaseReadBufferSize() {
989         if (deferDecreaseReadBuffer) {
990             deferDecreaseReadBuffer = false;
991             return;
992         }
993 
994         if (getConfig().getReadBufferSize() > getConfig().getMinReadBufferSize()) {
995             getConfig().setReadBufferSize(getConfig().getReadBufferSize() >>> 1);
996         }
997 
998         deferDecreaseReadBuffer = true;
999     }
1000 
1001     /**
1002      * {@inheritDoc}
1003      */
1004     public final long getCreationTime() {
1005         return creationTime;
1006     }
1007 
1008     /**
1009      * {@inheritDoc}
1010      */
1011     public final long getLastIoTime() {
1012         return Math.max(lastReadTime, lastWriteTime);
1013     }
1014 
1015     /**
1016      * {@inheritDoc}
1017      */
1018     public final long getLastReadTime() {
1019         return lastReadTime;
1020     }
1021 
1022     /**
1023      * {@inheritDoc}
1024      */
1025     public final long getLastWriteTime() {
1026         return lastWriteTime;
1027     }
1028 
1029     /**
1030      * {@inheritDoc}
1031      */
1032     public final boolean isIdle(IdleStatus status) {
1033         if (status == IdleStatus.BOTH_IDLE) {
1034             return idleCountForBoth.get() > 0;
1035         }
1036 
1037         if (status == IdleStatus.READER_IDLE) {
1038             return idleCountForRead.get() > 0;
1039         }
1040 
1041         if (status == IdleStatus.WRITER_IDLE) {
1042             return idleCountForWrite.get() > 0;
1043         }
1044 
1045         throw new IllegalArgumentException("Unknown idle status: " + status);
1046     }
1047 
1048     /**
1049      * {@inheritDoc}
1050      */
1051     public final boolean isBothIdle() {
1052         return isIdle(IdleStatus.BOTH_IDLE);
1053     }
1054 
1055     /**
1056      * {@inheritDoc}
1057      */
1058     public final boolean isReaderIdle() {
1059         return isIdle(IdleStatus.READER_IDLE);
1060     }
1061 
1062     /**
1063      * {@inheritDoc}
1064      */
1065     public final boolean isWriterIdle() {
1066         return isIdle(IdleStatus.WRITER_IDLE);
1067     }
1068 
1069     /**
1070      * {@inheritDoc}
1071      */
1072     public final int getIdleCount(IdleStatus status) {
1073         if (getConfig().getIdleTime(status) == 0) {
1074             if (status == IdleStatus.BOTH_IDLE) {
1075                 idleCountForBoth.set(0);
1076             }
1077 
1078             if (status == IdleStatus.READER_IDLE) {
1079                 idleCountForRead.set(0);
1080             }
1081 
1082             if (status == IdleStatus.WRITER_IDLE) {
1083                 idleCountForWrite.set(0);
1084             }
1085         }
1086 
1087         if (status == IdleStatus.BOTH_IDLE) {
1088             return idleCountForBoth.get();
1089         }
1090 
1091         if (status == IdleStatus.READER_IDLE) {
1092             return idleCountForRead.get();
1093         }
1094 
1095         if (status == IdleStatus.WRITER_IDLE) {
1096             return idleCountForWrite.get();
1097         }
1098 
1099         throw new IllegalArgumentException("Unknown idle status: " + status);
1100     }
1101 
1102     /**
1103      * {@inheritDoc}
1104      */
1105     public final long getLastIdleTime(IdleStatus status) {
1106         if (status == IdleStatus.BOTH_IDLE) {
1107             return lastIdleTimeForBoth;
1108         }
1109 
1110         if (status == IdleStatus.READER_IDLE) {
1111             return lastIdleTimeForRead;
1112         }
1113 
1114         if (status == IdleStatus.WRITER_IDLE) {
1115             return lastIdleTimeForWrite;
1116         }
1117 
1118         throw new IllegalArgumentException("Unknown idle status: " + status);
1119     }
1120 
1121     /**
1122      * TODO Add method documentation
1123      */
1124     public final void increaseIdleCount(IdleStatus status, long currentTime) {
1125         if (status == IdleStatus.BOTH_IDLE) {
1126             idleCountForBoth.incrementAndGet();
1127             lastIdleTimeForBoth = currentTime;
1128         } else if (status == IdleStatus.READER_IDLE) {
1129             idleCountForRead.incrementAndGet();
1130             lastIdleTimeForRead = currentTime;
1131         } else if (status == IdleStatus.WRITER_IDLE) {
1132             idleCountForWrite.incrementAndGet();
1133             lastIdleTimeForWrite = currentTime;
1134         } else {
1135             throw new IllegalArgumentException("Unknown idle status: " + status);
1136         }
1137     }
1138 
1139     /**
1140      * {@inheritDoc}
1141      */
1142     public final int getBothIdleCount() {
1143         return getIdleCount(IdleStatus.BOTH_IDLE);
1144     }
1145 
1146     /**
1147      * {@inheritDoc}
1148      */
1149     public final long getLastBothIdleTime() {
1150         return getLastIdleTime(IdleStatus.BOTH_IDLE);
1151     }
1152 
1153     /**
1154      * {@inheritDoc}
1155      */
1156     public final long getLastReaderIdleTime() {
1157         return getLastIdleTime(IdleStatus.READER_IDLE);
1158     }
1159 
1160     /**
1161      * {@inheritDoc}
1162      */
1163     public final long getLastWriterIdleTime() {
1164         return getLastIdleTime(IdleStatus.WRITER_IDLE);
1165     }
1166 
1167     /**
1168      * {@inheritDoc}
1169      */
1170     public final int getReaderIdleCount() {
1171         return getIdleCount(IdleStatus.READER_IDLE);
1172     }
1173 
1174     /**
1175      * {@inheritDoc}
1176      */
1177     public final int getWriterIdleCount() {
1178         return getIdleCount(IdleStatus.WRITER_IDLE);
1179     }
1180 
1181     /**
1182      * {@inheritDoc}
1183      */
1184     public SocketAddress getServiceAddress() {
1185         IoService service = getService();
1186         if (service instanceof IoAcceptor) {
1187             return ((IoAcceptor) service).getLocalAddress();
1188         }
1189 
1190         return getRemoteAddress();
1191     }
1192 
1193     /**
1194      * {@inheritDoc}
1195      */
1196     @Override
1197     public final int hashCode() {
1198         return super.hashCode();
1199     }
1200 
1201     /**
1202      * {@inheritDoc} TODO This is a ridiculous implementation. Need to be
1203      * replaced.
1204      */
1205     @Override
1206     public final boolean equals(Object o) {
1207         return super.equals(o);
1208     }
1209 
1210     /**
1211      * {@inheritDoc}
1212      */
1213     @Override
1214     public String toString() {
1215         if (isConnected() || isClosing()) {
1216             String remote = null;
1217             String local = null;
1218 
1219             try {
1220                 remote = String.valueOf(getRemoteAddress());
1221             } catch (Exception e) {
1222                 remote = "Cannot get the remote address informations: " + e.getMessage();
1223             }
1224 
1225             try {
1226                 local = String.valueOf(getLocalAddress());
1227             } catch (Exception e) {
1228             }
1229 
1230             if (getService() instanceof IoAcceptor) {
1231                 return "(" + getIdAsString() + ": " + getServiceName() + ", server, " + remote + " => " + local + ')';
1232             }
1233 
1234             return "(" + getIdAsString() + ": " + getServiceName() + ", client, " + local + " => " + remote + ')';
1235         }
1236 
1237         return "(" + getIdAsString() + ") Session disconnected ...";
1238     }
1239 
1240     /**
1241      * TODO Add method documentation
1242      */
1243     private String getIdAsString() {
1244         String id = Long.toHexString(getId()).toUpperCase();
1245 
1246         // Somewhat inefficient, but it won't happen that often
1247         // because an ID is often a big integer.
1248         while (id.length() < 8) {
1249             id = '0' + id; // padding
1250         }
1251         id = "0x" + id;
1252 
1253         return id;
1254     }
1255 
1256     /**
1257      * TODO Add method documentation
1258      */
1259     private String getServiceName() {
1260         TransportMetadata tm = getTransportMetadata();
1261         if (tm == null) {
1262             return "null";
1263         }
1264 
1265         return tm.getProviderName() + ' ' + tm.getName();
1266     }
1267 
1268     /**
1269      * {@inheritDoc}
1270      */
1271     public IoService getService() {
1272         return service;
1273     }
1274 
1275     /**
1276      * Fires a {@link IoEventType#SESSION_IDLE} event to any applicable sessions
1277      * in the specified collection.
1278      *
1279      * @param currentTime
1280      *            the current time (i.e. {@link System#currentTimeMillis()})
1281      */
1282     public static void notifyIdleness(Iterator<? extends IoSession> sessions, long currentTime) {
1283         IoSession s = null;
1284         while (sessions.hasNext()) {
1285             s = sessions.next();
1286             notifyIdleSession(s, currentTime);
1287         }
1288     }
1289 
1290     /**
1291      * Fires a {@link IoEventType#SESSION_IDLE} event if applicable for the
1292      * specified {@code session}.
1293      *
1294      * @param currentTime
1295      *            the current time (i.e. {@link System#currentTimeMillis()})
1296      */
1297     public static void notifyIdleSession(IoSession session, long currentTime) {
1298         notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
1299                 IdleStatus.BOTH_IDLE, Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE)));
1300 
1301         notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.READER_IDLE),
1302                 IdleStatus.READER_IDLE,
1303                 Math.max(session.getLastReadTime(), session.getLastIdleTime(IdleStatus.READER_IDLE)));
1304 
1305         notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
1306                 IdleStatus.WRITER_IDLE,
1307                 Math.max(session.getLastWriteTime(), session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
1308 
1309         notifyWriteTimeout(session, currentTime);
1310     }
1311 
1312     private static void notifyIdleSession0(IoSession session, long currentTime, long idleTime, IdleStatus status,
1313             long lastIoTime) {
1314         if ((idleTime > 0) && (lastIoTime != 0) && (currentTime - lastIoTime >= idleTime)) {
1315             session.getFilterChain().fireSessionIdle(status);
1316         }
1317     }
1318 
1319     private static void notifyWriteTimeout(IoSession session, long currentTime) {
1320 
1321         long writeTimeout = session.getConfig().getWriteTimeoutInMillis();
1322         if ((writeTimeout > 0) && (currentTime - session.getLastWriteTime() >= writeTimeout)
1323                 && !session.getWriteRequestQueue().isEmpty(session)) {
1324             WriteRequest request = session.getCurrentWriteRequest();
1325             if (request != null) {
1326                 session.setCurrentWriteRequest(null);
1327                 WriteTimeoutException cause = new WriteTimeoutException(request);
1328                 request.getFuture().setException(cause);
1329                 session.getFilterChain().fireExceptionCaught(cause);
1330                 // WriteException is an IOException, so we close the session.
1331                 session.close(true);
1332             }
1333         }
1334     }
1335 
1336     /**
1337      * A queue which handles the CLOSE request.
1338      *
1339      * TODO : Check that when closing a session, all the pending requests are
1340      * correctly sent.
1341      */
1342     private class CloseAwareWriteQueue implements WriteRequestQueue {
1343 
1344         private final WriteRequestQueue queue;
1345 
1346         /**
1347          * {@inheritDoc}
1348          */
1349         public CloseAwareWriteQueue(WriteRequestQueue queue) {
1350             this.queue = queue;
1351         }
1352 
1353         /**
1354          * {@inheritDoc}
1355          */
1356         public synchronized WriteRequest poll(IoSession session) {
1357             WriteRequest answer = queue.poll(session);
1358 
1359             if (answer == CLOSE_REQUEST) {
1360                 AbstractIoSession.this.close();
1361                 dispose(session);
1362                 answer = null;
1363             }
1364 
1365             return answer;
1366         }
1367 
1368         /**
1369          * {@inheritDoc}
1370          */
1371         public void offer(IoSession session, WriteRequest e) {
1372             queue.offer(session, e);
1373         }
1374 
1375         /**
1376          * {@inheritDoc}
1377          */
1378         public boolean isEmpty(IoSession session) {
1379             return queue.isEmpty(session);
1380         }
1381 
1382         /**
1383          * {@inheritDoc}
1384          */
1385         public void clear(IoSession session) {
1386             queue.clear(session);
1387         }
1388 
1389         /**
1390          * {@inheritDoc}
1391          */
1392         public void dispose(IoSession session) {
1393             queue.dispose(session);
1394         }
1395 
1396         /**
1397          * {@inheritDoc}
1398          */
1399         public int size() {
1400             return queue.size();
1401         }
1402     }
1403 }