View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.polling;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.HashMap;
25  import java.util.Iterator;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.Queue;
29  import java.util.concurrent.ConcurrentLinkedQueue;
30  import java.util.concurrent.Executor;
31  import java.util.concurrent.atomic.AtomicInteger;
32  
33  import org.apache.mina.core.buffer.IoBuffer;
34  import org.apache.mina.core.file.FileRegion;
35  import org.apache.mina.core.filterchain.IoFilterChain;
36  import org.apache.mina.core.future.DefaultIoFuture;
37  import org.apache.mina.core.service.AbstractIoService;
38  import org.apache.mina.core.service.IoProcessor;
39  import org.apache.mina.core.session.AbstractIoSession;
40  import org.apache.mina.core.session.IoSession;
41  import org.apache.mina.core.session.IoSessionConfig;
42  import org.apache.mina.core.write.WriteRequest;
43  import org.apache.mina.core.write.WriteRequestQueue;
44  import org.apache.mina.core.write.WriteToClosedSessionException;
45  import org.apache.mina.util.ExceptionMonitor;
46  import org.apache.mina.util.NamePreservingRunnable;
47  
48  /**
49   * An abstract implementation of {@link IoProcessor} which helps
50   * transport developers to write an {@link IoProcessor} easily.
51   * This class is in charge of active polling a set of {@link IoSession}
52   * and trigger events when some I/O operation is possible.
53   *
54   * @author The Apache MINA Project (dev@mina.apache.org)
55   * @version $Rev: 751744 $, $Date: 2009-03-09 17:53:13 +0100 (Mon, 09 Mar 2009) $
56   */
57  public abstract class AbstractPollingIoProcessor<T extends AbstractIoSession> implements IoProcessor<T> {
58      /**
59       * The maximum loop count for a write operation until
60       * {@link #write(AbstractIoSession, IoBuffer, int)} returns non-zero value.
61       * It is similar to what a spin lock is for in concurrency programming.
62       * It improves memory utilization and write throughput significantly.
63       */
64      private static final int WRITE_SPIN_COUNT = 256;
65      
66      /** A timeout used for the select, as we need to get out to deal with idle sessions */
67      private static final long SELECT_TIMEOUT = 1000L;
68  
69      /** A map containing the last Thread ID for each class */
70      private static final Map<Class<?>, AtomicInteger> threadIds = 
71          new HashMap<Class<?>, AtomicInteger>();
72  
73      private final Object lock = new Object();
74      private final String threadName;
75      private final Executor executor;
76  
77      /** A Session queue containing the newly created sessions */
78      private final Queue<T> newSessions = new ConcurrentLinkedQueue<T>();
79      private final Queue<T> removingSessions = new ConcurrentLinkedQueue<T>();
80      private final Queue<T> flushingSessions = new ConcurrentLinkedQueue<T>();
81      private final Queue<T> trafficControllingSessions = new ConcurrentLinkedQueue<T>();
82  
83      /** The processor thread : it handles the incoming messages */
84      private Processor processor;
85      
86      private long lastIdleCheckTime;
87  
88      private final Object disposalLock = new Object();
89      private volatile boolean disposing;
90      private volatile boolean disposed;
91      private final DefaultIoFuture disposalFuture = new DefaultIoFuture(null);
92  
93      /**
94       * Create an {@link AbstractPollingIoProcessor} with the given {@link Executor}
95       * for handling I/Os events.
96       * 
97       * @param executor the {@link Executor} for handling I/O events
98       */
99      protected AbstractPollingIoProcessor(Executor executor) {
100         if (executor == null) {
101             throw new NullPointerException("executor");
102         }
103 
104         this.threadName = nextThreadName();
105         this.executor = executor;
106     }
107 
108     /**
109      * Compute the thread ID for this class instance. As we may have different
110      * classes, we store the last ID number into a Map associating the class
111      * name to the last assigned ID.
112      *   
113      * @return a name for the current thread, based on the class name and
114      * an incremental value, starting at 1. 
115      */
116     private String nextThreadName() {
117         Class<?> cls = getClass();
118         int newThreadId;
119         
120         // We synchronize this block to avoid a concurrent access to 
121         // the actomicInteger (it can be modified by another thread, while
122         // being seen as null by another thread)
123         synchronized( threadIds ) {
124             // Get the current ID associated to this class' name
125             AtomicInteger threadId = threadIds.get(cls);
126             
127             if (threadId == null) {
128                 // We never have seen this class before, just create a
129                 // new ID starting at 1 for it, and associate this ID
130                 // with the class name in the map.
131                 newThreadId = 1;
132                 threadIds.put(cls, new AtomicInteger(newThreadId));
133             } else {
134                 // Just increment the lat ID, and get it.
135                 newThreadId = threadId.incrementAndGet();
136             }
137         }
138         
139         // Now we can compute the name for this thread
140         return cls.getSimpleName() + '-' + newThreadId;
141     }
142 
143     /**
144      * {@inheritDoc}
145      */
146     public final boolean isDisposing() {
147         return disposing;
148     }
149 
150     /**
151      * {@inheritDoc}
152      */
153     public final boolean isDisposed() {
154         return disposed;
155     }
156     
157     /**
158      * {@inheritDoc}
159      */
160     public final void dispose() {
161         if (disposed) {
162             return;
163         }
164 
165         synchronized (disposalLock) {
166             if (!disposing) {
167                 disposing = true;
168                 startupProcessor();
169             }
170         }
171 
172         disposalFuture.awaitUninterruptibly();
173         disposed = true;
174     }
175 
176     /**
177      * Dispose the resources used by this {@link IoProcessor} for polling 
178      * the client connections
179      * @throws Exception if some low level IO error occurs
180      */
181     protected abstract void dispose0() throws Exception;
182 
183     /**
184      * poll those sessions for the given timeout
185      * @param timeout milliseconds before the call timeout if no event appear
186      * @return The number of session ready for read or for write
187      * @throws Exception if some low level IO error occurs
188      */
189     protected abstract int select(long timeout) throws Exception;
190     
191     /**
192      * poll those sessions forever
193      * @return The number of session ready for read or for write
194      * @throws Exception if some low level IO error occurs
195      */
196     protected abstract int select() throws Exception;
197     
198     /**
199      * Say if the list of {@link IoSession} polled by this {@link IoProcessor} 
200      * is empty
201      * @return true if at least a session is managed by this {@link IoProcessor}
202      */
203     protected abstract boolean isSelectorEmpty();
204     
205     /**
206      * Interrupt the {@link AbstractPollingIoProcessor#select(int) call.
207      */
208     protected abstract void wakeup();
209     
210     /**
211      * Get an {@link Iterator} for the list of {@link IoSession} polled by this
212      * {@link IoProcessor}   
213      * @return {@link Iterator} of {@link IoSession}
214      */
215     protected abstract Iterator<T> allSessions();
216     
217     /**
218      * Get an {@link Iterator} for the list of {@link IoSession} found selected 
219      * by the last call of {@link AbstractPollingIoProcessor#select(int)
220      * @return {@link Iterator} of {@link IoSession} read for I/Os operation
221      */
222     protected abstract Iterator<T> selectedSessions();
223     
224     /**
225      * Get the state of a session (preparing, open, closed)
226      * @param session the {@link IoSession} to inspect
227      * @return the state of the session
228      */
229     protected abstract SessionState state(T session);
230 
231     /**
232      * Is the session ready for writing
233      * @param session the session queried
234      * @return true is ready, false if not ready
235      */
236     protected abstract boolean isWritable(T session);
237 
238     /**
239      * Is the session ready for reading
240      * @param session the session queried
241      * @return true is ready, false if not ready
242      */
243     protected abstract boolean isReadable(T session);
244 
245     /**
246      * register a session for writing
247      * @param session the session registered
248      * @param interested true for registering, false for removing
249      */
250     protected abstract void setInterestedInWrite(T session, boolean interested)
251             throws Exception;
252 
253     /**
254      * register a session for reading
255      * @param session the session registered
256      * @param interested true for registering, false for removing
257      */
258     protected abstract void setInterestedInRead(T session, boolean interested)
259             throws Exception;
260 
261     /**
262      * is this session registered for reading
263      * @param session the session queried
264      * @return true is registered for reading
265      */
266     protected abstract boolean isInterestedInRead(T session);
267 
268     /**
269      * is this session registered for writing
270      * @param session the session queried
271      * @return true is registered for writing
272      */
273     protected abstract boolean isInterestedInWrite(T session);
274 
275     /**
276      * Initialize the polling of a session. Add it to the polling process. 
277      * @param session the {@link IoSession} to add to the polling
278      * @throws Exception any exception thrown by the underlying system calls
279      */
280     protected abstract void init(T session) throws Exception;
281     
282     /**
283      * Destroy the underlying client socket handle
284      * @param session the {@link IoSession}
285      * @throws Exception any exception thrown by the underlying system calls
286      */
287     protected abstract void destroy(T session) throws Exception;
288     
289     /**
290      * Reads a sequence of bytes from a {@link IoSession} into the given {@link IoBuffer}. 
291      * Is called when the session was found ready for reading.
292      * @param session the session to read
293      * @param buf the buffer to fill
294      * @return the number of bytes read
295      * @throws Exception any exception thrown by the underlying system calls
296      */
297     protected abstract int read(T session, IoBuffer buf) throws Exception;
298 
299     /**
300      * Write a sequence of bytes to a {@link IoSession}, means to be called when a session
301      * was found ready for writing.
302      * @param session the session to write
303      * @param buf the buffer to write
304      * @param length the number of bytes to write can be superior to the number of bytes remaining
305      * in the buffer
306      * @return the number of byte written
307      * @throws Exception any exception thrown by the underlying system calls
308      */
309     protected abstract int write(T session, IoBuffer buf, int length) throws Exception;
310     
311     /**
312      * Write a part of a file to a {@link IoSession}, if the underlying API isn't supporting
313      * system calls like sendfile(), you can throw a {@link UnsupportedOperationException} so 
314      * the file will be send using usual {@link #write(AbstractIoSession, IoBuffer, int)} call. 
315      * @param session the session to write
316      * @param region the file region to write
317      * @param length the length of the portion to send
318      * @return the number of written bytes
319      * @throws Exception any exception thrown by the underlying system calls
320      */
321     protected abstract int transferFile(T session, FileRegion region, int length) throws Exception;
322 
323     /**
324      * {@inheritDoc}
325      */
326     public final void add(T session) {
327         if (isDisposing()) {
328             throw new IllegalStateException("Already disposed.");
329         }
330 
331         // Adds the session to the newSession queue and starts the worker
332         newSessions.add(session);
333         startupProcessor();
334     }
335 
336     /**
337      * {@inheritDoc}
338      */
339     public final void remove(T session) {
340         scheduleRemove(session);
341         startupProcessor();
342     }
343 
344     private void scheduleRemove(T session) {
345         removingSessions.add(session);
346     }
347 
348     /**
349      * {@inheritDoc}
350      */
351     public final void flush(T session) {
352         boolean needsWakeup = flushingSessions.isEmpty();
353         if (scheduleFlush(session) && needsWakeup) {
354             wakeup();
355         }
356     }
357 
358     private boolean scheduleFlush(T session) {
359         if (session.setScheduledForFlush(true)) {
360             flushingSessions.add(session);
361             return true;
362         }
363         return false;
364     }
365 
366     /**
367      * {@inheritDoc}
368      */
369     public final void updateTrafficMask(T session) {
370         scheduleTrafficControl(session);
371         wakeup();
372     }
373 
374     private void scheduleTrafficControl(T session) {
375         trafficControllingSessions.add(session);
376     }
377 
378     /**
379      * Starts the inner Processor, asking the executor to pick a thread in its
380      * pool. The Runnable will be renamed 
381      */
382     private void startupProcessor() {
383         synchronized (lock) {
384             if (processor == null) {
385                 processor = new Processor();
386                 executor.execute(new NamePreservingRunnable(processor, threadName));
387             }
388         }
389         
390         // Just stop the select() and start it again, so that the processor
391         // can be activated immediately. 
392         wakeup();
393     }
394 
395     /**
396      * Handle newly created sessions
397      * @return The number of new sessions
398      */
399     private int handleNewSessions() {
400         int addedSessions = 0;
401         
402         // Loop on the new sessions blocking queue, to count
403         // the number of sessions who has been created
404         for (;;) {
405             T session = newSessions.poll();
406 
407             if (session == null) {
408                 // We don't have new sessions
409                 break;
410             }
411 
412 
413             if (addNow(session)) {
414                 // A new session has been created 
415                 addedSessions ++;
416             }
417         }
418 
419         return addedSessions;
420     }
421 
422     private boolean addNow(T session) {
423 
424         boolean registered = false;
425         boolean notified = false;
426         try {
427             init(session);
428             registered = true;
429 
430             // Build the filter chain of this session.
431             session.getService().getFilterChainBuilder().buildFilterChain(
432                     session.getFilterChain());
433 
434             // DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here
435             // in AbstractIoFilterChain.fireSessionOpened().
436             ((AbstractIoService) session.getService()).getListeners().fireSessionCreated(session);
437             notified = true;
438         } catch (Throwable e) {
439             if (notified) {
440                 // Clear the DefaultIoFilterChain.CONNECT_FUTURE attribute
441                 // and call ConnectFuture.setException().
442                 scheduleRemove(session);
443                 IoFilterChain filterChain = session.getFilterChain(); 
444                 filterChain.fireExceptionCaught(e);
445                 wakeup();
446             } else {
447                 ExceptionMonitor.getInstance().exceptionCaught(e);
448                 try {
449                     destroy(session);
450                 } catch (Exception e1) {
451                     ExceptionMonitor.getInstance().exceptionCaught(e1);
452                 } finally {
453                     registered = false;
454                 }
455             }
456         }
457         return registered;
458     }
459 
460     private int remove() {
461         int removedSessions = 0;
462         for (; ;) {
463             T session = removingSessions.poll();
464 
465             if (session == null) {
466                 break;
467             }
468 
469             SessionState state = state(session);
470             switch (state) {
471             case OPEN:
472                 if (removeNow(session)) {
473                     removedSessions ++;
474                 }
475                 break;
476             case CLOSED:
477                 // Skip if channel is already closed
478                 break;
479             case PREPARING:
480                 // Retry later if session is not yet fully initialized.
481                 // (In case that Session.close() is called before addSession() is processed)
482                 scheduleRemove(session);
483                 return removedSessions;
484             default:
485                 throw new IllegalStateException(String.valueOf(state));
486             }
487         }
488 
489         return removedSessions;
490     }
491 
492     private boolean removeNow(T session) {
493         clearWriteRequestQueue(session);
494 
495         try {
496             destroy(session);
497             return true;
498         } catch (Exception e) {
499             IoFilterChain filterChain = session.getFilterChain(); 
500             filterChain.fireExceptionCaught(e);
501         } finally {
502             clearWriteRequestQueue(session);
503             ((AbstractIoService) session.getService()).getListeners().fireSessionDestroyed(session);
504         }
505         return false;
506     }
507 
508     private void clearWriteRequestQueue(T session) {
509         WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
510         WriteRequest req;
511 
512         List<WriteRequest> failedRequests = new ArrayList<WriteRequest>();
513 
514         if ((req = writeRequestQueue.poll(session)) != null) {
515             Object m = req.getMessage();
516             if (m instanceof IoBuffer) {
517                 IoBuffer buf = (IoBuffer) req.getMessage();
518 
519                 // The first unwritten empty buffer must be
520                 // forwarded to the filter chain.
521                 if (buf.hasRemaining()) {
522                     buf.reset();
523                     failedRequests.add(req);
524                 } else {
525                     IoFilterChain filterChain = session.getFilterChain(); 
526                     filterChain.fireMessageSent(req);
527                 }
528             } else {
529                 failedRequests.add(req);
530             }
531 
532             // Discard others.
533             while ((req = writeRequestQueue.poll(session)) != null) {
534                 failedRequests.add(req);
535             }
536         }
537 
538         // Create an exception and notify.
539         if (!failedRequests.isEmpty()) {
540             WriteToClosedSessionException cause = new WriteToClosedSessionException(failedRequests);
541             for (WriteRequest r: failedRequests) {
542                 session.decreaseScheduledBytesAndMessages(r);
543                 r.getFuture().setException(cause);
544             }
545             IoFilterChain filterChain = session.getFilterChain(); 
546             filterChain.fireExceptionCaught(cause);
547         }
548     }
549 
550     private void process() throws Exception {
551         for (Iterator<T> i = selectedSessions(); i.hasNext();) {
552         	T session = i.next();
553             process(session);
554             i.remove();
555         }
556     }
557 
558     /**
559      * Deal with session ready for the read or write operations, or both.
560      */
561     private void process(T session) {
562         // Process Reads
563         if (isReadable(session) && !session.isReadSuspended()) {
564             read(session);
565         }
566 
567         // Process writes
568         if (isWritable(session) && !session.isWriteSuspended()) {
569             scheduleFlush(session);
570         }
571     }
572 
573     private void read(T session) {
574         IoSessionConfig config = session.getConfig();
575         IoBuffer buf = IoBuffer.allocate(config.getReadBufferSize());
576 
577         final boolean hasFragmentation =
578             session.getTransportMetadata().hasFragmentation();
579 
580         try {
581             int readBytes = 0;
582             int ret;
583 
584             try {
585                 if (hasFragmentation) {
586                     while ((ret = read(session, buf)) > 0) {
587                         readBytes += ret;
588                         if (!buf.hasRemaining()) {
589                             break;
590                         }
591                     }
592                 } else {
593                     ret = read(session, buf);
594                     if (ret > 0) {
595                         readBytes = ret;
596                     }
597                 }
598             } finally {
599                 buf.flip();
600             }
601 
602             if (readBytes > 0) {
603                 IoFilterChain filterChain = session.getFilterChain(); 
604                 filterChain.fireMessageReceived(buf);
605                 buf = null;
606 
607                 if (hasFragmentation) {
608                     if (readBytes << 1 < config.getReadBufferSize()) {
609                         session.decreaseReadBufferSize();
610                     } else if (readBytes == config.getReadBufferSize()) {
611                         session.increaseReadBufferSize();
612                     }
613                 }
614             }
615             if (ret < 0) {
616                 scheduleRemove(session);
617             }
618         } catch (Throwable e) {
619             if (e instanceof IOException) {
620                 scheduleRemove(session);
621             }
622             IoFilterChain filterChain = session.getFilterChain(); 
623             filterChain.fireExceptionCaught(e);
624         }
625     }
626 
627     private void notifyIdleSessions(long currentTime) throws Exception {
628         // process idle sessions
629         if (currentTime - lastIdleCheckTime >= SELECT_TIMEOUT) {
630             lastIdleCheckTime = currentTime;
631             AbstractIoSession.notifyIdleness(allSessions(), currentTime);
632         }
633     }
634 
635     private void flush(long currentTime) {
636         final T firstSession = flushingSessions.peek();
637         if (firstSession == null) {
638             return;
639         }
640 
641         T session = flushingSessions.poll(); // the same one with firstSession
642         for (; ;) {
643             session.setScheduledForFlush(false);
644             SessionState state = state(session);
645             
646             switch (state) {
647             case OPEN:
648                 try {
649                     boolean flushedAll = flushNow(session, currentTime);
650                     if (flushedAll && !session.getWriteRequestQueue().isEmpty(session) &&
651                         !session.isScheduledForFlush()) {
652                         scheduleFlush(session);
653                     }
654                 } catch (Exception e) {
655                     scheduleRemove(session);
656                     IoFilterChain filterChain = session.getFilterChain(); 
657                     filterChain.fireExceptionCaught(e);
658                 }
659                 break;
660             case CLOSED:
661                 // Skip if the channel is already closed.
662                 break;
663             case PREPARING:
664                 // Retry later if session is not yet fully initialized.
665                 // (In case that Session.write() is called before addSession() is processed)
666                 scheduleFlush(session);
667                 return;
668             default:
669                 throw new IllegalStateException(String.valueOf(state));
670             }
671 
672             session = flushingSessions.peek();
673             if (session == null || session == firstSession) {
674                 break;
675             }
676             session = flushingSessions.poll();
677         }
678     }
679 
680     private boolean flushNow(T session, long currentTime) {
681         if (!session.isConnected()) {
682             scheduleRemove(session);
683             return false;
684         }
685 
686         final boolean hasFragmentation =
687             session.getTransportMetadata().hasFragmentation();
688 
689         final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
690 
691         // Set limitation for the number of written bytes for read-write
692         // fairness.  I used maxReadBufferSize * 3 / 2, which yields best
693         // performance in my experience while not breaking fairness much.
694         final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize() +
695                               (session.getConfig().getMaxReadBufferSize() >>> 1);
696         int writtenBytes = 0;
697         try {
698             // Clear OP_WRITE
699             setInterestedInWrite(session, false);
700             do {
701                 // Check for pending writes.
702                 WriteRequest req = session.getCurrentWriteRequest();
703                 if (req == null) {
704                     req = writeRequestQueue.poll(session);
705                     if (req == null) {
706                         break;
707                     }
708                     session.setCurrentWriteRequest(req);
709                 }
710 
711                 int localWrittenBytes = 0;
712                 Object message = req.getMessage();
713                 if (message instanceof IoBuffer) {
714                     localWrittenBytes = writeBuffer(
715                             session, req, hasFragmentation,
716                             maxWrittenBytes - writtenBytes,
717                             currentTime);
718                     if (localWrittenBytes > 0 && ((IoBuffer)message).hasRemaining() ) {
719                     	// the buffer isn't empty, we re-interest it in writing 
720                     	writtenBytes += localWrittenBytes;    	
721                     	setInterestedInWrite(session, true);
722                         return false;
723                     }
724                 } else if (message instanceof FileRegion) {
725                     localWrittenBytes = writeFile(
726                             session, req, hasFragmentation,
727                             maxWrittenBytes - writtenBytes,
728                             currentTime);
729 
730                     // Fix for Java bug on Linux http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5103988
731                     // If there's still data to be written in the FileRegion, return 0 indicating that we need
732                     // to pause until writing may resume.
733                     if (localWrittenBytes > 0 && ((FileRegion) message).getRemainingBytes() > 0) {
734                         writtenBytes += localWrittenBytes;
735                         setInterestedInWrite(session, true);
736                         return false;
737                     }
738                 } else {
739                     throw new IllegalStateException("Don't know how to handle message of type '" + message.getClass().getName() + "'.  Are you missing a protocol encoder?");
740                 }
741 
742                 if (localWrittenBytes == 0) {
743                     // Kernel buffer is full.
744                     setInterestedInWrite(session, true);
745                     return false;
746                 }
747 
748                 writtenBytes += localWrittenBytes;
749 
750                 if (writtenBytes >= maxWrittenBytes) {
751                     // Wrote too much
752                     scheduleFlush(session);
753                     return false;
754                 }
755             } while (writtenBytes < maxWrittenBytes);
756         } catch (Exception e) {
757             IoFilterChain filterChain = session.getFilterChain(); 
758             filterChain.fireExceptionCaught(e);
759             return false;
760         }
761 
762         return true;
763     }
764 
765     private int writeBuffer(T session, WriteRequest req,
766             boolean hasFragmentation, int maxLength, long currentTime) throws Exception {
767         IoBuffer buf = (IoBuffer) req.getMessage();
768         int localWrittenBytes = 0;
769         if (buf.hasRemaining()) {
770             int length;
771             if (hasFragmentation) {
772                 length = Math.min(buf.remaining(), maxLength);
773             } else {
774                 length = buf.remaining();
775             }
776             for (int i = WRITE_SPIN_COUNT; i > 0; i --) {
777                 localWrittenBytes = write(session, buf, length);
778                 if (localWrittenBytes != 0) {
779                     break;
780                 }
781             }
782         }
783 
784         session.increaseWrittenBytes(localWrittenBytes, currentTime);
785 
786         if (!buf.hasRemaining() ||
787                 !hasFragmentation && localWrittenBytes != 0) {
788             // Buffer has been sent, clear the current request.
789             buf.reset();
790             fireMessageSent(session, req);
791         }
792         return localWrittenBytes;
793     }
794 
795     private int writeFile(T session, WriteRequest req,
796             boolean hasFragmentation, int maxLength, long currentTime) throws Exception {
797         int localWrittenBytes;
798         FileRegion region = (FileRegion) req.getMessage();
799         if (region.getRemainingBytes() > 0) {
800             int length;
801             if (hasFragmentation) {
802                 length = (int) Math.min(region.getRemainingBytes(), maxLength);
803             } else {
804                 length = (int) Math.min(Integer.MAX_VALUE, region.getRemainingBytes());
805             }
806             localWrittenBytes = transferFile(session, region, length);
807             region.update(localWrittenBytes);
808         } else {
809             localWrittenBytes = 0;
810         }
811 
812         session.increaseWrittenBytes(localWrittenBytes, currentTime);
813 
814         if (region.getRemainingBytes() <= 0 ||
815                     !hasFragmentation && localWrittenBytes != 0) {
816             fireMessageSent(session, req);
817         }
818 
819         return localWrittenBytes;
820     }
821 
822     private void fireMessageSent(T session, WriteRequest req) {
823         session.setCurrentWriteRequest(null);
824         IoFilterChain filterChain = session.getFilterChain(); 
825         filterChain.fireMessageSent(req);
826     }
827 
828     private void updateTrafficMask() {
829         for (; ;) {
830             T session = trafficControllingSessions.poll();
831 
832             if (session == null) {
833                 break;
834             }
835 
836             SessionState state = state(session);
837             switch (state) {
838             case OPEN:
839             	updateTrafficControl(session);
840                 break;
841             case CLOSED:
842                 break;
843             case PREPARING:
844                 // Retry later if session is not yet fully initialized.
845                 // (In case that Session.suspend??() or session.resume??() is
846                 // called before addSession() is processed)
847                 scheduleTrafficControl(session);
848                 return;
849             default:
850                 throw new IllegalStateException(String.valueOf(state));
851             }
852         }
853     }
854 
855     public void updateTrafficControl(T session) {
856     	try {
857             setInterestedInRead(session, !session.isReadSuspended());
858         } catch (Exception e) {
859             IoFilterChain filterChain = session.getFilterChain(); 
860             filterChain.fireExceptionCaught(e);
861         }
862         try {
863             setInterestedInWrite(
864                     session,
865                     !session.getWriteRequestQueue().isEmpty(session) &&
866                             !session.isWriteSuspended());
867         } catch (Exception e) {
868             IoFilterChain filterChain = session.getFilterChain(); 
869             filterChain.fireExceptionCaught(e);
870         }
871     }
872         
873     private class Processor implements Runnable {
874         public void run() {
875             int nSessions = 0;
876             lastIdleCheckTime = System.currentTimeMillis();
877 
878             for (;;) {
879                 try {
880                     // This select has a timeout so that we can manage
881                     // dile session when we get out of the select every
882                     // second. (note : this is a hack to avoid creating
883                     // a dedicated thread).
884                     int selected = select(SELECT_TIMEOUT);
885 
886                     nSessions += handleNewSessions();
887                     updateTrafficMask();
888 
889                     // Now, if we have had some incoming or outgoing events,
890                     // deal with them
891                     if (selected > 0) {
892                         process();
893                     }
894 
895                     long currentTime = System.currentTimeMillis();
896                     flush(currentTime);
897                     nSessions -= remove();
898                     notifyIdleSessions(currentTime);
899 
900                     if (nSessions == 0) {
901                         synchronized (lock) {
902                             if (newSessions.isEmpty() && isSelectorEmpty()) {
903                                 processor = null;
904                                 break;
905                             }
906                         }
907                     }
908 
909                     // Disconnect all sessions immediately if disposal has been
910                     // requested so that we exit this loop eventually.
911                     if (isDisposing()) {
912                         for (Iterator<T> i = allSessions(); i.hasNext(); ) {
913                             scheduleRemove(i.next());
914                         }
915                         wakeup();
916                     }
917                 } catch (Throwable t) {
918                     ExceptionMonitor.getInstance().exceptionCaught(t);
919 
920                     try {
921                         Thread.sleep(1000);
922                     } catch (InterruptedException e1) {
923                         ExceptionMonitor.getInstance().exceptionCaught(e1);
924                     }
925                 }
926             }
927 
928             try {
929                 synchronized (disposalLock) {
930                     if (isDisposing()) {
931                         dispose0();
932                     }
933                 }
934             } catch (Throwable t) {
935                 ExceptionMonitor.getInstance().exceptionCaught(t);
936             } finally {
937                 disposalFuture.setValue(true);
938             }
939         }
940     }
941 
942     protected static enum SessionState {
943         OPEN,
944         CLOSED,
945         PREPARING,
946     }
947 }