1 | /* |
2 | * @(#) $Id: SocketAcceptor.java 332218 2005-11-10 03:52:42Z trustin $ |
3 | * |
4 | * Copyright 2004 The Apache Software Foundation |
5 | * |
6 | * Licensed under the Apache License, Version 2.0 (the "License"); |
7 | * you may not use this file except in compliance with the License. |
8 | * You may obtain a copy of the License at |
9 | * |
10 | * http://www.apache.org/licenses/LICENSE-2.0 |
11 | * |
12 | * Unless required by applicable law or agreed to in writing, software |
13 | * distributed under the License is distributed on an "AS IS" BASIS, |
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
15 | * See the License for the specific language governing permissions and |
16 | * limitations under the License. |
17 | * |
18 | */ |
19 | package org.apache.mina.io.socket; |
20 | |
21 | import java.io.IOException; |
22 | import java.net.InetSocketAddress; |
23 | import java.net.SocketAddress; |
24 | import java.nio.channels.SelectionKey; |
25 | import java.nio.channels.Selector; |
26 | import java.nio.channels.ServerSocketChannel; |
27 | import java.nio.channels.SocketChannel; |
28 | import java.util.HashMap; |
29 | import java.util.Iterator; |
30 | import java.util.Map; |
31 | import java.util.Set; |
32 | |
33 | import org.apache.mina.common.BaseSessionManager; |
34 | import org.apache.mina.io.IoAcceptor; |
35 | import org.apache.mina.io.IoFilterChain; |
36 | import org.apache.mina.io.IoHandler; |
37 | import org.apache.mina.io.IoSession; |
38 | import org.apache.mina.io.IoSessionManagerFilterChain; |
39 | import org.apache.mina.util.Queue; |
40 | |
41 | /** |
42 | * {@link IoAcceptor} for socket transport (TCP/IP). |
43 | * |
44 | * @author The Apache Directory Project (dev@directory.apache.org) |
45 | * @version $Rev: 332218 $, $Date: 2005-11-10 12:52:42 +0900 $ |
46 | */ |
47 | public class SocketAcceptor extends BaseSessionManager implements IoAcceptor |
48 | { |
49 | private static volatile int nextId = 0; |
50 | |
51 | private final IoSessionManagerFilterChain filters = new SocketSessionManagerFilterChain( this ); |
52 | |
53 | private final int id = nextId ++ ; |
54 | |
55 | private Selector selector; |
56 | |
57 | private final Map channels = new HashMap(); |
58 | |
59 | private final Queue registerQueue = new Queue(); |
60 | |
61 | private final Queue cancelQueue = new Queue(); |
62 | |
63 | private int backlog = 50; |
64 | |
65 | private Worker worker; |
66 | |
67 | |
68 | /** |
69 | * Creates a new instance. |
70 | */ |
71 | public SocketAcceptor() |
72 | { |
73 | } |
74 | |
75 | /** |
76 | * Binds to the specified <code>address</code> and handles incoming |
77 | * connections with the specified <code>handler</code>. Backlog value |
78 | * is configured to the value of <code>backlog</code> property. |
79 | * |
80 | * @throws IOException if failed to bind |
81 | */ |
82 | public void bind( SocketAddress address, IoHandler handler ) throws IOException |
83 | { |
84 | if( address == null ) |
85 | { |
86 | throw new NullPointerException( "address" ); |
87 | } |
88 | |
89 | if( handler == null ) |
90 | { |
91 | throw new NullPointerException( "handler" ); |
92 | } |
93 | |
94 | if( !( address instanceof InetSocketAddress ) ) |
95 | { |
96 | throw new IllegalArgumentException( "Unexpected address type: " + address.getClass() ); |
97 | } |
98 | |
99 | if( ( ( InetSocketAddress ) address ).getPort() == 0 ) |
100 | { |
101 | throw new IllegalArgumentException( "Unsupported port number: 0" ); |
102 | } |
103 | |
104 | RegistrationRequest request = new RegistrationRequest( address, backlog, handler ); |
105 | |
106 | synchronized( this ) |
107 | { |
108 | synchronized( registerQueue ) |
109 | { |
110 | registerQueue.push( request ); |
111 | } |
112 | startupWorker(); |
113 | } |
114 | |
115 | selector.wakeup(); |
116 | |
117 | synchronized( request ) |
118 | { |
119 | while( !request.done ) |
120 | { |
121 | try |
122 | { |
123 | request.wait(); |
124 | } |
125 | catch( InterruptedException e ) |
126 | { |
127 | } |
128 | } |
129 | } |
130 | |
131 | if( request.exception != null ) |
132 | { |
133 | throw request.exception; |
134 | } |
135 | } |
136 | |
137 | |
138 | private synchronized void startupWorker() throws IOException |
139 | { |
140 | if( worker == null ) |
141 | { |
142 | selector = Selector.open(); |
143 | worker = new Worker(); |
144 | |
145 | worker.start(); |
146 | } |
147 | } |
148 | |
149 | |
150 | public void unbind( SocketAddress address ) |
151 | { |
152 | if( address == null ) |
153 | { |
154 | throw new NullPointerException( "address" ); |
155 | } |
156 | |
157 | CancellationRequest request = new CancellationRequest( address ); |
158 | synchronized( this ) |
159 | { |
160 | try |
161 | { |
162 | startupWorker(); |
163 | } |
164 | catch( IOException e ) |
165 | { |
166 | // IOException is thrown only when Worker thread is not |
167 | // running and failed to open a selector. We simply throw |
168 | // IllegalArgumentException here because we can simply |
169 | // conclude that nothing is bound to the selector. |
170 | throw new IllegalArgumentException( "Address not bound: " + address ); |
171 | } |
172 | |
173 | synchronized( cancelQueue ) |
174 | { |
175 | cancelQueue.push( request ); |
176 | } |
177 | } |
178 | |
179 | selector.wakeup(); |
180 | |
181 | synchronized( request ) |
182 | { |
183 | while( !request.done ) |
184 | { |
185 | try |
186 | { |
187 | request.wait(); |
188 | } |
189 | catch( InterruptedException e ) |
190 | { |
191 | } |
192 | } |
193 | } |
194 | |
195 | if( request.exception != null ) |
196 | { |
197 | request.exception.fillInStackTrace(); |
198 | |
199 | throw request.exception; |
200 | } |
201 | } |
202 | |
203 | /** |
204 | * Returns the default backlog value which is used when user binds. |
205 | */ |
206 | public int getBacklog() |
207 | { |
208 | return backlog; |
209 | } |
210 | |
211 | /** |
212 | * Sets the default backlog value which is used when user binds. |
213 | */ |
214 | public void setBacklog( int defaultBacklog ) |
215 | { |
216 | if( defaultBacklog <= 0 ) |
217 | { |
218 | throw new IllegalArgumentException( "defaultBacklog: " + defaultBacklog ); |
219 | } |
220 | this.backlog = defaultBacklog; |
221 | } |
222 | |
223 | |
224 | private class Worker extends Thread |
225 | { |
226 | public Worker() |
227 | { |
228 | super( "SocketAcceptor-" + id ); |
229 | } |
230 | |
231 | public void run() |
232 | { |
233 | for( ;; ) |
234 | { |
235 | try |
236 | { |
237 | int nKeys = selector.select(); |
238 | |
239 | registerNew(); |
240 | cancelKeys(); |
241 | |
242 | if( nKeys > 0 ) |
243 | { |
244 | processSessions( selector.selectedKeys() ); |
245 | } |
246 | |
247 | if( selector.keys().isEmpty() ) |
248 | { |
249 | synchronized( SocketAcceptor.this ) |
250 | { |
251 | if( selector.keys().isEmpty() && |
252 | registerQueue.isEmpty() && |
253 | cancelQueue.isEmpty() ) |
254 | { |
255 | worker = null; |
256 | try |
257 | { |
258 | selector.close(); |
259 | } |
260 | catch( IOException e ) |
261 | { |
262 | exceptionMonitor.exceptionCaught( SocketAcceptor.this, e ); |
263 | } |
264 | finally |
265 | { |
266 | selector = null; |
267 | } |
268 | break; |
269 | } |
270 | } |
271 | } |
272 | } |
273 | catch( IOException e ) |
274 | { |
275 | exceptionMonitor.exceptionCaught( SocketAcceptor.this, e ); |
276 | |
277 | try |
278 | { |
279 | Thread.sleep( 1000 ); |
280 | } |
281 | catch( InterruptedException e1 ) |
282 | { |
283 | } |
284 | } |
285 | } |
286 | } |
287 | |
288 | private void processSessions( Set keys ) throws IOException |
289 | { |
290 | Iterator it = keys.iterator(); |
291 | while( it.hasNext() ) |
292 | { |
293 | SelectionKey key = ( SelectionKey ) it.next(); |
294 | |
295 | it.remove(); |
296 | |
297 | if( !key.isAcceptable() ) |
298 | { |
299 | continue; |
300 | } |
301 | |
302 | ServerSocketChannel ssc = ( ServerSocketChannel ) key.channel(); |
303 | |
304 | SocketChannel ch = ssc.accept(); |
305 | |
306 | if( ch == null ) |
307 | { |
308 | continue; |
309 | } |
310 | |
311 | boolean success = false; |
312 | try |
313 | { |
314 | RegistrationRequest req = ( RegistrationRequest ) key.attachment(); |
315 | SocketSession session = new SocketSession( filters, ch, req.handler ); |
316 | req.handler.sessionCreated( session ); |
317 | SocketIoProcessor.getInstance().addSession( session ); |
318 | success = true; |
319 | } |
320 | catch( Throwable t ) |
321 | { |
322 | exceptionMonitor.exceptionCaught( SocketAcceptor.this, t ); |
323 | } |
324 | finally |
325 | { |
326 | if( !success ) |
327 | { |
328 | ch.close(); |
329 | } |
330 | } |
331 | } |
332 | } |
333 | } |
334 | |
335 | |
336 | private void registerNew() |
337 | { |
338 | if( registerQueue.isEmpty() ) |
339 | { |
340 | return; |
341 | } |
342 | |
343 | for( ;; ) |
344 | { |
345 | RegistrationRequest req; |
346 | |
347 | synchronized( registerQueue ) |
348 | { |
349 | req = ( RegistrationRequest ) registerQueue.pop(); |
350 | } |
351 | |
352 | if( req == null ) |
353 | { |
354 | break; |
355 | } |
356 | |
357 | ServerSocketChannel ssc = null; |
358 | |
359 | try |
360 | { |
361 | ssc = ServerSocketChannel.open(); |
362 | ssc.configureBlocking( false ); |
363 | ssc.socket().bind( req.address, req.backlog ); |
364 | ssc.register( selector, SelectionKey.OP_ACCEPT, req ); |
365 | |
366 | channels.put( req.address, ssc ); |
367 | } |
368 | catch( IOException e ) |
369 | { |
370 | req.exception = e; |
371 | } |
372 | finally |
373 | { |
374 | synchronized( req ) |
375 | { |
376 | req.done = true; |
377 | |
378 | req.notify(); |
379 | } |
380 | |
381 | if( ssc != null && req.exception != null ) |
382 | { |
383 | try |
384 | { |
385 | ssc.close(); |
386 | } |
387 | catch( IOException e ) |
388 | { |
389 | exceptionMonitor.exceptionCaught( this, e ); |
390 | } |
391 | } |
392 | } |
393 | } |
394 | } |
395 | |
396 | |
397 | private void cancelKeys() |
398 | { |
399 | if( cancelQueue.isEmpty() ) |
400 | { |
401 | return; |
402 | } |
403 | |
404 | for( ;; ) |
405 | { |
406 | CancellationRequest request; |
407 | |
408 | synchronized( cancelQueue ) |
409 | { |
410 | request = ( CancellationRequest ) cancelQueue.pop(); |
411 | } |
412 | |
413 | if( request == null ) |
414 | { |
415 | break; |
416 | } |
417 | |
418 | ServerSocketChannel ssc = ( ServerSocketChannel ) channels.remove( request.address ); |
419 | |
420 | // close the channel |
421 | try |
422 | { |
423 | if( ssc == null ) |
424 | { |
425 | request.exception = new IllegalArgumentException( "Address not bound: " + request.address ); |
426 | } |
427 | else |
428 | { |
429 | SelectionKey key = ssc.keyFor( selector ); |
430 | |
431 | key.cancel(); |
432 | |
433 | selector.wakeup(); // wake up again to trigger thread death |
434 | |
435 | ssc.close(); |
436 | } |
437 | } |
438 | catch( IOException e ) |
439 | { |
440 | exceptionMonitor.exceptionCaught( this, e ); |
441 | } |
442 | finally |
443 | { |
444 | synchronized( request ) |
445 | { |
446 | request.done = true; |
447 | |
448 | request.notify(); |
449 | } |
450 | } |
451 | } |
452 | } |
453 | |
454 | public IoFilterChain getFilterChain() |
455 | { |
456 | return filters; |
457 | } |
458 | |
459 | private static class RegistrationRequest |
460 | { |
461 | private final SocketAddress address; |
462 | |
463 | private final int backlog; |
464 | |
465 | private final IoHandler handler; |
466 | |
467 | private IOException exception; |
468 | |
469 | private boolean done; |
470 | |
471 | private RegistrationRequest( SocketAddress address, int backlog, IoHandler handler ) |
472 | { |
473 | this.address = address; |
474 | this.backlog = backlog; |
475 | this.handler = handler; |
476 | } |
477 | } |
478 | |
479 | |
480 | private static class CancellationRequest |
481 | { |
482 | private final SocketAddress address; |
483 | |
484 | private boolean done; |
485 | |
486 | private RuntimeException exception; |
487 | |
488 | private CancellationRequest( SocketAddress address ) |
489 | { |
490 | this.address = address; |
491 | } |
492 | } |
493 | |
494 | |
495 | public IoSession newSession( SocketAddress remoteAddress, SocketAddress localAddress ) |
496 | { |
497 | throw new UnsupportedOperationException(); |
498 | } |
499 | } |