1 | /* |
2 | * @(#) $Id: DatagramAcceptor.java 357871 2005-12-20 01:56:40Z trustin $ |
3 | * |
4 | * Copyright 2004 The Apache Software Foundation |
5 | * |
6 | * Licensed under the Apache License, Version 2.0 (the "License"); |
7 | * you may not use this file except in compliance with the License. |
8 | * You may obtain a copy of the License at |
9 | * |
10 | * http://www.apache.org/licenses/LICENSE-2.0 |
11 | * |
12 | * Unless required by applicable law or agreed to in writing, software |
13 | * distributed under the License is distributed on an "AS IS" BASIS, |
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
15 | * See the License for the specific language governing permissions and |
16 | * limitations under the License. |
17 | * |
18 | */ |
19 | package org.apache.mina.io.datagram; |
20 | |
21 | import java.io.IOException; |
22 | import java.net.InetSocketAddress; |
23 | import java.net.SocketAddress; |
24 | import java.nio.channels.DatagramChannel; |
25 | import java.nio.channels.SelectionKey; |
26 | import java.nio.channels.Selector; |
27 | import java.util.HashMap; |
28 | import java.util.Iterator; |
29 | import java.util.Map; |
30 | import java.util.Set; |
31 | |
32 | import org.apache.mina.common.ByteBuffer; |
33 | import org.apache.mina.io.IoAcceptor; |
34 | import org.apache.mina.io.IoFilterChain; |
35 | import org.apache.mina.io.IoHandler; |
36 | import org.apache.mina.io.IoSession; |
37 | import org.apache.mina.io.IoSessionManagerFilterChain; |
38 | import org.apache.mina.util.ExceptionUtil; |
39 | import org.apache.mina.util.Queue; |
40 | |
41 | /** |
42 | * {@link IoAcceptor} for datagram transport (UDP/IP). |
43 | * |
44 | * @author The Apache Directory Project (dev@directory.apache.org) |
45 | * @version $Rev: 357871 $, $Date: 2005-12-20 10:56:40 +0900 (Tue, 20 Dec 2005) $ |
46 | */ |
47 | public class DatagramAcceptor extends DatagramSessionManager implements IoAcceptor |
48 | { |
49 | private static volatile int nextId = 0; |
50 | |
51 | private final IoSessionManagerFilterChain filters = |
52 | new DatagramSessionManagerFilterChain( this ); |
53 | |
54 | private final int id = nextId ++ ; |
55 | |
56 | private Selector selector; |
57 | |
58 | private final Map channels = new HashMap(); |
59 | |
60 | private final Queue registerQueue = new Queue(); |
61 | |
62 | private final Queue cancelQueue = new Queue(); |
63 | |
64 | private final Queue flushingSessions = new Queue(); |
65 | |
66 | private Worker worker; |
67 | |
68 | /** |
69 | * Creates a new instance. |
70 | */ |
71 | public DatagramAcceptor() |
72 | { |
73 | } |
74 | |
75 | public void bind( SocketAddress address, IoHandler handler ) |
76 | throws IOException |
77 | { |
78 | if( address == null ) |
79 | throw new NullPointerException( "address" ); |
80 | if( handler == null ) |
81 | throw new NullPointerException( "handler" ); |
82 | |
83 | if( !( address instanceof InetSocketAddress ) ) |
84 | throw new IllegalArgumentException( "Unexpected address type: " |
85 | + address.getClass() ); |
86 | if( ( ( InetSocketAddress ) address ).getPort() == 0 ) |
87 | throw new IllegalArgumentException( "Unsupported port number: 0" ); |
88 | |
89 | RegistrationRequest request = new RegistrationRequest( address, handler ); |
90 | synchronized( this ) |
91 | { |
92 | synchronized( registerQueue ) |
93 | { |
94 | registerQueue.push( request ); |
95 | } |
96 | startupWorker(); |
97 | } |
98 | selector.wakeup(); |
99 | |
100 | synchronized( request ) |
101 | { |
102 | while( !request.done ) |
103 | { |
104 | try |
105 | { |
106 | request.wait(); |
107 | } |
108 | catch( InterruptedException e ) |
109 | { |
110 | } |
111 | } |
112 | } |
113 | |
114 | if( request.exception != null ) |
115 | { |
116 | ExceptionUtil.throwException( request.exception ); |
117 | } |
118 | } |
119 | |
120 | public void unbind( SocketAddress address ) |
121 | { |
122 | if( address == null ) |
123 | throw new NullPointerException( "address" ); |
124 | |
125 | CancellationRequest request = new CancellationRequest( address ); |
126 | synchronized( this ) |
127 | { |
128 | try |
129 | { |
130 | startupWorker(); |
131 | } |
132 | catch( IOException e ) |
133 | { |
134 | // IOException is thrown only when Worker thread is not |
135 | // running and failed to open a selector. We simply throw |
136 | // IllegalArgumentException here because we can simply |
137 | // conclude that nothing is bound to the selector. |
138 | throw new IllegalArgumentException( "Address not bound: " + address ); |
139 | } |
140 | |
141 | synchronized( cancelQueue ) |
142 | { |
143 | cancelQueue.push( request ); |
144 | } |
145 | } |
146 | selector.wakeup(); |
147 | |
148 | synchronized( request ) |
149 | { |
150 | while( !request.done ) |
151 | { |
152 | try |
153 | { |
154 | request.wait(); |
155 | } |
156 | catch( InterruptedException e ) |
157 | { |
158 | } |
159 | } |
160 | } |
161 | |
162 | if( request.exception != null ) |
163 | { |
164 | request.exception.fillInStackTrace(); |
165 | throw request.exception; |
166 | } |
167 | } |
168 | |
169 | public IoSession newSession( SocketAddress remoteAddress, SocketAddress localAddress ) |
170 | { |
171 | if( remoteAddress == null ) |
172 | { |
173 | throw new NullPointerException( "remoteAddress" ); |
174 | } |
175 | if( localAddress == null ) |
176 | { |
177 | throw new NullPointerException( "localAddress" ); |
178 | } |
179 | |
180 | Selector selector = this.selector; |
181 | DatagramChannel ch = ( DatagramChannel ) channels.get( localAddress ); |
182 | if( selector == null || ch == null ) |
183 | { |
184 | throw new IllegalArgumentException( "Unknown localAddress: " + localAddress ); |
185 | } |
186 | |
187 | SelectionKey key = ch.keyFor( selector ); |
188 | if( key == null ) |
189 | { |
190 | throw new IllegalArgumentException( "Unknown lodalAddress: " + localAddress ); |
191 | } |
192 | |
193 | RegistrationRequest req = ( RegistrationRequest ) key.attachment(); |
194 | DatagramSession s = new DatagramSession( filters, ch, req.handler ); |
195 | s.setRemoteAddress( remoteAddress ); |
196 | s.setSelectionKey( key ); |
197 | |
198 | try |
199 | { |
200 | req.handler.sessionCreated( s ); |
201 | } |
202 | catch( Throwable t ) |
203 | { |
204 | exceptionMonitor.exceptionCaught( this, t ); |
205 | } |
206 | |
207 | return s; |
208 | } |
209 | |
210 | private synchronized void startupWorker() throws IOException |
211 | { |
212 | if( worker == null ) |
213 | { |
214 | selector = Selector.open(); |
215 | worker = new Worker(); |
216 | worker.start(); |
217 | } |
218 | } |
219 | |
220 | void flushSession( DatagramSession session ) |
221 | { |
222 | scheduleFlush( session ); |
223 | selector.wakeup(); |
224 | } |
225 | |
226 | void closeSession( DatagramSession session ) |
227 | { |
228 | } |
229 | |
230 | private void scheduleFlush( DatagramSession session ) |
231 | { |
232 | synchronized( flushingSessions ) |
233 | { |
234 | flushingSessions.push( session ); |
235 | } |
236 | } |
237 | |
238 | private class Worker extends Thread |
239 | { |
240 | public Worker() |
241 | { |
242 | super( "DatagramAcceptor-" + id ); |
243 | } |
244 | |
245 | public void run() |
246 | { |
247 | for( ;; ) |
248 | { |
249 | try |
250 | { |
251 | int nKeys = selector.select(); |
252 | |
253 | registerNew(); |
254 | |
255 | if( nKeys > 0 ) |
256 | { |
257 | processReadySessions( selector.selectedKeys() ); |
258 | } |
259 | |
260 | flushSessions(); |
261 | cancelKeys(); |
262 | |
263 | if( selector.keys().isEmpty() ) |
264 | { |
265 | synchronized( DatagramAcceptor.this ) |
266 | { |
267 | if( selector.keys().isEmpty() && |
268 | registerQueue.isEmpty() && |
269 | cancelQueue.isEmpty() ) |
270 | { |
271 | worker = null; |
272 | try |
273 | { |
274 | selector.close(); |
275 | } |
276 | catch( IOException e ) |
277 | { |
278 | exceptionMonitor.exceptionCaught( DatagramAcceptor.this, e ); |
279 | } |
280 | finally |
281 | { |
282 | selector = null; |
283 | } |
284 | break; |
285 | } |
286 | } |
287 | } |
288 | } |
289 | catch( IOException e ) |
290 | { |
291 | exceptionMonitor.exceptionCaught( DatagramAcceptor.this, |
292 | e ); |
293 | |
294 | try |
295 | { |
296 | Thread.sleep( 1000 ); |
297 | } |
298 | catch( InterruptedException e1 ) |
299 | { |
300 | } |
301 | } |
302 | } |
303 | } |
304 | } |
305 | |
306 | private void processReadySessions( Set keys ) |
307 | { |
308 | Iterator it = keys.iterator(); |
309 | while( it.hasNext() ) |
310 | { |
311 | SelectionKey key = ( SelectionKey ) it.next(); |
312 | it.remove(); |
313 | |
314 | DatagramChannel ch = ( DatagramChannel ) key.channel(); |
315 | |
316 | RegistrationRequest req = ( RegistrationRequest ) key.attachment(); |
317 | DatagramSession session = new DatagramSession( |
318 | filters, ch, req.handler ); |
319 | session.setSelectionKey( key ); |
320 | |
321 | try |
322 | { |
323 | req.handler.sessionCreated( session ); |
324 | |
325 | if( key.isReadable() ) |
326 | { |
327 | readSession( session ); |
328 | } |
329 | |
330 | if( key.isWritable() ) |
331 | { |
332 | scheduleFlush( session ); |
333 | } |
334 | } |
335 | catch( Throwable t ) |
336 | { |
337 | exceptionMonitor.exceptionCaught( this, t ); |
338 | } |
339 | } |
340 | } |
341 | |
342 | private void readSession( DatagramSession session ) |
343 | { |
344 | |
345 | ByteBuffer readBuf = ByteBuffer.allocate( 2048 ); |
346 | try |
347 | { |
348 | SocketAddress remoteAddress = session.getChannel().receive( |
349 | readBuf.buf() ); |
350 | if( remoteAddress != null ) |
351 | { |
352 | readBuf.flip(); |
353 | session.setRemoteAddress( remoteAddress ); |
354 | |
355 | ByteBuffer newBuf = ByteBuffer.allocate( readBuf.limit() ); |
356 | newBuf.put( readBuf ); |
357 | newBuf.flip(); |
358 | |
359 | session.increaseReadBytes( newBuf.remaining() ); |
360 | filters.dataRead( session, newBuf ); |
361 | } |
362 | } |
363 | catch( IOException e ) |
364 | { |
365 | filters.exceptionCaught( session, e ); |
366 | } |
367 | finally |
368 | { |
369 | readBuf.release(); |
370 | } |
371 | } |
372 | |
373 | private void flushSessions() |
374 | { |
375 | if( flushingSessions.size() == 0 ) |
376 | return; |
377 | |
378 | for( ;; ) |
379 | { |
380 | DatagramSession session; |
381 | |
382 | synchronized( flushingSessions ) |
383 | { |
384 | session = ( DatagramSession ) flushingSessions.pop(); |
385 | } |
386 | |
387 | if( session == null ) |
388 | break; |
389 | |
390 | try |
391 | { |
392 | flush( session ); |
393 | } |
394 | catch( IOException e ) |
395 | { |
396 | session.getManagerFilterChain().exceptionCaught( session, e ); |
397 | } |
398 | } |
399 | } |
400 | |
401 | private void flush( DatagramSession session ) throws IOException |
402 | { |
403 | DatagramChannel ch = session.getChannel(); |
404 | |
405 | Queue writeBufferQueue = session.getWriteBufferQueue(); |
406 | Queue writeMarkerQueue = session.getWriteMarkerQueue(); |
407 | |
408 | ByteBuffer buf; |
409 | Object marker; |
410 | for( ;; ) |
411 | { |
412 | synchronized( writeBufferQueue ) |
413 | { |
414 | buf = ( ByteBuffer ) writeBufferQueue.first(); |
415 | marker = writeMarkerQueue.first(); |
416 | } |
417 | |
418 | if( buf == null ) |
419 | break; |
420 | |
421 | if( buf.remaining() == 0 ) |
422 | { |
423 | // pop and fire event |
424 | synchronized( writeBufferQueue ) |
425 | { |
426 | writeBufferQueue.pop(); |
427 | writeMarkerQueue.pop(); |
428 | } |
429 | |
430 | try |
431 | { |
432 | buf.release(); |
433 | } |
434 | catch( IllegalStateException e ) |
435 | { |
436 | session.getManagerFilterChain().exceptionCaught( session, e ); |
437 | } |
438 | |
439 | session.increaseWrittenWriteRequests(); |
440 | session.getManagerFilterChain().dataWritten( session, marker ); |
441 | continue; |
442 | } |
443 | |
444 | SelectionKey key = session.getSelectionKey(); |
445 | if( key == null ) |
446 | { |
447 | scheduleFlush( session ); |
448 | break; |
449 | } |
450 | if( !key.isValid() ) |
451 | { |
452 | continue; |
453 | } |
454 | |
455 | int writtenBytes = ch |
456 | .send( buf.buf(), session.getRemoteAddress() ); |
457 | |
458 | if( writtenBytes == 0 ) |
459 | { |
460 | // Kernel buffer is full |
461 | key.interestOps( key.interestOps() | SelectionKey.OP_WRITE ); |
462 | } |
463 | else if( writtenBytes > 0 ) |
464 | { |
465 | key.interestOps( key.interestOps() |
466 | & ( ~SelectionKey.OP_WRITE ) ); |
467 | |
468 | // pop and fire event |
469 | synchronized( writeBufferQueue ) |
470 | { |
471 | writeBufferQueue.pop(); |
472 | writeMarkerQueue.pop(); |
473 | } |
474 | |
475 | session.increaseWrittenBytes( writtenBytes ); |
476 | session.increaseWrittenWriteRequests(); |
477 | session.getManagerFilterChain().dataWritten( session, marker ); |
478 | } |
479 | } |
480 | } |
481 | |
482 | private void registerNew() |
483 | { |
484 | if( registerQueue.isEmpty() ) |
485 | return; |
486 | |
487 | for( ;; ) |
488 | { |
489 | RegistrationRequest req; |
490 | synchronized( registerQueue ) |
491 | { |
492 | req = ( RegistrationRequest ) registerQueue.pop(); |
493 | } |
494 | |
495 | if( req == null ) |
496 | break; |
497 | |
498 | DatagramChannel ch = null; |
499 | try |
500 | { |
501 | ch = DatagramChannel.open(); |
502 | ch.configureBlocking( false ); |
503 | ch.socket().bind( req.address ); |
504 | ch.register( selector, SelectionKey.OP_READ, req ); |
505 | channels.put( req.address, ch ); |
506 | } |
507 | catch( Throwable t ) |
508 | { |
509 | req.exception = t; |
510 | } |
511 | finally |
512 | { |
513 | synchronized( req ) |
514 | { |
515 | req.done = true; |
516 | req.notify(); |
517 | } |
518 | |
519 | if( ch != null && req.exception != null ) |
520 | { |
521 | try |
522 | { |
523 | ch.close(); |
524 | } |
525 | catch( Throwable e ) |
526 | { |
527 | exceptionMonitor.exceptionCaught( this, e ); |
528 | } |
529 | } |
530 | } |
531 | } |
532 | } |
533 | |
534 | private void cancelKeys() |
535 | { |
536 | if( cancelQueue.isEmpty() ) |
537 | return; |
538 | |
539 | for( ;; ) |
540 | { |
541 | CancellationRequest request; |
542 | synchronized( cancelQueue ) |
543 | { |
544 | request = ( CancellationRequest ) cancelQueue.pop(); |
545 | } |
546 | |
547 | if( request == null ) |
548 | { |
549 | break; |
550 | } |
551 | |
552 | DatagramChannel ch = ( DatagramChannel ) channels.remove( request.address ); |
553 | // close the channel |
554 | try |
555 | { |
556 | if( ch == null ) |
557 | { |
558 | request.exception = new IllegalArgumentException( |
559 | "Address not bound: " + request.address ); |
560 | } |
561 | else |
562 | { |
563 | SelectionKey key = ch.keyFor( selector ); |
564 | key.cancel(); |
565 | selector.wakeup(); // wake up again to trigger thread death |
566 | ch.close(); |
567 | } |
568 | } |
569 | catch( Throwable t ) |
570 | { |
571 | exceptionMonitor.exceptionCaught( this, t ); |
572 | } |
573 | finally |
574 | { |
575 | synchronized( request ) |
576 | { |
577 | request.done = true; |
578 | request.notify(); |
579 | } |
580 | } |
581 | } |
582 | } |
583 | |
584 | public IoFilterChain getFilterChain() |
585 | { |
586 | return filters; |
587 | } |
588 | |
589 | private static class RegistrationRequest |
590 | { |
591 | private final SocketAddress address; |
592 | |
593 | private final IoHandler handler; |
594 | |
595 | private Throwable exception; |
596 | |
597 | private boolean done; |
598 | |
599 | private RegistrationRequest( SocketAddress address, IoHandler handler ) |
600 | { |
601 | this.address = address; |
602 | this.handler = handler; |
603 | } |
604 | } |
605 | |
606 | private static class CancellationRequest |
607 | { |
608 | private final SocketAddress address; |
609 | private boolean done; |
610 | private RuntimeException exception; |
611 | |
612 | private CancellationRequest( SocketAddress address ) |
613 | { |
614 | this.address = address; |
615 | } |
616 | } |
617 | } |