1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.mina.io.datagram;
20
21 import java.io.IOException;
22 import java.net.InetSocketAddress;
23 import java.net.SocketAddress;
24 import java.nio.channels.DatagramChannel;
25 import java.nio.channels.SelectionKey;
26 import java.nio.channels.Selector;
27 import java.util.HashMap;
28 import java.util.Iterator;
29 import java.util.Map;
30 import java.util.Set;
31
32 import org.apache.mina.common.ByteBuffer;
33 import org.apache.mina.io.IoAcceptor;
34 import org.apache.mina.io.IoFilterChain;
35 import org.apache.mina.io.IoHandler;
36 import org.apache.mina.io.IoSessionManagerFilterChain;
37 import org.apache.mina.util.ExceptionUtil;
38 import org.apache.mina.util.Queue;
39
40 /***
41 * {@link IoAcceptor} for datagram transport (UDP/IP).
42 *
43 * @author Trustin Lee (trustin@apache.org)
44 * @version $Rev: 165586 $, $Date: 2005-05-02 15:27:27 +0900 (?, 02 5? 2005) $
45 */
46 public class DatagramAcceptor extends DatagramSessionManager implements IoAcceptor
47 {
48 private static volatile int nextId = 0;
49
50 private final IoSessionManagerFilterChain filters =
51 new DatagramSessionManagerFilterChain( this );
52
53 private final int id = nextId ++ ;
54
55 private final Selector selector;
56
57 private final Map channels = new HashMap();
58
59 private final Queue registerQueue = new Queue();
60
61 private final Queue cancelQueue = new Queue();
62
63 private final Queue flushingSessions = new Queue();
64
65 private Worker worker;
66
67 /***
68 * Creates a new instance.
69 *
70 * @throws IOException if failed to open a selector.
71 */
72 public DatagramAcceptor() throws IOException
73 {
74 selector = Selector.open();
75 }
76
77 public void bind( SocketAddress address, IoHandler handler )
78 throws IOException
79 {
80 if( address == null )
81 throw new NullPointerException( "address" );
82 if( handler == null )
83 throw new NullPointerException( "handler" );
84
85 if( !( address instanceof InetSocketAddress ) )
86 throw new IllegalArgumentException( "Unexpected address type: "
87 + address.getClass() );
88 if( ( ( InetSocketAddress ) address ).getPort() == 0 )
89 throw new IllegalArgumentException( "Unsupported port number: 0" );
90
91 RegistrationRequest request = new RegistrationRequest( address, handler );
92 synchronized( this )
93 {
94 synchronized( registerQueue )
95 {
96 registerQueue.push( request );
97 }
98 startupWorker();
99 }
100 selector.wakeup();
101
102 synchronized( request )
103 {
104 while( !request.done )
105 {
106 try
107 {
108 request.wait();
109 }
110 catch( InterruptedException e )
111 {
112 }
113 }
114 }
115
116 if( request.exception != null )
117 {
118 ExceptionUtil.throwException( request.exception );
119 }
120 }
121
122 public void unbind( SocketAddress address )
123 {
124 if( address == null )
125 throw new NullPointerException( "address" );
126
127 CancellationRequest request = new CancellationRequest( address );
128 synchronized( this )
129 {
130 synchronized( cancelQueue )
131 {
132 cancelQueue.push( request );
133 }
134 startupWorker();
135 }
136 selector.wakeup();
137
138 synchronized( request )
139 {
140 while( !request.done )
141 {
142 try
143 {
144 request.wait();
145 }
146 catch( InterruptedException e )
147 {
148 }
149 }
150 }
151
152 if( request.exception != null )
153 {
154 request.exception.fillInStackTrace();
155 throw request.exception;
156 }
157 }
158
159 private synchronized void startupWorker()
160 {
161 if( worker == null )
162 {
163 worker = new Worker();
164 worker.start();
165 }
166 }
167
168 void flushSession( DatagramSession session )
169 {
170 scheduleFlush( session );
171 selector.wakeup();
172 }
173
174 void closeSession( DatagramSession session )
175 {
176 }
177
178 private void scheduleFlush( DatagramSession session )
179 {
180 synchronized( flushingSessions )
181 {
182 flushingSessions.push( session );
183 }
184 }
185
186 private class Worker extends Thread
187 {
188 public Worker()
189 {
190 super( "DatagramAcceptor-" + id );
191 }
192
193 public void run()
194 {
195 for( ;; )
196 {
197 try
198 {
199 int nKeys = selector.select();
200
201 registerNew();
202
203 if( nKeys > 0 )
204 {
205 processReadySessions( selector.selectedKeys() );
206 }
207
208 flushSessions();
209 cancelKeys();
210
211 if( selector.keys().isEmpty() )
212 {
213 synchronized( DatagramAcceptor.this )
214 {
215 if( selector.keys().isEmpty() &&
216 registerQueue.isEmpty() &&
217 cancelQueue.isEmpty() )
218 {
219 worker = null;
220 break;
221 }
222 }
223 }
224 }
225 catch( IOException e )
226 {
227 exceptionMonitor.exceptionCaught( DatagramAcceptor.this,
228 e );
229
230 try
231 {
232 Thread.sleep( 1000 );
233 }
234 catch( InterruptedException e1 )
235 {
236 }
237 }
238 }
239 }
240 }
241
242 private void processReadySessions( Set keys )
243 {
244 Iterator it = keys.iterator();
245 while( it.hasNext() )
246 {
247 SelectionKey key = ( SelectionKey ) it.next();
248 it.remove();
249
250 DatagramChannel ch = ( DatagramChannel ) key.channel();
251
252 RegistrationRequest req = ( RegistrationRequest ) key.attachment();
253 DatagramSession session = new DatagramSession(
254 filters, ch, req.handler );
255 session.setSelectionKey( key );
256
257 try
258 {
259 req.handler.sessionCreated( session );
260
261 if( key.isReadable() )
262 {
263 readSession( session );
264 }
265
266 if( key.isWritable() )
267 {
268 scheduleFlush( session );
269 }
270 }
271 catch( Throwable t )
272 {
273 exceptionMonitor.exceptionCaught( this, t );
274 }
275 }
276 }
277
278 private void readSession( DatagramSession session )
279 {
280
281 ByteBuffer readBuf = ByteBuffer.allocate( 2048 );
282 try
283 {
284 SocketAddress remoteAddress = session.getChannel().receive(
285 readBuf.buf() );
286 if( remoteAddress != null )
287 {
288 readBuf.flip();
289 session.setRemoteAddress( remoteAddress );
290
291 ByteBuffer newBuf = ByteBuffer.allocate( readBuf.limit() );
292 newBuf.put( readBuf );
293 newBuf.flip();
294
295 session.increaseReadBytes( newBuf.remaining() );
296 filters.dataRead( session, newBuf );
297 }
298 }
299 catch( IOException e )
300 {
301 filters.exceptionCaught( session, e );
302 }
303 finally
304 {
305 readBuf.release();
306 }
307 }
308
309 private void flushSessions()
310 {
311 if( flushingSessions.size() == 0 )
312 return;
313
314 for( ;; )
315 {
316 DatagramSession session;
317
318 synchronized( flushingSessions )
319 {
320 session = ( DatagramSession ) flushingSessions.pop();
321 }
322
323 if( session == null )
324 break;
325
326 try
327 {
328 flush( session );
329 }
330 catch( IOException e )
331 {
332 session.getManagerFilterChain().exceptionCaught( session, e );
333 }
334 }
335 }
336
337 private void flush( DatagramSession session ) throws IOException
338 {
339 DatagramChannel ch = session.getChannel();
340
341 Queue writeBufferQueue = session.getWriteBufferQueue();
342 Queue writeMarkerQueue = session.getWriteMarkerQueue();
343
344 ByteBuffer buf;
345 Object marker;
346 for( ;; )
347 {
348 synchronized( writeBufferQueue )
349 {
350 buf = ( ByteBuffer ) writeBufferQueue.first();
351 marker = writeMarkerQueue.first();
352 }
353
354 if( buf == null )
355 break;
356
357 if( buf.remaining() == 0 )
358 {
359
360 synchronized( writeBufferQueue )
361 {
362 writeBufferQueue.pop();
363 writeMarkerQueue.pop();
364 }
365
366 try
367 {
368 buf.release();
369 }
370 catch( IllegalStateException e )
371 {
372 session.getManagerFilterChain().exceptionCaught( session, e );
373 }
374
375 session.getManagerFilterChain().dataWritten( session, marker );
376 continue;
377 }
378
379 int writtenBytes = ch
380 .send( buf.buf(), session.getRemoteAddress() );
381
382 SelectionKey key = session.getSelectionKey();
383 if( writtenBytes == 0 )
384 {
385
386 key.interestOps( key.interestOps() | SelectionKey.OP_WRITE );
387 }
388 else if( writtenBytes > 0 )
389 {
390 key.interestOps( key.interestOps()
391 & ( ~SelectionKey.OP_WRITE ) );
392
393
394 synchronized( writeBufferQueue )
395 {
396 writeBufferQueue.pop();
397 writeMarkerQueue.pop();
398 }
399
400 session.increaseWrittenBytes( writtenBytes );
401 session.getManagerFilterChain().dataWritten( session, marker );
402 }
403 }
404 }
405
406 private void registerNew()
407 {
408 if( registerQueue.isEmpty() )
409 return;
410
411 for( ;; )
412 {
413 RegistrationRequest req;
414 synchronized( registerQueue )
415 {
416 req = ( RegistrationRequest ) registerQueue.pop();
417 }
418
419 if( req == null )
420 break;
421
422 DatagramChannel ch = null;
423 try
424 {
425 ch = DatagramChannel.open();
426 ch.configureBlocking( false );
427 ch.socket().bind( req.address );
428 ch.register( selector, SelectionKey.OP_READ, req );
429 channels.put( req.address, ch );
430 }
431 catch( Throwable t )
432 {
433 req.exception = t;
434 }
435 finally
436 {
437 synchronized( req )
438 {
439 req.done = true;
440 req.notify();
441 }
442
443 if( ch != null && req.exception != null )
444 {
445 try
446 {
447 ch.close();
448 }
449 catch( Throwable e )
450 {
451 exceptionMonitor.exceptionCaught( this, e );
452 }
453 }
454 }
455 }
456 }
457
458 private void cancelKeys()
459 {
460 if( cancelQueue.isEmpty() )
461 return;
462
463 for( ;; )
464 {
465 CancellationRequest request;
466 synchronized( cancelQueue )
467 {
468 request = ( CancellationRequest ) cancelQueue.pop();
469 }
470
471 if( request == null )
472 {
473 break;
474 }
475
476 DatagramChannel ch = ( DatagramChannel ) channels.remove( request.address );
477
478 try
479 {
480 if( ch == null )
481 {
482 request.exception = new IllegalArgumentException(
483 "Address not bound: " + request.address );
484 }
485 else
486 {
487 SelectionKey key = ch.keyFor( selector );
488 key.cancel();
489 selector.wakeup();
490 ch.close();
491 }
492 }
493 catch( Throwable t )
494 {
495 exceptionMonitor.exceptionCaught( this, t );
496 }
497 finally
498 {
499 synchronized( request )
500 {
501 request.done = true;
502 request.notify();
503 }
504 }
505 }
506 }
507
508 public IoFilterChain getFilterChain()
509 {
510 return filters;
511 }
512
513 private static class RegistrationRequest
514 {
515 private final SocketAddress address;
516
517 private final IoHandler handler;
518
519 private Throwable exception;
520
521 private boolean done;
522
523 private RegistrationRequest( SocketAddress address, IoHandler handler )
524 {
525 this.address = address;
526 this.handler = handler;
527 }
528 }
529
530 private static class CancellationRequest
531 {
532 private final SocketAddress address;
533 private boolean done;
534 private RuntimeException exception;
535
536 private CancellationRequest( SocketAddress address )
537 {
538 this.address = address;
539 }
540 }
541 }