EMMA Coverage Report (generated Tue Dec 20 11:01:01 KST 2005)
[all classes][org.apache.mina.io.socket]

COVERAGE SUMMARY FOR SOURCE FILE [SocketConnector.java]

nameclass, %method, %block, %line, %
SocketConnector.java75%  (3/4)93%  (14/15)56%  (375/671)65%  (102.2/157)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class SocketConnector$10%   (0/1)100% (0/0)100% (0/0)100% (0/0)
     
class SocketConnector$Worker100% (1/1)100% (2/2)38%  (47/125)37%  (10/27)
run (): void 100% (1/1)29%  (32/110)29%  (7/24)
SocketConnector$Worker (SocketConnector): void 100% (1/1)100% (15/15)100% (3/3)
     
class SocketConnector100% (1/1)92%  (11/12)59%  (311/529)70%  (87.2/125)
connect (SocketAddress, int, IoHandler): IoSession 0%   (0/1)0%   (0/7)0%   (0/1)
connect (SocketAddress, SocketAddress, int, IoHandler): IoSession 100% (1/1)50%  (92/185)63%  (27.1/43)
processTimedOutSessions (Set): void 100% (1/1)51%  (27/53)62%  (10/16)
registerNew (): void 100% (1/1)55%  (34/62)65%  (11.7/18)
processSessions (Set): void 100% (1/1)58%  (84/145)74%  (19.4/26)
newSession (SocketChannel, IoHandler): SocketSession 100% (1/1)85%  (17/20)71%  (5/7)
<static initializer> 100% (1/1)100% (3/3)100% (1/1)
SocketConnector (): void 100% (1/1)100% (21/21)100% (5/5)
connect (SocketAddress, IoHandler): IoSession 100% (1/1)100% (7/7)100% (1/1)
connect (SocketAddress, SocketAddress, IoHandler): IoSession 100% (1/1)100% (7/7)100% (1/1)
getFilterChain (): IoFilterChain 100% (1/1)100% (3/3)100% (1/1)
startupWorker (): void 100% (1/1)100% (16/16)100% (5/5)
     
class SocketConnector$ConnectionRequest100% (1/1)100% (1/1)100% (17/17)100% (5/5)
SocketConnector$ConnectionRequest (SocketChannel, int, IoHandler): void 100% (1/1)100% (17/17)100% (5/5)

1/*
2 *   @(#) $Id: SocketConnector.java 357871 2005-12-20 01:56:40Z trustin $
3 *
4 *   Copyright 2004 The Apache Software Foundation
5 *
6 *   Licensed under the Apache License, Version 2.0 (the "License");
7 *   you may not use this file except in compliance with the License.
8 *   You may obtain a copy of the License at
9 *
10 *       http://www.apache.org/licenses/LICENSE-2.0
11 *
12 *   Unless required by applicable law or agreed to in writing, software
13 *   distributed under the License is distributed on an "AS IS" BASIS,
14 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 *   See the License for the specific language governing permissions and
16 *   limitations under the License.
17 *
18 */
19package org.apache.mina.io.socket;
20 
21import java.io.IOException;
22import java.net.ConnectException;
23import java.net.InetSocketAddress;
24import java.net.SocketAddress;
25import java.nio.channels.SelectionKey;
26import java.nio.channels.Selector;
27import java.nio.channels.SocketChannel;
28import java.util.Iterator;
29import java.util.Set;
30 
31import org.apache.mina.common.BaseSessionManager;
32import org.apache.mina.io.IoConnector;
33import org.apache.mina.io.IoFilterChain;
34import org.apache.mina.io.IoHandler;
35import org.apache.mina.io.IoSession;
36import org.apache.mina.io.IoSessionManagerFilterChain;
37import org.apache.mina.util.ExceptionUtil;
38import org.apache.mina.util.Queue;
39 
40/**
41 * {@link IoConnector} for socket transport (TCP/IP).
42 * 
43 * @author The Apache Directory Project (dev@directory.apache.org)
44 * @version $Rev: 357871 $, $Date: 2005-12-20 10:56:40 +0900 (Tue, 20 Dec 2005) $
45 */
46public class SocketConnector extends BaseSessionManager implements IoConnector
47{
48    private static volatile int nextId = 0;
49 
50    private final int id = nextId++;
51 
52    private final IoSessionManagerFilterChain filters = new SocketSessionManagerFilterChain( this );
53 
54    private Selector selector;
55 
56    private final Queue connectQueue = new Queue();
57 
58    private Worker worker;
59 
60    /**
61     * Creates a new instance.
62     */
63    public SocketConnector()
64    {
65    }
66 
67    public IoSession connect( SocketAddress address, IoHandler handler ) throws IOException
68    {
69        return connect( address, null, Integer.MAX_VALUE, handler);
70    }
71 
72    public IoSession connect( SocketAddress address, SocketAddress localAddress, IoHandler handler ) throws IOException
73    {
74        return connect( address, localAddress, Integer.MAX_VALUE, handler);
75    }
76 
77    public IoSession connect( SocketAddress address, int timeout, IoHandler handler ) throws IOException
78    {
79        return connect( address, null, timeout, handler);
80    }
81 
82    public IoSession connect( SocketAddress address, SocketAddress localAddress,
83                              int timeout, IoHandler handler ) throws IOException
84    {
85        if( address == null )
86            throw new NullPointerException( "address" );
87        if( handler == null )
88            throw new NullPointerException( "handler" );
89 
90        if( timeout <= 0 )
91            throw new IllegalArgumentException( "Illegal timeout: " + timeout );
92 
93        if( ! ( address instanceof InetSocketAddress ) )
94            throw new IllegalArgumentException( "Unexpected address type: "
95                                                + address.getClass() );
96 
97        if( localAddress != null && !( localAddress instanceof InetSocketAddress ) )
98            throw new IllegalArgumentException( "Unexpected local address type: "
99                                                + localAddress.getClass() );
100 
101        SocketChannel ch = SocketChannel.open();
102        boolean success = false;
103        try
104        {
105            ch.socket().setReuseAddress( true );
106            if( localAddress != null )
107            {
108                ch.socket().bind( localAddress );
109            }
110    
111            ch.configureBlocking( false );
112 
113            if( ch.connect( address ) )
114            {
115                SocketSession session = newSession( ch, handler );
116                success = true;
117                return session;
118            }
119            
120            success = true;
121        }
122        finally
123        {
124            if( !success )
125            {
126                ch.close();
127            }
128        }
129        
130        ConnectionRequest request = new ConnectionRequest( ch, timeout, handler );
131        synchronized( this )
132        {
133            synchronized( connectQueue )
134            {
135                connectQueue.push( request );
136            }
137            startupWorker();
138            selector.wakeup();
139        }
140 
141        synchronized( request )
142        {
143            while( !request.done )
144            {
145                try
146                {
147                    request.wait();
148                }
149                catch( InterruptedException e )
150                {
151                }
152            }
153        }
154 
155        if( request.exception != null )
156        {
157            ExceptionUtil.throwException( request.exception );
158        }
159 
160        return request.session;
161    }
162    
163    private synchronized void startupWorker() throws IOException
164    {
165        if( worker == null )
166        {
167            selector = Selector.open();
168            worker = new Worker();
169            worker.start();
170        }
171    }
172 
173    private void registerNew()
174    {
175        if( connectQueue.isEmpty() )
176            return;
177 
178        for( ;; )
179        {
180            ConnectionRequest req;
181            synchronized( connectQueue )
182            {
183                req = ( ConnectionRequest ) connectQueue.pop();
184            }
185 
186            if( req == null )
187                break;
188            
189            SocketChannel ch = req.channel;
190            try
191            {
192                ch.register( selector, SelectionKey.OP_CONNECT, req );
193            }
194            catch( IOException e )
195            {
196                req.exception = e;
197                synchronized( req )
198                {
199                    req.done = true;
200                    req.notify();
201                }
202            }
203        }
204    }
205    
206    private void processSessions( Set keys )
207    {
208        Iterator it = keys.iterator();
209 
210        while( it.hasNext() )
211        {
212            SelectionKey key = ( SelectionKey ) it.next();
213 
214            if( !key.isConnectable() )
215                continue;
216 
217            SocketChannel ch = ( SocketChannel ) key.channel();
218            ConnectionRequest entry = ( ConnectionRequest ) key.attachment();
219 
220            try
221            {
222                ch.finishConnect();
223                SocketSession session = newSession( ch, entry.handler );
224                entry.session = session;
225            }
226            catch( Throwable e )
227            {
228                entry.exception = e;
229            }
230            finally
231            {
232                key.cancel();
233                if( entry.session == null )
234                {
235                    try
236                    {
237                        ch.close();
238                    }
239                    catch( IOException e )
240                    {
241                        exceptionMonitor.exceptionCaught( this, e );
242                    }
243                }
244 
245                synchronized( entry )
246                {
247                    entry.done = true;
248                    entry.notify();
249                }
250            }
251        }
252 
253        keys.clear();
254    }
255 
256    private void processTimedOutSessions( Set keys )
257    {
258        long currentTime = System.currentTimeMillis();
259        Iterator it = keys.iterator();
260 
261        while( it.hasNext() )
262        {
263            SelectionKey key = ( SelectionKey ) it.next();
264 
265            if( !key.isValid() )
266                continue;
267 
268            ConnectionRequest entry = ( ConnectionRequest ) key.attachment();
269 
270            if( currentTime >= entry.deadline )
271            {
272                entry.exception = new ConnectException();
273                entry.done = true;
274 
275                synchronized( entry )
276                {
277                    entry.notify();
278                }
279 
280                key.cancel();
281            }
282        }
283    }
284 
285    private SocketSession newSession( SocketChannel ch, IoHandler handler ) throws IOException
286    {
287        SocketSession session = new SocketSession( filters, ch, handler );
288        try
289        {
290            handler.sessionCreated( session );
291        }
292        catch( Throwable e )
293        {
294            ExceptionUtil.throwException( e );
295        }
296        SocketIoProcessor.getInstance().addSession( session );
297        return session;
298    }
299 
300    private class Worker extends Thread
301    {
302        public Worker()
303        {
304            super( "SocketConnector-" + id );
305        }
306 
307        public void run()
308        {
309            for( ;; )
310            {
311                try
312                {
313                    int nKeys = selector.select( 1000 );
314 
315                    registerNew();
316                    
317                    if( nKeys > 0 )
318                    {
319                        processSessions( selector.selectedKeys() );
320                    }
321 
322                    processTimedOutSessions( selector.keys() );
323 
324                    if( selector.keys().isEmpty() )
325                    {
326                        synchronized( SocketConnector.this )
327                        {
328                            if( selector.keys().isEmpty() &&
329                                connectQueue.isEmpty() )
330                            {
331                                worker = null;
332                                try
333                                {
334                                    selector.close();
335                                }
336                                catch( IOException e )
337                                {
338                                    exceptionMonitor.exceptionCaught( SocketConnector.this, e );
339                                }
340                                finally
341                                {
342                                    selector = null;
343                                }
344                                break;
345                            }
346                        }
347                    }
348                }
349                catch( IOException e )
350                {
351                    exceptionMonitor.exceptionCaught( SocketConnector.this, e );
352 
353                    try
354                    {
355                        Thread.sleep( 1000 );
356                    }
357                    catch( InterruptedException e1 )
358                    {
359                    }
360                }
361            }
362        }
363    }
364 
365    private static class ConnectionRequest
366    {
367        private final SocketChannel channel;
368        
369        private final long deadline;
370 
371        private final IoHandler handler;
372        
373        private SocketSession session;
374 
375        private boolean done;
376 
377        private Throwable exception;
378 
379        private ConnectionRequest( SocketChannel channel, int timeout, IoHandler handler )
380        {
381            this.channel = channel;
382            this.deadline = System.currentTimeMillis() + timeout * 1000L;
383            this.handler = handler;
384        }
385    }
386 
387    public IoFilterChain getFilterChain()
388    {
389        return filters;
390    }
391}

[all classes][org.apache.mina.io.socket]
EMMA 2.0.4217 (C) Vladimir Roubtsov