EMMA Coverage Report (generated Sat Sep 03 11:42:34 KST 2005)
[all classes][org.apache.mina.io.socket]

COVERAGE SUMMARY FOR SOURCE FILE [SocketIoProcessor.java]

nameclass, %method, %block, %line, %
SocketIoProcessor.java100% (2/2)96%  (22/23)68%  (609/895)75%  (182.9/243)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class SocketIoProcessor100% (1/1)95%  (20/21)67%  (543/813)76%  (165.9/219)
addReadableSession (SocketSession): void 0%   (0/1)0%   (0/22)0%   (0/5)
notifyIdleSession0 (SocketSession, long, long, IdleStatus, long): void 100% (1/1)18%  (5/28)31%  (1.2/4)
notifyWriteTimeoutSession (SocketSession, long, long, long): void 100% (1/1)19%  (5/27)40%  (1.2/3)
<static initializer> 100% (1/1)40%  (8/20)50%  (4/8)
removeSessions (): void 100% (1/1)56%  (48/86)67%  (16.1/24)
flush (SocketSession): void 100% (1/1)59%  (89/150)76%  (26.6/35)
flushSessions (): void 100% (1/1)62%  (37/60)64%  (14.2/22)
releaseWriteBuffers (SocketSession): void 100% (1/1)70%  (16/23)67%  (6/9)
read (SocketSession): void 100% (1/1)72%  (67/93)74%  (19.2/26)
addSession (SocketSession): void 100% (1/1)72%  (26/36)91%  (7.3/8)
scheduleFlush (SocketSession): void 100% (1/1)72%  (13/18)93%  (3.7/4)
scheduleRemove (SocketSession): void 100% (1/1)72%  (13/18)93%  (3.7/4)
addSessions (): void 100% (1/1)78%  (47/60)83%  (15.7/19)
processSessions (Set): void 100% (1/1)90%  (27/30)91%  (10/11)
SocketIoProcessor (): void 100% (1/1)100% (29/29)100% (8/8)
flushSession (SocketSession): void 100% (1/1)100% (8/8)100% (3/3)
getInstance (): SocketIoProcessor 100% (1/1)100% (2/2)100% (1/1)
notifyIdleSession (SocketSession, long): void 100% (1/1)100% (42/42)100% (6/6)
notifyIdleSessions (): void 100% (1/1)100% (38/38)100% (11/11)
removeSession (SocketSession): void 100% (1/1)100% (10/10)100% (4/4)
startupWorker (): void 100% (1/1)100% (13/13)100% (4/4)
     
class SocketIoProcessor$Worker100% (1/1)100% (2/2)80%  (66/82)71%  (17/24)
run (): void 100% (1/1)79%  (59/75)67%  (14/21)
SocketIoProcessor$Worker (SocketIoProcessor): void 100% (1/1)100% (7/7)100% (3/3)

1/*
2 *   @(#) $Id: SocketIoProcessor.java 264677 2005-08-30 02:44:35Z trustin $
3 *
4 *   Copyright 2004 The Apache Software Foundation
5 *
6 *   Licensed under the Apache License, Version 2.0 (the "License");
7 *   you may not use this file except in compliance with the License.
8 *   You may obtain a copy of the License at
9 *
10 *       http://www.apache.org/licenses/LICENSE-2.0
11 *
12 *   Unless required by applicable law or agreed to in writing, software
13 *   distributed under the License is distributed on an "AS IS" BASIS,
14 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 *   See the License for the specific language governing permissions and
16 *   limitations under the License.
17 *
18 */
19package org.apache.mina.io.socket;
20 
21import java.io.IOException;
22import java.nio.channels.CancelledKeyException;
23import java.nio.channels.SelectionKey;
24import java.nio.channels.Selector;
25import java.nio.channels.SocketChannel;
26import java.util.Iterator;
27import java.util.Set;
28 
29import org.apache.mina.common.ByteBuffer;
30import org.apache.mina.common.IdleStatus;
31import org.apache.mina.common.SessionConfig;
32import org.apache.mina.io.WriteTimeoutException;
33import org.apache.mina.util.Queue;
34 
35/**
36 * Performs all I/O operations for sockets which is connected or bound.
37 * This class is used by MINA internally.
38 * 
39 * @author Trustin Lee (trustin@apache.org)
40 * @version $Rev: 264677 $, $Date: 2005-08-30 11:44:35 +0900 $,
41 */
42class SocketIoProcessor
43{
44    private static final SocketIoProcessor instance;
45 
46    static
47    {
48        SocketIoProcessor tmp;
49 
50        try
51        {
52            tmp = new SocketIoProcessor();
53        }
54        catch( IOException e )
55        {
56            InternalError error = new InternalError(
57                                                     "Failed to open selector." );
58            error.initCause( e );
59            throw error;
60        }
61 
62        instance = tmp;
63    }
64 
65    private final Selector selector;
66 
67    private final Queue newSessions = new Queue();
68 
69    private final Queue removingSessions = new Queue();
70 
71    private final Queue flushingSessions = new Queue();
72 
73    private final Queue readableSessions = new Queue();
74 
75    private Worker worker;
76 
77    private long lastIdleCheckTime = System.currentTimeMillis();
78 
79    private SocketIoProcessor() throws IOException
80    {
81        selector = Selector.open();
82    }
83 
84    static SocketIoProcessor getInstance()
85    {
86        return instance;
87    }
88 
89    void addSession( SocketSession session )
90    {
91        synchronized( this )
92        {
93            synchronized( newSessions )
94            {
95                newSessions.push( session );
96            }
97            startupWorker();
98        }
99 
100        selector.wakeup();
101    }
102 
103    void removeSession( SocketSession session )
104    {
105        scheduleRemove( session );
106        startupWorker();
107        selector.wakeup();
108    }
109 
110    private synchronized void startupWorker()
111    {
112        if( worker == null )
113        {
114            worker = new Worker();
115            worker.start();
116        }
117    }
118 
119    void flushSession( SocketSession session )
120    {
121        scheduleFlush( session );
122        selector.wakeup();
123    }
124 
125    void addReadableSession( SocketSession session )
126    {
127        synchronized( readableSessions )
128        {
129            readableSessions.push( session );
130        }
131        selector.wakeup();
132    }
133 
134    private void addSessions()
135    {
136        if( newSessions.isEmpty() )
137            return;
138 
139        SocketSession session;
140 
141        for( ;; )
142        {
143            synchronized( newSessions )
144            {
145                session = ( SocketSession ) newSessions.pop();
146            }
147 
148            if( session == null )
149                break;
150 
151            SocketChannel ch = session.getChannel();
152            boolean registered;
153 
154            try
155            {
156                ch.configureBlocking( false );
157                session.setSelectionKey( ch.register( selector,
158                                                      SelectionKey.OP_READ,
159                                                      session ) );
160                registered = true;
161            }
162            catch( IOException e )
163            {
164                registered = false;
165                session.getManagerFilterChain().exceptionCaught( session, e );
166            }
167 
168            if( registered )
169            {
170                session.getManagerFilterChain().sessionOpened( session );
171            }
172        }
173    }
174 
175    private void removeSessions()
176    {
177        if( removingSessions.isEmpty() )
178            return;
179 
180        for( ;; )
181        {
182            SocketSession session;
183 
184            synchronized( removingSessions )
185            {
186                session = ( SocketSession ) removingSessions.pop();
187            }
188 
189            if( session == null )
190                break;
191 
192            SocketChannel ch = session.getChannel();
193            SelectionKey key = session.getSelectionKey();
194            // Retry later if session is not yet fully initialized.
195            // (In case that Session.close() is called before addSession() is processed)
196            if( key == null )
197            {
198                scheduleRemove( session );
199                break;
200            }
201 
202            // skip if channel is already closed
203            if( !key.isValid() )
204            {
205                continue;
206            }
207 
208            try
209            {
210                key.cancel();
211                ch.close();
212            }
213            catch( IOException e )
214            {
215                session.getManagerFilterChain().exceptionCaught( session, e );
216            }
217            finally
218            {
219                releaseWriteBuffers( session );
220 
221                session.getManagerFilterChain().sessionClosed( session );
222                session.notifyClose();
223            }
224        }
225    }
226 
227    private void processSessions( Set selectedKeys )
228    {
229        Iterator it = selectedKeys.iterator();
230 
231        while( it.hasNext() )
232        {
233            SelectionKey key = ( SelectionKey ) it.next();
234            SocketSession session = ( SocketSession ) key.attachment();
235 
236            if( key.isReadable() )
237            {
238                read( session );
239            }
240 
241            if( key.isWritable() )
242            {
243                scheduleFlush( session );
244            }
245        }
246 
247        selectedKeys.clear();
248    }
249 
250    private void read( SocketSession session )
251    {
252        ByteBuffer buf = ByteBuffer.allocate(
253                (( SocketSessionConfig ) session.getConfig()).getSessionReceiveBufferSize() ); 
254        SocketChannel ch = session.getChannel();
255 
256        try
257        {
258            int readBytes = 0;
259            int ret;
260 
261            buf.clear();
262 
263            try
264            {
265                while( ( ret = ch.read( buf.buf() ) ) > 0 )
266                {
267                    readBytes += ret;
268                }
269            }
270            finally
271            {
272                buf.flip();
273            }
274 
275            session.increaseReadBytes( readBytes );
276            session.setIdle( IdleStatus.BOTH_IDLE, false );
277            session.setIdle( IdleStatus.READER_IDLE, false );
278 
279            if( readBytes > 0 )
280            {
281                ByteBuffer newBuf = ByteBuffer.allocate( readBytes );
282                newBuf.put( buf );
283                newBuf.flip();
284                session.getManagerFilterChain().dataRead( session, newBuf );
285            }
286            if( ret < 0 )
287            {
288                scheduleRemove( session );
289            }
290        }
291        catch( Throwable e )
292        {
293            if( e instanceof IOException )
294                scheduleRemove( session );
295            session.getManagerFilterChain().exceptionCaught( session, e );
296        }
297        finally
298        {
299            buf.release();
300        }
301    }
302 
303    private void scheduleRemove( SocketSession session )
304    {
305        synchronized( removingSessions )
306        {
307            removingSessions.push( session );
308        }
309    }
310 
311    private void scheduleFlush( SocketSession session )
312    {
313        synchronized( flushingSessions )
314        {
315            flushingSessions.push( session );
316        }
317    }
318 
319    private void notifyIdleSessions()
320    {
321        Set keys = selector.keys();
322        Iterator it;
323        SocketSession session;
324 
325        // process idle sessions
326        long currentTime = System.currentTimeMillis();
327 
328        if( ( keys != null ) && ( ( currentTime - lastIdleCheckTime ) >= 1000 ) )
329        {
330            lastIdleCheckTime = currentTime;
331            it = keys.iterator();
332 
333            while( it.hasNext() )
334            {
335                SelectionKey key = ( SelectionKey ) it.next();
336                session = ( SocketSession ) key.attachment();
337 
338                notifyIdleSession( session, currentTime );
339            }
340        }
341    }
342 
343    private void notifyIdleSession( SocketSession session, long currentTime )
344    {
345        SessionConfig config = session.getConfig();
346 
347        notifyIdleSession0( session, currentTime, config
348                .getIdleTimeInMillis( IdleStatus.BOTH_IDLE ),
349                            IdleStatus.BOTH_IDLE, session.getLastIoTime() );
350        notifyIdleSession0( session, currentTime, config
351                .getIdleTimeInMillis( IdleStatus.READER_IDLE ),
352                            IdleStatus.READER_IDLE, session.getLastReadTime() );
353        notifyIdleSession0( session, currentTime, config
354                .getIdleTimeInMillis( IdleStatus.WRITER_IDLE ),
355                            IdleStatus.WRITER_IDLE, session.getLastWriteTime() );
356 
357        notifyWriteTimeoutSession( session, currentTime, config
358                .getWriteTimeoutInMillis(), session.getLastWriteTime() );
359    }
360 
361    private void notifyIdleSession0( SocketSession session, long currentTime,
362                                    long idleTime, IdleStatus status,
363                                    long lastIoTime )
364    {
365        if( idleTime > 0 && !session.isIdle( status ) && lastIoTime != 0
366            && ( currentTime - lastIoTime ) >= idleTime )
367        {
368            session.setIdle( status, true );
369            session.getManagerFilterChain().sessionIdle( session, status );
370        }
371    }
372 
373    private void notifyWriteTimeoutSession( SocketSession session,
374                                           long currentTime,
375                                           long writeTimeout, long lastIoTime )
376    {
377        if( writeTimeout > 0
378            && ( currentTime - lastIoTime ) >= writeTimeout
379            && session.getSelectionKey() != null
380            && ( session.getSelectionKey().interestOps() & SelectionKey.OP_WRITE ) != 0 )
381        {
382            session
383                    .getManagerFilterChain()
384                    .exceptionCaught( session, new WriteTimeoutException() );
385        }
386    }
387 
388    private void flushSessions()
389    {
390        if( flushingSessions.size() == 0 )
391            return;
392 
393        for( ;; )
394        {
395            SocketSession session;
396 
397            synchronized( flushingSessions )
398            {
399                session = ( SocketSession ) flushingSessions.pop();
400            }
401 
402            if( session == null )
403                break;
404 
405            if( !session.isConnected() )
406            {
407                releaseWriteBuffers( session );
408                continue;
409            }
410 
411            // If encountered write request before session is initialized, 
412            // (In case that Session.write() is called before addSession() is processed)
413            if( session.getSelectionKey() == null )
414            {
415                // Reschedule for later write
416                scheduleFlush( session );
417                break;
418            }
419            else
420            {
421                try
422                {
423                    flush( session );
424                }
425                catch( CancelledKeyException e )
426                {
427                    // Connection is closed unexpectedly.
428                    scheduleRemove( session );
429                }
430                catch( IOException e )
431                {
432                    scheduleRemove( session );
433                    session.getManagerFilterChain().exceptionCaught( session, e );
434                }
435            }
436        }
437    }
438    
439    private void releaseWriteBuffers( SocketSession session )
440    {
441        Queue writeBufferQueue = session.getWriteBufferQueue();
442        session.getWriteMarkerQueue().clear();
443        ByteBuffer buf;
444        
445        while( ( buf = (ByteBuffer) writeBufferQueue.pop() ) != null )
446        {
447            try
448            {
449                buf.release();
450            }
451            catch( IllegalStateException e )
452            {
453                session.getManagerFilterChain().exceptionCaught( session, e );
454            }
455        }
456    }
457 
458    private void flush( SocketSession session ) throws IOException
459    {
460        SocketChannel ch = session.getChannel();
461 
462        Queue writeBufferQueue = session.getWriteBufferQueue();
463        Queue writeMarkerQueue = session.getWriteMarkerQueue();
464 
465        ByteBuffer buf;
466        Object marker;
467        for( ;; )
468        {
469            synchronized( writeBufferQueue )
470            {
471                buf = ( ByteBuffer ) writeBufferQueue.first();
472                marker = writeMarkerQueue.first();
473            }
474 
475            if( buf == null )
476                break;
477 
478            if( buf.remaining() == 0 )
479            {
480                synchronized( writeBufferQueue )
481                {
482                    writeBufferQueue.pop();
483                    writeMarkerQueue.pop();
484                }
485                try
486                {
487                    buf.release();
488                }
489                catch( IllegalStateException e )
490                {
491                    session.getManagerFilterChain().exceptionCaught( session, e );
492                }
493 
494                session.increaseWrittenWriteRequests();
495                session.getManagerFilterChain().dataWritten( session, marker );
496                continue;
497            }
498 
499            int writtenBytes = 0;
500            try
501            {
502                writtenBytes = ch.write( buf.buf() );
503            }
504            finally
505            {
506                if( writtenBytes > 0 )
507                {
508                    session.increaseWrittenBytes( writtenBytes );
509                    session.setIdle( IdleStatus.BOTH_IDLE, false );
510                    session.setIdle( IdleStatus.WRITER_IDLE, false );
511                }
512 
513                SelectionKey key = session.getSelectionKey();
514                if( buf.hasRemaining() )
515                {
516                    // Kernel buffer is full
517                    key
518                            .interestOps( key.interestOps()
519                                          | SelectionKey.OP_WRITE );
520                    break;
521                }
522                else
523                {
524                    key.interestOps( key.interestOps()
525                                     & ( ~SelectionKey.OP_WRITE ) );
526                }
527            }
528        }
529    }
530 
531    private class Worker extends Thread
532    {
533        public Worker()
534        {
535            super( "SocketIoProcessor" );
536        }
537 
538        public void run()
539        {
540            for( ;; )
541            {
542                try
543                {
544                    int nKeys = selector.select( 1000 );
545                    addSessions();
546 
547                    if( nKeys > 0 )
548                    {
549                        processSessions( selector.selectedKeys() );
550                    }
551 
552                    flushSessions();
553                    removeSessions();
554                    notifyIdleSessions();
555 
556                    if( selector.keys().isEmpty() )
557                    {
558                        synchronized( SocketIoProcessor.this )
559                        {
560                            if( selector.keys().isEmpty() &&
561                                newSessions.isEmpty() )
562                            {
563                                worker = null;
564                                break;
565                            }
566                        }
567                    }
568                }
569                catch( IOException e )
570                {
571                    e.printStackTrace();
572 
573                    try
574                    {
575                        Thread.sleep( 1000 );
576                    }
577                    catch( InterruptedException e1 )
578                    {
579                    }
580                }
581            }
582        }
583    }
584}

[all classes][org.apache.mina.io.socket]
EMMA 2.0.4217 (C) Vladimir Roubtsov