View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.socket.nio.support;
21  
22  import java.io.IOException;
23  import java.net.InetSocketAddress;
24  import java.net.SocketAddress;
25  import java.nio.channels.DatagramChannel;
26  import java.nio.channels.SelectionKey;
27  import java.nio.channels.Selector;
28  import java.util.ArrayList;
29  import java.util.Iterator;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.Queue;
33  import java.util.Set;
34  import java.util.concurrent.ConcurrentHashMap;
35  import java.util.concurrent.ConcurrentLinkedQueue;
36  import java.util.concurrent.Executor;
37  import java.util.concurrent.atomic.AtomicInteger;
38  
39  import org.apache.mina.common.ByteBuffer;
40  import org.apache.mina.common.ExceptionMonitor;
41  import org.apache.mina.common.IoAcceptor;
42  import org.apache.mina.common.IoHandler;
43  import org.apache.mina.common.IoServiceConfig;
44  import org.apache.mina.common.IoSession;
45  import org.apache.mina.common.IoSessionRecycler;
46  import org.apache.mina.common.RuntimeIOException;
47  import org.apache.mina.common.IoFilter.WriteRequest;
48  import org.apache.mina.common.support.BaseIoAcceptor;
49  import org.apache.mina.common.support.IoServiceListenerSupport;
50  import org.apache.mina.transport.socket.nio.DatagramAcceptorConfig;
51  import org.apache.mina.transport.socket.nio.DatagramServiceConfig;
52  import org.apache.mina.transport.socket.nio.DatagramSessionConfig;
53  import org.apache.mina.util.NamePreservingRunnable;
54  
55  /**
56   * {@link IoAcceptor} for datagram transport (UDP/IP).
57   *
58   * @author The Apache Directory Project (mina-dev@directory.apache.org)
59   * @version $Rev: 588150 $, $Date: 2007-10-25 15:20:04 +0900 (목, 25 10월 2007) $
60   */
61  public class DatagramAcceptorDelegate extends BaseIoAcceptor implements
62          IoAcceptor, DatagramService {
63      private static final AtomicInteger nextId = new AtomicInteger();
64  
65      private final Object lock = new Object();
66  
67      private final IoAcceptor wrapper;
68  
69      private final Executor executor;
70  
71      private final int id = nextId.getAndIncrement();
72  
73      private volatile Selector selector;
74  
75      private DatagramAcceptorConfig defaultConfig = new DatagramAcceptorConfig();
76  
77      private final Map<SocketAddress, DatagramChannel> channels = new ConcurrentHashMap<SocketAddress, DatagramChannel>();
78  
79      private final Queue<RegistrationRequest> registerQueue = new ConcurrentLinkedQueue<RegistrationRequest>();
80  
81      private final Queue<CancellationRequest> cancelQueue = new ConcurrentLinkedQueue<CancellationRequest>();
82  
83      private final Queue<DatagramSessionImpl> flushingSessions = new ConcurrentLinkedQueue<DatagramSessionImpl>();
84  
85      private Worker worker;
86  
87      /**
88       * Creates a new instance.
89       */
90      public DatagramAcceptorDelegate(IoAcceptor wrapper, Executor executor) {
91          this.wrapper = wrapper;
92          this.executor = executor;
93  
94          // The default reuseAddress of an accepted socket should be 'true'.
95          defaultConfig.getSessionConfig().setReuseAddress(true);
96      }
97  
98      public void bind(SocketAddress address, IoHandler handler,
99              IoServiceConfig config) throws IOException {
100         if (handler == null)
101             throw new NullPointerException("handler");
102         if (config == null) {
103             config = getDefaultConfig();
104         }
105 
106         if (address != null && !(address instanceof InetSocketAddress))
107             throw new IllegalArgumentException("Unexpected address type: "
108                     + address.getClass());
109 
110         RegistrationRequest request = new RegistrationRequest(address, handler,
111                 config);
112         
113         synchronized (lock) {
114             startupWorker();
115             registerQueue.add(request);
116             selector.wakeup();
117         }
118 
119         synchronized (request) {
120             while (!request.done) {
121                 try {
122                     request.wait();
123                 } catch (InterruptedException e) {
124                     throw new RuntimeIOException(e);
125                 }
126             }
127         }
128 
129         if (request.exception != null) {
130             throw (IOException) new IOException("Failed to bind")
131                     .initCause(request.exception);
132         }
133     }
134 
135     public void unbind(SocketAddress address) {
136         if (address == null)
137             throw new NullPointerException("address");
138 
139         CancellationRequest request = new CancellationRequest(address);
140         synchronized (lock) {
141             try {
142                 startupWorker();
143             } catch (IOException e) {
144                 // IOException is thrown only when Worker thread is not
145                 // running and failed to open a selector.  We simply throw
146                 // IllegalArgumentException here because we can simply
147                 // conclude that nothing is bound to the selector.
148                 throw new IllegalArgumentException("Address not bound: " + address);
149             }
150     
151             cancelQueue.add(request);
152             selector.wakeup();
153         }
154 
155         synchronized (request) {
156             while (!request.done) {
157                 try {
158                     request.wait();
159                 } catch (InterruptedException e) {
160                     throw new RuntimeIOException(e);
161                 }
162             }
163         }
164 
165         if (request.exception != null) {
166             throw new RuntimeException("Failed to unbind", request.exception);
167         }
168     }
169 
170     public void unbindAll() {
171         List<SocketAddress> addresses = new ArrayList<SocketAddress>(channels
172                 .keySet());
173 
174         for (SocketAddress address : addresses) {
175             unbind(address);
176         }
177     }
178 
179     @Override
180     public IoSession newSession(SocketAddress remoteAddress,
181             SocketAddress localAddress) {
182         if (remoteAddress == null) {
183             throw new NullPointerException("remoteAddress");
184         }
185         if (localAddress == null) {
186             throw new NullPointerException("localAddress");
187         }
188 
189         Selector selector = this.selector;
190         DatagramChannel ch = channels.get(localAddress);
191         if (selector == null || ch == null) {
192             throw new IllegalArgumentException("Unknown localAddress: "
193                     + localAddress);
194         }
195 
196         SelectionKey key = ch.keyFor(selector);
197         if (key == null) {
198             throw new IllegalArgumentException("Unknown localAddress: "
199                     + localAddress);
200         }
201 
202         RegistrationRequest req = (RegistrationRequest) key.attachment();
203         IoSession session;
204         IoSessionRecycler sessionRecycler = getSessionRecycler(req);
205         synchronized (sessionRecycler) {
206             session = sessionRecycler.recycle(localAddress, remoteAddress);
207             if (session != null) {
208                 return session;
209             }
210 
211             // If a new session needs to be created.
212             // Note that the local address is the service address in the
213             // acceptor side, and I didn't call getLocalSocketAddress().
214             // This avoids strange cases where getLocalSocketAddress() on the
215             // underlying socket would return an IPv6 address while the
216             // specified service address is an IPv4 address.
217             DatagramSessionImpl datagramSession = new DatagramSessionImpl(
218                     wrapper, this, req.config, ch, req.handler, req.address,
219                     req.address);
220             datagramSession.setRemoteAddress(remoteAddress);
221             datagramSession.setSelectionKey(key);
222 
223             getSessionRecycler(req).put(datagramSession);
224             session = datagramSession;
225         }
226 
227         try {
228             buildFilterChain(req, session);
229             getListeners().fireSessionCreated(session);
230         } catch (Throwable t) {
231             ExceptionMonitor.getInstance().exceptionCaught(t);
232         }
233 
234         return session;
235     }
236 
237     private IoSessionRecycler getSessionRecycler(RegistrationRequest req) {
238         IoSessionRecycler sessionRecycler;
239         if (req.config instanceof DatagramServiceConfig) {
240             sessionRecycler = ((DatagramServiceConfig) req.config)
241                     .getSessionRecycler();
242         } else {
243             sessionRecycler = defaultConfig.getSessionRecycler();
244         }
245         return sessionRecycler;
246     }
247 
248     @Override
249     public IoServiceListenerSupport getListeners() {
250         return super.getListeners();
251     }
252 
253     private void buildFilterChain(RegistrationRequest req, IoSession session)
254             throws Exception {
255         this.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
256         req.config.getFilterChainBuilder().buildFilterChain(
257                 session.getFilterChain());
258         req.config.getThreadModel().buildFilterChain(session.getFilterChain());
259     }
260 
261     public DatagramAcceptorConfig getDefaultConfig() {
262         return defaultConfig;
263     }
264 
265     /**
266      * Sets the config this acceptor will use by default.
267      *
268      * @param defaultConfig the default config.
269      * @throws NullPointerException if the specified value is <code>null</code>.
270      */
271     public void setDefaultConfig(DatagramAcceptorConfig defaultConfig) {
272         if (defaultConfig == null) {
273             throw new NullPointerException("defaultConfig");
274         }
275         this.defaultConfig = defaultConfig;
276     }
277 
278     private void startupWorker() throws IOException {
279         synchronized (lock) {
280             if (worker == null) {
281                 selector = Selector.open();
282                 worker = new Worker();
283                 executor.execute(
284                         new NamePreservingRunnable(worker, "DatagramAcceptor-" + id));
285             }
286         }
287     }
288 
289     public void flushSession(DatagramSessionImpl session) {
290         if (scheduleFlush(session)) {
291             Selector selector = this.selector;
292             if (selector != null) {
293                 selector.wakeup();
294             }
295         }
296     }
297 
298     public void closeSession(DatagramSessionImpl session) {
299     }
300 
301     private boolean scheduleFlush(DatagramSessionImpl session) {
302         if (session.setScheduledForFlush(true)) {
303             flushingSessions.add(session);
304             return true;
305         } else {
306             return false;
307         }
308     }
309 
310     private class Worker implements Runnable {
311         public void run() {
312             Selector selector = DatagramAcceptorDelegate.this.selector;
313             for (;;) {
314                 try {
315                     int nKeys = selector.select();
316 
317                     registerNew();
318 
319                     if (nKeys > 0) {
320                         processReadySessions(selector.selectedKeys());
321                     }
322 
323                     flushSessions();
324                     cancelKeys();
325 
326                     if (selector.keys().isEmpty()) {
327                         synchronized (lock) {
328                             if (selector.keys().isEmpty()
329                                     && registerQueue.isEmpty()
330                                     && cancelQueue.isEmpty()) {
331                                 worker = null;
332                                 try {
333                                     selector.close();
334                                 } catch (IOException e) {
335                                     ExceptionMonitor.getInstance()
336                                             .exceptionCaught(e);
337                                 } finally {
338                                     DatagramAcceptorDelegate.this.selector = null;
339                                 }
340                                 break;
341                             }
342                         }
343                     }
344                 } catch (IOException e) {
345                     ExceptionMonitor.getInstance().exceptionCaught(e);
346 
347                     try {
348                         Thread.sleep(1000);
349                     } catch (InterruptedException e1) {
350                         ExceptionMonitor.getInstance().exceptionCaught(e1);
351                     }
352                 }
353             }
354         }
355     }
356 
357     private void processReadySessions(Set<SelectionKey> keys) {
358         Iterator<SelectionKey> it = keys.iterator();
359         while (it.hasNext()) {
360             SelectionKey key = it.next();
361             it.remove();
362 
363             DatagramChannel ch = (DatagramChannel) key.channel();
364 
365             RegistrationRequest req = (RegistrationRequest) key.attachment();
366             try {
367                 if (key.isReadable()) {
368                     readSession(ch, req);
369                 }
370 
371                 if (key.isWritable()) {
372                     for (Object o : getManagedSessions(req.address)) {
373                         scheduleFlush((DatagramSessionImpl) o);
374                     }
375                 }
376             } catch (Throwable t) {
377                 ExceptionMonitor.getInstance().exceptionCaught(t);
378             }
379         }
380     }
381 
382     private void readSession(DatagramChannel channel, RegistrationRequest req)
383             throws Exception {
384         ByteBuffer readBuf = ByteBuffer
385                 .allocate(((DatagramSessionConfig) req.config
386                         .getSessionConfig()).getReceiveBufferSize());
387         try {
388             SocketAddress remoteAddress = channel.receive(readBuf.buf());
389             if (remoteAddress != null) {
390                 DatagramSessionImpl session = (DatagramSessionImpl) newSession(
391                         remoteAddress, req.address);
392 
393                 readBuf.flip();
394 
395                 ByteBuffer newBuf = ByteBuffer.allocate(readBuf.limit());
396                 newBuf.put(readBuf);
397                 newBuf.flip();
398 
399                 session.increaseReadBytes(newBuf.remaining());
400                 session.getFilterChain().fireMessageReceived(session, newBuf);
401             }
402         } finally {
403             readBuf.release();
404         }
405     }
406 
407     private void flushSessions() {
408         if (flushingSessions.size() == 0)
409             return;
410 
411         for (;;) {
412             DatagramSessionImpl session = flushingSessions.poll();
413 
414             if (session == null)
415                 break;
416 
417             session.setScheduledForFlush(false);
418 
419             try {
420                 boolean flushedAll = flush(session);
421                 if (flushedAll && !session.getWriteRequestQueue().isEmpty() && !session.isScheduledForFlush()) {
422                     scheduleFlush(session);
423                 }
424             } catch (IOException e) {
425                 session.getFilterChain().fireExceptionCaught(session, e);
426             }
427         }
428     }
429 
430     private boolean flush(DatagramSessionImpl session) throws IOException {
431         // Clear OP_WRITE
432         SelectionKey key = session.getSelectionKey();
433         if (key == null) {
434             scheduleFlush(session);
435             return false;
436         }
437         if (!key.isValid()) {
438             return false;
439         }
440         key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
441 
442         DatagramChannel ch = session.getChannel();
443         Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
444 
445         int writtenBytes = 0;
446         int maxWrittenBytes = ((DatagramSessionConfig) session.getConfig()).getSendBufferSize() << 1;
447         try {
448             for (;;) {
449                 WriteRequest req = writeRequestQueue.peek();
450     
451                 if (req == null)
452                     break;
453     
454                 ByteBuffer buf = (ByteBuffer) req.getMessage();
455                 if (buf.remaining() == 0) {
456                     // pop and fire event
457                     writeRequestQueue.poll();
458     
459                     buf.reset();
460                     
461                     if (!buf.hasRemaining()) {
462                         session.increaseWrittenMessages();
463                     }
464                     session.getFilterChain().fireMessageSent(session, req);
465                     continue;
466                 }
467     
468                 SocketAddress destination = req.getDestination();
469                 if (destination == null) {
470                     destination = session.getRemoteAddress();
471                 }
472     
473                 int localWrittenBytes = ch.send(buf.buf(), destination);
474                 writtenBytes += localWrittenBytes;
475     
476                 if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
477                     // Kernel buffer is full or wrote too much
478                     key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
479                     return false;
480                 } else {
481                     // pop and fire event
482                     writeRequestQueue.poll();
483     
484                     buf.reset();
485                     
486                     if (!buf.hasRemaining()) {
487                         session.increaseWrittenMessages();
488                     }
489 
490                     session.getFilterChain().fireMessageSent(session, req);
491                 }
492             }
493         } finally {
494             session.increaseWrittenBytes(writtenBytes);
495         }
496         
497         return true;
498     }
499 
500     private void registerNew() {
501         if (registerQueue.isEmpty())
502             return;
503 
504         Selector selector = this.selector;
505         for (;;) {
506             RegistrationRequest req = registerQueue.poll();
507 
508             if (req == null)
509                 break;
510 
511             DatagramChannel ch = null;
512             try {
513                 ch = DatagramChannel.open();
514                 DatagramSessionConfig cfg;
515                 if (req.config.getSessionConfig() instanceof DatagramSessionConfig) {
516                     cfg = (DatagramSessionConfig) req.config.getSessionConfig();
517                 } else {
518                     cfg = getDefaultConfig().getSessionConfig();
519                 }
520 
521                 ch.socket().setReuseAddress(cfg.isReuseAddress());
522                 ch.socket().setBroadcast(cfg.isBroadcast());
523                 ch.socket().setReceiveBufferSize(cfg.getReceiveBufferSize());
524                 ch.socket().setSendBufferSize(cfg.getSendBufferSize());
525 
526                 if (ch.socket().getTrafficClass() != cfg.getTrafficClass()) {
527                     ch.socket().setTrafficClass(cfg.getTrafficClass());
528                 }
529 
530                 ch.configureBlocking(false);
531                 ch.socket().bind(req.address);
532                 if (req.address == null || req.address.getPort() == 0) {
533                     req.address = (InetSocketAddress) ch.socket()
534                             .getLocalSocketAddress();
535                 }
536                 ch.register(selector, SelectionKey.OP_READ, req);
537                 channels.put(req.address, ch);
538 
539                 getListeners().fireServiceActivated(this, req.address,
540                         req.handler, req.config);
541             } catch (Throwable t) {
542                 req.exception = t;
543             } finally {
544                 synchronized (req) {
545                     req.done = true;
546                     req.notify();
547                 }
548 
549                 if (ch != null && req.exception != null) {
550                     try {
551                         ch.disconnect();
552                         ch.close();
553                     } catch (Throwable e) {
554                         ExceptionMonitor.getInstance().exceptionCaught(e);
555                     }
556                 }
557             }
558         }
559     }
560 
561     private void cancelKeys() {
562         if (cancelQueue.isEmpty())
563             return;
564 
565         Selector selector = this.selector;
566         for (;;) {
567             CancellationRequest request = cancelQueue.poll();
568 
569             if (request == null) {
570                 break;
571             }
572 
573             DatagramChannel ch = channels.remove(request.address);
574 
575             // close the channel
576             try {
577                 if (ch == null) {
578                     request.exception = new IllegalArgumentException(
579                             "Address not bound: " + request.address);
580                 } else {
581                     SelectionKey key = ch.keyFor(selector);
582                     request.registrationRequest = (RegistrationRequest) key
583                             .attachment();
584                     key.cancel();
585                     selector.wakeup(); // wake up again to trigger thread death
586                     ch.disconnect();
587                     ch.close();
588                 }
589             } catch (Throwable t) {
590                 ExceptionMonitor.getInstance().exceptionCaught(t);
591             } finally {
592                 synchronized (request) {
593                     request.done = true;
594                     request.notify();
595                 }
596 
597                 if (request.exception == null) {
598                     getListeners().fireServiceDeactivated(this,
599                             request.address,
600                             request.registrationRequest.handler,
601                             request.registrationRequest.config);
602                 }
603             }
604         }
605     }
606 
607     public void updateTrafficMask(DatagramSessionImpl session) {
608         // There's no point in changing the traffic mask for sessions originating
609         // from this acceptor since new sessions are created every time data is
610         // received.
611     }
612 
613     private static class RegistrationRequest {
614         private InetSocketAddress address;
615 
616         private final IoHandler handler;
617 
618         private final IoServiceConfig config;
619 
620         private Throwable exception;
621 
622         private boolean done;
623 
624         private RegistrationRequest(SocketAddress address, IoHandler handler,
625                 IoServiceConfig config) {
626             this.address = (InetSocketAddress) address;
627             this.handler = handler;
628             this.config = config;
629         }
630     }
631 
632     private static class CancellationRequest {
633         private final SocketAddress address;
634 
635         private boolean done;
636 
637         private RegistrationRequest registrationRequest;
638 
639         private RuntimeException exception;
640 
641         private CancellationRequest(SocketAddress address) {
642             this.address = address;
643         }
644     }
645 }