1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.mina.io.datagram;
20
21 import java.io.IOException;
22 import java.net.InetSocketAddress;
23 import java.net.SocketAddress;
24 import java.nio.channels.DatagramChannel;
25 import java.nio.channels.SelectionKey;
26 import java.nio.channels.Selector;
27 import java.util.HashMap;
28 import java.util.Iterator;
29 import java.util.Map;
30 import java.util.Set;
31
32 import org.apache.mina.common.ByteBuffer;
33 import org.apache.mina.io.IoAcceptor;
34 import org.apache.mina.io.IoFilterChain;
35 import org.apache.mina.io.IoHandler;
36 import org.apache.mina.io.IoSessionManagerFilterChain;
37 import org.apache.mina.util.ExceptionUtil;
38 import org.apache.mina.util.Queue;
39
40 /***
41 * {@link IoAcceptor} for datagram transport (UDP/IP).
42 *
43 * @author Trustin Lee (trustin@apache.org)
44 * @version $Rev: 188654 $, $Date: 2005-06-07 10:31:11 +0900 $
45 */
46 public class DatagramAcceptor extends DatagramSessionManager implements IoAcceptor
47 {
48 private static volatile int nextId = 0;
49
50 private final IoSessionManagerFilterChain filters =
51 new DatagramSessionManagerFilterChain( this );
52
53 private final int id = nextId ++ ;
54
55 private Selector selector;
56
57 private final Map channels = new HashMap();
58
59 private final Queue registerQueue = new Queue();
60
61 private final Queue cancelQueue = new Queue();
62
63 private final Queue flushingSessions = new Queue();
64
65 private Worker worker;
66
67 /***
68 * Creates a new instance.
69 */
70 public DatagramAcceptor()
71 {
72 }
73
74 public void bind( SocketAddress address, IoHandler handler )
75 throws IOException
76 {
77 if( address == null )
78 throw new NullPointerException( "address" );
79 if( handler == null )
80 throw new NullPointerException( "handler" );
81
82 if( !( address instanceof InetSocketAddress ) )
83 throw new IllegalArgumentException( "Unexpected address type: "
84 + address.getClass() );
85 if( ( ( InetSocketAddress ) address ).getPort() == 0 )
86 throw new IllegalArgumentException( "Unsupported port number: 0" );
87
88 RegistrationRequest request = new RegistrationRequest( address, handler );
89 synchronized( this )
90 {
91 synchronized( registerQueue )
92 {
93 registerQueue.push( request );
94 }
95 startupWorker();
96 }
97 selector.wakeup();
98
99 synchronized( request )
100 {
101 while( !request.done )
102 {
103 try
104 {
105 request.wait();
106 }
107 catch( InterruptedException e )
108 {
109 }
110 }
111 }
112
113 if( request.exception != null )
114 {
115 ExceptionUtil.throwException( request.exception );
116 }
117 }
118
119 public void unbind( SocketAddress address )
120 {
121 if( address == null )
122 throw new NullPointerException( "address" );
123
124 CancellationRequest request = new CancellationRequest( address );
125 synchronized( this )
126 {
127 try
128 {
129 startupWorker();
130 }
131 catch( IOException e )
132 {
133
134
135
136
137 throw new IllegalArgumentException( "Address not bound: " + address );
138 }
139
140 synchronized( cancelQueue )
141 {
142 cancelQueue.push( request );
143 }
144 }
145 selector.wakeup();
146
147 synchronized( request )
148 {
149 while( !request.done )
150 {
151 try
152 {
153 request.wait();
154 }
155 catch( InterruptedException e )
156 {
157 }
158 }
159 }
160
161 if( request.exception != null )
162 {
163 request.exception.fillInStackTrace();
164 throw request.exception;
165 }
166 }
167
168 private synchronized void startupWorker() throws IOException
169 {
170 if( worker == null )
171 {
172 selector = Selector.open();
173 worker = new Worker();
174 worker.start();
175 }
176 }
177
178 void flushSession( DatagramSession session )
179 {
180 scheduleFlush( session );
181 selector.wakeup();
182 }
183
184 void closeSession( DatagramSession session )
185 {
186 }
187
188 private void scheduleFlush( DatagramSession session )
189 {
190 synchronized( flushingSessions )
191 {
192 flushingSessions.push( session );
193 }
194 }
195
196 private class Worker extends Thread
197 {
198 public Worker()
199 {
200 super( "DatagramAcceptor-" + id );
201 }
202
203 public void run()
204 {
205 for( ;; )
206 {
207 try
208 {
209 int nKeys = selector.select();
210
211 registerNew();
212
213 if( nKeys > 0 )
214 {
215 processReadySessions( selector.selectedKeys() );
216 }
217
218 flushSessions();
219 cancelKeys();
220
221 if( selector.keys().isEmpty() )
222 {
223 synchronized( DatagramAcceptor.this )
224 {
225 if( selector.keys().isEmpty() &&
226 registerQueue.isEmpty() &&
227 cancelQueue.isEmpty() )
228 {
229 worker = null;
230 try
231 {
232 selector.close();
233 }
234 catch( IOException e )
235 {
236 exceptionMonitor.exceptionCaught( DatagramAcceptor.this, e );
237 }
238 finally
239 {
240 selector = null;
241 }
242 break;
243 }
244 }
245 }
246 }
247 catch( IOException e )
248 {
249 exceptionMonitor.exceptionCaught( DatagramAcceptor.this,
250 e );
251
252 try
253 {
254 Thread.sleep( 1000 );
255 }
256 catch( InterruptedException e1 )
257 {
258 }
259 }
260 }
261 }
262 }
263
264 private void processReadySessions( Set keys )
265 {
266 Iterator it = keys.iterator();
267 while( it.hasNext() )
268 {
269 SelectionKey key = ( SelectionKey ) it.next();
270 it.remove();
271
272 DatagramChannel ch = ( DatagramChannel ) key.channel();
273
274 RegistrationRequest req = ( RegistrationRequest ) key.attachment();
275 DatagramSession session = new DatagramSession(
276 filters, ch, req.handler );
277 session.setSelectionKey( key );
278
279 try
280 {
281 req.handler.sessionCreated( session );
282
283 if( key.isReadable() )
284 {
285 readSession( session );
286 }
287
288 if( key.isWritable() )
289 {
290 scheduleFlush( session );
291 }
292 }
293 catch( Throwable t )
294 {
295 exceptionMonitor.exceptionCaught( this, t );
296 }
297 }
298 }
299
300 private void readSession( DatagramSession session )
301 {
302
303 ByteBuffer readBuf = ByteBuffer.allocate( 2048 );
304 try
305 {
306 SocketAddress remoteAddress = session.getChannel().receive(
307 readBuf.buf() );
308 if( remoteAddress != null )
309 {
310 readBuf.flip();
311 session.setRemoteAddress( remoteAddress );
312
313 ByteBuffer newBuf = ByteBuffer.allocate( readBuf.limit() );
314 newBuf.put( readBuf );
315 newBuf.flip();
316
317 session.increaseReadBytes( newBuf.remaining() );
318 filters.dataRead( session, newBuf );
319 }
320 }
321 catch( IOException e )
322 {
323 filters.exceptionCaught( session, e );
324 }
325 finally
326 {
327 readBuf.release();
328 }
329 }
330
331 private void flushSessions()
332 {
333 if( flushingSessions.size() == 0 )
334 return;
335
336 for( ;; )
337 {
338 DatagramSession session;
339
340 synchronized( flushingSessions )
341 {
342 session = ( DatagramSession ) flushingSessions.pop();
343 }
344
345 if( session == null )
346 break;
347
348 try
349 {
350 flush( session );
351 }
352 catch( IOException e )
353 {
354 session.getManagerFilterChain().exceptionCaught( session, e );
355 }
356 }
357 }
358
359 private void flush( DatagramSession session ) throws IOException
360 {
361 DatagramChannel ch = session.getChannel();
362
363 Queue writeBufferQueue = session.getWriteBufferQueue();
364 Queue writeMarkerQueue = session.getWriteMarkerQueue();
365
366 ByteBuffer buf;
367 Object marker;
368 for( ;; )
369 {
370 synchronized( writeBufferQueue )
371 {
372 buf = ( ByteBuffer ) writeBufferQueue.first();
373 marker = writeMarkerQueue.first();
374 }
375
376 if( buf == null )
377 break;
378
379 if( buf.remaining() == 0 )
380 {
381
382 synchronized( writeBufferQueue )
383 {
384 writeBufferQueue.pop();
385 writeMarkerQueue.pop();
386 }
387
388 try
389 {
390 buf.release();
391 }
392 catch( IllegalStateException e )
393 {
394 session.getManagerFilterChain().exceptionCaught( session, e );
395 }
396
397 session.getManagerFilterChain().dataWritten( session, marker );
398 continue;
399 }
400
401 int writtenBytes = ch
402 .send( buf.buf(), session.getRemoteAddress() );
403
404 SelectionKey key = session.getSelectionKey();
405 if( writtenBytes == 0 )
406 {
407
408 key.interestOps( key.interestOps() | SelectionKey.OP_WRITE );
409 }
410 else if( writtenBytes > 0 )
411 {
412 key.interestOps( key.interestOps()
413 & ( ~SelectionKey.OP_WRITE ) );
414
415
416 synchronized( writeBufferQueue )
417 {
418 writeBufferQueue.pop();
419 writeMarkerQueue.pop();
420 }
421
422 session.increaseWrittenBytes( writtenBytes );
423 session.getManagerFilterChain().dataWritten( session, marker );
424 }
425 }
426 }
427
428 private void registerNew()
429 {
430 if( registerQueue.isEmpty() )
431 return;
432
433 for( ;; )
434 {
435 RegistrationRequest req;
436 synchronized( registerQueue )
437 {
438 req = ( RegistrationRequest ) registerQueue.pop();
439 }
440
441 if( req == null )
442 break;
443
444 DatagramChannel ch = null;
445 try
446 {
447 ch = DatagramChannel.open();
448 ch.configureBlocking( false );
449 ch.socket().bind( req.address );
450 ch.register( selector, SelectionKey.OP_READ, req );
451 channels.put( req.address, ch );
452 }
453 catch( Throwable t )
454 {
455 req.exception = t;
456 }
457 finally
458 {
459 synchronized( req )
460 {
461 req.done = true;
462 req.notify();
463 }
464
465 if( ch != null && req.exception != null )
466 {
467 try
468 {
469 ch.close();
470 }
471 catch( Throwable e )
472 {
473 exceptionMonitor.exceptionCaught( this, e );
474 }
475 }
476 }
477 }
478 }
479
480 private void cancelKeys()
481 {
482 if( cancelQueue.isEmpty() )
483 return;
484
485 for( ;; )
486 {
487 CancellationRequest request;
488 synchronized( cancelQueue )
489 {
490 request = ( CancellationRequest ) cancelQueue.pop();
491 }
492
493 if( request == null )
494 {
495 break;
496 }
497
498 DatagramChannel ch = ( DatagramChannel ) channels.remove( request.address );
499
500 try
501 {
502 if( ch == null )
503 {
504 request.exception = new IllegalArgumentException(
505 "Address not bound: " + request.address );
506 }
507 else
508 {
509 SelectionKey key = ch.keyFor( selector );
510 key.cancel();
511 selector.wakeup();
512 ch.close();
513 }
514 }
515 catch( Throwable t )
516 {
517 exceptionMonitor.exceptionCaught( this, t );
518 }
519 finally
520 {
521 synchronized( request )
522 {
523 request.done = true;
524 request.notify();
525 }
526 }
527 }
528 }
529
530 public IoFilterChain getFilterChain()
531 {
532 return filters;
533 }
534
535 private static class RegistrationRequest
536 {
537 private final SocketAddress address;
538
539 private final IoHandler handler;
540
541 private Throwable exception;
542
543 private boolean done;
544
545 private RegistrationRequest( SocketAddress address, IoHandler handler )
546 {
547 this.address = address;
548 this.handler = handler;
549 }
550 }
551
552 private static class CancellationRequest
553 {
554 private final SocketAddress address;
555 private boolean done;
556 private RuntimeException exception;
557
558 private CancellationRequest( SocketAddress address )
559 {
560 this.address = address;
561 }
562 }
563 }