1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.mina.io.datagram;
20
21 import java.io.IOException;
22 import java.net.InetSocketAddress;
23 import java.net.SocketAddress;
24 import java.nio.channels.DatagramChannel;
25 import java.nio.channels.SelectionKey;
26 import java.nio.channels.Selector;
27 import java.util.HashMap;
28 import java.util.Iterator;
29 import java.util.Map;
30 import java.util.Set;
31
32 import org.apache.mina.common.ByteBuffer;
33 import org.apache.mina.io.IoAcceptor;
34 import org.apache.mina.io.IoFilterChain;
35 import org.apache.mina.io.IoHandler;
36 import org.apache.mina.io.IoSession;
37 import org.apache.mina.io.IoSessionManagerFilterChain;
38 import org.apache.mina.util.ExceptionUtil;
39 import org.apache.mina.util.Queue;
40
41 /***
42 * {@link IoAcceptor} for datagram transport (UDP/IP).
43 *
44 * @author The Apache Directory Project (dev@directory.apache.org)
45 * @version $Rev: 332218 $, $Date: 2005-11-10 12:52:42 +0900 $
46 */
47 public class DatagramAcceptor extends DatagramSessionManager implements IoAcceptor
48 {
49 private static volatile int nextId = 0;
50
51 private final IoSessionManagerFilterChain filters =
52 new DatagramSessionManagerFilterChain( this );
53
54 private final int id = nextId ++ ;
55
56 private Selector selector;
57
58 private final Map channels = new HashMap();
59
60 private final Queue registerQueue = new Queue();
61
62 private final Queue cancelQueue = new Queue();
63
64 private final Queue flushingSessions = new Queue();
65
66 private Worker worker;
67
68 /***
69 * Creates a new instance.
70 */
71 public DatagramAcceptor()
72 {
73 }
74
75 public void bind( SocketAddress address, IoHandler handler )
76 throws IOException
77 {
78 if( address == null )
79 throw new NullPointerException( "address" );
80 if( handler == null )
81 throw new NullPointerException( "handler" );
82
83 if( !( address instanceof InetSocketAddress ) )
84 throw new IllegalArgumentException( "Unexpected address type: "
85 + address.getClass() );
86 if( ( ( InetSocketAddress ) address ).getPort() == 0 )
87 throw new IllegalArgumentException( "Unsupported port number: 0" );
88
89 RegistrationRequest request = new RegistrationRequest( address, handler );
90 synchronized( this )
91 {
92 synchronized( registerQueue )
93 {
94 registerQueue.push( request );
95 }
96 startupWorker();
97 }
98 selector.wakeup();
99
100 synchronized( request )
101 {
102 while( !request.done )
103 {
104 try
105 {
106 request.wait();
107 }
108 catch( InterruptedException e )
109 {
110 }
111 }
112 }
113
114 if( request.exception != null )
115 {
116 ExceptionUtil.throwException( request.exception );
117 }
118 }
119
120 public void unbind( SocketAddress address )
121 {
122 if( address == null )
123 throw new NullPointerException( "address" );
124
125 CancellationRequest request = new CancellationRequest( address );
126 synchronized( this )
127 {
128 try
129 {
130 startupWorker();
131 }
132 catch( IOException e )
133 {
134
135
136
137
138 throw new IllegalArgumentException( "Address not bound: " + address );
139 }
140
141 synchronized( cancelQueue )
142 {
143 cancelQueue.push( request );
144 }
145 }
146 selector.wakeup();
147
148 synchronized( request )
149 {
150 while( !request.done )
151 {
152 try
153 {
154 request.wait();
155 }
156 catch( InterruptedException e )
157 {
158 }
159 }
160 }
161
162 if( request.exception != null )
163 {
164 request.exception.fillInStackTrace();
165 throw request.exception;
166 }
167 }
168
169 public IoSession newSession( SocketAddress remoteAddress, SocketAddress localAddress )
170 {
171 if( remoteAddress == null )
172 {
173 throw new NullPointerException( "remoteAddress" );
174 }
175 if( localAddress == null )
176 {
177 throw new NullPointerException( "localAddress" );
178 }
179
180 Selector selector = this.selector;
181 DatagramChannel ch = ( DatagramChannel ) channels.get( localAddress );
182 if( selector == null || ch == null )
183 {
184 throw new IllegalArgumentException( "Unknown localAddress: " + localAddress );
185 }
186
187 SelectionKey key = ch.keyFor( selector );
188 if( key == null )
189 {
190 throw new IllegalArgumentException( "Unknown lodalAddress: " + localAddress );
191 }
192
193 RegistrationRequest req = ( RegistrationRequest ) key.attachment();
194 DatagramSession s = new DatagramSession( filters, ch, req.handler );
195 s.setRemoteAddress( remoteAddress );
196 s.setSelectionKey( key );
197
198 try
199 {
200 req.handler.sessionCreated( s );
201 }
202 catch( Throwable t )
203 {
204 exceptionMonitor.exceptionCaught( this, t );
205 }
206
207 return s;
208 }
209
210 private synchronized void startupWorker() throws IOException
211 {
212 if( worker == null )
213 {
214 selector = Selector.open();
215 worker = new Worker();
216 worker.start();
217 }
218 }
219
220 void flushSession( DatagramSession session )
221 {
222 scheduleFlush( session );
223 selector.wakeup();
224 }
225
226 void closeSession( DatagramSession session )
227 {
228 }
229
230 private void scheduleFlush( DatagramSession session )
231 {
232 synchronized( flushingSessions )
233 {
234 flushingSessions.push( session );
235 }
236 }
237
238 private class Worker extends Thread
239 {
240 public Worker()
241 {
242 super( "DatagramAcceptor-" + id );
243 }
244
245 public void run()
246 {
247 for( ;; )
248 {
249 try
250 {
251 int nKeys = selector.select();
252
253 registerNew();
254
255 if( nKeys > 0 )
256 {
257 processReadySessions( selector.selectedKeys() );
258 }
259
260 flushSessions();
261 cancelKeys();
262
263 if( selector.keys().isEmpty() )
264 {
265 synchronized( DatagramAcceptor.this )
266 {
267 if( selector.keys().isEmpty() &&
268 registerQueue.isEmpty() &&
269 cancelQueue.isEmpty() )
270 {
271 worker = null;
272 try
273 {
274 selector.close();
275 }
276 catch( IOException e )
277 {
278 exceptionMonitor.exceptionCaught( DatagramAcceptor.this, e );
279 }
280 finally
281 {
282 selector = null;
283 }
284 break;
285 }
286 }
287 }
288 }
289 catch( IOException e )
290 {
291 exceptionMonitor.exceptionCaught( DatagramAcceptor.this,
292 e );
293
294 try
295 {
296 Thread.sleep( 1000 );
297 }
298 catch( InterruptedException e1 )
299 {
300 }
301 }
302 }
303 }
304 }
305
306 private void processReadySessions( Set keys )
307 {
308 Iterator it = keys.iterator();
309 while( it.hasNext() )
310 {
311 SelectionKey key = ( SelectionKey ) it.next();
312 it.remove();
313
314 DatagramChannel ch = ( DatagramChannel ) key.channel();
315
316 RegistrationRequest req = ( RegistrationRequest ) key.attachment();
317 DatagramSession session = new DatagramSession(
318 filters, ch, req.handler );
319 session.setSelectionKey( key );
320
321 try
322 {
323 req.handler.sessionCreated( session );
324
325 if( key.isReadable() )
326 {
327 readSession( session );
328 }
329
330 if( key.isWritable() )
331 {
332 scheduleFlush( session );
333 }
334 }
335 catch( Throwable t )
336 {
337 exceptionMonitor.exceptionCaught( this, t );
338 }
339 }
340 }
341
342 private void readSession( DatagramSession session )
343 {
344
345 ByteBuffer readBuf = ByteBuffer.allocate( 2048 );
346 try
347 {
348 SocketAddress remoteAddress = session.getChannel().receive(
349 readBuf.buf() );
350 if( remoteAddress != null )
351 {
352 readBuf.flip();
353 session.setRemoteAddress( remoteAddress );
354
355 ByteBuffer newBuf = ByteBuffer.allocate( readBuf.limit() );
356 newBuf.put( readBuf );
357 newBuf.flip();
358
359 session.increaseReadBytes( newBuf.remaining() );
360 filters.dataRead( session, newBuf );
361 }
362 }
363 catch( IOException e )
364 {
365 filters.exceptionCaught( session, e );
366 }
367 finally
368 {
369 readBuf.release();
370 }
371 }
372
373 private void flushSessions()
374 {
375 if( flushingSessions.size() == 0 )
376 return;
377
378 for( ;; )
379 {
380 DatagramSession session;
381
382 synchronized( flushingSessions )
383 {
384 session = ( DatagramSession ) flushingSessions.pop();
385 }
386
387 if( session == null )
388 break;
389
390 try
391 {
392 flush( session );
393 }
394 catch( IOException e )
395 {
396 session.getManagerFilterChain().exceptionCaught( session, e );
397 }
398 }
399 }
400
401 private void flush( DatagramSession session ) throws IOException
402 {
403 DatagramChannel ch = session.getChannel();
404
405 Queue writeBufferQueue = session.getWriteBufferQueue();
406 Queue writeMarkerQueue = session.getWriteMarkerQueue();
407
408 ByteBuffer buf;
409 Object marker;
410 for( ;; )
411 {
412 synchronized( writeBufferQueue )
413 {
414 buf = ( ByteBuffer ) writeBufferQueue.first();
415 marker = writeMarkerQueue.first();
416 }
417
418 if( buf == null )
419 break;
420
421 if( buf.remaining() == 0 )
422 {
423
424 synchronized( writeBufferQueue )
425 {
426 writeBufferQueue.pop();
427 writeMarkerQueue.pop();
428 }
429
430 try
431 {
432 buf.release();
433 }
434 catch( IllegalStateException e )
435 {
436 session.getManagerFilterChain().exceptionCaught( session, e );
437 }
438
439 session.increaseWrittenWriteRequests();
440 session.getManagerFilterChain().dataWritten( session, marker );
441 continue;
442 }
443
444 SelectionKey key = session.getSelectionKey();
445 if( key == null )
446 {
447 scheduleFlush( session );
448 break;
449 }
450 if( !key.isValid() )
451 {
452 continue;
453 }
454
455 int writtenBytes = ch
456 .send( buf.buf(), session.getRemoteAddress() );
457
458 if( writtenBytes == 0 )
459 {
460
461 key.interestOps( key.interestOps() | SelectionKey.OP_WRITE );
462 }
463 else if( writtenBytes > 0 )
464 {
465 key.interestOps( key.interestOps()
466 & ( ~SelectionKey.OP_WRITE ) );
467
468
469 synchronized( writeBufferQueue )
470 {
471 writeBufferQueue.pop();
472 writeMarkerQueue.pop();
473 }
474
475 session.increaseWrittenBytes( writtenBytes );
476 session.increaseWrittenWriteRequests();
477 session.getManagerFilterChain().dataWritten( session, marker );
478 }
479 }
480 }
481
482 private void registerNew()
483 {
484 if( registerQueue.isEmpty() )
485 return;
486
487 for( ;; )
488 {
489 RegistrationRequest req;
490 synchronized( registerQueue )
491 {
492 req = ( RegistrationRequest ) registerQueue.pop();
493 }
494
495 if( req == null )
496 break;
497
498 DatagramChannel ch = null;
499 try
500 {
501 ch = DatagramChannel.open();
502 ch.configureBlocking( false );
503 ch.socket().bind( req.address );
504 ch.register( selector, SelectionKey.OP_READ, req );
505 channels.put( req.address, ch );
506 }
507 catch( Throwable t )
508 {
509 req.exception = t;
510 }
511 finally
512 {
513 synchronized( req )
514 {
515 req.done = true;
516 req.notify();
517 }
518
519 if( ch != null && req.exception != null )
520 {
521 try
522 {
523 ch.close();
524 }
525 catch( Throwable e )
526 {
527 exceptionMonitor.exceptionCaught( this, e );
528 }
529 }
530 }
531 }
532 }
533
534 private void cancelKeys()
535 {
536 if( cancelQueue.isEmpty() )
537 return;
538
539 for( ;; )
540 {
541 CancellationRequest request;
542 synchronized( cancelQueue )
543 {
544 request = ( CancellationRequest ) cancelQueue.pop();
545 }
546
547 if( request == null )
548 {
549 break;
550 }
551
552 DatagramChannel ch = ( DatagramChannel ) channels.remove( request.address );
553
554 try
555 {
556 if( ch == null )
557 {
558 request.exception = new IllegalArgumentException(
559 "Address not bound: " + request.address );
560 }
561 else
562 {
563 SelectionKey key = ch.keyFor( selector );
564 key.cancel();
565 selector.wakeup();
566 ch.close();
567 }
568 }
569 catch( Throwable t )
570 {
571 exceptionMonitor.exceptionCaught( this, t );
572 }
573 finally
574 {
575 synchronized( request )
576 {
577 request.done = true;
578 request.notify();
579 }
580 }
581 }
582 }
583
584 public IoFilterChain getFilterChain()
585 {
586 return filters;
587 }
588
589 private static class RegistrationRequest
590 {
591 private final SocketAddress address;
592
593 private final IoHandler handler;
594
595 private Throwable exception;
596
597 private boolean done;
598
599 private RegistrationRequest( SocketAddress address, IoHandler handler )
600 {
601 this.address = address;
602 this.handler = handler;
603 }
604 }
605
606 private static class CancellationRequest
607 {
608 private final SocketAddress address;
609 private boolean done;
610 private RuntimeException exception;
611
612 private CancellationRequest( SocketAddress address )
613 {
614 this.address = address;
615 }
616 }
617 }