EMMA Coverage Report (generated Sat Sep 03 11:42:34 KST 2005)
[all classes][org.apache.mina.io.datagram]

COVERAGE SUMMARY FOR SOURCE FILE [DatagramAcceptor.java]

nameclass, %method, %block, %line, %
DatagramAcceptor.java80%  (4/5)95%  (18/19)63%  (631/999)73%  (177.5/243)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class DatagramAcceptor100% (1/1)93%  (14/15)62%  (531/856)73%  (152.7/208)
closeSession (DatagramSession): void 0%   (0/1)0%   (0/1)0%   (0/1)
registerNew (): void 100% (1/1)42%  (68/161)61%  (17.2/28)
bind (SocketAddress, IoHandler): void 100% (1/1)56%  (60/108)68%  (17.7/26)
cancelKeys (): void 100% (1/1)57%  (75/131)75%  (18/24)
flush (DatagramSession): void 100% (1/1)59%  (75/128)64%  (23.5/37)
unbind (SocketAddress): void 100% (1/1)62%  (58/93)73%  (17.5/24)
flushSessions (): void 100% (1/1)71%  (27/38)82%  (10.7/13)
scheduleFlush (DatagramSession): void 100% (1/1)72%  (13/18)93%  (3.7/4)
readSession (DatagramSession): void 100% (1/1)75%  (41/55)76%  (11.4/15)
processReadySessions (Set): void 100% (1/1)84%  (48/57)83%  (15/18)
<static initializer> 100% (1/1)100% (3/3)100% (1/1)
DatagramAcceptor (): void 100% (1/1)100% (36/36)100% (8/8)
flushSession (DatagramSession): void 100% (1/1)100% (8/8)100% (3/3)
getFilterChain (): IoFilterChain 100% (1/1)100% (3/3)100% (1/1)
startupWorker (): void 100% (1/1)100% (16/16)100% (5/5)
     
class DatagramAcceptor$10%   (0/1)100% (0/0)100% (0/0)100% (0/0)
     
class DatagramAcceptor$Worker100% (1/1)100% (2/2)66%  (85/128)64%  (17.8/28)
run (): void 100% (1/1)62%  (70/113)59%  (14.8/25)
DatagramAcceptor$Worker (DatagramAcceptor): void 100% (1/1)100% (15/15)100% (3/3)
     
class DatagramAcceptor$CancellationRequest100% (1/1)100% (1/1)100% (6/6)100% (3/3)
DatagramAcceptor$CancellationRequest (SocketAddress): void 100% (1/1)100% (6/6)100% (3/3)
     
class DatagramAcceptor$RegistrationRequest100% (1/1)100% (1/1)100% (9/9)100% (4/4)
DatagramAcceptor$RegistrationRequest (SocketAddress, IoHandler): void 100% (1/1)100% (9/9)100% (4/4)

1/*
2 *   @(#) $Id: DatagramAcceptor.java 264677 2005-08-30 02:44:35Z trustin $
3 *
4 *   Copyright 2004 The Apache Software Foundation
5 *
6 *   Licensed under the Apache License, Version 2.0 (the "License");
7 *   you may not use this file except in compliance with the License.
8 *   You may obtain a copy of the License at
9 *
10 *       http://www.apache.org/licenses/LICENSE-2.0
11 *
12 *   Unless required by applicable law or agreed to in writing, software
13 *   distributed under the License is distributed on an "AS IS" BASIS,
14 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 *   See the License for the specific language governing permissions and
16 *   limitations under the License.
17 *
18 */
19package org.apache.mina.io.datagram;
20 
21import java.io.IOException;
22import java.net.InetSocketAddress;
23import java.net.SocketAddress;
24import java.nio.channels.DatagramChannel;
25import java.nio.channels.SelectionKey;
26import java.nio.channels.Selector;
27import java.util.HashMap;
28import java.util.Iterator;
29import java.util.Map;
30import java.util.Set;
31 
32import org.apache.mina.common.ByteBuffer;
33import org.apache.mina.io.IoAcceptor;
34import org.apache.mina.io.IoFilterChain;
35import org.apache.mina.io.IoHandler;
36import org.apache.mina.io.IoSessionManagerFilterChain;
37import org.apache.mina.util.ExceptionUtil;
38import org.apache.mina.util.Queue;
39 
40/**
41 * {@link IoAcceptor} for datagram transport (UDP/IP).
42 * 
43 * @author Trustin Lee (trustin@apache.org)
44 * @version $Rev: 264677 $, $Date: 2005-08-30 11:44:35 +0900 $
45 */
46public class DatagramAcceptor extends DatagramSessionManager implements IoAcceptor
47{
48    private static volatile int nextId = 0;
49 
50    private final IoSessionManagerFilterChain filters =
51        new DatagramSessionManagerFilterChain( this );
52 
53    private final int id = nextId ++ ;
54 
55    private Selector selector;
56 
57    private final Map channels = new HashMap();
58 
59    private final Queue registerQueue = new Queue();
60 
61    private final Queue cancelQueue = new Queue();
62 
63    private final Queue flushingSessions = new Queue();
64 
65    private Worker worker;
66 
67    /**
68     * Creates a new instance.
69     */
70    public DatagramAcceptor()
71    {
72    }
73 
74    public void bind( SocketAddress address, IoHandler handler )
75            throws IOException
76    {
77        if( address == null )
78            throw new NullPointerException( "address" );
79        if( handler == null )
80            throw new NullPointerException( "handler" );
81 
82        if( !( address instanceof InetSocketAddress ) )
83            throw new IllegalArgumentException( "Unexpected address type: "
84                                                + address.getClass() );
85        if( ( ( InetSocketAddress ) address ).getPort() == 0 )
86            throw new IllegalArgumentException( "Unsupported port number: 0" );
87        
88        RegistrationRequest request = new RegistrationRequest( address, handler );
89        synchronized( this )
90        {
91            synchronized( registerQueue )
92            {
93                registerQueue.push( request );
94            }
95            startupWorker();
96        }
97        selector.wakeup();
98        
99        synchronized( request )
100        {
101            while( !request.done )
102            {
103                try
104                {
105                    request.wait();
106                }
107                catch( InterruptedException e )
108                {
109                }
110            }
111        }
112        
113        if( request.exception != null )
114        {
115            ExceptionUtil.throwException( request.exception );
116        }
117    }
118 
119    public void unbind( SocketAddress address )
120    {
121        if( address == null )
122            throw new NullPointerException( "address" );
123 
124        CancellationRequest request = new CancellationRequest( address );
125        synchronized( this )
126        {
127            try
128            {
129                startupWorker();
130            }
131            catch( IOException e )
132            {
133                // IOException is thrown only when Worker thread is not
134                // running and failed to open a selector.  We simply throw
135                // IllegalArgumentException here because we can simply
136                // conclude that nothing is bound to the selector.
137                throw new IllegalArgumentException( "Address not bound: " + address );
138            }
139 
140            synchronized( cancelQueue )
141            {
142                cancelQueue.push( request );
143            }
144        }
145        selector.wakeup();
146        
147        synchronized( request )
148        {
149            while( !request.done )
150            {
151                try
152                {
153                    request.wait();
154                }
155                catch( InterruptedException e )
156                {
157                }
158            }
159        }
160        
161        if( request.exception != null )
162        {
163            request.exception.fillInStackTrace();
164            throw request.exception;
165        }
166    }
167    
168    private synchronized void startupWorker() throws IOException
169    {
170        if( worker == null )
171        {
172            selector = Selector.open();
173            worker = new Worker();
174            worker.start();
175        }
176    }
177 
178    void flushSession( DatagramSession session )
179    {
180        scheduleFlush( session );
181        selector.wakeup();
182    }
183 
184    void closeSession( DatagramSession session )
185    {
186    }
187 
188    private void scheduleFlush( DatagramSession session )
189    {
190        synchronized( flushingSessions )
191        {
192            flushingSessions.push( session );
193        }
194    }
195 
196    private class Worker extends Thread
197    {
198        public Worker()
199        {
200            super( "DatagramAcceptor-" + id );
201        }
202 
203        public void run()
204        {
205            for( ;; )
206            {
207                try
208                {
209                    int nKeys = selector.select();
210 
211                    registerNew();
212 
213                    if( nKeys > 0 )
214                    {
215                        processReadySessions( selector.selectedKeys() );
216                    }
217 
218                    flushSessions();
219                    cancelKeys();
220 
221                    if( selector.keys().isEmpty() )
222                    {
223                        synchronized( DatagramAcceptor.this )
224                        {
225                            if( selector.keys().isEmpty() &&
226                                registerQueue.isEmpty() &&
227                                cancelQueue.isEmpty() )
228                            {
229                                worker = null;
230                                try
231                                {
232                                    selector.close();
233                                }
234                                catch( IOException e )
235                                {
236                                    exceptionMonitor.exceptionCaught( DatagramAcceptor.this, e );
237                                }
238                                finally
239                                {
240                                    selector = null;
241                                }
242                                break;
243                            }
244                        }
245                    }
246                }
247                catch( IOException e )
248                {
249                    exceptionMonitor.exceptionCaught( DatagramAcceptor.this,
250                            e );
251 
252                    try
253                    {
254                        Thread.sleep( 1000 );
255                    }
256                    catch( InterruptedException e1 )
257                    {
258                    }
259                }
260            }
261        }
262    }
263 
264    private void processReadySessions( Set keys )
265    {
266        Iterator it = keys.iterator();
267        while( it.hasNext() )
268        {
269            SelectionKey key = ( SelectionKey ) it.next();
270            it.remove();
271 
272            DatagramChannel ch = ( DatagramChannel ) key.channel();
273 
274            RegistrationRequest req = ( RegistrationRequest ) key.attachment();
275            DatagramSession session = new DatagramSession(
276                    filters, ch, req.handler );
277            session.setSelectionKey( key );
278            
279            try
280            {
281                req.handler.sessionCreated( session );
282 
283                if( key.isReadable() )
284                {
285                    readSession( session );
286                }
287 
288                if( key.isWritable() )
289                {
290                    scheduleFlush( session );
291                }
292            }
293            catch( Throwable t )
294            {
295                exceptionMonitor.exceptionCaught( this, t );
296            }
297        }
298    }
299 
300    private void readSession( DatagramSession session )
301    {
302 
303        ByteBuffer readBuf = ByteBuffer.allocate( 2048 );
304        try
305        {
306            SocketAddress remoteAddress = session.getChannel().receive(
307                    readBuf.buf() );
308            if( remoteAddress != null )
309            {
310                readBuf.flip();
311                session.setRemoteAddress( remoteAddress );
312 
313                ByteBuffer newBuf = ByteBuffer.allocate( readBuf.limit() );
314                newBuf.put( readBuf );
315                newBuf.flip();
316 
317                session.increaseReadBytes( newBuf.remaining() );
318                filters.dataRead( session, newBuf );
319            }
320        }
321        catch( IOException e )
322        {
323            filters.exceptionCaught( session, e );
324        }
325        finally
326        {
327            readBuf.release();
328        }
329    }
330 
331    private void flushSessions()
332    {
333        if( flushingSessions.size() == 0 )
334            return;
335 
336        for( ;; )
337        {
338            DatagramSession session;
339 
340            synchronized( flushingSessions )
341            {
342                session = ( DatagramSession ) flushingSessions.pop();
343            }
344 
345            if( session == null )
346                break;
347 
348            try
349            {
350                flush( session );
351            }
352            catch( IOException e )
353            {
354                session.getManagerFilterChain().exceptionCaught( session, e );
355            }
356        }
357    }
358 
359    private void flush( DatagramSession session ) throws IOException
360    {
361        DatagramChannel ch = session.getChannel();
362 
363        Queue writeBufferQueue = session.getWriteBufferQueue();
364        Queue writeMarkerQueue = session.getWriteMarkerQueue();
365 
366        ByteBuffer buf;
367        Object marker;
368        for( ;; )
369        {
370            synchronized( writeBufferQueue )
371            {
372                buf = ( ByteBuffer ) writeBufferQueue.first();
373                marker = writeMarkerQueue.first();
374            }
375 
376            if( buf == null )
377                break;
378 
379            if( buf.remaining() == 0 )
380            {
381                // pop and fire event
382                synchronized( writeBufferQueue )
383                {
384                    writeBufferQueue.pop();
385                    writeMarkerQueue.pop();
386                }
387 
388                try
389                {
390                    buf.release();
391                }
392                catch( IllegalStateException e )
393                {
394                    session.getManagerFilterChain().exceptionCaught( session, e );
395                }
396 
397                session.increaseWrittenWriteRequests();
398                session.getManagerFilterChain().dataWritten( session, marker );
399                continue;
400            }
401 
402            int writtenBytes = ch
403                    .send( buf.buf(), session.getRemoteAddress() );
404 
405            SelectionKey key = session.getSelectionKey();
406            if( writtenBytes == 0 )
407            {
408                // Kernel buffer is full
409                key.interestOps( key.interestOps() | SelectionKey.OP_WRITE );
410            }
411            else if( writtenBytes > 0 )
412            {
413                key.interestOps( key.interestOps()
414                                 & ( ~SelectionKey.OP_WRITE ) );
415 
416                // pop and fire event
417                synchronized( writeBufferQueue )
418                {
419                    writeBufferQueue.pop();
420                    writeMarkerQueue.pop();
421                }
422 
423                session.increaseWrittenBytes( writtenBytes );
424                session.increaseWrittenWriteRequests();
425                session.getManagerFilterChain().dataWritten( session, marker );
426            }
427        }
428    }
429 
430    private void registerNew()
431    {
432        if( registerQueue.isEmpty() )
433            return;
434 
435        for( ;; )
436        {
437            RegistrationRequest req;
438            synchronized( registerQueue )
439            {
440                req = ( RegistrationRequest ) registerQueue.pop();
441            }
442 
443            if( req == null )
444                break;
445 
446            DatagramChannel ch = null;
447            try
448            {
449                ch = DatagramChannel.open();
450                ch.configureBlocking( false );
451                ch.socket().bind( req.address );
452                ch.register( selector, SelectionKey.OP_READ, req );
453                channels.put( req.address, ch );
454            }
455            catch( Throwable t )
456            {
457                req.exception = t;
458            }
459            finally
460            {
461                synchronized( req )
462                {
463                    req.done = true;
464                    req.notify();
465                }
466 
467                if( ch != null && req.exception != null )
468                {
469                    try
470                    {
471                        ch.close();
472                    }
473                    catch( Throwable e )
474                    {
475                        exceptionMonitor.exceptionCaught( this, e );
476                    }
477                }
478            }
479        }
480    }
481 
482    private void cancelKeys()
483    {
484        if( cancelQueue.isEmpty() )
485            return;
486 
487        for( ;; )
488        {
489            CancellationRequest request;
490            synchronized( cancelQueue )
491            {
492                request = ( CancellationRequest ) cancelQueue.pop();
493            }
494            
495            if( request == null )
496            {
497                break;
498            }
499 
500            DatagramChannel ch = ( DatagramChannel ) channels.remove( request.address );
501            // close the channel
502            try
503            {
504                if( ch == null )
505                {
506                    request.exception = new IllegalArgumentException(
507                            "Address not bound: " + request.address );
508                }
509                else
510                {
511                    SelectionKey key = ch.keyFor( selector );
512                    key.cancel();
513                    selector.wakeup(); // wake up again to trigger thread death
514                    ch.close();
515                }
516            }
517            catch( Throwable t )
518            {
519                exceptionMonitor.exceptionCaught( this, t );
520            }
521            finally
522            {
523                synchronized( request )
524                {
525                    request.done = true;
526                    request.notify();
527                }
528            }
529        }
530    }
531    
532    public IoFilterChain getFilterChain()
533    {
534        return filters;
535    }
536 
537    private static class RegistrationRequest
538    {
539        private final SocketAddress address;
540        
541        private final IoHandler handler;
542        
543        private Throwable exception; 
544        
545        private boolean done;
546        
547        private RegistrationRequest( SocketAddress address, IoHandler handler )
548        {
549            this.address = address;
550            this.handler = handler;
551        }
552    }
553 
554    private static class CancellationRequest
555    {
556        private final SocketAddress address;
557        private boolean done;
558        private RuntimeException exception;
559        
560        private CancellationRequest( SocketAddress address )
561        {
562            this.address = address;
563        }
564    }
565}

[all classes][org.apache.mina.io.datagram]
EMMA 2.0.4217 (C) Vladimir Roubtsov