EMMA Coverage Report (generated Mon Jul 11 13:15:38 KST 2005)
[all classes][org.apache.mina.io.socket]

COVERAGE SUMMARY FOR SOURCE FILE [SocketIoProcessor.java]

nameclass, %method, %block, %line, %
SocketIoProcessor.java100% (2/2)96%  (22/23)68%  (609/890)76%  (183.4/241)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class SocketIoProcessor100% (1/1)95%  (20/21)67%  (543/808)77%  (166.4/217)
addReadableSession (SocketSession): void 0%   (0/1)0%   (0/22)0%   (0/5)
notifyIdleSession0 (SocketSession, long, long, IdleStatus, long): void 100% (1/1)18%  (5/28)31%  (1.2/4)
notifyWriteTimeoutSession (SocketSession, long, long, long): void 100% (1/1)19%  (5/27)40%  (1.2/3)
<static initializer> 100% (1/1)40%  (8/20)50%  (4/8)
removeSessions (): void 100% (1/1)56%  (48/86)67%  (16.1/24)
flush (SocketSession): void 100% (1/1)59%  (89/150)76%  (26.6/35)
flushSessions (): void 100% (1/1)67%  (37/55)74%  (14.7/20)
releaseWriteBuffers (SocketSession): void 100% (1/1)70%  (16/23)67%  (6/9)
read (SocketSession): void 100% (1/1)72%  (67/93)74%  (19.2/26)
addSession (SocketSession): void 100% (1/1)72%  (26/36)91%  (7.3/8)
scheduleFlush (SocketSession): void 100% (1/1)72%  (13/18)93%  (3.7/4)
scheduleRemove (SocketSession): void 100% (1/1)72%  (13/18)93%  (3.7/4)
addSessions (): void 100% (1/1)78%  (47/60)83%  (15.7/19)
processSessions (Set): void 100% (1/1)90%  (27/30)91%  (10/11)
SocketIoProcessor (): void 100% (1/1)100% (29/29)100% (8/8)
flushSession (SocketSession): void 100% (1/1)100% (8/8)100% (3/3)
getInstance (): SocketIoProcessor 100% (1/1)100% (2/2)100% (1/1)
notifyIdleSession (SocketSession, long): void 100% (1/1)100% (42/42)100% (6/6)
notifyIdleSessions (): void 100% (1/1)100% (38/38)100% (11/11)
removeSession (SocketSession): void 100% (1/1)100% (10/10)100% (4/4)
startupWorker (): void 100% (1/1)100% (13/13)100% (4/4)
     
class SocketIoProcessor$Worker100% (1/1)100% (2/2)80%  (66/82)71%  (17/24)
run (): void 100% (1/1)79%  (59/75)67%  (14/21)
SocketIoProcessor$Worker (SocketIoProcessor): void 100% (1/1)100% (7/7)100% (3/3)

1/*
2 *   @(#) $Id: SocketIoProcessor.java 210062 2005-07-11 03:52:38Z trustin $
3 *
4 *   Copyright 2004 The Apache Software Foundation
5 *
6 *   Licensed under the Apache License, Version 2.0 (the "License");
7 *   you may not use this file except in compliance with the License.
8 *   You may obtain a copy of the License at
9 *
10 *       http://www.apache.org/licenses/LICENSE-2.0
11 *
12 *   Unless required by applicable law or agreed to in writing, software
13 *   distributed under the License is distributed on an "AS IS" BASIS,
14 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 *   See the License for the specific language governing permissions and
16 *   limitations under the License.
17 *
18 */
19package org.apache.mina.io.socket;
20 
21import java.io.IOException;
22import java.nio.channels.SelectionKey;
23import java.nio.channels.Selector;
24import java.nio.channels.SocketChannel;
25import java.util.Iterator;
26import java.util.Set;
27 
28import org.apache.mina.common.ByteBuffer;
29import org.apache.mina.common.IdleStatus;
30import org.apache.mina.common.SessionConfig;
31import org.apache.mina.io.WriteTimeoutException;
32import org.apache.mina.util.Queue;
33 
34/**
35 * Performs all I/O operations for sockets which is connected or bound.
36 * This class is used by MINA internally.
37 * 
38 * @author Trustin Lee (trustin@apache.org)
39 * @version $Rev: 210062 $, $Date: 2005-07-11 12:52:38 +0900 $,
40 */
41class SocketIoProcessor
42{
43    private static final SocketIoProcessor instance;
44 
45    static
46    {
47        SocketIoProcessor tmp;
48 
49        try
50        {
51            tmp = new SocketIoProcessor();
52        }
53        catch( IOException e )
54        {
55            InternalError error = new InternalError(
56                                                     "Failed to open selector." );
57            error.initCause( e );
58            throw error;
59        }
60 
61        instance = tmp;
62    }
63 
64    private final Selector selector;
65 
66    private final Queue newSessions = new Queue();
67 
68    private final Queue removingSessions = new Queue();
69 
70    private final Queue flushingSessions = new Queue();
71 
72    private final Queue readableSessions = new Queue();
73 
74    private Worker worker;
75 
76    private long lastIdleCheckTime = System.currentTimeMillis();
77 
78    private SocketIoProcessor() throws IOException
79    {
80        selector = Selector.open();
81    }
82 
83    static SocketIoProcessor getInstance()
84    {
85        return instance;
86    }
87 
88    void addSession( SocketSession session )
89    {
90        synchronized( this )
91        {
92            synchronized( newSessions )
93            {
94                newSessions.push( session );
95            }
96            startupWorker();
97        }
98 
99        selector.wakeup();
100    }
101 
102    void removeSession( SocketSession session )
103    {
104        scheduleRemove( session );
105        startupWorker();
106        selector.wakeup();
107    }
108 
109    private synchronized void startupWorker()
110    {
111        if( worker == null )
112        {
113            worker = new Worker();
114            worker.start();
115        }
116    }
117 
118    void flushSession( SocketSession session )
119    {
120        scheduleFlush( session );
121        selector.wakeup();
122    }
123 
124    void addReadableSession( SocketSession session )
125    {
126        synchronized( readableSessions )
127        {
128            readableSessions.push( session );
129        }
130        selector.wakeup();
131    }
132 
133    private void addSessions()
134    {
135        if( newSessions.isEmpty() )
136            return;
137 
138        SocketSession session;
139 
140        for( ;; )
141        {
142            synchronized( newSessions )
143            {
144                session = ( SocketSession ) newSessions.pop();
145            }
146 
147            if( session == null )
148                break;
149 
150            SocketChannel ch = session.getChannel();
151            boolean registered;
152 
153            try
154            {
155                ch.configureBlocking( false );
156                session.setSelectionKey( ch.register( selector,
157                                                      SelectionKey.OP_READ,
158                                                      session ) );
159                registered = true;
160            }
161            catch( IOException e )
162            {
163                registered = false;
164                session.getManagerFilterChain().exceptionCaught( session, e );
165            }
166 
167            if( registered )
168            {
169                session.getManagerFilterChain().sessionOpened( session );
170            }
171        }
172    }
173 
174    private void removeSessions()
175    {
176        if( removingSessions.isEmpty() )
177            return;
178 
179        for( ;; )
180        {
181            SocketSession session;
182 
183            synchronized( removingSessions )
184            {
185                session = ( SocketSession ) removingSessions.pop();
186            }
187 
188            if( session == null )
189                break;
190 
191            SocketChannel ch = session.getChannel();
192            SelectionKey key = session.getSelectionKey();
193            // Retry later if session is not yet fully initialized.
194            // (In case that Session.close() is called before addSession() is processed)
195            if( key == null )
196            {
197                scheduleRemove( session );
198                break;
199            }
200 
201            // skip if channel is already closed
202            if( !key.isValid() )
203            {
204                continue;
205            }
206 
207            try
208            {
209                key.cancel();
210                ch.close();
211            }
212            catch( IOException e )
213            {
214                session.getManagerFilterChain().exceptionCaught( session, e );
215            }
216            finally
217            {
218                releaseWriteBuffers( session );
219 
220                session.getManagerFilterChain().sessionClosed( session );
221                session.notifyClose();
222            }
223        }
224    }
225 
226    private void processSessions( Set selectedKeys )
227    {
228        Iterator it = selectedKeys.iterator();
229 
230        while( it.hasNext() )
231        {
232            SelectionKey key = ( SelectionKey ) it.next();
233            SocketSession session = ( SocketSession ) key.attachment();
234 
235            if( key.isReadable() )
236            {
237                read( session );
238            }
239 
240            if( key.isWritable() )
241            {
242                scheduleFlush( session );
243            }
244        }
245 
246        selectedKeys.clear();
247    }
248 
249    private void read( SocketSession session )
250    {
251        ByteBuffer buf = ByteBuffer.allocate(
252                (( SocketSessionConfig ) session.getConfig()).getSessionReceiveBufferSize() ); 
253        SocketChannel ch = session.getChannel();
254 
255        try
256        {
257            int readBytes = 0;
258            int ret;
259 
260            buf.clear();
261 
262            try
263            {
264                while( ( ret = ch.read( buf.buf() ) ) > 0 )
265                {
266                    readBytes += ret;
267                }
268            }
269            finally
270            {
271                buf.flip();
272            }
273 
274            session.increaseReadBytes( readBytes );
275            session.setIdle( IdleStatus.BOTH_IDLE, false );
276            session.setIdle( IdleStatus.READER_IDLE, false );
277 
278            if( readBytes > 0 )
279            {
280                ByteBuffer newBuf = ByteBuffer.allocate( readBytes );
281                newBuf.put( buf );
282                newBuf.flip();
283                session.getManagerFilterChain().dataRead( session, newBuf );
284            }
285            if( ret < 0 )
286            {
287                scheduleRemove( session );
288            }
289        }
290        catch( Throwable e )
291        {
292            if( e instanceof IOException )
293                scheduleRemove( session );
294            session.getManagerFilterChain().exceptionCaught( session, e );
295        }
296        finally
297        {
298            buf.release();
299        }
300    }
301 
302    private void scheduleRemove( SocketSession session )
303    {
304        synchronized( removingSessions )
305        {
306            removingSessions.push( session );
307        }
308    }
309 
310    private void scheduleFlush( SocketSession session )
311    {
312        synchronized( flushingSessions )
313        {
314            flushingSessions.push( session );
315        }
316    }
317 
318    private void notifyIdleSessions()
319    {
320        Set keys = selector.keys();
321        Iterator it;
322        SocketSession session;
323 
324        // process idle sessions
325        long currentTime = System.currentTimeMillis();
326 
327        if( ( keys != null ) && ( ( currentTime - lastIdleCheckTime ) >= 1000 ) )
328        {
329            lastIdleCheckTime = currentTime;
330            it = keys.iterator();
331 
332            while( it.hasNext() )
333            {
334                SelectionKey key = ( SelectionKey ) it.next();
335                session = ( SocketSession ) key.attachment();
336 
337                notifyIdleSession( session, currentTime );
338            }
339        }
340    }
341 
342    private void notifyIdleSession( SocketSession session, long currentTime )
343    {
344        SessionConfig config = session.getConfig();
345 
346        notifyIdleSession0( session, currentTime, config
347                .getIdleTimeInMillis( IdleStatus.BOTH_IDLE ),
348                            IdleStatus.BOTH_IDLE, session.getLastIoTime() );
349        notifyIdleSession0( session, currentTime, config
350                .getIdleTimeInMillis( IdleStatus.READER_IDLE ),
351                            IdleStatus.READER_IDLE, session.getLastReadTime() );
352        notifyIdleSession0( session, currentTime, config
353                .getIdleTimeInMillis( IdleStatus.WRITER_IDLE ),
354                            IdleStatus.WRITER_IDLE, session.getLastWriteTime() );
355 
356        notifyWriteTimeoutSession( session, currentTime, config
357                .getWriteTimeoutInMillis(), session.getLastWriteTime() );
358    }
359 
360    private void notifyIdleSession0( SocketSession session, long currentTime,
361                                    long idleTime, IdleStatus status,
362                                    long lastIoTime )
363    {
364        if( idleTime > 0 && !session.isIdle( status ) && lastIoTime != 0
365            && ( currentTime - lastIoTime ) >= idleTime )
366        {
367            session.setIdle( status, true );
368            session.getManagerFilterChain().sessionIdle( session, status );
369        }
370    }
371 
372    private void notifyWriteTimeoutSession( SocketSession session,
373                                           long currentTime,
374                                           long writeTimeout, long lastIoTime )
375    {
376        if( writeTimeout > 0
377            && ( currentTime - lastIoTime ) >= writeTimeout
378            && session.getSelectionKey() != null
379            && ( session.getSelectionKey().interestOps() & SelectionKey.OP_WRITE ) != 0 )
380        {
381            session
382                    .getManagerFilterChain()
383                    .exceptionCaught( session, new WriteTimeoutException() );
384        }
385    }
386 
387    private void flushSessions()
388    {
389        if( flushingSessions.size() == 0 )
390            return;
391 
392        for( ;; )
393        {
394            SocketSession session;
395 
396            synchronized( flushingSessions )
397            {
398                session = ( SocketSession ) flushingSessions.pop();
399            }
400 
401            if( session == null )
402                break;
403 
404            if( !session.isConnected() )
405            {
406                releaseWriteBuffers( session );
407                continue;
408            }
409 
410            // If encountered write request before session is initialized, 
411            // (In case that Session.write() is called before addSession() is processed)
412            if( session.getSelectionKey() == null )
413            {
414                // Reschedule for later write
415                scheduleFlush( session );
416                break;
417            }
418            else
419            {
420                try
421                {
422                    flush( session );
423                }
424                catch( IOException e )
425                {
426                    scheduleRemove( session );
427                    session.getManagerFilterChain().exceptionCaught( session, e );
428                }
429            }
430        }
431    }
432    
433    private void releaseWriteBuffers( SocketSession session )
434    {
435        Queue writeBufferQueue = session.getWriteBufferQueue();
436        session.getWriteMarkerQueue().clear();
437        ByteBuffer buf;
438        
439        while( ( buf = (ByteBuffer) writeBufferQueue.pop() ) != null )
440        {
441            try
442            {
443                buf.release();
444            }
445            catch( IllegalStateException e )
446            {
447                session.getManagerFilterChain().exceptionCaught( session, e );
448            }
449        }
450    }
451 
452    private void flush( SocketSession session ) throws IOException
453    {
454        SocketChannel ch = session.getChannel();
455 
456        Queue writeBufferQueue = session.getWriteBufferQueue();
457        Queue writeMarkerQueue = session.getWriteMarkerQueue();
458 
459        ByteBuffer buf;
460        Object marker;
461        for( ;; )
462        {
463            synchronized( writeBufferQueue )
464            {
465                buf = ( ByteBuffer ) writeBufferQueue.first();
466                marker = writeMarkerQueue.first();
467            }
468 
469            if( buf == null )
470                break;
471 
472            if( buf.remaining() == 0 )
473            {
474                synchronized( writeBufferQueue )
475                {
476                    writeBufferQueue.pop();
477                    writeMarkerQueue.pop();
478                }
479                try
480                {
481                    buf.release();
482                }
483                catch( IllegalStateException e )
484                {
485                    session.getManagerFilterChain().exceptionCaught( session, e );
486                }
487 
488                session.increaseWrittenWriteRequests();
489                session.getManagerFilterChain().dataWritten( session, marker );
490                continue;
491            }
492 
493            int writtenBytes = 0;
494            try
495            {
496                writtenBytes = ch.write( buf.buf() );
497            }
498            finally
499            {
500                if( writtenBytes > 0 )
501                {
502                    session.increaseWrittenBytes( writtenBytes );
503                    session.setIdle( IdleStatus.BOTH_IDLE, false );
504                    session.setIdle( IdleStatus.WRITER_IDLE, false );
505                }
506 
507                SelectionKey key = session.getSelectionKey();
508                if( buf.hasRemaining() )
509                {
510                    // Kernel buffer is full
511                    key
512                            .interestOps( key.interestOps()
513                                          | SelectionKey.OP_WRITE );
514                    break;
515                }
516                else
517                {
518                    key.interestOps( key.interestOps()
519                                     & ( ~SelectionKey.OP_WRITE ) );
520                }
521            }
522        }
523    }
524 
525    private class Worker extends Thread
526    {
527        public Worker()
528        {
529            super( "SocketIoProcessor" );
530        }
531 
532        public void run()
533        {
534            for( ;; )
535            {
536                try
537                {
538                    int nKeys = selector.select( 1000 );
539                    addSessions();
540 
541                    if( nKeys > 0 )
542                    {
543                        processSessions( selector.selectedKeys() );
544                    }
545 
546                    flushSessions();
547                    removeSessions();
548                    notifyIdleSessions();
549 
550                    if( selector.keys().isEmpty() )
551                    {
552                        synchronized( SocketIoProcessor.this )
553                        {
554                            if( selector.keys().isEmpty() &&
555                                newSessions.isEmpty() )
556                            {
557                                worker = null;
558                                break;
559                            }
560                        }
561                    }
562                }
563                catch( IOException e )
564                {
565                    e.printStackTrace();
566 
567                    try
568                    {
569                        Thread.sleep( 1000 );
570                    }
571                    catch( InterruptedException e1 )
572                    {
573                    }
574                }
575            }
576        }
577    }
578}

[all classes][org.apache.mina.io.socket]
EMMA 2.0.4217 (C) Vladimir Roubtsov