View Javadoc

1   /*
2    *   @(#) $Id: SocketIoProcessor.java 264677 2005-08-30 02:44:35Z trustin $
3    *
4    *   Copyright 2004 The Apache Software Foundation
5    *
6    *   Licensed under the Apache License, Version 2.0 (the "License");
7    *   you may not use this file except in compliance with the License.
8    *   You may obtain a copy of the License at
9    *
10   *       http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *   Unless required by applicable law or agreed to in writing, software
13   *   distributed under the License is distributed on an "AS IS" BASIS,
14   *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   *   See the License for the specific language governing permissions and
16   *   limitations under the License.
17   *
18   */
19  package org.apache.mina.io.socket;
20  
21  import java.io.IOException;
22  import java.nio.channels.CancelledKeyException;
23  import java.nio.channels.SelectionKey;
24  import java.nio.channels.Selector;
25  import java.nio.channels.SocketChannel;
26  import java.util.Iterator;
27  import java.util.Set;
28  
29  import org.apache.mina.common.ByteBuffer;
30  import org.apache.mina.common.IdleStatus;
31  import org.apache.mina.common.SessionConfig;
32  import org.apache.mina.io.WriteTimeoutException;
33  import org.apache.mina.util.Queue;
34  
35  /***
36   * Performs all I/O operations for sockets which is connected or bound.
37   * This class is used by MINA internally.
38   * 
39   * @author Trustin Lee (trustin@apache.org)
40   * @version $Rev: 264677 $, $Date: 2005-08-30 11:44:35 +0900 $,
41   */
42  class SocketIoProcessor
43  {
44      private static final SocketIoProcessor instance;
45  
46      static
47      {
48          SocketIoProcessor tmp;
49  
50          try
51          {
52              tmp = new SocketIoProcessor();
53          }
54          catch( IOException e )
55          {
56              InternalError error = new InternalError(
57                                                       "Failed to open selector." );
58              error.initCause( e );
59              throw error;
60          }
61  
62          instance = tmp;
63      }
64  
65      private final Selector selector;
66  
67      private final Queue newSessions = new Queue();
68  
69      private final Queue removingSessions = new Queue();
70  
71      private final Queue flushingSessions = new Queue();
72  
73      private final Queue readableSessions = new Queue();
74  
75      private Worker worker;
76  
77      private long lastIdleCheckTime = System.currentTimeMillis();
78  
79      private SocketIoProcessor() throws IOException
80      {
81          selector = Selector.open();
82      }
83  
84      static SocketIoProcessor getInstance()
85      {
86          return instance;
87      }
88  
89      void addSession( SocketSession session )
90      {
91          synchronized( this )
92          {
93              synchronized( newSessions )
94              {
95                  newSessions.push( session );
96              }
97              startupWorker();
98          }
99  
100         selector.wakeup();
101     }
102 
103     void removeSession( SocketSession session )
104     {
105         scheduleRemove( session );
106         startupWorker();
107         selector.wakeup();
108     }
109 
110     private synchronized void startupWorker()
111     {
112         if( worker == null )
113         {
114             worker = new Worker();
115             worker.start();
116         }
117     }
118 
119     void flushSession( SocketSession session )
120     {
121         scheduleFlush( session );
122         selector.wakeup();
123     }
124 
125     void addReadableSession( SocketSession session )
126     {
127         synchronized( readableSessions )
128         {
129             readableSessions.push( session );
130         }
131         selector.wakeup();
132     }
133 
134     private void addSessions()
135     {
136         if( newSessions.isEmpty() )
137             return;
138 
139         SocketSession session;
140 
141         for( ;; )
142         {
143             synchronized( newSessions )
144             {
145                 session = ( SocketSession ) newSessions.pop();
146             }
147 
148             if( session == null )
149                 break;
150 
151             SocketChannel ch = session.getChannel();
152             boolean registered;
153 
154             try
155             {
156                 ch.configureBlocking( false );
157                 session.setSelectionKey( ch.register( selector,
158                                                       SelectionKey.OP_READ,
159                                                       session ) );
160                 registered = true;
161             }
162             catch( IOException e )
163             {
164                 registered = false;
165                 session.getManagerFilterChain().exceptionCaught( session, e );
166             }
167 
168             if( registered )
169             {
170                 session.getManagerFilterChain().sessionOpened( session );
171             }
172         }
173     }
174 
175     private void removeSessions()
176     {
177         if( removingSessions.isEmpty() )
178             return;
179 
180         for( ;; )
181         {
182             SocketSession session;
183 
184             synchronized( removingSessions )
185             {
186                 session = ( SocketSession ) removingSessions.pop();
187             }
188 
189             if( session == null )
190                 break;
191 
192             SocketChannel ch = session.getChannel();
193             SelectionKey key = session.getSelectionKey();
194             // Retry later if session is not yet fully initialized.
195             // (In case that Session.close() is called before addSession() is processed)
196             if( key == null )
197             {
198                 scheduleRemove( session );
199                 break;
200             }
201 
202             // skip if channel is already closed
203             if( !key.isValid() )
204             {
205                 continue;
206             }
207 
208             try
209             {
210                 key.cancel();
211                 ch.close();
212             }
213             catch( IOException e )
214             {
215                 session.getManagerFilterChain().exceptionCaught( session, e );
216             }
217             finally
218             {
219                 releaseWriteBuffers( session );
220 
221                 session.getManagerFilterChain().sessionClosed( session );
222                 session.notifyClose();
223             }
224         }
225     }
226 
227     private void processSessions( Set selectedKeys )
228     {
229         Iterator it = selectedKeys.iterator();
230 
231         while( it.hasNext() )
232         {
233             SelectionKey key = ( SelectionKey ) it.next();
234             SocketSession session = ( SocketSession ) key.attachment();
235 
236             if( key.isReadable() )
237             {
238                 read( session );
239             }
240 
241             if( key.isWritable() )
242             {
243                 scheduleFlush( session );
244             }
245         }
246 
247         selectedKeys.clear();
248     }
249 
250     private void read( SocketSession session )
251     {
252         ByteBuffer buf = ByteBuffer.allocate(
253                 (( SocketSessionConfig ) session.getConfig()).getSessionReceiveBufferSize() ); 
254         SocketChannel ch = session.getChannel();
255 
256         try
257         {
258             int readBytes = 0;
259             int ret;
260 
261             buf.clear();
262 
263             try
264             {
265                 while( ( ret = ch.read( buf.buf() ) ) > 0 )
266                 {
267                     readBytes += ret;
268                 }
269             }
270             finally
271             {
272                 buf.flip();
273             }
274 
275             session.increaseReadBytes( readBytes );
276             session.setIdle( IdleStatus.BOTH_IDLE, false );
277             session.setIdle( IdleStatus.READER_IDLE, false );
278 
279             if( readBytes > 0 )
280             {
281                 ByteBuffer newBuf = ByteBuffer.allocate( readBytes );
282                 newBuf.put( buf );
283                 newBuf.flip();
284                 session.getManagerFilterChain().dataRead( session, newBuf );
285             }
286             if( ret < 0 )
287             {
288                 scheduleRemove( session );
289             }
290         }
291         catch( Throwable e )
292         {
293             if( e instanceof IOException )
294                 scheduleRemove( session );
295             session.getManagerFilterChain().exceptionCaught( session, e );
296         }
297         finally
298         {
299             buf.release();
300         }
301     }
302 
303     private void scheduleRemove( SocketSession session )
304     {
305         synchronized( removingSessions )
306         {
307             removingSessions.push( session );
308         }
309     }
310 
311     private void scheduleFlush( SocketSession session )
312     {
313         synchronized( flushingSessions )
314         {
315             flushingSessions.push( session );
316         }
317     }
318 
319     private void notifyIdleSessions()
320     {
321         Set keys = selector.keys();
322         Iterator it;
323         SocketSession session;
324 
325         // process idle sessions
326         long currentTime = System.currentTimeMillis();
327 
328         if( ( keys != null ) && ( ( currentTime - lastIdleCheckTime ) >= 1000 ) )
329         {
330             lastIdleCheckTime = currentTime;
331             it = keys.iterator();
332 
333             while( it.hasNext() )
334             {
335                 SelectionKey key = ( SelectionKey ) it.next();
336                 session = ( SocketSession ) key.attachment();
337 
338                 notifyIdleSession( session, currentTime );
339             }
340         }
341     }
342 
343     private void notifyIdleSession( SocketSession session, long currentTime )
344     {
345         SessionConfig config = session.getConfig();
346 
347         notifyIdleSession0( session, currentTime, config
348                 .getIdleTimeInMillis( IdleStatus.BOTH_IDLE ),
349                             IdleStatus.BOTH_IDLE, session.getLastIoTime() );
350         notifyIdleSession0( session, currentTime, config
351                 .getIdleTimeInMillis( IdleStatus.READER_IDLE ),
352                             IdleStatus.READER_IDLE, session.getLastReadTime() );
353         notifyIdleSession0( session, currentTime, config
354                 .getIdleTimeInMillis( IdleStatus.WRITER_IDLE ),
355                             IdleStatus.WRITER_IDLE, session.getLastWriteTime() );
356 
357         notifyWriteTimeoutSession( session, currentTime, config
358                 .getWriteTimeoutInMillis(), session.getLastWriteTime() );
359     }
360 
361     private void notifyIdleSession0( SocketSession session, long currentTime,
362                                     long idleTime, IdleStatus status,
363                                     long lastIoTime )
364     {
365         if( idleTime > 0 && !session.isIdle( status ) && lastIoTime != 0
366             && ( currentTime - lastIoTime ) >= idleTime )
367         {
368             session.setIdle( status, true );
369             session.getManagerFilterChain().sessionIdle( session, status );
370         }
371     }
372 
373     private void notifyWriteTimeoutSession( SocketSession session,
374                                            long currentTime,
375                                            long writeTimeout, long lastIoTime )
376     {
377         if( writeTimeout > 0
378             && ( currentTime - lastIoTime ) >= writeTimeout
379             && session.getSelectionKey() != null
380             && ( session.getSelectionKey().interestOps() & SelectionKey.OP_WRITE ) != 0 )
381         {
382             session
383                     .getManagerFilterChain()
384                     .exceptionCaught( session, new WriteTimeoutException() );
385         }
386     }
387 
388     private void flushSessions()
389     {
390         if( flushingSessions.size() == 0 )
391             return;
392 
393         for( ;; )
394         {
395             SocketSession session;
396 
397             synchronized( flushingSessions )
398             {
399                 session = ( SocketSession ) flushingSessions.pop();
400             }
401 
402             if( session == null )
403                 break;
404 
405             if( !session.isConnected() )
406             {
407                 releaseWriteBuffers( session );
408                 continue;
409             }
410 
411             // If encountered write request before session is initialized, 
412             // (In case that Session.write() is called before addSession() is processed)
413             if( session.getSelectionKey() == null )
414             {
415                 // Reschedule for later write
416                 scheduleFlush( session );
417                 break;
418             }
419             else
420             {
421                 try
422                 {
423                     flush( session );
424                 }
425                 catch( CancelledKeyException e )
426                 {
427                     // Connection is closed unexpectedly.
428                     scheduleRemove( session );
429                 }
430                 catch( IOException e )
431                 {
432                     scheduleRemove( session );
433                     session.getManagerFilterChain().exceptionCaught( session, e );
434                 }
435             }
436         }
437     }
438     
439     private void releaseWriteBuffers( SocketSession session )
440     {
441         Queue writeBufferQueue = session.getWriteBufferQueue();
442         session.getWriteMarkerQueue().clear();
443         ByteBuffer buf;
444         
445         while( ( buf = (ByteBuffer) writeBufferQueue.pop() ) != null )
446         {
447             try
448             {
449                 buf.release();
450             }
451             catch( IllegalStateException e )
452             {
453                 session.getManagerFilterChain().exceptionCaught( session, e );
454             }
455         }
456     }
457 
458     private void flush( SocketSession session ) throws IOException
459     {
460         SocketChannel ch = session.getChannel();
461 
462         Queue writeBufferQueue = session.getWriteBufferQueue();
463         Queue writeMarkerQueue = session.getWriteMarkerQueue();
464 
465         ByteBuffer buf;
466         Object marker;
467         for( ;; )
468         {
469             synchronized( writeBufferQueue )
470             {
471                 buf = ( ByteBuffer ) writeBufferQueue.first();
472                 marker = writeMarkerQueue.first();
473             }
474 
475             if( buf == null )
476                 break;
477 
478             if( buf.remaining() == 0 )
479             {
480                 synchronized( writeBufferQueue )
481                 {
482                     writeBufferQueue.pop();
483                     writeMarkerQueue.pop();
484                 }
485                 try
486                 {
487                     buf.release();
488                 }
489                 catch( IllegalStateException e )
490                 {
491                     session.getManagerFilterChain().exceptionCaught( session, e );
492                 }
493 
494                 session.increaseWrittenWriteRequests();
495                 session.getManagerFilterChain().dataWritten( session, marker );
496                 continue;
497             }
498 
499             int writtenBytes = 0;
500             try
501             {
502                 writtenBytes = ch.write( buf.buf() );
503             }
504             finally
505             {
506                 if( writtenBytes > 0 )
507                 {
508                     session.increaseWrittenBytes( writtenBytes );
509                     session.setIdle( IdleStatus.BOTH_IDLE, false );
510                     session.setIdle( IdleStatus.WRITER_IDLE, false );
511                 }
512 
513                 SelectionKey key = session.getSelectionKey();
514                 if( buf.hasRemaining() )
515                 {
516                     // Kernel buffer is full
517                     key
518                             .interestOps( key.interestOps()
519                                           | SelectionKey.OP_WRITE );
520                     break;
521                 }
522                 else
523                 {
524                     key.interestOps( key.interestOps()
525                                      & ( ~SelectionKey.OP_WRITE ) );
526                 }
527             }
528         }
529     }
530 
531     private class Worker extends Thread
532     {
533         public Worker()
534         {
535             super( "SocketIoProcessor" );
536         }
537 
538         public void run()
539         {
540             for( ;; )
541             {
542                 try
543                 {
544                     int nKeys = selector.select( 1000 );
545                     addSessions();
546 
547                     if( nKeys > 0 )
548                     {
549                         processSessions( selector.selectedKeys() );
550                     }
551 
552                     flushSessions();
553                     removeSessions();
554                     notifyIdleSessions();
555 
556                     if( selector.keys().isEmpty() )
557                     {
558                         synchronized( SocketIoProcessor.this )
559                         {
560                             if( selector.keys().isEmpty() &&
561                                 newSessions.isEmpty() )
562                             {
563                                 worker = null;
564                                 break;
565                             }
566                         }
567                     }
568                 }
569                 catch( IOException e )
570                 {
571                     e.printStackTrace();
572 
573                     try
574                     {
575                         Thread.sleep( 1000 );
576                     }
577                     catch( InterruptedException e1 )
578                     {
579                     }
580                 }
581             }
582         }
583     }
584 }