1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.mina.io.datagram;
20
21 import java.io.IOException;
22 import java.net.InetSocketAddress;
23 import java.net.SocketAddress;
24 import java.nio.channels.DatagramChannel;
25 import java.nio.channels.SelectionKey;
26 import java.nio.channels.Selector;
27 import java.util.Iterator;
28 import java.util.Set;
29
30 import org.apache.mina.common.ByteBuffer;
31 import org.apache.mina.io.IoConnector;
32 import org.apache.mina.io.IoFilterChain;
33 import org.apache.mina.io.IoHandler;
34 import org.apache.mina.io.IoSession;
35 import org.apache.mina.io.IoSessionManagerFilterChain;
36 import org.apache.mina.util.ExceptionUtil;
37 import org.apache.mina.util.Queue;
38
39 /***
40 * {@link IoConnector} for datagram transport (UDP/IP).
41 *
42 * @author The Apache Directory Project (dev@directory.apache.org)
43 * @version $Rev: 332218 $, $Date: 2005-11-10 12:52:42 +0900 $
44 */
45 public class DatagramConnector extends DatagramSessionManager implements IoConnector
46 {
47 private static volatile int nextId = 0;
48
49 private final IoSessionManagerFilterChain filters =
50 new DatagramSessionManagerFilterChain( this );
51
52 private final int id = nextId ++ ;
53
54 private Selector selector;
55
56 private final Queue registerQueue = new Queue();
57
58 private final Queue cancelQueue = new Queue();
59
60 private final Queue flushingSessions = new Queue();
61
62 private Worker worker;
63
64 /***
65 * Creates a new instance.
66 */
67 public DatagramConnector()
68 {
69 }
70
71 public IoSession connect( SocketAddress address, IoHandler handler ) throws IOException
72 {
73 return connect( address, null, handler);
74 }
75
76 public IoSession connect( SocketAddress address, int timeout, IoHandler handler ) throws IOException
77 {
78 return connect( address, null, handler );
79 }
80
81 public IoSession connect( SocketAddress address, SocketAddress localAddress, int timeout, IoHandler handler ) throws IOException
82 {
83 return connect( address, localAddress, handler );
84 }
85
86 public IoSession connect( SocketAddress address, SocketAddress localAddress,
87 IoHandler handler ) throws IOException
88 {
89 if( address == null )
90 throw new NullPointerException( "address" );
91 if( handler == null )
92 throw new NullPointerException( "handler" );
93
94 if( !( address instanceof InetSocketAddress ) )
95 throw new IllegalArgumentException( "Unexpected address type: "
96 + address.getClass() );
97
98 if( localAddress != null && !( localAddress instanceof InetSocketAddress ) )
99 {
100 throw new IllegalArgumentException( "Unexpected local address type: "
101 + localAddress.getClass() );
102 }
103
104 DatagramChannel ch = DatagramChannel.open();
105 boolean initialized = false;
106 try
107 {
108 ch.socket().setReuseAddress( true );
109 if( localAddress != null )
110 {
111 ch.socket().bind( localAddress );
112 }
113 ch.connect( address );
114 ch.configureBlocking( false );
115 initialized = true;
116 }
117 finally
118 {
119 if( !initialized )
120 {
121 ch.close();
122 }
123 }
124
125 RegistrationRequest request = new RegistrationRequest( ch, handler );
126 synchronized( this )
127 {
128 synchronized( registerQueue )
129 {
130 registerQueue.push( request );
131 }
132 startupWorker();
133 }
134
135 selector.wakeup();
136
137 synchronized( request )
138 {
139 while( !request.done )
140 {
141 try
142 {
143 request.wait();
144 }
145 catch( InterruptedException e )
146 {
147 }
148 }
149 }
150
151 if( request.exception != null )
152 {
153 ExceptionUtil.throwException( request.exception );
154 }
155
156 return request.session;
157 }
158
159 private synchronized void startupWorker() throws IOException
160 {
161 if( worker == null )
162 {
163 selector = Selector.open();
164 worker = new Worker();
165 worker.start();
166 }
167 }
168
169 void closeSession( DatagramSession session )
170 {
171 synchronized( this )
172 {
173 try
174 {
175 startupWorker();
176 }
177 catch( IOException e )
178 {
179
180
181
182
183
184 return;
185 }
186
187 synchronized( cancelQueue )
188 {
189 cancelQueue.push( session );
190 }
191 }
192
193 selector.wakeup();
194 }
195
196 void flushSession( DatagramSession session )
197 {
198 scheduleFlush( session );
199 selector.wakeup();
200 }
201
202 private void scheduleFlush( DatagramSession session )
203 {
204 synchronized( flushingSessions )
205 {
206 flushingSessions.push( session );
207 }
208 }
209
210 private class Worker extends Thread
211 {
212 public Worker()
213 {
214 super( "DatagramAcceptor-" + id );
215 }
216
217 public void run()
218 {
219 for( ;; )
220 {
221 try
222 {
223 int nKeys = selector.select();
224
225 registerNew();
226
227 if( nKeys > 0 )
228 {
229 processReadySessions( selector.selectedKeys() );
230 }
231
232 flushSessions();
233 cancelKeys();
234
235 if( selector.keys().isEmpty() )
236 {
237 synchronized( DatagramConnector.this )
238 {
239 if( selector.keys().isEmpty() &&
240 registerQueue.isEmpty() &&
241 cancelQueue.isEmpty() )
242 {
243 worker = null;
244 try
245 {
246 selector.close();
247 }
248 catch( IOException e )
249 {
250 exceptionMonitor.exceptionCaught( DatagramConnector.this, e );
251 }
252 finally
253 {
254 selector = null;
255 }
256 break;
257 }
258 }
259 }
260 }
261 catch( IOException e )
262 {
263 exceptionMonitor.exceptionCaught( DatagramConnector.this,
264 e );
265
266 try
267 {
268 Thread.sleep( 1000 );
269 }
270 catch( InterruptedException e1 )
271 {
272 }
273 }
274 }
275 }
276 }
277
278 private void processReadySessions( Set keys )
279 {
280 Iterator it = keys.iterator();
281 while( it.hasNext() )
282 {
283 SelectionKey key = ( SelectionKey ) it.next();
284 it.remove();
285
286 DatagramSession session = ( DatagramSession ) key.attachment();
287
288 if( key.isReadable() )
289 {
290 readSession( session );
291 }
292
293 if( key.isWritable() )
294 {
295 scheduleFlush( session );
296 }
297 }
298 }
299
300 private void readSession( DatagramSession session )
301 {
302
303 ByteBuffer readBuf = ByteBuffer.allocate( 2048 );
304 try
305 {
306 int readBytes = session.getChannel().read( readBuf.buf() );
307 if( readBytes > 0 )
308 {
309 readBuf.flip();
310 ByteBuffer newBuf = ByteBuffer.allocate( readBuf.limit() );
311 newBuf.put( readBuf );
312 newBuf.flip();
313
314 session.increaseReadBytes( readBytes );
315 filters.dataRead( session, newBuf );
316 }
317 }
318 catch( IOException e )
319 {
320 filters.exceptionCaught( session, e );
321 }
322 finally
323 {
324 readBuf.release();
325 }
326 }
327
328 private void flushSessions()
329 {
330 if( flushingSessions.size() == 0 )
331 return;
332
333 for( ;; )
334 {
335 DatagramSession session;
336
337 synchronized( flushingSessions )
338 {
339 session = ( DatagramSession ) flushingSessions.pop();
340 }
341
342 if( session == null )
343 break;
344
345 try
346 {
347 flush( session );
348 }
349 catch( IOException e )
350 {
351 session.getManagerFilterChain().exceptionCaught( session, e );
352 }
353 }
354 }
355
356 private void flush( DatagramSession session ) throws IOException
357 {
358 DatagramChannel ch = session.getChannel();
359
360 Queue writeBufferQueue = session.getWriteBufferQueue();
361 Queue writeMarkerQueue = session.getWriteMarkerQueue();
362
363 ByteBuffer buf;
364 Object marker;
365 for( ;; )
366 {
367 synchronized( writeBufferQueue )
368 {
369 buf = ( ByteBuffer ) writeBufferQueue.first();
370 marker = writeMarkerQueue.first();
371 }
372
373 if( buf == null )
374 break;
375
376 if( buf.remaining() == 0 )
377 {
378
379 synchronized( writeBufferQueue )
380 {
381 writeBufferQueue.pop();
382 writeMarkerQueue.pop();
383 }
384
385 try
386 {
387 buf.release();
388 }
389 catch( IllegalStateException e )
390 {
391 session.getManagerFilterChain().exceptionCaught( session, e );
392 }
393
394 session.increaseWrittenWriteRequests();
395 session.getManagerFilterChain().dataWritten( session, marker );
396 continue;
397 }
398
399 SelectionKey key = session.getSelectionKey();
400 if( key == null )
401 {
402 scheduleFlush( session );
403 break;
404 }
405 if( !key.isValid() )
406 {
407 continue;
408 }
409
410 int writtenBytes = ch.write( buf.buf() );
411
412 if( writtenBytes == 0 )
413 {
414
415 key.interestOps( key.interestOps() | SelectionKey.OP_WRITE );
416 }
417 else if( writtenBytes > 0 )
418 {
419 key.interestOps( key.interestOps()
420 & ( ~SelectionKey.OP_WRITE ) );
421
422
423 synchronized( writeBufferQueue )
424 {
425 writeBufferQueue.pop();
426 writeMarkerQueue.pop();
427 }
428
429 session.increaseWrittenBytes( writtenBytes );
430 session.increaseWrittenWriteRequests();
431 session.getManagerFilterChain().dataWritten( session, marker );
432 }
433 }
434 }
435
436 private void registerNew()
437 {
438 if( registerQueue.isEmpty() )
439 return;
440
441 for( ;; )
442 {
443 RegistrationRequest req;
444 synchronized( registerQueue )
445 {
446 req = ( RegistrationRequest ) registerQueue.pop();
447 }
448
449 if( req == null )
450 break;
451
452 DatagramSession session = new DatagramSession(
453 filters, req.channel, req.handler );
454
455 try
456 {
457 req.handler.sessionCreated( session );
458
459 SelectionKey key = req.channel.register( selector,
460 SelectionKey.OP_READ, session );
461
462 session.setSelectionKey( key );
463 }
464 catch( Throwable t )
465 {
466 req.exception = t;
467 }
468 finally
469 {
470 synchronized( req )
471 {
472 req.done = true;
473 req.session = session;
474 req.notify();
475 }
476
477 if( req.exception != null )
478 {
479 try
480 {
481 req.channel.close();
482 }
483 catch (IOException e)
484 {
485 exceptionMonitor.exceptionCaught( this, e );
486 }
487 }
488 }
489 }
490 }
491
492 private void cancelKeys()
493 {
494 if( cancelQueue.isEmpty() )
495 return;
496
497 for( ;; )
498 {
499 DatagramSession session;
500 synchronized( cancelQueue )
501 {
502 session = ( DatagramSession ) cancelQueue.pop();
503 }
504 if( session == null )
505 break;
506 else
507 {
508 SelectionKey key = session.getSelectionKey();
509 DatagramChannel ch = ( DatagramChannel ) key.channel();
510 try
511 {
512 ch.close();
513 }
514 catch( IOException e )
515 {
516 exceptionMonitor.exceptionCaught( this, e );
517 }
518 session.notifyClose();
519 key.cancel();
520 selector.wakeup();
521 }
522 }
523 }
524
525 public IoFilterChain getFilterChain()
526 {
527 return filters;
528 }
529
530 private static class RegistrationRequest
531 {
532 private final DatagramChannel channel;
533
534 private final IoHandler handler;
535
536 private boolean done;
537
538 private DatagramSession session;
539
540 private Throwable exception;
541
542 private RegistrationRequest( DatagramChannel channel,
543 IoHandler handler )
544 {
545 this.channel = channel;
546 this.handler = handler;
547 }
548 }
549 }