View Javadoc

1   /*
2    *   @(#) $Id: DatagramAcceptor.java 357871 2005-12-20 01:56:40Z trustin $
3    *
4    *   Copyright 2004 The Apache Software Foundation
5    *
6    *   Licensed under the Apache License, Version 2.0 (the "License");
7    *   you may not use this file except in compliance with the License.
8    *   You may obtain a copy of the License at
9    *
10   *       http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *   Unless required by applicable law or agreed to in writing, software
13   *   distributed under the License is distributed on an "AS IS" BASIS,
14   *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   *   See the License for the specific language governing permissions and
16   *   limitations under the License.
17   *
18   */
19  package org.apache.mina.io.datagram;
20  
21  import java.io.IOException;
22  import java.net.InetSocketAddress;
23  import java.net.SocketAddress;
24  import java.nio.channels.DatagramChannel;
25  import java.nio.channels.SelectionKey;
26  import java.nio.channels.Selector;
27  import java.util.HashMap;
28  import java.util.Iterator;
29  import java.util.Map;
30  import java.util.Set;
31  
32  import org.apache.mina.common.ByteBuffer;
33  import org.apache.mina.io.IoAcceptor;
34  import org.apache.mina.io.IoFilterChain;
35  import org.apache.mina.io.IoHandler;
36  import org.apache.mina.io.IoSession;
37  import org.apache.mina.io.IoSessionManagerFilterChain;
38  import org.apache.mina.util.ExceptionUtil;
39  import org.apache.mina.util.Queue;
40  
41  /***
42   * {@link IoAcceptor} for datagram transport (UDP/IP).
43   * 
44   * @author The Apache Directory Project (dev@directory.apache.org)
45   * @version $Rev: 357871 $, $Date: 2005-12-20 10:56:40 +0900 (Tue, 20 Dec 2005) $
46   */
47  public class DatagramAcceptor extends DatagramSessionManager implements IoAcceptor
48  {
49      private static volatile int nextId = 0;
50  
51      private final IoSessionManagerFilterChain filters =
52          new DatagramSessionManagerFilterChain( this );
53  
54      private final int id = nextId ++ ;
55  
56      private Selector selector;
57  
58      private final Map channels = new HashMap();
59  
60      private final Queue registerQueue = new Queue();
61  
62      private final Queue cancelQueue = new Queue();
63  
64      private final Queue flushingSessions = new Queue();
65  
66      private Worker worker;
67  
68      /***
69       * Creates a new instance.
70       */
71      public DatagramAcceptor()
72      {
73      }
74  
75      public void bind( SocketAddress address, IoHandler handler )
76              throws IOException
77      {
78          if( address == null )
79              throw new NullPointerException( "address" );
80          if( handler == null )
81              throw new NullPointerException( "handler" );
82  
83          if( !( address instanceof InetSocketAddress ) )
84              throw new IllegalArgumentException( "Unexpected address type: "
85                                                  + address.getClass() );
86          if( ( ( InetSocketAddress ) address ).getPort() == 0 )
87              throw new IllegalArgumentException( "Unsupported port number: 0" );
88          
89          RegistrationRequest request = new RegistrationRequest( address, handler );
90          synchronized( this )
91          {
92              synchronized( registerQueue )
93              {
94                  registerQueue.push( request );
95              }
96              startupWorker();
97          }
98          selector.wakeup();
99          
100         synchronized( request )
101         {
102             while( !request.done )
103             {
104                 try
105                 {
106                     request.wait();
107                 }
108                 catch( InterruptedException e )
109                 {
110                 }
111             }
112         }
113         
114         if( request.exception != null )
115         {
116             ExceptionUtil.throwException( request.exception );
117         }
118     }
119 
120     public void unbind( SocketAddress address )
121     {
122         if( address == null )
123             throw new NullPointerException( "address" );
124 
125         CancellationRequest request = new CancellationRequest( address );
126         synchronized( this )
127         {
128             try
129             {
130                 startupWorker();
131             }
132             catch( IOException e )
133             {
134                 // IOException is thrown only when Worker thread is not
135                 // running and failed to open a selector.  We simply throw
136                 // IllegalArgumentException here because we can simply
137                 // conclude that nothing is bound to the selector.
138                 throw new IllegalArgumentException( "Address not bound: " + address );
139             }
140 
141             synchronized( cancelQueue )
142             {
143                 cancelQueue.push( request );
144             }
145         }
146         selector.wakeup();
147         
148         synchronized( request )
149         {
150             while( !request.done )
151             {
152                 try
153                 {
154                     request.wait();
155                 }
156                 catch( InterruptedException e )
157                 {
158                 }
159             }
160         }
161         
162         if( request.exception != null )
163         {
164             request.exception.fillInStackTrace();
165             throw request.exception;
166         }
167     }
168     
169     public IoSession newSession( SocketAddress remoteAddress, SocketAddress localAddress )
170     {
171         if( remoteAddress == null )
172         {
173             throw new NullPointerException( "remoteAddress" );
174         }
175         if( localAddress == null )
176         {
177             throw new NullPointerException( "localAddress" );
178         }
179         
180         Selector selector = this.selector;
181         DatagramChannel ch = ( DatagramChannel ) channels.get( localAddress );
182         if( selector == null || ch == null )
183         {
184             throw new IllegalArgumentException( "Unknown localAddress: " + localAddress );
185         }
186             
187         SelectionKey key = ch.keyFor( selector );
188         if( key == null )
189         {
190             throw new IllegalArgumentException( "Unknown lodalAddress: " + localAddress );
191         }
192 
193         RegistrationRequest req = ( RegistrationRequest ) key.attachment();
194         DatagramSession s = new DatagramSession( filters, ch, req.handler );
195         s.setRemoteAddress( remoteAddress );
196         s.setSelectionKey( key );
197         
198         try
199         {
200             req.handler.sessionCreated( s );
201         }
202         catch( Throwable t )
203         {
204             exceptionMonitor.exceptionCaught( this, t );
205         }
206         
207         return s;
208     }
209 
210     private synchronized void startupWorker() throws IOException
211     {
212         if( worker == null )
213         {
214             selector = Selector.open();
215             worker = new Worker();
216             worker.start();
217         }
218     }
219 
220     void flushSession( DatagramSession session )
221     {
222         scheduleFlush( session );
223         selector.wakeup();
224     }
225 
226     void closeSession( DatagramSession session )
227     {
228     }
229 
230     private void scheduleFlush( DatagramSession session )
231     {
232         synchronized( flushingSessions )
233         {
234             flushingSessions.push( session );
235         }
236     }
237 
238     private class Worker extends Thread
239     {
240         public Worker()
241         {
242             super( "DatagramAcceptor-" + id );
243         }
244 
245         public void run()
246         {
247             for( ;; )
248             {
249                 try
250                 {
251                     int nKeys = selector.select();
252 
253                     registerNew();
254 
255                     if( nKeys > 0 )
256                     {
257                         processReadySessions( selector.selectedKeys() );
258                     }
259 
260                     flushSessions();
261                     cancelKeys();
262 
263                     if( selector.keys().isEmpty() )
264                     {
265                         synchronized( DatagramAcceptor.this )
266                         {
267                             if( selector.keys().isEmpty() &&
268                                 registerQueue.isEmpty() &&
269                                 cancelQueue.isEmpty() )
270                             {
271                                 worker = null;
272                                 try
273                                 {
274                                     selector.close();
275                                 }
276                                 catch( IOException e )
277                                 {
278                                     exceptionMonitor.exceptionCaught( DatagramAcceptor.this, e );
279                                 }
280                                 finally
281                                 {
282                                     selector = null;
283                                 }
284                                 break;
285                             }
286                         }
287                     }
288                 }
289                 catch( IOException e )
290                 {
291                     exceptionMonitor.exceptionCaught( DatagramAcceptor.this,
292                             e );
293 
294                     try
295                     {
296                         Thread.sleep( 1000 );
297                     }
298                     catch( InterruptedException e1 )
299                     {
300                     }
301                 }
302             }
303         }
304     }
305 
306     private void processReadySessions( Set keys )
307     {
308         Iterator it = keys.iterator();
309         while( it.hasNext() )
310         {
311             SelectionKey key = ( SelectionKey ) it.next();
312             it.remove();
313 
314             DatagramChannel ch = ( DatagramChannel ) key.channel();
315 
316             RegistrationRequest req = ( RegistrationRequest ) key.attachment();
317             DatagramSession session = new DatagramSession(
318                     filters, ch, req.handler );
319             session.setSelectionKey( key );
320             
321             try
322             {
323                 req.handler.sessionCreated( session );
324 
325                 if( key.isReadable() )
326                 {
327                     readSession( session );
328                 }
329 
330                 if( key.isWritable() )
331                 {
332                     scheduleFlush( session );
333                 }
334             }
335             catch( Throwable t )
336             {
337                 exceptionMonitor.exceptionCaught( this, t );
338             }
339         }
340     }
341 
342     private void readSession( DatagramSession session )
343     {
344 
345         ByteBuffer readBuf = ByteBuffer.allocate( 2048 );
346         try
347         {
348             SocketAddress remoteAddress = session.getChannel().receive(
349                     readBuf.buf() );
350             if( remoteAddress != null )
351             {
352                 readBuf.flip();
353                 session.setRemoteAddress( remoteAddress );
354 
355                 ByteBuffer newBuf = ByteBuffer.allocate( readBuf.limit() );
356                 newBuf.put( readBuf );
357                 newBuf.flip();
358 
359                 session.increaseReadBytes( newBuf.remaining() );
360                 filters.dataRead( session, newBuf );
361             }
362         }
363         catch( IOException e )
364         {
365             filters.exceptionCaught( session, e );
366         }
367         finally
368         {
369             readBuf.release();
370         }
371     }
372 
373     private void flushSessions()
374     {
375         if( flushingSessions.size() == 0 )
376             return;
377 
378         for( ;; )
379         {
380             DatagramSession session;
381 
382             synchronized( flushingSessions )
383             {
384                 session = ( DatagramSession ) flushingSessions.pop();
385             }
386 
387             if( session == null )
388                 break;
389 
390             try
391             {
392                 flush( session );
393             }
394             catch( IOException e )
395             {
396                 session.getManagerFilterChain().exceptionCaught( session, e );
397             }
398         }
399     }
400 
401     private void flush( DatagramSession session ) throws IOException
402     {
403         DatagramChannel ch = session.getChannel();
404 
405         Queue writeBufferQueue = session.getWriteBufferQueue();
406         Queue writeMarkerQueue = session.getWriteMarkerQueue();
407 
408         ByteBuffer buf;
409         Object marker;
410         for( ;; )
411         {
412             synchronized( writeBufferQueue )
413             {
414                 buf = ( ByteBuffer ) writeBufferQueue.first();
415                 marker = writeMarkerQueue.first();
416             }
417 
418             if( buf == null )
419                 break;
420 
421             if( buf.remaining() == 0 )
422             {
423                 // pop and fire event
424                 synchronized( writeBufferQueue )
425                 {
426                     writeBufferQueue.pop();
427                     writeMarkerQueue.pop();
428                 }
429 
430                 try
431                 {
432                     buf.release();
433                 }
434                 catch( IllegalStateException e )
435                 {
436                     session.getManagerFilterChain().exceptionCaught( session, e );
437                 }
438 
439                 session.increaseWrittenWriteRequests();
440                 session.getManagerFilterChain().dataWritten( session, marker );
441                 continue;
442             }
443 
444             SelectionKey key = session.getSelectionKey();
445             if( key == null )
446             {
447                 scheduleFlush( session );
448                 break;
449             }
450             if( !key.isValid() )
451             {
452                 continue;
453             }
454 
455             int writtenBytes = ch
456                     .send( buf.buf(), session.getRemoteAddress() );
457 
458             if( writtenBytes == 0 )
459             {
460                 // Kernel buffer is full
461                 key.interestOps( key.interestOps() | SelectionKey.OP_WRITE );
462             }
463             else if( writtenBytes > 0 )
464             {
465                 key.interestOps( key.interestOps()
466                                  & ( ~SelectionKey.OP_WRITE ) );
467 
468                 // pop and fire event
469                 synchronized( writeBufferQueue )
470                 {
471                     writeBufferQueue.pop();
472                     writeMarkerQueue.pop();
473                 }
474 
475                 session.increaseWrittenBytes( writtenBytes );
476                 session.increaseWrittenWriteRequests();
477                 session.getManagerFilterChain().dataWritten( session, marker );
478             }
479         }
480     }
481 
482     private void registerNew()
483     {
484         if( registerQueue.isEmpty() )
485             return;
486 
487         for( ;; )
488         {
489             RegistrationRequest req;
490             synchronized( registerQueue )
491             {
492                 req = ( RegistrationRequest ) registerQueue.pop();
493             }
494 
495             if( req == null )
496                 break;
497 
498             DatagramChannel ch = null;
499             try
500             {
501                 ch = DatagramChannel.open();
502                 ch.configureBlocking( false );
503                 ch.socket().bind( req.address );
504                 ch.register( selector, SelectionKey.OP_READ, req );
505                 channels.put( req.address, ch );
506             }
507             catch( Throwable t )
508             {
509                 req.exception = t;
510             }
511             finally
512             {
513                 synchronized( req )
514                 {
515                     req.done = true;
516                     req.notify();
517                 }
518 
519                 if( ch != null && req.exception != null )
520                 {
521                     try
522                     {
523                         ch.close();
524                     }
525                     catch( Throwable e )
526                     {
527                         exceptionMonitor.exceptionCaught( this, e );
528                     }
529                 }
530             }
531         }
532     }
533 
534     private void cancelKeys()
535     {
536         if( cancelQueue.isEmpty() )
537             return;
538 
539         for( ;; )
540         {
541             CancellationRequest request;
542             synchronized( cancelQueue )
543             {
544                 request = ( CancellationRequest ) cancelQueue.pop();
545             }
546             
547             if( request == null )
548             {
549                 break;
550             }
551 
552             DatagramChannel ch = ( DatagramChannel ) channels.remove( request.address );
553             // close the channel
554             try
555             {
556                 if( ch == null )
557                 {
558                     request.exception = new IllegalArgumentException(
559                             "Address not bound: " + request.address );
560                 }
561                 else
562                 {
563                     SelectionKey key = ch.keyFor( selector );
564                     key.cancel();
565                     selector.wakeup(); // wake up again to trigger thread death
566                     ch.close();
567                 }
568             }
569             catch( Throwable t )
570             {
571                 exceptionMonitor.exceptionCaught( this, t );
572             }
573             finally
574             {
575                 synchronized( request )
576                 {
577                     request.done = true;
578                     request.notify();
579                 }
580             }
581         }
582     }
583     
584     public IoFilterChain getFilterChain()
585     {
586         return filters;
587     }
588 
589     private static class RegistrationRequest
590     {
591         private final SocketAddress address;
592         
593         private final IoHandler handler;
594         
595         private Throwable exception; 
596         
597         private boolean done;
598         
599         private RegistrationRequest( SocketAddress address, IoHandler handler )
600         {
601             this.address = address;
602             this.handler = handler;
603         }
604     }
605 
606     private static class CancellationRequest
607     {
608         private final SocketAddress address;
609         private boolean done;
610         private RuntimeException exception;
611         
612         private CancellationRequest( SocketAddress address )
613         {
614             this.address = address;
615         }
616     }
617 }