EMMA Coverage Report (generated Wed Jun 08 12:10:57 KST 2005)
[all classes][org.apache.mina.io.socket]

COVERAGE SUMMARY FOR SOURCE FILE [SocketAcceptor.java]

nameclass, %method, %block, %line, %
SocketAcceptor.java80%  (4/5)87%  (13/15)63%  (502/794)73%  (131.8/181)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class SocketAcceptor100% (1/1)80%  (8/10)61%  (350/574)74%  (90.4/122)
getBacklog (): int 0%   (0/1)0%   (0/3)0%   (0/1)
setBacklog (int): void 0%   (0/1)0%   (0/18)0%   (0/4)
cancelKeys (): void 100% (1/1)57%  (75/131)75%  (18/24)
registerNew (): void 100% (1/1)59%  (96/163)76%  (21.2/28)
bind (SocketAddress, IoHandler): void 100% (1/1)59%  (65/110)72%  (18.7/26)
unbind (SocketAddress): void 100% (1/1)62%  (58/93)73%  (17.5/24)
<static initializer> 100% (1/1)100% (3/3)100% (1/1)
SocketAcceptor (): void 100% (1/1)100% (34/34)100% (8/8)
getFilterChain (): IoFilterChain 100% (1/1)100% (3/3)100% (1/1)
startupWorker (): void 100% (1/1)100% (16/16)100% (5/5)
     
class SocketAcceptor$10%   (0/1)100% (0/0)100% (0/0)100% (0/0)
     
class SocketAcceptor$Worker100% (1/1)100% (3/3)66%  (134/202)66%  (33.5/51)
run (): void 100% (1/1)61%  (66/109)58%  (13.8/24)
processSessions (Set): void 100% (1/1)68%  (53/78)69%  (16.7/24)
SocketAcceptor$Worker (SocketAcceptor): void 100% (1/1)100% (15/15)100% (3/3)
     
class SocketAcceptor$CancellationRequest100% (1/1)100% (1/1)100% (6/6)100% (3/3)
SocketAcceptor$CancellationRequest (SocketAddress): void 100% (1/1)100% (6/6)100% (3/3)
     
class SocketAcceptor$RegistrationRequest100% (1/1)100% (1/1)100% (12/12)100% (5/5)
SocketAcceptor$RegistrationRequest (SocketAddress, int, IoHandler): void 100% (1/1)100% (12/12)100% (5/5)

1/*
2 *   @(#) $Id: SocketAcceptor.java 188654 2005-06-07 01:31:11Z trustin $
3 *
4 *   Copyright 2004 The Apache Software Foundation
5 *
6 *   Licensed under the Apache License, Version 2.0 (the "License");
7 *   you may not use this file except in compliance with the License.
8 *   You may obtain a copy of the License at
9 *
10 *       http://www.apache.org/licenses/LICENSE-2.0
11 *
12 *   Unless required by applicable law or agreed to in writing, software
13 *   distributed under the License is distributed on an "AS IS" BASIS,
14 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 *   See the License for the specific language governing permissions and
16 *   limitations under the License.
17 *
18 */
19package org.apache.mina.io.socket;
20 
21import java.io.IOException;
22import java.net.InetSocketAddress;
23import java.net.SocketAddress;
24import java.nio.channels.SelectionKey;
25import java.nio.channels.Selector;
26import java.nio.channels.ServerSocketChannel;
27import java.nio.channels.SocketChannel;
28import java.util.HashMap;
29import java.util.Iterator;
30import java.util.Map;
31import java.util.Set;
32 
33import org.apache.mina.common.BaseSessionManager;
34import org.apache.mina.io.IoAcceptor;
35import org.apache.mina.io.IoFilterChain;
36import org.apache.mina.io.IoHandler;
37import org.apache.mina.io.IoSessionManagerFilterChain;
38import org.apache.mina.util.Queue;
39 
40/**
41 * {@link IoAcceptor} for socket transport (TCP/IP).
42 * 
43 * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
44 * @version $Rev: 188654 $, $Date: 2005-06-07 10:31:11 +0900 $
45 */
46public class SocketAcceptor extends BaseSessionManager implements IoAcceptor
47{
48    private static volatile int nextId = 0;
49 
50    private final IoSessionManagerFilterChain filters = new SocketSessionManagerFilterChain( this );
51 
52    private final int id = nextId ++ ;
53 
54    private Selector selector;
55 
56    private final Map channels = new HashMap();
57 
58    private final Queue registerQueue = new Queue();
59 
60    private final Queue cancelQueue = new Queue();
61    
62    private int backlog = 50;
63 
64    private Worker worker;
65 
66 
67    /**
68     * Creates a new instance.
69     */
70    public SocketAcceptor()
71    {
72    }
73 
74    /**
75     * Binds to the specified <code>address</code> and handles incoming
76     * connections with the specified <code>handler</code>.  Backlog value
77     * is configured to the value of <code>backlog</code> property.
78     *
79     * @throws IOException if failed to bind
80     */
81    public void bind( SocketAddress address, IoHandler handler ) throws IOException
82    {
83        if( address == null )
84        {
85            throw new NullPointerException( "address" );
86        }
87 
88        if( handler == null )
89        {
90            throw new NullPointerException( "handler" );
91        }
92 
93        if( !( address instanceof InetSocketAddress ) )
94        {
95            throw new IllegalArgumentException( "Unexpected address type: " + address.getClass() );
96        }
97 
98        if( ( ( InetSocketAddress ) address ).getPort() == 0 )
99        {
100            throw new IllegalArgumentException( "Unsupported port number: 0" );
101        }
102        
103        RegistrationRequest request = new RegistrationRequest( address, backlog, handler );
104 
105        synchronized( this )
106        {
107            synchronized( registerQueue )
108            {
109                registerQueue.push( request );
110            }
111            startupWorker();
112        }
113        
114        selector.wakeup();
115        
116        synchronized( request )
117        {
118            while( !request.done )
119            {
120                try
121                {
122                    request.wait();
123                }
124                catch( InterruptedException e )
125                {
126                }
127            }
128        }
129        
130        if( request.exception != null )
131        {
132            throw request.exception;
133        }
134    }
135 
136 
137    private synchronized void startupWorker() throws IOException
138    {
139        if( worker == null )
140        {
141            selector = Selector.open();
142            worker = new Worker();
143 
144            worker.start();
145        }
146    }
147 
148 
149    public void unbind( SocketAddress address )
150    {
151        if( address == null )
152        {
153            throw new NullPointerException( "address" );
154        }
155 
156        CancellationRequest request = new CancellationRequest( address );
157        synchronized( this )
158        {
159            try
160            {
161                startupWorker();
162            }
163            catch( IOException e )
164            {
165                // IOException is thrown only when Worker thread is not
166                // running and failed to open a selector.  We simply throw
167                // IllegalArgumentException here because we can simply
168                // conclude that nothing is bound to the selector.
169                throw new IllegalArgumentException( "Address not bound: " + address );
170            }
171 
172            synchronized( cancelQueue )
173            {
174                cancelQueue.push( request );
175            }
176        }
177        
178        selector.wakeup();
179 
180        synchronized( request )
181        {
182            while( !request.done )
183            {
184                try
185                {
186                    request.wait();
187                }
188                catch( InterruptedException e )
189                {
190                }
191            }
192        }
193        
194        if( request.exception != null )
195        {
196            request.exception.fillInStackTrace();
197 
198            throw request.exception;
199        }
200    }
201    
202    /**
203     * Returns the default backlog value which is used when user binds. 
204     */
205    public int getBacklog()
206    {
207        return backlog;
208    }
209    
210    /**
211     * Sets the default backlog value which is used when user binds. 
212     */
213    public void setBacklog( int defaultBacklog )
214    {
215        if( defaultBacklog <= 0 )
216        {
217            throw new IllegalArgumentException( "defaultBacklog: " + defaultBacklog );
218        }
219        this.backlog = defaultBacklog;
220    }
221 
222 
223    private class Worker extends Thread
224    {
225        public Worker()
226        {
227            super( "SocketAcceptor-" + id );
228        }
229 
230        public void run()
231        {
232            for( ;; )
233            {
234                try
235                {
236                    int nKeys = selector.select();
237 
238                    registerNew();
239                    cancelKeys();
240 
241                    if( nKeys > 0 )
242                    {
243                        processSessions( selector.selectedKeys() );
244                    }
245 
246                    if( selector.keys().isEmpty() )
247                    {
248                        synchronized( SocketAcceptor.this )
249                        {
250                            if( selector.keys().isEmpty() &&
251                                registerQueue.isEmpty() &&
252                                cancelQueue.isEmpty() )
253                            {
254                                worker = null;
255                                try
256                                {
257                                    selector.close();
258                                }
259                                catch( IOException e )
260                                {
261                                    exceptionMonitor.exceptionCaught( SocketAcceptor.this, e );
262                                }
263                                finally
264                                {
265                                    selector = null;
266                                }
267                                break;
268                            }
269                        }
270                    }
271                }
272                catch( IOException e )
273                {
274                    exceptionMonitor.exceptionCaught( SocketAcceptor.this, e );
275 
276                    try
277                    {
278                        Thread.sleep( 1000 );
279                    }
280                    catch( InterruptedException e1 )
281                    {
282                    }
283                }
284            }
285        }
286 
287        private void processSessions( Set keys ) throws IOException
288        {
289            Iterator it = keys.iterator();
290            while( it.hasNext() )
291            {
292                SelectionKey key = ( SelectionKey ) it.next();
293   
294                it.remove();
295   
296                if( !key.isAcceptable() )
297                {
298                    continue;
299                }
300   
301                ServerSocketChannel ssc = ( ServerSocketChannel ) key.channel();
302   
303                SocketChannel ch = ssc.accept();
304   
305                if( ch == null )
306                {
307                    continue;
308                }
309   
310                boolean success = false;
311                try
312                {
313                    RegistrationRequest req = ( RegistrationRequest ) key.attachment();
314                    SocketSession session = new SocketSession( filters, ch, req.handler );
315                    req.handler.sessionCreated( session );
316                    SocketIoProcessor.getInstance().addSession( session );
317                    success = true;
318                }
319                catch( Throwable t )
320                {
321                    exceptionMonitor.exceptionCaught( SocketAcceptor.this, t );
322                }
323                finally
324                {
325                    if( !success )
326                    {
327                        ch.close();
328                    }
329                }
330            }
331        }
332    }
333 
334 
335    private void registerNew()
336    {
337        if( registerQueue.isEmpty() )
338        {
339            return;
340        }
341 
342        for( ;; )
343        {
344            RegistrationRequest req;
345 
346            synchronized( registerQueue )
347            {
348                req = ( RegistrationRequest ) registerQueue.pop();
349            }
350 
351            if( req == null )
352            {
353                break;
354            }
355 
356            ServerSocketChannel ssc = null;
357 
358            try
359            {
360                ssc = ServerSocketChannel.open();
361                ssc.configureBlocking( false );
362                ssc.socket().bind( req.address, req.backlog );
363                ssc.register( selector, SelectionKey.OP_ACCEPT, req );
364 
365                channels.put( req.address, ssc );
366            }
367            catch( IOException e )
368            {
369                req.exception = e;
370            }
371            finally
372            {
373                synchronized( req )
374                {
375                    req.done = true;
376 
377                    req.notify();
378                }
379 
380                if( ssc != null && req.exception != null )
381                {
382                    try
383                    {
384                        ssc.close();
385                    }
386                    catch( IOException e )
387                    {
388                        exceptionMonitor.exceptionCaught( this, e );
389                    }
390                }
391            }
392        }
393    }
394 
395 
396    private void cancelKeys()
397    {
398        if( cancelQueue.isEmpty() )
399        {
400            return;
401        }
402 
403        for( ;; )
404        {
405            CancellationRequest request;
406 
407            synchronized( cancelQueue )
408            {
409                request = ( CancellationRequest ) cancelQueue.pop();
410            }
411 
412            if( request == null )
413            {
414                break;
415            }
416 
417            ServerSocketChannel ssc = ( ServerSocketChannel ) channels.remove( request.address );
418            
419            // close the channel
420            try
421            {
422                if( ssc == null )
423                {
424                    request.exception = new IllegalArgumentException( "Address not bound: " + request.address );
425                }
426                else
427                {
428                    SelectionKey key = ssc.keyFor( selector );
429 
430                    key.cancel();
431 
432                    selector.wakeup(); // wake up again to trigger thread death
433 
434                    ssc.close();
435                }
436            }
437            catch( IOException e )
438            {
439                exceptionMonitor.exceptionCaught( this, e );
440            }
441            finally
442            {
443                synchronized( request )
444                {
445                    request.done = true;
446 
447                    request.notify();
448                }
449            }
450        }
451    }
452 
453    public IoFilterChain getFilterChain()
454    {
455        return filters;
456    }
457 
458    private static class RegistrationRequest
459    {
460        private final SocketAddress address;
461        
462        private final int backlog;
463 
464        private final IoHandler handler;
465        
466        private IOException exception; 
467        
468        private boolean done;
469        
470        private RegistrationRequest( SocketAddress address, int backlog, IoHandler handler )
471        {
472            this.address = address;
473            this.backlog = backlog;
474            this.handler = handler;
475        }
476    }
477 
478 
479    private static class CancellationRequest
480    {
481        private final SocketAddress address;
482 
483        private boolean done;
484 
485        private RuntimeException exception;
486        
487        private CancellationRequest( SocketAddress address )
488        {
489            this.address = address;
490        }
491    }
492}

[all classes][org.apache.mina.io.socket]
EMMA 2.0.4217 (C) Vladimir Roubtsov