View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *  
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *  
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License. 
18   *  
19   */
20  package org.apache.mina.transport.socket.nio.support;
21  
22  import java.io.IOException;
23  import java.net.InetSocketAddress;
24  import java.net.SocketAddress;
25  import java.nio.channels.DatagramChannel;
26  import java.nio.channels.SelectionKey;
27  import java.nio.channels.Selector;
28  import java.util.ArrayList;
29  import java.util.HashMap;
30  import java.util.Iterator;
31  import java.util.List;
32  import java.util.Map;
33  import java.util.Set;
34  
35  import org.apache.mina.common.ByteBuffer;
36  import org.apache.mina.common.ExceptionMonitor;
37  import org.apache.mina.common.IoAcceptor;
38  import org.apache.mina.common.IoHandler;
39  import org.apache.mina.common.IoServiceConfig;
40  import org.apache.mina.common.IoSession;
41  import org.apache.mina.common.IoSessionRecycler;
42  import org.apache.mina.common.IoFilter.WriteRequest;
43  import org.apache.mina.common.support.BaseIoAcceptor;
44  import org.apache.mina.common.support.IoServiceListenerSupport;
45  import org.apache.mina.transport.socket.nio.DatagramAcceptorConfig;
46  import org.apache.mina.transport.socket.nio.DatagramServiceConfig;
47  import org.apache.mina.transport.socket.nio.DatagramSessionConfig;
48  import org.apache.mina.util.NamePreservingRunnable;
49  import org.apache.mina.util.Queue;
50  
51  import edu.emory.mathcs.backport.java.util.concurrent.Executor;
52  
53  /**
54   * {@link IoAcceptor} for datagram transport (UDP/IP).
55   * 
56   * @author The Apache Directory Project (mina-dev@directory.apache.org)
57   * @version $Rev: 588150 $, $Date: 2007-10-25 08:20:04 +0200 (Thu, 25 Oct 2007) $
58   */
59  public class DatagramAcceptorDelegate extends BaseIoAcceptor implements
60          IoAcceptor, DatagramService {
61      private static volatile int nextId = 0;
62  
63      private final IoAcceptor wrapper;
64  
65      private final Executor executor;
66  
67      private final int id = nextId++;
68  
69      private Selector selector;
70  
71      private DatagramAcceptorConfig defaultConfig = new DatagramAcceptorConfig();
72  
73      private final Map channels = new HashMap();
74  
75      private final Queue registerQueue = new Queue();
76  
77      private final Queue cancelQueue = new Queue();
78  
79      private final Queue flushingSessions = new Queue();
80  
81      private Worker worker;
82  
83      /**
84       * Creates a new instance.
85       */
86      public DatagramAcceptorDelegate(IoAcceptor wrapper, Executor executor) {
87          this.wrapper = wrapper;
88          this.executor = executor;
89  
90          // The default reuseAddress of an accepted socket should be 'true'.
91          ((DatagramSessionConfig) defaultConfig.getSessionConfig())
92                  .setReuseAddress(true);
93      }
94  
95      public void bind(SocketAddress address, IoHandler handler,
96              IoServiceConfig config) throws IOException {
97          if (handler == null)
98              throw new NullPointerException("handler");
99          if (config == null) {
100             config = getDefaultConfig();
101         }
102 
103         if (address != null && !(address instanceof InetSocketAddress))
104             throw new IllegalArgumentException("Unexpected address type: "
105                     + address.getClass());
106 
107         RegistrationRequest request = new RegistrationRequest(address, handler,
108                 config);
109         synchronized (this) {
110             synchronized (registerQueue) {
111                 registerQueue.push(request);
112             }
113             startupWorker();
114             selector.wakeup();
115         }
116 
117         synchronized (request) {
118             while (!request.done) {
119                 try {
120                     request.wait();
121                 } catch (InterruptedException e) {
122                 }
123             }
124         }
125 
126         if (request.exception != null) {
127             throw (IOException) new IOException("Failed to bind")
128                     .initCause(request.exception);
129         }
130     }
131 
132     public void unbind(SocketAddress address) {
133         if (address == null)
134             throw new NullPointerException("address");
135 
136         CancellationRequest request = new CancellationRequest(address);
137         synchronized (this) {
138             try {
139                 startupWorker();
140             } catch (IOException e) {
141                 // IOException is thrown only when Worker thread is not
142                 // running and failed to open a selector.  We simply throw
143                 // IllegalArgumentException here because we can simply
144                 // conclude that nothing is bound to the selector.
145                 throw new IllegalArgumentException("Address not bound: "
146                         + address);
147             }
148 
149             synchronized (cancelQueue) {
150                 cancelQueue.push(request);
151             }
152             selector.wakeup();
153         }
154 
155         synchronized (request) {
156             while (!request.done) {
157                 try {
158                     request.wait();
159                 } catch (InterruptedException e) {
160                 }
161             }
162         }
163 
164         if (request.exception != null) {
165             throw new RuntimeException("Failed to unbind", request.exception);
166         }
167     }
168 
169     public void unbindAll() {
170         List addresses;
171         synchronized (channels) {
172             addresses = new ArrayList(channels.keySet());
173         }
174 
175         for (Iterator i = addresses.iterator(); i.hasNext();) {
176             unbind((SocketAddress) i.next());
177         }
178     }
179 
180     public IoSession newSession(SocketAddress remoteAddress,
181             SocketAddress localAddress) {
182         if (remoteAddress == null) {
183             throw new NullPointerException("remoteAddress");
184         }
185         if (localAddress == null) {
186             throw new NullPointerException("localAddress");
187         }
188 
189         Selector selector = getSelector();
190         DatagramChannel ch = (DatagramChannel) channels.get(localAddress);
191         if (selector == null || ch == null) {
192             throw new IllegalArgumentException("Unknown localAddress: "
193                     + localAddress);
194         }
195 
196         SelectionKey key = ch.keyFor(selector);
197         if (key == null) {
198             throw new IllegalArgumentException("Unknown localAddress: "
199                     + localAddress);
200         }
201 
202         RegistrationRequest req = (RegistrationRequest) key.attachment();
203         IoSession session;
204         IoSessionRecycler sessionRecycler = getSessionRecycler(req);
205         synchronized (sessionRecycler) {
206             session = sessionRecycler.recycle(localAddress, remoteAddress);
207             if (session != null) {
208                 return session;
209             }
210 
211             // If a new session needs to be created.
212             // Note that the local address is the service address in the
213             // acceptor side, and I didn't call getLocalSocketAddress().
214             // This avoids strange cases where getLocalSocketAddress() on the
215             // underlying socket would return an IPv6 address while the
216             // specified service address is an IPv4 address.
217             DatagramSessionImpl datagramSession = new DatagramSessionImpl(
218                     wrapper, this, req.config, ch, req.handler, req.address,
219                     req.address);
220             datagramSession.setRemoteAddress(remoteAddress);
221             datagramSession.setSelectionKey(key);
222 
223             getSessionRecycler(req).put(datagramSession);
224             session = datagramSession;
225         }
226 
227         try {
228             buildFilterChain(req, session);
229             getListeners().fireSessionCreated(session);
230         } catch (Throwable t) {
231             ExceptionMonitor.getInstance().exceptionCaught(t);
232         }
233 
234         return session;
235     }
236 
237     private IoSessionRecycler getSessionRecycler(RegistrationRequest req) {
238         IoSessionRecycler sessionRecycler;
239         if (req.config instanceof DatagramServiceConfig) {
240             sessionRecycler = ((DatagramServiceConfig) req.config)
241                     .getSessionRecycler();
242         } else {
243             sessionRecycler = defaultConfig.getSessionRecycler();
244         }
245         return sessionRecycler;
246     }
247 
248     public IoServiceListenerSupport getListeners() {
249         return super.getListeners();
250     }
251 
252     private void buildFilterChain(RegistrationRequest req, IoSession session)
253             throws Exception {
254         this.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
255         req.config.getFilterChainBuilder().buildFilterChain(
256                 session.getFilterChain());
257         req.config.getThreadModel().buildFilterChain(session.getFilterChain());
258     }
259 
260     public IoServiceConfig getDefaultConfig() {
261         return defaultConfig;
262     }
263 
264     /**
265      * Sets the config this acceptor will use by default.
266      * 
267      * @param defaultConfig the default config.
268      * @throws NullPointerException if the specified value is <code>null</code>.
269      */
270     public void setDefaultConfig(DatagramAcceptorConfig defaultConfig) {
271         if (defaultConfig == null) {
272             throw new NullPointerException("defaultConfig");
273         }
274         this.defaultConfig = defaultConfig;
275     }
276     
277     private synchronized Selector getSelector() {
278         return this.selector;
279     }
280 
281     private synchronized void startupWorker() throws IOException {
282         if (worker == null) {
283             selector = Selector.open();
284             worker = new Worker();
285             executor.execute(
286                     new NamePreservingRunnable(worker, "DatagramAcceptor-" + id));
287         }
288     }
289 
290     public void flushSession(DatagramSessionImpl session) {
291         if (scheduleFlush(session)) {
292             Selector selector = getSelector();
293             if (selector != null) {
294                 selector.wakeup();
295             }
296         }
297     }
298 
299     public void closeSession(DatagramSessionImpl session) {
300     }
301 
302     private boolean scheduleFlush(DatagramSessionImpl session) {
303         if (session.setScheduledForFlush(true)) {
304             synchronized (flushingSessions) {
305                 flushingSessions.push(session);
306             }
307             return true;
308         } else {
309             return false;
310         }
311     }
312 
313     private class Worker implements Runnable {
314         public void run() {
315             Selector selector = DatagramAcceptorDelegate.this.getSelector();
316             for (;;) {
317                 try {
318                     int nKeys = selector.select();
319 
320                     registerNew();
321 
322                     if (nKeys > 0) {
323                         processReadySessions(selector.selectedKeys());
324                     }
325 
326                     flushSessions();
327                     cancelKeys();
328 
329                     if (selector.keys().isEmpty()) {
330                         synchronized (DatagramAcceptorDelegate.this) {
331                             if (selector.keys().isEmpty()
332                                     && registerQueue.isEmpty()
333                                     && cancelQueue.isEmpty()) {
334                                 worker = null;
335                                 try {
336                                     selector.close();
337                                 } catch (IOException e) {
338                                     ExceptionMonitor.getInstance()
339                                             .exceptionCaught(e);
340                                 } finally {
341                                     DatagramAcceptorDelegate.this.selector = null;
342                                 }
343                                 break;
344                             }
345                         }
346                     }
347                 } catch (IOException e) {
348                     ExceptionMonitor.getInstance().exceptionCaught(e);
349 
350                     try {
351                         Thread.sleep(1000);
352                     } catch (InterruptedException e1) {
353                     }
354                 }
355             }
356         }
357     }
358 
359     private void processReadySessions(Set keys) {
360         Iterator it = keys.iterator();
361         while (it.hasNext()) {
362             SelectionKey key = (SelectionKey) it.next();
363             it.remove();
364 
365             DatagramChannel ch = (DatagramChannel) key.channel();
366 
367             RegistrationRequest req = (RegistrationRequest) key.attachment();
368             try {
369                 if (key.isReadable()) {
370                     readSession(ch, req);
371                 }
372 
373                 if (key.isWritable()) {
374                     for (Iterator i = getManagedSessions(req.address)
375                             .iterator(); i.hasNext();) {
376                         scheduleFlush((DatagramSessionImpl) i.next());
377                     }
378                 }
379             } catch (Throwable t) {
380                 ExceptionMonitor.getInstance().exceptionCaught(t);
381             }
382         }
383     }
384 
385     private void readSession(DatagramChannel channel, RegistrationRequest req)
386             throws Exception {
387         ByteBuffer readBuf = ByteBuffer
388                 .allocate(((DatagramSessionConfig) req.config
389                         .getSessionConfig()).getReceiveBufferSize());
390         try {
391             SocketAddress remoteAddress = channel.receive(readBuf.buf());
392             if (remoteAddress != null) {
393                 DatagramSessionImpl session = (DatagramSessionImpl) newSession(
394                         remoteAddress, req.address);
395 
396                 readBuf.flip();
397 
398                 ByteBuffer newBuf = ByteBuffer.allocate(readBuf.limit());
399                 newBuf.put(readBuf);
400                 newBuf.flip();
401 
402                 session.increaseReadBytes(newBuf.remaining());
403                 session.getFilterChain().fireMessageReceived(session, newBuf);
404             }
405         } finally {
406             readBuf.release();
407         }
408     }
409 
410     private void flushSessions() {
411         if (flushingSessions.size() == 0)
412             return;
413 
414         for (;;) {
415             DatagramSessionImpl session;
416 
417             synchronized (flushingSessions) {
418                 session = (DatagramSessionImpl) flushingSessions.pop();
419             }
420 
421             if (session == null)
422                 break;
423 
424             session.setScheduledForFlush(false);
425             
426             try {
427                 boolean flushedAll = flush(session);
428                 if (flushedAll && !session.getWriteRequestQueue().isEmpty() && !session.isScheduledForFlush()) {
429                     scheduleFlush(session);
430                 }
431             } catch (IOException e) {
432                 session.getFilterChain().fireExceptionCaught(session, e);
433             }
434         }
435     }
436 
437     private boolean flush(DatagramSessionImpl session) throws IOException {
438         // Clear OP_WRITE
439         SelectionKey key = session.getSelectionKey();
440         if (key == null) {
441             scheduleFlush(session);
442             return false;
443         }
444         if (!key.isValid()) {
445             return false;
446         }
447         key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
448 
449         DatagramChannel ch = session.getChannel();
450         Queue writeRequestQueue = session.getWriteRequestQueue();
451 
452         int writtenBytes = 0;
453         int maxWrittenBytes = ((DatagramSessionConfig) session.getConfig()).getSendBufferSize() << 1;
454         try {
455             for (;;) {
456                 WriteRequest req;
457                 synchronized (writeRequestQueue) {
458                     req = (WriteRequest) writeRequestQueue.first();
459                 }
460     
461                 if (req == null)
462                     break;
463     
464                 ByteBuffer buf = (ByteBuffer) req.getMessage();
465                 if (buf.remaining() == 0) {
466                     // pop and fire event
467                     synchronized (writeRequestQueue) {
468                         writeRequestQueue.pop();
469                     }
470 
471                     buf.reset();
472     
473                     if (!buf.hasRemaining()) {
474                         session.increaseWrittenMessages();
475                     }
476                     
477                     ((DatagramFilterChain) session.getFilterChain())
478                             .fireMessageSent(session, req);
479                     continue;
480                 }
481     
482                 SocketAddress destination = req.getDestination();
483                 if (destination == null) {
484                     destination = session.getRemoteAddress();
485                 }
486     
487                 int localWrittenBytes = ch.send(buf.buf(), destination);
488                 writtenBytes += localWrittenBytes;
489     
490                 if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
491                     // Kernel buffer is full or wrote too much
492                     key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
493                     return false;
494                 } else {
495                     // pop and fire event
496                     synchronized (writeRequestQueue) {
497                         writeRequestQueue.pop();
498                     }
499     
500                     buf.reset();
501                     
502                     if (!buf.hasRemaining()) {
503                         session.increaseWrittenMessages();
504                     }
505 
506                     session.getFilterChain().fireMessageSent(session, req);
507                 }
508             }
509         } finally {
510             session.increaseWrittenBytes(writtenBytes);
511         }
512         
513         return true;
514     }
515 
516     private void registerNew() {
517         if (registerQueue.isEmpty())
518             return;
519 
520         Selector selector = getSelector();
521         for (;;) {
522             RegistrationRequest req;
523             synchronized (registerQueue) {
524                 req = (RegistrationRequest) registerQueue.pop();
525             }
526 
527             if (req == null)
528                 break;
529 
530             DatagramChannel ch = null;
531             try {
532                 ch = DatagramChannel.open();
533                 DatagramSessionConfig cfg;
534                 if (req.config.getSessionConfig() instanceof DatagramSessionConfig) {
535                     cfg = (DatagramSessionConfig) req.config.getSessionConfig();
536                 } else {
537                     cfg = (DatagramSessionConfig) getDefaultConfig()
538                             .getSessionConfig();
539                 }
540 
541                 ch.socket().setReuseAddress(cfg.isReuseAddress());
542                 ch.socket().setBroadcast(cfg.isBroadcast());
543                 ch.socket().setReceiveBufferSize(cfg.getReceiveBufferSize());
544                 ch.socket().setSendBufferSize(cfg.getSendBufferSize());
545 
546                 if (ch.socket().getTrafficClass() != cfg.getTrafficClass()) {
547                     ch.socket().setTrafficClass(cfg.getTrafficClass());
548                 }
549 
550                 ch.configureBlocking(false);
551                 ch.socket().bind(req.address);
552                 if (req.address == null || req.address.getPort() == 0) {
553                     req.address = (InetSocketAddress) ch.socket()
554                             .getLocalSocketAddress();
555                 }
556                 ch.register(selector, SelectionKey.OP_READ, req);
557                 synchronized (channels) {
558                     channels.put(req.address, ch);
559                 }
560 
561                 getListeners().fireServiceActivated(this, req.address,
562                         req.handler, req.config);
563             } catch (Throwable t) {
564                 req.exception = t;
565             } finally {
566                 synchronized (req) {
567                     req.done = true;
568                     req.notify();
569                 }
570 
571                 if (ch != null && req.exception != null) {
572                     try {
573                         ch.disconnect();
574                         ch.close();
575                     } catch (Throwable e) {
576                         ExceptionMonitor.getInstance().exceptionCaught(e);
577                     }
578                 }
579             }
580         }
581     }
582 
583     private void cancelKeys() {
584         if (cancelQueue.isEmpty())
585             return;
586 
587         Selector selector = getSelector();
588         for (;;) {
589             CancellationRequest request;
590             synchronized (cancelQueue) {
591                 request = (CancellationRequest) cancelQueue.pop();
592             }
593 
594             if (request == null) {
595                 break;
596             }
597 
598             DatagramChannel ch;
599             synchronized (channels) {
600                 ch = (DatagramChannel) channels.remove(request.address);
601             }
602 
603             // close the channel
604             try {
605                 if (ch == null) {
606                     request.exception = new IllegalArgumentException(
607                             "Address not bound: " + request.address);
608                 } else {
609                     SelectionKey key = ch.keyFor(selector);
610                     request.registrationRequest = (RegistrationRequest) key
611                             .attachment();
612                     key.cancel();
613                     selector.wakeup(); // wake up again to trigger thread death
614                     ch.disconnect();
615                     ch.close();
616                 }
617             } catch (Throwable t) {
618                 ExceptionMonitor.getInstance().exceptionCaught(t);
619             } finally {
620                 synchronized (request) {
621                     request.done = true;
622                     request.notify();
623                 }
624 
625                 if (request.exception == null) {
626                     getListeners().fireServiceDeactivated(this,
627                             request.address,
628                             request.registrationRequest.handler,
629                             request.registrationRequest.config);
630                 }
631             }
632         }
633     }
634 
635     public void updateTrafficMask(DatagramSessionImpl session) {
636         // There's no point in changing the traffic mask for sessions originating
637         // from this acceptor since new sessions are created every time data is
638         // received.
639     }
640 
641     private static class RegistrationRequest {
642         private InetSocketAddress address;
643 
644         private final IoHandler handler;
645 
646         private final IoServiceConfig config;
647 
648         private Throwable exception;
649 
650         private boolean done;
651 
652         private RegistrationRequest(SocketAddress address, IoHandler handler,
653                 IoServiceConfig config) {
654             this.address = (InetSocketAddress) address;
655             this.handler = handler;
656             this.config = config;
657         }
658     }
659 
660     private static class CancellationRequest {
661         private final SocketAddress address;
662 
663         private boolean done;
664 
665         private RegistrationRequest registrationRequest;
666 
667         private RuntimeException exception;
668 
669         private CancellationRequest(SocketAddress address) {
670             this.address = address;
671         }
672     }
673 }