View Javadoc

1   /*
2    *   @(#) $Id: SocketIoProcessor.java 165128 2005-04-28 11:16:55Z trustin $
3    *
4    *   Copyright 2004 The Apache Software Foundation
5    *
6    *   Licensed under the Apache License, Version 2.0 (the "License");
7    *   you may not use this file except in compliance with the License.
8    *   You may obtain a copy of the License at
9    *
10   *       http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *   Unless required by applicable law or agreed to in writing, software
13   *   distributed under the License is distributed on an "AS IS" BASIS,
14   *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   *   See the License for the specific language governing permissions and
16   *   limitations under the License.
17   *
18   */
19  package org.apache.mina.io.socket;
20  
21  import java.io.IOException;
22  import java.nio.channels.SelectionKey;
23  import java.nio.channels.Selector;
24  import java.nio.channels.SocketChannel;
25  import java.util.Iterator;
26  import java.util.Set;
27  
28  import org.apache.mina.common.ByteBuffer;
29  import org.apache.mina.common.IdleStatus;
30  import org.apache.mina.common.SessionConfig;
31  import org.apache.mina.io.WriteTimeoutException;
32  import org.apache.mina.util.Queue;
33  
34  /***
35   * Performs all I/O operations for sockets which is connected or bound.
36   * This class is used by MINA internally.
37   * 
38   * @author Trustin Lee (trustin@apache.org)
39   * @version $Rev: 165128 $, $Date: 2005-04-28 20:16:55 +0900 (?, 28  4? 2005) $,
40   */
41  class SocketIoProcessor
42  {
43      private static final SocketIoProcessor instance;
44  
45      static
46      {
47          SocketIoProcessor tmp;
48  
49          try
50          {
51              tmp = new SocketIoProcessor();
52          }
53          catch( IOException e )
54          {
55              InternalError error = new InternalError(
56                                                       "Failed to open selector." );
57              error.initCause( e );
58              throw error;
59          }
60  
61          instance = tmp;
62      }
63  
64      private final Selector selector;
65  
66      private final Queue newSessions = new Queue();
67  
68      private final Queue removingSessions = new Queue();
69  
70      private final Queue flushingSessions = new Queue();
71  
72      private final Queue readableSessions = new Queue();
73  
74      private Worker worker;
75  
76      private long lastIdleCheckTime = System.currentTimeMillis();
77  
78      private SocketIoProcessor() throws IOException
79      {
80          selector = Selector.open();
81      }
82  
83      static SocketIoProcessor getInstance()
84      {
85          return instance;
86      }
87  
88      void addSession( SocketSession session )
89      {
90          synchronized( this )
91          {
92              synchronized( newSessions )
93              {
94                  newSessions.push( session );
95              }
96              startupWorker();
97          }
98  
99          selector.wakeup();
100     }
101 
102     void removeSession( SocketSession session )
103     {
104         scheduleRemove( session );
105         startupWorker();
106         selector.wakeup();
107     }
108 
109     private synchronized void startupWorker()
110     {
111         if( worker == null )
112         {
113             worker = new Worker();
114             worker.start();
115         }
116     }
117 
118     void flushSession( SocketSession session )
119     {
120         scheduleFlush( session );
121         selector.wakeup();
122     }
123 
124     void addReadableSession( SocketSession session )
125     {
126         synchronized( readableSessions )
127         {
128             readableSessions.push( session );
129         }
130         selector.wakeup();
131     }
132 
133     private void addSessions()
134     {
135         if( newSessions.isEmpty() )
136             return;
137 
138         SocketSession session;
139 
140         for( ;; )
141         {
142             synchronized( newSessions )
143             {
144                 session = ( SocketSession ) newSessions.pop();
145             }
146 
147             if( session == null )
148                 break;
149 
150             SocketChannel ch = session.getChannel();
151             boolean registered;
152 
153             try
154             {
155                 ch.configureBlocking( false );
156                 session.setSelectionKey( ch.register( selector,
157                                                       SelectionKey.OP_READ,
158                                                       session ) );
159                 registered = true;
160             }
161             catch( IOException e )
162             {
163                 registered = false;
164                 session.getManagerFilterChain().exceptionCaught( session, e );
165             }
166 
167             if( registered )
168             {
169                 session.getManagerFilterChain().sessionOpened( session );
170             }
171         }
172     }
173 
174     private void removeSessions()
175     {
176         if( removingSessions.isEmpty() )
177             return;
178 
179         for( ;; )
180         {
181             SocketSession session;
182 
183             synchronized( removingSessions )
184             {
185                 session = ( SocketSession ) removingSessions.pop();
186             }
187 
188             if( session == null )
189                 break;
190 
191             SocketChannel ch = session.getChannel();
192             SelectionKey key = session.getSelectionKey();
193             if( !key.isValid() ) // skip if channel is already closed
194             {
195                 continue;
196             }
197 
198             try
199             {
200                 key.cancel();
201                 ch.close();
202             }
203             catch( IOException e )
204             {
205                 session.getManagerFilterChain().exceptionCaught( session, e );
206             }
207             finally
208             {
209                 releaseWriteBuffers( session );
210 
211                 session.getManagerFilterChain().sessionClosed( session );
212                 session.notifyClose();
213             }
214         }
215     }
216 
217     private void processSessions( Set selectedKeys )
218     {
219         Iterator it = selectedKeys.iterator();
220 
221         while( it.hasNext() )
222         {
223             SelectionKey key = ( SelectionKey ) it.next();
224             SocketSession session = ( SocketSession ) key.attachment();
225 
226             if( key.isReadable() )
227             {
228                 read( session );
229             }
230 
231             if( key.isWritable() )
232             {
233                 scheduleFlush( session );
234             }
235         }
236 
237         selectedKeys.clear();
238     }
239 
240     private void read( SocketSession session )
241     {
242         ByteBuffer buf = ByteBuffer.allocate(
243                 (( SocketSessionConfig ) session.getConfig()).getSessionReceiveBufferSize() ); 
244         SocketChannel ch = session.getChannel();
245 
246         try
247         {
248             int readBytes = 0;
249             int ret;
250 
251             buf.clear();
252 
253             try
254             {
255                 while( ( ret = ch.read( buf.buf() ) ) > 0 )
256                 {
257                     readBytes += ret;
258                 }
259             }
260             finally
261             {
262                 buf.flip();
263             }
264 
265             session.increaseReadBytes( readBytes );
266             session.setIdle( IdleStatus.BOTH_IDLE, false );
267             session.setIdle( IdleStatus.READER_IDLE, false );
268 
269             if( readBytes > 0 )
270             {
271                 ByteBuffer newBuf = ByteBuffer.allocate( readBytes );
272                 newBuf.put( buf );
273                 newBuf.flip();
274                 session.getManagerFilterChain().dataRead( session, newBuf );
275             }
276             if( ret < 0 )
277             {
278                 scheduleRemove( session );
279             }
280         }
281         catch( Throwable e )
282         {
283             if( e instanceof IOException )
284                 scheduleRemove( session );
285             session.getManagerFilterChain().exceptionCaught( session, e );
286         }
287         finally
288         {
289             buf.release();
290         }
291     }
292 
293     private void scheduleRemove( SocketSession session )
294     {
295         synchronized( removingSessions )
296         {
297             removingSessions.push( session );
298         }
299     }
300 
301     private void scheduleFlush( SocketSession session )
302     {
303         synchronized( flushingSessions )
304         {
305             flushingSessions.push( session );
306         }
307     }
308 
309     private void notifyIdleSessions()
310     {
311         Set keys = selector.keys();
312         Iterator it;
313         SocketSession session;
314 
315         // process idle sessions
316         long currentTime = System.currentTimeMillis();
317 
318         if( ( keys != null ) && ( ( currentTime - lastIdleCheckTime ) >= 1000 ) )
319         {
320             lastIdleCheckTime = currentTime;
321             it = keys.iterator();
322 
323             while( it.hasNext() )
324             {
325                 SelectionKey key = ( SelectionKey ) it.next();
326                 session = ( SocketSession ) key.attachment();
327 
328                 notifyIdleSession( session, currentTime );
329             }
330         }
331     }
332 
333     private void notifyIdleSession( SocketSession session, long currentTime )
334     {
335         SessionConfig config = session.getConfig();
336 
337         notifyIdleSession0( session, currentTime, config
338                 .getIdleTimeInMillis( IdleStatus.BOTH_IDLE ),
339                             IdleStatus.BOTH_IDLE, session.getLastIoTime() );
340         notifyIdleSession0( session, currentTime, config
341                 .getIdleTimeInMillis( IdleStatus.READER_IDLE ),
342                             IdleStatus.READER_IDLE, session.getLastReadTime() );
343         notifyIdleSession0( session, currentTime, config
344                 .getIdleTimeInMillis( IdleStatus.WRITER_IDLE ),
345                             IdleStatus.WRITER_IDLE, session.getLastWriteTime() );
346 
347         notifyWriteTimeoutSession( session, currentTime, config
348                 .getWriteTimeoutInMillis(), session.getLastWriteTime() );
349     }
350 
351     private void notifyIdleSession0( SocketSession session, long currentTime,
352                                     long idleTime, IdleStatus status,
353                                     long lastIoTime )
354     {
355         if( idleTime > 0 && !session.isIdle( status ) && lastIoTime != 0
356             && ( currentTime - lastIoTime ) >= idleTime )
357         {
358             session.setIdle( status, true );
359             session.getManagerFilterChain().sessionIdle( session, status );
360         }
361     }
362 
363     private void notifyWriteTimeoutSession( SocketSession session,
364                                            long currentTime,
365                                            long writeTimeout, long lastIoTime )
366     {
367         if( writeTimeout > 0
368             && ( currentTime - lastIoTime ) >= writeTimeout
369             && ( session.getSelectionKey().interestOps() & SelectionKey.OP_WRITE ) != 0 )
370         {
371             session
372                     .getManagerFilterChain()
373                     .exceptionCaught( session, new WriteTimeoutException() );
374         }
375     }
376 
377     private void flushSessions()
378     {
379         if( flushingSessions.size() == 0 )
380             return;
381 
382         for( ;; )
383         {
384             SocketSession session;
385 
386             synchronized( flushingSessions )
387             {
388                 session = ( SocketSession ) flushingSessions.pop();
389             }
390 
391             if( session == null )
392                 break;
393 
394             if( !session.isConnected() )
395             {
396                 releaseWriteBuffers( session );
397                 continue;
398             }
399 
400             try
401             {
402                 flush( session );
403             }
404             catch( IOException e )
405             {
406                 scheduleRemove( session );
407                 session.getManagerFilterChain().exceptionCaught( session, e );
408             }
409         }
410     }
411     
412     private void releaseWriteBuffers( SocketSession session )
413     {
414         Queue writeBufferQueue = session.getWriteBufferQueue();
415         session.getWriteMarkerQueue().clear();
416         ByteBuffer buf;
417         
418         while( ( buf = (ByteBuffer) writeBufferQueue.pop() ) != null )
419         {
420             try
421             {
422                 buf.release();
423             }
424             catch( IllegalStateException e )
425             {
426                 session.getManagerFilterChain().exceptionCaught( session, e );
427             }
428         }
429     }
430 
431     private void flush( SocketSession session ) throws IOException
432     {
433         SocketChannel ch = session.getChannel();
434 
435         Queue writeBufferQueue = session.getWriteBufferQueue();
436         Queue writeMarkerQueue = session.getWriteMarkerQueue();
437 
438         ByteBuffer buf;
439         Object marker;
440         for( ;; )
441         {
442             synchronized( writeBufferQueue )
443             {
444                 buf = ( ByteBuffer ) writeBufferQueue.first();
445                 marker = writeMarkerQueue.first();
446             }
447 
448             if( buf == null )
449                 break;
450 
451             if( buf.remaining() == 0 )
452             {
453                 synchronized( writeBufferQueue )
454                 {
455                     writeBufferQueue.pop();
456                     writeMarkerQueue.pop();
457                 }
458                 try
459                 {
460                     buf.release();
461                 }
462                 catch( IllegalStateException e )
463                 {
464                     session.getManagerFilterChain().exceptionCaught( session, e );
465                 }
466 
467                 session.getManagerFilterChain().dataWritten( session, marker );
468                 continue;
469             }
470 
471             int writtenBytes = 0;
472             try
473             {
474                 writtenBytes = ch.write( buf.buf() );
475             }
476             finally
477             {
478                 if( writtenBytes > 0 )
479                 {
480                     session.increaseWrittenBytes( writtenBytes );
481                     session.setIdle( IdleStatus.BOTH_IDLE, false );
482                     session.setIdle( IdleStatus.WRITER_IDLE, false );
483                 }
484 
485                 SelectionKey key = session.getSelectionKey();
486                 if( buf.hasRemaining() )
487                 {
488                     // Kernel buffer is full
489                     key
490                             .interestOps( key.interestOps()
491                                           | SelectionKey.OP_WRITE );
492                     break;
493                 }
494                 else
495                 {
496                     key.interestOps( key.interestOps()
497                                      & ( ~SelectionKey.OP_WRITE ) );
498                 }
499             }
500         }
501     }
502 
503     private class Worker extends Thread
504     {
505         public Worker()
506         {
507             super( "SocketIoProcessor" );
508         }
509 
510         public void run()
511         {
512             for( ;; )
513             {
514                 try
515                 {
516                     int nKeys = selector.select( 1000 );
517                     addSessions();
518 
519                     if( nKeys > 0 )
520                     {
521                         processSessions( selector.selectedKeys() );
522                     }
523 
524                     flushSessions();
525                     removeSessions();
526                     notifyIdleSessions();
527 
528                     if( selector.keys().isEmpty() )
529                     {
530                         synchronized( SocketIoProcessor.this )
531                         {
532                             if( selector.keys().isEmpty() &&
533                                 newSessions.isEmpty() )
534                             {
535                                 worker = null;
536                                 break;
537                             }
538                         }
539                     }
540                 }
541                 catch( IOException e )
542                 {
543                     e.printStackTrace();
544 
545                     try
546                     {
547                         Thread.sleep( 1000 );
548                     }
549                     catch( InterruptedException e1 )
550                     {
551                     }
552                 }
553             }
554         }
555     }
556 }