1 | /* |
2 | * @(#) $Id: SocketAcceptor.java 264677 2005-08-30 02:44:35Z trustin $ |
3 | * |
4 | * Copyright 2004 The Apache Software Foundation |
5 | * |
6 | * Licensed under the Apache License, Version 2.0 (the "License"); |
7 | * you may not use this file except in compliance with the License. |
8 | * You may obtain a copy of the License at |
9 | * |
10 | * http://www.apache.org/licenses/LICENSE-2.0 |
11 | * |
12 | * Unless required by applicable law or agreed to in writing, software |
13 | * distributed under the License is distributed on an "AS IS" BASIS, |
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
15 | * See the License for the specific language governing permissions and |
16 | * limitations under the License. |
17 | * |
18 | */ |
19 | package org.apache.mina.io.socket; |
20 | |
21 | import java.io.IOException; |
22 | import java.net.InetSocketAddress; |
23 | import java.net.SocketAddress; |
24 | import java.nio.channels.SelectionKey; |
25 | import java.nio.channels.Selector; |
26 | import java.nio.channels.ServerSocketChannel; |
27 | import java.nio.channels.SocketChannel; |
28 | import java.util.HashMap; |
29 | import java.util.Iterator; |
30 | import java.util.Map; |
31 | import java.util.Set; |
32 | |
33 | import org.apache.mina.common.BaseSessionManager; |
34 | import org.apache.mina.io.IoAcceptor; |
35 | import org.apache.mina.io.IoFilterChain; |
36 | import org.apache.mina.io.IoHandler; |
37 | import org.apache.mina.io.IoSessionManagerFilterChain; |
38 | import org.apache.mina.util.Queue; |
39 | |
40 | /** |
41 | * {@link IoAcceptor} for socket transport (TCP/IP). |
42 | * |
43 | * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a> |
44 | * @version $Rev: 264677 $, $Date: 2005-08-30 11:44:35 +0900 $ |
45 | */ |
46 | public class SocketAcceptor extends BaseSessionManager implements IoAcceptor |
47 | { |
48 | private static volatile int nextId = 0; |
49 | |
50 | private final IoSessionManagerFilterChain filters = new SocketSessionManagerFilterChain( this ); |
51 | |
52 | private final int id = nextId ++ ; |
53 | |
54 | private Selector selector; |
55 | |
56 | private final Map channels = new HashMap(); |
57 | |
58 | private final Queue registerQueue = new Queue(); |
59 | |
60 | private final Queue cancelQueue = new Queue(); |
61 | |
62 | private int backlog = 50; |
63 | |
64 | private Worker worker; |
65 | |
66 | |
67 | /** |
68 | * Creates a new instance. |
69 | */ |
70 | public SocketAcceptor() |
71 | { |
72 | } |
73 | |
74 | /** |
75 | * Binds to the specified <code>address</code> and handles incoming |
76 | * connections with the specified <code>handler</code>. Backlog value |
77 | * is configured to the value of <code>backlog</code> property. |
78 | * |
79 | * @throws IOException if failed to bind |
80 | */ |
81 | public void bind( SocketAddress address, IoHandler handler ) throws IOException |
82 | { |
83 | if( address == null ) |
84 | { |
85 | throw new NullPointerException( "address" ); |
86 | } |
87 | |
88 | if( handler == null ) |
89 | { |
90 | throw new NullPointerException( "handler" ); |
91 | } |
92 | |
93 | if( !( address instanceof InetSocketAddress ) ) |
94 | { |
95 | throw new IllegalArgumentException( "Unexpected address type: " + address.getClass() ); |
96 | } |
97 | |
98 | if( ( ( InetSocketAddress ) address ).getPort() == 0 ) |
99 | { |
100 | throw new IllegalArgumentException( "Unsupported port number: 0" ); |
101 | } |
102 | |
103 | RegistrationRequest request = new RegistrationRequest( address, backlog, handler ); |
104 | |
105 | synchronized( this ) |
106 | { |
107 | synchronized( registerQueue ) |
108 | { |
109 | registerQueue.push( request ); |
110 | } |
111 | startupWorker(); |
112 | } |
113 | |
114 | selector.wakeup(); |
115 | |
116 | synchronized( request ) |
117 | { |
118 | while( !request.done ) |
119 | { |
120 | try |
121 | { |
122 | request.wait(); |
123 | } |
124 | catch( InterruptedException e ) |
125 | { |
126 | } |
127 | } |
128 | } |
129 | |
130 | if( request.exception != null ) |
131 | { |
132 | throw request.exception; |
133 | } |
134 | } |
135 | |
136 | |
137 | private synchronized void startupWorker() throws IOException |
138 | { |
139 | if( worker == null ) |
140 | { |
141 | selector = Selector.open(); |
142 | worker = new Worker(); |
143 | |
144 | worker.start(); |
145 | } |
146 | } |
147 | |
148 | |
149 | public void unbind( SocketAddress address ) |
150 | { |
151 | if( address == null ) |
152 | { |
153 | throw new NullPointerException( "address" ); |
154 | } |
155 | |
156 | CancellationRequest request = new CancellationRequest( address ); |
157 | synchronized( this ) |
158 | { |
159 | try |
160 | { |
161 | startupWorker(); |
162 | } |
163 | catch( IOException e ) |
164 | { |
165 | // IOException is thrown only when Worker thread is not |
166 | // running and failed to open a selector. We simply throw |
167 | // IllegalArgumentException here because we can simply |
168 | // conclude that nothing is bound to the selector. |
169 | throw new IllegalArgumentException( "Address not bound: " + address ); |
170 | } |
171 | |
172 | synchronized( cancelQueue ) |
173 | { |
174 | cancelQueue.push( request ); |
175 | } |
176 | } |
177 | |
178 | selector.wakeup(); |
179 | |
180 | synchronized( request ) |
181 | { |
182 | while( !request.done ) |
183 | { |
184 | try |
185 | { |
186 | request.wait(); |
187 | } |
188 | catch( InterruptedException e ) |
189 | { |
190 | } |
191 | } |
192 | } |
193 | |
194 | if( request.exception != null ) |
195 | { |
196 | request.exception.fillInStackTrace(); |
197 | |
198 | throw request.exception; |
199 | } |
200 | } |
201 | |
202 | /** |
203 | * Returns the default backlog value which is used when user binds. |
204 | */ |
205 | public int getBacklog() |
206 | { |
207 | return backlog; |
208 | } |
209 | |
210 | /** |
211 | * Sets the default backlog value which is used when user binds. |
212 | */ |
213 | public void setBacklog( int defaultBacklog ) |
214 | { |
215 | if( defaultBacklog <= 0 ) |
216 | { |
217 | throw new IllegalArgumentException( "defaultBacklog: " + defaultBacklog ); |
218 | } |
219 | this.backlog = defaultBacklog; |
220 | } |
221 | |
222 | |
223 | private class Worker extends Thread |
224 | { |
225 | public Worker() |
226 | { |
227 | super( "SocketAcceptor-" + id ); |
228 | } |
229 | |
230 | public void run() |
231 | { |
232 | for( ;; ) |
233 | { |
234 | try |
235 | { |
236 | int nKeys = selector.select(); |
237 | |
238 | registerNew(); |
239 | cancelKeys(); |
240 | |
241 | if( nKeys > 0 ) |
242 | { |
243 | processSessions( selector.selectedKeys() ); |
244 | } |
245 | |
246 | if( selector.keys().isEmpty() ) |
247 | { |
248 | synchronized( SocketAcceptor.this ) |
249 | { |
250 | if( selector.keys().isEmpty() && |
251 | registerQueue.isEmpty() && |
252 | cancelQueue.isEmpty() ) |
253 | { |
254 | worker = null; |
255 | try |
256 | { |
257 | selector.close(); |
258 | } |
259 | catch( IOException e ) |
260 | { |
261 | exceptionMonitor.exceptionCaught( SocketAcceptor.this, e ); |
262 | } |
263 | finally |
264 | { |
265 | selector = null; |
266 | } |
267 | break; |
268 | } |
269 | } |
270 | } |
271 | } |
272 | catch( IOException e ) |
273 | { |
274 | exceptionMonitor.exceptionCaught( SocketAcceptor.this, e ); |
275 | |
276 | try |
277 | { |
278 | Thread.sleep( 1000 ); |
279 | } |
280 | catch( InterruptedException e1 ) |
281 | { |
282 | } |
283 | } |
284 | } |
285 | } |
286 | |
287 | private void processSessions( Set keys ) throws IOException |
288 | { |
289 | Iterator it = keys.iterator(); |
290 | while( it.hasNext() ) |
291 | { |
292 | SelectionKey key = ( SelectionKey ) it.next(); |
293 | |
294 | it.remove(); |
295 | |
296 | if( !key.isAcceptable() ) |
297 | { |
298 | continue; |
299 | } |
300 | |
301 | ServerSocketChannel ssc = ( ServerSocketChannel ) key.channel(); |
302 | |
303 | SocketChannel ch = ssc.accept(); |
304 | |
305 | if( ch == null ) |
306 | { |
307 | continue; |
308 | } |
309 | |
310 | boolean success = false; |
311 | try |
312 | { |
313 | RegistrationRequest req = ( RegistrationRequest ) key.attachment(); |
314 | SocketSession session = new SocketSession( filters, ch, req.handler ); |
315 | req.handler.sessionCreated( session ); |
316 | SocketIoProcessor.getInstance().addSession( session ); |
317 | success = true; |
318 | } |
319 | catch( Throwable t ) |
320 | { |
321 | exceptionMonitor.exceptionCaught( SocketAcceptor.this, t ); |
322 | } |
323 | finally |
324 | { |
325 | if( !success ) |
326 | { |
327 | ch.close(); |
328 | } |
329 | } |
330 | } |
331 | } |
332 | } |
333 | |
334 | |
335 | private void registerNew() |
336 | { |
337 | if( registerQueue.isEmpty() ) |
338 | { |
339 | return; |
340 | } |
341 | |
342 | for( ;; ) |
343 | { |
344 | RegistrationRequest req; |
345 | |
346 | synchronized( registerQueue ) |
347 | { |
348 | req = ( RegistrationRequest ) registerQueue.pop(); |
349 | } |
350 | |
351 | if( req == null ) |
352 | { |
353 | break; |
354 | } |
355 | |
356 | ServerSocketChannel ssc = null; |
357 | |
358 | try |
359 | { |
360 | ssc = ServerSocketChannel.open(); |
361 | ssc.configureBlocking( false ); |
362 | ssc.socket().bind( req.address, req.backlog ); |
363 | ssc.register( selector, SelectionKey.OP_ACCEPT, req ); |
364 | |
365 | channels.put( req.address, ssc ); |
366 | } |
367 | catch( IOException e ) |
368 | { |
369 | req.exception = e; |
370 | } |
371 | finally |
372 | { |
373 | synchronized( req ) |
374 | { |
375 | req.done = true; |
376 | |
377 | req.notify(); |
378 | } |
379 | |
380 | if( ssc != null && req.exception != null ) |
381 | { |
382 | try |
383 | { |
384 | ssc.close(); |
385 | } |
386 | catch( IOException e ) |
387 | { |
388 | exceptionMonitor.exceptionCaught( this, e ); |
389 | } |
390 | } |
391 | } |
392 | } |
393 | } |
394 | |
395 | |
396 | private void cancelKeys() |
397 | { |
398 | if( cancelQueue.isEmpty() ) |
399 | { |
400 | return; |
401 | } |
402 | |
403 | for( ;; ) |
404 | { |
405 | CancellationRequest request; |
406 | |
407 | synchronized( cancelQueue ) |
408 | { |
409 | request = ( CancellationRequest ) cancelQueue.pop(); |
410 | } |
411 | |
412 | if( request == null ) |
413 | { |
414 | break; |
415 | } |
416 | |
417 | ServerSocketChannel ssc = ( ServerSocketChannel ) channels.remove( request.address ); |
418 | |
419 | // close the channel |
420 | try |
421 | { |
422 | if( ssc == null ) |
423 | { |
424 | request.exception = new IllegalArgumentException( "Address not bound: " + request.address ); |
425 | } |
426 | else |
427 | { |
428 | SelectionKey key = ssc.keyFor( selector ); |
429 | |
430 | key.cancel(); |
431 | |
432 | selector.wakeup(); // wake up again to trigger thread death |
433 | |
434 | ssc.close(); |
435 | } |
436 | } |
437 | catch( IOException e ) |
438 | { |
439 | exceptionMonitor.exceptionCaught( this, e ); |
440 | } |
441 | finally |
442 | { |
443 | synchronized( request ) |
444 | { |
445 | request.done = true; |
446 | |
447 | request.notify(); |
448 | } |
449 | } |
450 | } |
451 | } |
452 | |
453 | public IoFilterChain getFilterChain() |
454 | { |
455 | return filters; |
456 | } |
457 | |
458 | private static class RegistrationRequest |
459 | { |
460 | private final SocketAddress address; |
461 | |
462 | private final int backlog; |
463 | |
464 | private final IoHandler handler; |
465 | |
466 | private IOException exception; |
467 | |
468 | private boolean done; |
469 | |
470 | private RegistrationRequest( SocketAddress address, int backlog, IoHandler handler ) |
471 | { |
472 | this.address = address; |
473 | this.backlog = backlog; |
474 | this.handler = handler; |
475 | } |
476 | } |
477 | |
478 | |
479 | private static class CancellationRequest |
480 | { |
481 | private final SocketAddress address; |
482 | |
483 | private boolean done; |
484 | |
485 | private RuntimeException exception; |
486 | |
487 | private CancellationRequest( SocketAddress address ) |
488 | { |
489 | this.address = address; |
490 | } |
491 | } |
492 | } |