View Javadoc

1   /*
2    *   @(#) $Id: SocketIoProcessor.java 332218 2005-11-10 03:52:42Z trustin $
3    *
4    *   Copyright 2004 The Apache Software Foundation
5    *
6    *   Licensed under the Apache License, Version 2.0 (the "License");
7    *   you may not use this file except in compliance with the License.
8    *   You may obtain a copy of the License at
9    *
10   *       http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *   Unless required by applicable law or agreed to in writing, software
13   *   distributed under the License is distributed on an "AS IS" BASIS,
14   *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   *   See the License for the specific language governing permissions and
16   *   limitations under the License.
17   *
18   */
19  package org.apache.mina.io.socket;
20  
21  import java.io.IOException;
22  import java.nio.channels.SelectionKey;
23  import java.nio.channels.Selector;
24  import java.nio.channels.SocketChannel;
25  import java.util.Iterator;
26  import java.util.Set;
27  
28  import org.apache.mina.common.ByteBuffer;
29  import org.apache.mina.common.IdleStatus;
30  import org.apache.mina.common.SessionConfig;
31  import org.apache.mina.io.WriteTimeoutException;
32  import org.apache.mina.util.Queue;
33  import org.slf4j.Logger;
34  import org.slf4j.LoggerFactory;
35  
36  /***
37   * Performs all I/O operations for sockets which is connected or bound.
38   * This class is used by MINA internally.
39   * 
40   * @author The Apache Directory Project (dev@directory.apache.org)
41   * @version $Rev: 332218 $, $Date: 2005-11-10 12:52:42 +0900 $,
42   */
43  class SocketIoProcessor
44  {
45      private static final Logger log = LoggerFactory.getLogger( SocketIoProcessor.class );
46      private static final SocketIoProcessor instance;
47  
48      static
49      {
50          SocketIoProcessor tmp;
51  
52          try
53          {
54              tmp = new SocketIoProcessor();
55          }
56          catch( IOException e )
57          {
58              InternalError error = new InternalError(
59                                                       "Failed to open selector." );
60              error.initCause( e );
61              throw error;
62          }
63  
64          instance = tmp;
65      }
66  
67      private final Selector selector;
68  
69      private final Queue newSessions = new Queue();
70  
71      private final Queue removingSessions = new Queue();
72  
73      private final Queue flushingSessions = new Queue();
74  
75      private final Queue readableSessions = new Queue();
76  
77      private Worker worker;
78  
79      private long lastIdleCheckTime = System.currentTimeMillis();
80  
81      private SocketIoProcessor() throws IOException
82      {
83          selector = Selector.open();
84      }
85  
86      static SocketIoProcessor getInstance()
87      {
88          return instance;
89      }
90  
91      void addSession( SocketSession session )
92      {
93          synchronized( this )
94          {
95              synchronized( newSessions )
96              {
97                  newSessions.push( session );
98              }
99              startupWorker();
100         }
101 
102         selector.wakeup();
103     }
104 
105     void removeSession( SocketSession session )
106     {
107         scheduleRemove( session );
108         startupWorker();
109         selector.wakeup();
110     }
111 
112     private synchronized void startupWorker()
113     {
114         if( worker == null )
115         {
116             worker = new Worker();
117             worker.start();
118         }
119     }
120 
121     void flushSession( SocketSession session )
122     {
123         scheduleFlush( session );
124         selector.wakeup();
125     }
126 
127     void addReadableSession( SocketSession session )
128     {
129         synchronized( readableSessions )
130         {
131             readableSessions.push( session );
132         }
133         selector.wakeup();
134     }
135 
136     private void addSessions()
137     {
138         if( newSessions.isEmpty() )
139             return;
140 
141         SocketSession session;
142 
143         for( ;; )
144         {
145             synchronized( newSessions )
146             {
147                 session = ( SocketSession ) newSessions.pop();
148             }
149 
150             if( session == null )
151                 break;
152 
153             SocketChannel ch = session.getChannel();
154             boolean registered;
155 
156             try
157             {
158                 ch.configureBlocking( false );
159                 session.setSelectionKey( ch.register( selector,
160                                                       SelectionKey.OP_READ,
161                                                       session ) );
162                 registered = true;
163             }
164             catch( IOException e )
165             {
166                 registered = false;
167                 session.getManagerFilterChain().exceptionCaught( session, e );
168             }
169 
170             if( registered )
171             {
172                 session.getManagerFilterChain().sessionOpened( session );
173             }
174         }
175     }
176 
177     private void removeSessions()
178     {
179         if( removingSessions.isEmpty() )
180             return;
181 
182         for( ;; )
183         {
184             SocketSession session;
185 
186             synchronized( removingSessions )
187             {
188                 session = ( SocketSession ) removingSessions.pop();
189             }
190 
191             if( session == null )
192                 break;
193 
194             SocketChannel ch = session.getChannel();
195             SelectionKey key = session.getSelectionKey();
196             // Retry later if session is not yet fully initialized.
197             // (In case that Session.close() is called before addSession() is processed)
198             if( key == null )
199             {
200                 scheduleRemove( session );
201                 break;
202             }
203 
204             // skip if channel is already closed
205             if( !key.isValid() )
206             {
207                 continue;
208             }
209 
210             try
211             {
212                 key.cancel();
213                 ch.close();
214             }
215             catch( IOException e )
216             {
217                 session.getManagerFilterChain().exceptionCaught( session, e );
218             }
219             finally
220             {
221                 releaseWriteBuffers( session );
222 
223                 session.getManagerFilterChain().sessionClosed( session );
224                 session.notifyClose();
225             }
226         }
227     }
228 
229     private void processSessions( Set selectedKeys )
230     {
231         Iterator it = selectedKeys.iterator();
232 
233         while( it.hasNext() )
234         {
235             SelectionKey key = ( SelectionKey ) it.next();
236             SocketSession session = ( SocketSession ) key.attachment();
237 
238             if( key.isReadable() )
239             {
240                 read( session );
241             }
242 
243             if( key.isWritable() )
244             {
245                 scheduleFlush( session );
246             }
247         }
248 
249         selectedKeys.clear();
250     }
251 
252     private void read( SocketSession session )
253     {
254         ByteBuffer buf = ByteBuffer.allocate(
255                 (( SocketSessionConfig ) session.getConfig()).getSessionReceiveBufferSize() ); 
256         SocketChannel ch = session.getChannel();
257 
258         try
259         {
260             int readBytes = 0;
261             int ret;
262 
263             buf.clear();
264 
265             try
266             {
267                 while( ( ret = ch.read( buf.buf() ) ) > 0 )
268                 {
269                     readBytes += ret;
270                 }
271             }
272             finally
273             {
274                 buf.flip();
275             }
276 
277             session.increaseReadBytes( readBytes );
278             session.resetIdleCount( IdleStatus.BOTH_IDLE );
279             session.resetIdleCount( IdleStatus.READER_IDLE );
280 
281             if( readBytes > 0 )
282             {
283                 ByteBuffer newBuf = ByteBuffer.allocate( readBytes );
284                 newBuf.put( buf );
285                 newBuf.flip();
286                 session.getManagerFilterChain().dataRead( session, newBuf );
287             }
288             if( ret < 0 )
289             {
290                 scheduleRemove( session );
291             }
292         }
293         catch( Throwable e )
294         {
295             if( e instanceof IOException )
296                 scheduleRemove( session );
297             session.getManagerFilterChain().exceptionCaught( session, e );
298         }
299         finally
300         {
301             buf.release();
302         }
303     }
304 
305     private void scheduleRemove( SocketSession session )
306     {
307         synchronized( removingSessions )
308         {
309             removingSessions.push( session );
310         }
311     }
312 
313     private void scheduleFlush( SocketSession session )
314     {
315         synchronized( flushingSessions )
316         {
317             flushingSessions.push( session );
318         }
319     }
320 
321     private void notifyIdleSessions()
322     {
323         // process idle sessions
324         long currentTime = System.currentTimeMillis();
325         if( ( currentTime - lastIdleCheckTime ) >= 1000 )
326         {
327             lastIdleCheckTime = currentTime;
328             Set keys = selector.keys();
329             if( keys != null )
330             {
331                 for( Iterator it = keys.iterator(); it.hasNext(); )
332                 {
333                     SelectionKey key = ( SelectionKey ) it.next();
334                     SocketSession session = ( SocketSession ) key.attachment();
335                     notifyIdleSession( session, currentTime );
336                 }
337             }
338         }
339     }
340 
341     private void notifyIdleSession( SocketSession session, long currentTime )
342     {
343         SessionConfig config = session.getConfig();
344 
345         notifyIdleSession0(
346                 session, currentTime,
347                 config.getIdleTimeInMillis( IdleStatus.BOTH_IDLE ),
348                 IdleStatus.BOTH_IDLE,
349                 Math.max( session.getLastIoTime(), session.getLastIdleTime( IdleStatus.BOTH_IDLE ) ) );
350         notifyIdleSession0(
351                 session, currentTime,
352                 config.getIdleTimeInMillis( IdleStatus.READER_IDLE ),
353                 IdleStatus.READER_IDLE,
354                 Math.max( session.getLastReadTime(), session.getLastIdleTime( IdleStatus.READER_IDLE ) ) );
355         notifyIdleSession0(
356                 session, currentTime,
357                 config.getIdleTimeInMillis( IdleStatus.WRITER_IDLE ),
358                 IdleStatus.WRITER_IDLE,
359                 Math.max( session.getLastWriteTime(), session.getLastIdleTime( IdleStatus.WRITER_IDLE ) ) );
360 
361         notifyWriteTimeoutSession( session, currentTime, config
362                 .getWriteTimeoutInMillis(), session.getLastWriteTime() );
363     }
364 
365     private void notifyIdleSession0( SocketSession session, long currentTime,
366                                     long idleTime, IdleStatus status,
367                                     long lastIoTime )
368     {
369         if( idleTime > 0 && lastIoTime != 0
370             && ( currentTime - lastIoTime ) >= idleTime )
371         {
372             session.increaseIdleCount( status );
373             session.getManagerFilterChain().sessionIdle( session, status );
374         }
375     }
376 
377     private void notifyWriteTimeoutSession( SocketSession session,
378                                            long currentTime,
379                                            long writeTimeout, long lastIoTime )
380     {
381         SelectionKey key = session.getSelectionKey();
382         if( writeTimeout > 0
383             && ( currentTime - lastIoTime ) >= writeTimeout
384             && key != null && key.isValid()
385             && ( key.interestOps() & SelectionKey.OP_WRITE ) != 0 )
386         {
387             session
388                     .getManagerFilterChain()
389                     .exceptionCaught( session, new WriteTimeoutException() );
390         }
391     }
392 
393     private void flushSessions()
394     {
395         if( flushingSessions.size() == 0 )
396             return;
397 
398         for( ;; )
399         {
400             SocketSession session;
401 
402             synchronized( flushingSessions )
403             {
404                 session = ( SocketSession ) flushingSessions.pop();
405             }
406 
407             if( session == null )
408                 break;
409 
410             if( !session.isConnected() )
411             {
412                 releaseWriteBuffers( session );
413                 continue;
414             }
415 
416             // If encountered write request before session is initialized, 
417             // (In case that Session.write() is called before addSession() is processed)
418             SelectionKey key = session.getSelectionKey();
419             if( key == null )
420             {
421                 // Reschedule for later write
422                 scheduleFlush( session );
423                 break;
424             }
425             if( !key.isValid() )
426             {
427                 continue;
428             }
429             
430             try
431             {
432                 flush( session );
433             }
434             catch( IOException e )
435             {
436                 scheduleRemove( session );
437                 session.getManagerFilterChain().exceptionCaught( session, e );
438             }
439         }
440     }
441     
442     private void releaseWriteBuffers( SocketSession session )
443     {
444         Queue writeBufferQueue = session.getWriteBufferQueue();
445         session.getWriteMarkerQueue().clear();
446         ByteBuffer buf;
447         
448         while( ( buf = (ByteBuffer) writeBufferQueue.pop() ) != null )
449         {
450             try
451             {
452                 buf.release();
453             }
454             catch( IllegalStateException e )
455             {
456                 session.getManagerFilterChain().exceptionCaught( session, e );
457             }
458         }
459     }
460 
461     private void flush( SocketSession session ) throws IOException
462     {
463         // Clear OP_WRITE
464         SelectionKey key = session.getSelectionKey();
465         key.interestOps( key.interestOps() & ( ~SelectionKey.OP_WRITE ) );
466 
467         SocketChannel ch = session.getChannel();
468 
469         Queue writeBufferQueue = session.getWriteBufferQueue();
470         Queue writeMarkerQueue = session.getWriteMarkerQueue();
471 
472         ByteBuffer buf;
473         Object marker;
474         for( ;; )
475         {
476             synchronized( writeBufferQueue )
477             {
478                 buf = ( ByteBuffer ) writeBufferQueue.first();
479                 marker = writeMarkerQueue.first();
480             }
481 
482             if( buf == null )
483                 break;
484 
485             if( buf.remaining() == 0 )
486             {
487                 synchronized( writeBufferQueue )
488                 {
489                     writeBufferQueue.pop();
490                     writeMarkerQueue.pop();
491                 }
492                 try
493                 {
494                     buf.release();
495                 }
496                 catch( IllegalStateException e )
497                 {
498                     session.getManagerFilterChain().exceptionCaught( session, e );
499                 }
500 
501                 session.increaseWrittenWriteRequests();
502                 session.getManagerFilterChain().dataWritten( session, marker );
503                 continue;
504             }
505 
506             int writtenBytes = ch.write( buf.buf() );
507             if( writtenBytes > 0 )
508             {
509                 session.increaseWrittenBytes( writtenBytes );
510                 session.resetIdleCount( IdleStatus.BOTH_IDLE );
511                 session.resetIdleCount( IdleStatus.WRITER_IDLE );
512             }
513 
514             if( buf.hasRemaining() )
515             {
516                 // Kernel buffer is full
517                 key.interestOps( key.interestOps() | SelectionKey.OP_WRITE );
518                 break;
519             }
520         }
521     }
522 
523     private class Worker extends Thread
524     {
525         public Worker()
526         {
527             super( "SocketIoProcessor" );
528         }
529 
530         public void run()
531         {
532             for( ;; )
533             {
534                 try
535                 {
536                     int nKeys = selector.select( 1000 );
537                     addSessions();
538 
539                     if( nKeys > 0 )
540                     {
541                         processSessions( selector.selectedKeys() );
542                     }
543 
544                     flushSessions();
545                     removeSessions();
546                     notifyIdleSessions();
547 
548                     if( selector.keys().isEmpty() )
549                     {
550                         synchronized( SocketIoProcessor.this )
551                         {
552                             if( selector.keys().isEmpty() &&
553                                 newSessions.isEmpty() )
554                             {
555                                 worker = null;
556                                 break;
557                             }
558                         }
559                     }
560                 }
561                 catch( Throwable t )
562                 {
563                     log.warn( "Unexpected exception.", t );
564 
565                     try
566                     {
567                         Thread.sleep( 1000 );
568                     }
569                     catch( InterruptedException e1 )
570                     {
571                     }
572                 }
573             }
574         }
575     }
576 }