EMMA Coverage Report (generated Mon Jul 11 13:15:38 KST 2005)
[all classes][org.apache.mina.io.datagram]

COVERAGE SUMMARY FOR SOURCE FILE [DatagramConnector.java]

nameclass, %method, %block, %line, %
DatagramConnector.java75%  (3/4)80%  (16/20)61%  (547/895)71%  (160.4/225)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class DatagramConnector100% (1/1)76%  (13/17)60%  (453/758)72%  (138.6/193)
connect (SocketAddress, IoHandler): IoSession 0%   (0/1)0%   (0/6)0%   (0/1)
connect (SocketAddress, SocketAddress, int, IoHandler): IoSession 0%   (0/1)0%   (0/6)0%   (0/1)
connect (SocketAddress, int, IoHandler): IoSession 0%   (0/1)0%   (0/6)0%   (0/1)
getFilterChain (): IoFilterChain 0%   (0/1)0%   (0/3)0%   (0/1)
registerNew (): void 100% (1/1)40%  (68/168)58%  (15.5/27)
connect (SocketAddress, SocketAddress, IoHandler): IoSession 100% (1/1)53%  (82/155)66%  (24.9/38)
flush (DatagramSession): void 100% (1/1)58%  (73/126)64%  (23.5/37)
closeSession (DatagramSession): void 100% (1/1)66%  (27/41)73%  (8.1/11)
flushSessions (): void 100% (1/1)71%  (27/38)82%  (10.7/13)
scheduleFlush (DatagramSession): void 100% (1/1)72%  (13/18)93%  (3.7/4)
readSession (DatagramSession): void 100% (1/1)73%  (37/51)74%  (10.4/14)
cancelKeys (): void 100% (1/1)79%  (41/52)87%  (15.7/18)
processReadySessions (Set): void 100% (1/1)90%  (27/30)91%  (10/11)
<static initializer> 100% (1/1)100% (3/3)100% (1/1)
DatagramConnector (): void 100% (1/1)100% (31/31)100% (7/7)
flushSession (DatagramSession): void 100% (1/1)100% (8/8)100% (3/3)
startupWorker (): void 100% (1/1)100% (16/16)100% (5/5)
     
class DatagramConnector$10%   (0/1)100% (0/0)100% (0/0)100% (0/0)
     
class DatagramConnector$Worker100% (1/1)100% (2/2)66%  (85/128)64%  (17.8/28)
run (): void 100% (1/1)62%  (70/113)59%  (14.8/25)
DatagramConnector$Worker (DatagramConnector): void 100% (1/1)100% (15/15)100% (3/3)
     
class DatagramConnector$RegistrationRequest100% (1/1)100% (1/1)100% (9/9)100% (4/4)
DatagramConnector$RegistrationRequest (DatagramChannel, IoHandler): void 100% (1/1)100% (9/9)100% (4/4)

1/*
2 *   @(#) $Id: DatagramConnector.java 210062 2005-07-11 03:52:38Z trustin $
3 *
4 *   Copyright 2004 The Apache Software Foundation
5 *
6 *   Licensed under the Apache License, Version 2.0 (the "License");
7 *   you may not use this file except in compliance with the License.
8 *   You may obtain a copy of the License at
9 *
10 *       http://www.apache.org/licenses/LICENSE-2.0
11 *
12 *   Unless required by applicable law or agreed to in writing, software
13 *   distributed under the License is distributed on an "AS IS" BASIS,
14 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 *   See the License for the specific language governing permissions and
16 *   limitations under the License.
17 *
18 */
19package org.apache.mina.io.datagram;
20 
21import java.io.IOException;
22import java.net.InetSocketAddress;
23import java.net.SocketAddress;
24import java.nio.channels.DatagramChannel;
25import java.nio.channels.SelectionKey;
26import java.nio.channels.Selector;
27import java.util.Iterator;
28import java.util.Set;
29 
30import org.apache.mina.common.ByteBuffer;
31import org.apache.mina.io.IoConnector;
32import org.apache.mina.io.IoFilterChain;
33import org.apache.mina.io.IoHandler;
34import org.apache.mina.io.IoSession;
35import org.apache.mina.io.IoSessionManagerFilterChain;
36import org.apache.mina.util.ExceptionUtil;
37import org.apache.mina.util.Queue;
38 
39/**
40 * {@link IoConnector} for datagram transport (UDP/IP).
41 * 
42 * @author Trustin Lee (trustin@apache.org)
43 * @version $Rev: 210062 $, $Date: 2005-07-11 12:52:38 +0900 $
44 */
45public class DatagramConnector extends DatagramSessionManager implements IoConnector
46{
47    private static volatile int nextId = 0;
48 
49    private final IoSessionManagerFilterChain filters =
50        new DatagramSessionManagerFilterChain( this );
51 
52    private final int id = nextId ++ ;
53 
54    private Selector selector;
55 
56    private final Queue registerQueue = new Queue();
57 
58    private final Queue cancelQueue = new Queue();
59 
60    private final Queue flushingSessions = new Queue();
61 
62    private Worker worker;
63 
64    /**
65     * Creates a new instance.
66     */
67    public DatagramConnector()
68    {
69    }
70 
71    public IoSession connect( SocketAddress address, IoHandler handler ) throws IOException
72    {
73        return connect( address, null, handler);
74    }
75 
76    public IoSession connect( SocketAddress address, int timeout, IoHandler handler ) throws IOException
77    {
78        return connect( address, null, handler );
79    }
80 
81    public IoSession connect( SocketAddress address, SocketAddress localAddress, int timeout, IoHandler handler ) throws IOException
82    {
83        return connect( address, localAddress, handler );
84    }
85 
86    public IoSession connect( SocketAddress address, SocketAddress localAddress,
87                              IoHandler handler ) throws IOException
88    {
89        if( address == null )
90            throw new NullPointerException( "address" );
91        if( handler == null )
92            throw new NullPointerException( "handler" );
93 
94        if( !( address instanceof InetSocketAddress ) )
95            throw new IllegalArgumentException( "Unexpected address type: "
96                                                + address.getClass() );
97        
98        if( localAddress != null && !( localAddress instanceof InetSocketAddress ) )
99        {
100            throw new IllegalArgumentException( "Unexpected local address type: "
101                                                + localAddress.getClass() );
102        }
103        
104        DatagramChannel ch = DatagramChannel.open();
105        boolean initialized = false;
106        try
107        {
108            ch.socket().setReuseAddress( true );
109            if( localAddress != null )
110            {
111                ch.socket().bind( localAddress );
112            }
113            ch.connect( address );
114            ch.configureBlocking( false );
115            initialized = true;
116        }
117        finally
118        {
119            if( !initialized )
120            {
121                ch.close();
122            }
123        }
124 
125        RegistrationRequest request = new RegistrationRequest( ch, handler );
126        synchronized( this )
127        {
128            synchronized( registerQueue )
129            {
130                registerQueue.push( request );
131            }
132            startupWorker();
133        }
134 
135        selector.wakeup();
136 
137        synchronized( request )
138        {
139            while( !request.done )
140            {
141                try
142                {
143                    request.wait();
144                }
145                catch( InterruptedException e )
146                {
147                }
148            }
149        }
150        
151        if( request.exception != null )
152        {
153            ExceptionUtil.throwException( request.exception );
154        }
155 
156        return request.session;
157    }
158    
159    private synchronized void startupWorker() throws IOException
160    {
161        if( worker == null )
162        {
163            selector = Selector.open();
164            worker = new Worker();
165            worker.start();
166        }
167    }
168 
169    void closeSession( DatagramSession session )
170    {
171        synchronized( this )
172        {
173            try
174            {
175                startupWorker();
176            }
177            catch( IOException e )
178            {
179                // IOException is thrown only when Worker thread is not
180                // running and failed to open a selector.  We simply return
181                // silently here because it we can simply conclude that
182                // this session is not managed by this connector or
183                // already closed.
184                return;
185            }
186 
187            synchronized( cancelQueue )
188            {
189                cancelQueue.push( session );
190            }
191        }
192 
193        selector.wakeup();
194    }
195 
196    void flushSession( DatagramSession session )
197    {
198        scheduleFlush( session );
199        selector.wakeup();
200    }
201 
202    private void scheduleFlush( DatagramSession session )
203    {
204        synchronized( flushingSessions )
205        {
206            flushingSessions.push( session );
207        }
208    }
209 
210    private class Worker extends Thread
211    {
212        public Worker()
213        {
214            super( "DatagramAcceptor-" + id );
215        }
216 
217        public void run()
218        {
219            for( ;; )
220            {
221                try
222                {
223                    int nKeys = selector.select();
224 
225                    registerNew();
226 
227                    if( nKeys > 0 )
228                    {
229                        processReadySessions( selector.selectedKeys() );
230                    }
231 
232                    flushSessions();
233                    cancelKeys();
234 
235                    if( selector.keys().isEmpty() )
236                    {
237                        synchronized( DatagramConnector.this )
238                        {
239                            if( selector.keys().isEmpty() &&
240                                registerQueue.isEmpty() &&
241                                cancelQueue.isEmpty() )
242                            {
243                                worker = null;
244                                try
245                                {
246                                    selector.close();
247                                }
248                                catch( IOException e )
249                                {
250                                    exceptionMonitor.exceptionCaught( DatagramConnector.this, e );
251                                }
252                                finally
253                                {
254                                    selector = null;
255                                }
256                                break;
257                            }
258                        }
259                    }
260                }
261                catch( IOException e )
262                {
263                    exceptionMonitor.exceptionCaught( DatagramConnector.this,
264                            e );
265 
266                    try
267                    {
268                        Thread.sleep( 1000 );
269                    }
270                    catch( InterruptedException e1 )
271                    {
272                    }
273                }
274            }
275        }
276    }
277 
278    private void processReadySessions( Set keys )
279    {
280        Iterator it = keys.iterator();
281        while( it.hasNext() )
282        {
283            SelectionKey key = ( SelectionKey ) it.next();
284            it.remove();
285 
286            DatagramSession session = ( DatagramSession ) key.attachment();
287 
288            if( key.isReadable() )
289            {
290                readSession( session );
291            }
292 
293            if( key.isWritable() )
294            {
295                scheduleFlush( session );
296            }
297        }
298    }
299 
300    private void readSession( DatagramSession session )
301    {
302 
303        ByteBuffer readBuf = ByteBuffer.allocate( 2048 );
304        try
305        {
306            int readBytes = session.getChannel().read( readBuf.buf() );
307            if( readBytes > 0 )
308            {
309                readBuf.flip();
310                ByteBuffer newBuf = ByteBuffer.allocate( readBuf.limit() );
311                newBuf.put( readBuf );
312                newBuf.flip();
313 
314                session.increaseReadBytes( readBytes );
315                filters.dataRead( session, newBuf );
316            }
317        }
318        catch( IOException e )
319        {
320            filters.exceptionCaught( session, e );
321        }
322        finally
323        {
324            readBuf.release();
325        }
326    }
327 
328    private void flushSessions()
329    {
330        if( flushingSessions.size() == 0 )
331            return;
332 
333        for( ;; )
334        {
335            DatagramSession session;
336 
337            synchronized( flushingSessions )
338            {
339                session = ( DatagramSession ) flushingSessions.pop();
340            }
341 
342            if( session == null )
343                break;
344 
345            try
346            {
347                flush( session );
348            }
349            catch( IOException e )
350            {
351                session.getManagerFilterChain().exceptionCaught( session, e );
352            }
353        }
354    }
355 
356    private void flush( DatagramSession session ) throws IOException
357    {
358        DatagramChannel ch = session.getChannel();
359 
360        Queue writeBufferQueue = session.getWriteBufferQueue();
361        Queue writeMarkerQueue = session.getWriteMarkerQueue();
362 
363        ByteBuffer buf;
364        Object marker;
365        for( ;; )
366        {
367            synchronized( writeBufferQueue )
368            {
369                buf = ( ByteBuffer ) writeBufferQueue.first();
370                marker = writeMarkerQueue.first();
371            }
372 
373            if( buf == null )
374                break;
375 
376            if( buf.remaining() == 0 )
377            {
378                // pop and fire event
379                synchronized( writeBufferQueue )
380                {
381                    writeBufferQueue.pop();
382                    writeMarkerQueue.pop();
383                }
384 
385                try
386                {
387                    buf.release();
388                }
389                catch( IllegalStateException e )
390                {
391                    session.getManagerFilterChain().exceptionCaught( session, e );
392                }
393 
394                session.increaseWrittenWriteRequests();
395                session.getManagerFilterChain().dataWritten( session, marker );
396                continue;
397            }
398 
399            int writtenBytes = ch.write( buf.buf() );
400 
401            SelectionKey key = session.getSelectionKey();
402            if( writtenBytes == 0 )
403            {
404                // Kernel buffer is full
405                key.interestOps( key.interestOps() | SelectionKey.OP_WRITE );
406            }
407            else if( writtenBytes > 0 )
408            {
409                key.interestOps( key.interestOps()
410                                 & ( ~SelectionKey.OP_WRITE ) );
411 
412                // pop and fire event
413                synchronized( writeBufferQueue )
414                {
415                    writeBufferQueue.pop();
416                    writeMarkerQueue.pop();
417                }
418 
419                session.increaseWrittenBytes( writtenBytes );
420                session.increaseWrittenWriteRequests();
421                session.getManagerFilterChain().dataWritten( session, marker );
422            }
423        }
424    }
425 
426    private void registerNew()
427    {
428        if( registerQueue.isEmpty() )
429            return;
430 
431        for( ;; )
432        {
433            RegistrationRequest req;
434            synchronized( registerQueue )
435            {
436                req = ( RegistrationRequest ) registerQueue.pop();
437            }
438 
439            if( req == null )
440                break;
441 
442            DatagramSession session = new DatagramSession(
443                    filters, req.channel, req.handler );
444 
445            try
446            {
447                req.handler.sessionCreated( session );
448 
449                SelectionKey key = req.channel.register( selector,
450                        SelectionKey.OP_READ, session );
451    
452                session.setSelectionKey( key );
453            }
454            catch( Throwable t )
455            {
456                req.exception = t;
457            }
458            finally 
459            {
460                synchronized( req )
461                {
462                    req.done = true;
463                    req.session = session;
464                    req.notify();
465                }
466                
467                if( req.exception != null )
468                {
469                    try
470                    {
471                        req.channel.close();
472                    }
473                    catch (IOException e)
474                    {
475                        exceptionMonitor.exceptionCaught( this, e );
476                    }
477                }
478            }
479        }
480    }
481 
482    private void cancelKeys()
483    {
484        if( cancelQueue.isEmpty() )
485            return;
486 
487        for( ;; )
488        {
489            DatagramSession session;
490            synchronized( cancelQueue )
491            {
492                session = ( DatagramSession ) cancelQueue.pop();
493            }
494            if( session == null )
495                break;
496            else
497            {
498                SelectionKey key = session.getSelectionKey();
499                DatagramChannel ch = ( DatagramChannel ) key.channel();
500                try
501                {
502                    ch.close();
503                }
504                catch( IOException e )
505                {
506                    exceptionMonitor.exceptionCaught( this, e );
507                }
508                session.notifyClose();
509                key.cancel();
510                selector.wakeup(); // wake up again to trigger thread death
511            }
512        }
513    }
514 
515    public IoFilterChain getFilterChain()
516    {
517        return filters;
518    }
519 
520    private static class RegistrationRequest
521    {
522        private final DatagramChannel channel;
523 
524        private final IoHandler handler;
525        
526        private boolean done;
527        
528        private DatagramSession session;
529        
530        private Throwable exception;
531 
532        private RegistrationRequest( DatagramChannel channel,
533                                     IoHandler handler )
534        {
535            this.channel = channel;
536            this.handler = handler;
537        }
538    }
539}

[all classes][org.apache.mina.io.datagram]
EMMA 2.0.4217 (C) Vladimir Roubtsov