1 | /* |
2 | * @(#) $Id: SocketConnector.java 327113 2005-10-21 06:59:15Z trustin $ |
3 | * |
4 | * Copyright 2004 The Apache Software Foundation |
5 | * |
6 | * Licensed under the Apache License, Version 2.0 (the "License"); |
7 | * you may not use this file except in compliance with the License. |
8 | * You may obtain a copy of the License at |
9 | * |
10 | * http://www.apache.org/licenses/LICENSE-2.0 |
11 | * |
12 | * Unless required by applicable law or agreed to in writing, software |
13 | * distributed under the License is distributed on an "AS IS" BASIS, |
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
15 | * See the License for the specific language governing permissions and |
16 | * limitations under the License. |
17 | * |
18 | */ |
19 | package org.apache.mina.io.socket; |
20 | |
21 | import java.io.IOException; |
22 | import java.net.ConnectException; |
23 | import java.net.InetSocketAddress; |
24 | import java.net.SocketAddress; |
25 | import java.nio.channels.SelectionKey; |
26 | import java.nio.channels.Selector; |
27 | import java.nio.channels.SocketChannel; |
28 | import java.util.Iterator; |
29 | import java.util.Set; |
30 | |
31 | import org.apache.mina.common.BaseSessionManager; |
32 | import org.apache.mina.io.IoConnector; |
33 | import org.apache.mina.io.IoFilterChain; |
34 | import org.apache.mina.io.IoHandler; |
35 | import org.apache.mina.io.IoSession; |
36 | import org.apache.mina.io.IoSessionManagerFilterChain; |
37 | import org.apache.mina.util.ExceptionUtil; |
38 | import org.apache.mina.util.Queue; |
39 | |
40 | /** |
41 | * {@link IoConnector} for socket transport (TCP/IP). |
42 | * |
43 | * @author The Apache Directory Project (dev@directory.apache.org) |
44 | * @version $Rev: 327113 $, $Date: 2005-10-21 15:59:15 +0900 $ |
45 | */ |
46 | public class SocketConnector extends BaseSessionManager implements IoConnector |
47 | { |
48 | private static volatile int nextId = 0; |
49 | |
50 | private final int id = nextId++; |
51 | |
52 | private final IoSessionManagerFilterChain filters = new SocketSessionManagerFilterChain( this ); |
53 | |
54 | private Selector selector; |
55 | |
56 | private final Queue connectQueue = new Queue(); |
57 | |
58 | private Worker worker; |
59 | |
60 | /** |
61 | * Creates a new instance. |
62 | */ |
63 | public SocketConnector() |
64 | { |
65 | } |
66 | |
67 | public IoSession connect( SocketAddress address, IoHandler handler ) throws IOException |
68 | { |
69 | return connect( address, null, Integer.MAX_VALUE, handler); |
70 | } |
71 | |
72 | public IoSession connect( SocketAddress address, SocketAddress localAddress, IoHandler handler ) throws IOException |
73 | { |
74 | return connect( address, localAddress, Integer.MAX_VALUE, handler); |
75 | } |
76 | |
77 | public IoSession connect( SocketAddress address, int timeout, IoHandler handler ) throws IOException |
78 | { |
79 | return connect( address, null, timeout, handler); |
80 | } |
81 | |
82 | public IoSession connect( SocketAddress address, SocketAddress localAddress, |
83 | int timeout, IoHandler handler ) throws IOException |
84 | { |
85 | if( address == null ) |
86 | throw new NullPointerException( "address" ); |
87 | if( handler == null ) |
88 | throw new NullPointerException( "handler" ); |
89 | |
90 | if( timeout <= 0 ) |
91 | throw new IllegalArgumentException( "Illegal timeout: " + timeout ); |
92 | |
93 | if( ! ( address instanceof InetSocketAddress ) ) |
94 | throw new IllegalArgumentException( "Unexpected address type: " |
95 | + address.getClass() ); |
96 | |
97 | if( localAddress != null && !( localAddress instanceof InetSocketAddress ) ) |
98 | throw new IllegalArgumentException( "Unexpected local address type: " |
99 | + localAddress.getClass() ); |
100 | |
101 | SocketChannel ch = SocketChannel.open(); |
102 | boolean success = false; |
103 | try |
104 | { |
105 | ch.socket().setReuseAddress( true ); |
106 | if( localAddress != null ) |
107 | { |
108 | ch.socket().bind( localAddress ); |
109 | } |
110 | |
111 | ch.configureBlocking( false ); |
112 | |
113 | if( ch.connect( address ) ) |
114 | { |
115 | SocketSession session = newSession( ch, handler ); |
116 | success = true; |
117 | return session; |
118 | } |
119 | |
120 | success = true; |
121 | } |
122 | finally |
123 | { |
124 | if( !success ) |
125 | { |
126 | ch.close(); |
127 | } |
128 | } |
129 | |
130 | ConnectionRequest request = new ConnectionRequest( ch, timeout, handler ); |
131 | synchronized( this ) |
132 | { |
133 | synchronized( connectQueue ) |
134 | { |
135 | connectQueue.push( request ); |
136 | } |
137 | startupWorker(); |
138 | selector.wakeup(); |
139 | } |
140 | |
141 | synchronized( request ) |
142 | { |
143 | while( !request.done ) |
144 | { |
145 | try |
146 | { |
147 | request.wait(); |
148 | } |
149 | catch( InterruptedException e ) |
150 | { |
151 | } |
152 | } |
153 | } |
154 | |
155 | if( request.exception != null ) |
156 | { |
157 | ExceptionUtil.throwException( request.exception ); |
158 | } |
159 | |
160 | return request.session; |
161 | } |
162 | |
163 | private synchronized void startupWorker() throws IOException |
164 | { |
165 | if( worker == null ) |
166 | { |
167 | selector = Selector.open(); |
168 | worker = new Worker(); |
169 | worker.start(); |
170 | } |
171 | } |
172 | |
173 | private void registerNew() |
174 | { |
175 | if( connectQueue.isEmpty() ) |
176 | return; |
177 | |
178 | for( ;; ) |
179 | { |
180 | ConnectionRequest req; |
181 | synchronized( connectQueue ) |
182 | { |
183 | req = ( ConnectionRequest ) connectQueue.pop(); |
184 | } |
185 | |
186 | if( req == null ) |
187 | break; |
188 | |
189 | SocketChannel ch = req.channel; |
190 | try |
191 | { |
192 | ch.register( selector, SelectionKey.OP_CONNECT, req ); |
193 | } |
194 | catch( IOException e ) |
195 | { |
196 | req.exception = e; |
197 | synchronized( req ) |
198 | { |
199 | req.done = true; |
200 | req.notify(); |
201 | } |
202 | } |
203 | } |
204 | } |
205 | |
206 | private void processSessions( Set keys ) |
207 | { |
208 | Iterator it = keys.iterator(); |
209 | |
210 | while( it.hasNext() ) |
211 | { |
212 | SelectionKey key = ( SelectionKey ) it.next(); |
213 | |
214 | if( !key.isConnectable() ) |
215 | continue; |
216 | |
217 | SocketChannel ch = ( SocketChannel ) key.channel(); |
218 | ConnectionRequest entry = ( ConnectionRequest ) key.attachment(); |
219 | |
220 | try |
221 | { |
222 | ch.finishConnect(); |
223 | SocketSession session = newSession( ch, entry.handler ); |
224 | entry.session = session; |
225 | } |
226 | catch( Throwable e ) |
227 | { |
228 | entry.exception = e; |
229 | } |
230 | finally |
231 | { |
232 | key.cancel(); |
233 | if( entry.session == null ) |
234 | { |
235 | try |
236 | { |
237 | ch.close(); |
238 | } |
239 | catch( IOException e ) |
240 | { |
241 | exceptionMonitor.exceptionCaught( this, e ); |
242 | } |
243 | } |
244 | |
245 | synchronized( entry ) |
246 | { |
247 | entry.done = true; |
248 | entry.notify(); |
249 | } |
250 | } |
251 | } |
252 | |
253 | keys.clear(); |
254 | } |
255 | |
256 | private void processTimedOutSessions( Set keys ) |
257 | { |
258 | long currentTime = System.currentTimeMillis(); |
259 | Iterator it = keys.iterator(); |
260 | |
261 | while( it.hasNext() ) |
262 | { |
263 | SelectionKey key = ( SelectionKey ) it.next(); |
264 | |
265 | if( !key.isValid() ) |
266 | continue; |
267 | |
268 | ConnectionRequest entry = ( ConnectionRequest ) key.attachment(); |
269 | |
270 | if( currentTime >= entry.deadline ) |
271 | { |
272 | entry.exception = new ConnectException(); |
273 | entry.done = true; |
274 | |
275 | synchronized( entry ) |
276 | { |
277 | entry.notify(); |
278 | } |
279 | |
280 | key.cancel(); |
281 | } |
282 | } |
283 | } |
284 | |
285 | private SocketSession newSession( SocketChannel ch, IoHandler handler ) throws IOException |
286 | { |
287 | SocketSession session = new SocketSession( filters, ch, handler ); |
288 | try |
289 | { |
290 | handler.sessionCreated( session ); |
291 | } |
292 | catch( Throwable e ) |
293 | { |
294 | ExceptionUtil.throwException( e ); |
295 | } |
296 | SocketIoProcessor.getInstance().addSession( session ); |
297 | return session; |
298 | } |
299 | |
300 | private class Worker extends Thread |
301 | { |
302 | public Worker() |
303 | { |
304 | super( "SocketConnector-" + id ); |
305 | } |
306 | |
307 | public void run() |
308 | { |
309 | for( ;; ) |
310 | { |
311 | try |
312 | { |
313 | int nKeys = selector.select( 1000 ); |
314 | |
315 | registerNew(); |
316 | |
317 | if( nKeys > 0 ) |
318 | { |
319 | processSessions( selector.selectedKeys() ); |
320 | } |
321 | |
322 | processTimedOutSessions( selector.keys() ); |
323 | |
324 | if( selector.keys().isEmpty() ) |
325 | { |
326 | synchronized( SocketConnector.this ) |
327 | { |
328 | if( selector.keys().isEmpty() && |
329 | connectQueue.isEmpty() ) |
330 | { |
331 | worker = null; |
332 | try |
333 | { |
334 | selector.close(); |
335 | } |
336 | catch( IOException e ) |
337 | { |
338 | exceptionMonitor.exceptionCaught( SocketConnector.this, e ); |
339 | } |
340 | finally |
341 | { |
342 | selector = null; |
343 | } |
344 | break; |
345 | } |
346 | } |
347 | } |
348 | } |
349 | catch( IOException e ) |
350 | { |
351 | exceptionMonitor.exceptionCaught( SocketConnector.this, e ); |
352 | |
353 | try |
354 | { |
355 | Thread.sleep( 1000 ); |
356 | } |
357 | catch( InterruptedException e1 ) |
358 | { |
359 | } |
360 | } |
361 | } |
362 | } |
363 | } |
364 | |
365 | private static class ConnectionRequest |
366 | { |
367 | private final SocketChannel channel; |
368 | |
369 | private final long deadline; |
370 | |
371 | private final IoHandler handler; |
372 | |
373 | private SocketSession session; |
374 | |
375 | private boolean done; |
376 | |
377 | private Throwable exception; |
378 | |
379 | private ConnectionRequest( SocketChannel channel, int timeout, IoHandler handler ) |
380 | { |
381 | this.channel = channel; |
382 | this.deadline = System.currentTimeMillis() + timeout * 1000L; |
383 | this.handler = handler; |
384 | } |
385 | } |
386 | |
387 | public IoFilterChain getFilterChain() |
388 | { |
389 | return filters; |
390 | } |
391 | } |