View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.socket.nio.support;
21  
22  import java.io.IOException;
23  import java.net.InetSocketAddress;
24  import java.net.SocketAddress;
25  import java.nio.channels.DatagramChannel;
26  import java.nio.channels.SelectionKey;
27  import java.nio.channels.Selector;
28  import java.util.Iterator;
29  import java.util.Queue;
30  import java.util.Set;
31  import java.util.concurrent.ConcurrentLinkedQueue;
32  import java.util.concurrent.Executor;
33  import java.util.concurrent.atomic.AtomicInteger;
34  
35  import org.apache.mina.common.ByteBuffer;
36  import org.apache.mina.common.ConnectFuture;
37  import org.apache.mina.common.ExceptionMonitor;
38  import org.apache.mina.common.IoConnector;
39  import org.apache.mina.common.IoHandler;
40  import org.apache.mina.common.IoServiceConfig;
41  import org.apache.mina.common.IoSession;
42  import org.apache.mina.common.IoSessionRecycler;
43  import org.apache.mina.common.IoFilter.WriteRequest;
44  import org.apache.mina.common.support.AbstractIoFilterChain;
45  import org.apache.mina.common.support.BaseIoConnector;
46  import org.apache.mina.common.support.DefaultConnectFuture;
47  import org.apache.mina.transport.socket.nio.DatagramConnectorConfig;
48  import org.apache.mina.transport.socket.nio.DatagramServiceConfig;
49  import org.apache.mina.transport.socket.nio.DatagramSessionConfig;
50  import org.apache.mina.util.NamePreservingRunnable;
51  
52  /**
53   * {@link IoConnector} for datagram transport (UDP/IP).
54   *
55   * @author The Apache Directory Project (mina-dev@directory.apache.org)
56   * @version $Rev: 588150 $, $Date: 2007-10-25 15:20:04 +0900 (Thu, 25 Oct 2007) $
57   */
58  public class DatagramConnectorDelegate extends BaseIoConnector implements
59          DatagramService {
60      private static final AtomicInteger nextId = new AtomicInteger();
61  
62      private final Object lock = new Object();
63  
64      private final IoConnector wrapper;
65  
66      private final Executor executor;
67  
68      private final int id = nextId.getAndIncrement();
69  
70      private volatile Selector selector;
71  
72      private DatagramConnectorConfig defaultConfig = new DatagramConnectorConfig();
73  
74      private final Queue<RegistrationRequest> registerQueue = new ConcurrentLinkedQueue<RegistrationRequest>();
75  
76      private final Queue<DatagramSessionImpl> cancelQueue = new ConcurrentLinkedQueue<DatagramSessionImpl>();
77  
78      private final Queue<DatagramSessionImpl> flushingSessions = new ConcurrentLinkedQueue<DatagramSessionImpl>();
79  
80      private final Queue<DatagramSessionImpl> trafficControllingSessions = new ConcurrentLinkedQueue<DatagramSessionImpl>();
81  
82      private Worker worker;
83  
84      /**
85       * Creates a new instance.
86       */
87      public DatagramConnectorDelegate(IoConnector wrapper, Executor executor) {
88          this.wrapper = wrapper;
89          this.executor = executor;
90      }
91  
92      public ConnectFuture connect(SocketAddress address, IoHandler handler,
93              IoServiceConfig config) {
94          return connect(address, null, handler, config);
95      }
96  
97      public ConnectFuture connect(SocketAddress address,
98              SocketAddress localAddress, IoHandler handler,
99              IoServiceConfig config) {
100         if (address == null)
101             throw new NullPointerException("address");
102         if (handler == null)
103             throw new NullPointerException("handler");
104 
105         if (!(address instanceof InetSocketAddress))
106             throw new IllegalArgumentException("Unexpected address type: "
107                     + address.getClass());
108 
109         if (localAddress != null
110                 && !(localAddress instanceof InetSocketAddress)) {
111             throw new IllegalArgumentException(
112                     "Unexpected local address type: " + localAddress.getClass());
113         }
114 
115         if (config == null) {
116             config = getDefaultConfig();
117         }
118 
119         DatagramChannel ch = null;
120         boolean initialized = false;
121         try {
122             ch = DatagramChannel.open();
123             DatagramSessionConfig cfg;
124             if (config.getSessionConfig() instanceof DatagramSessionConfig) {
125                 cfg = (DatagramSessionConfig) config.getSessionConfig();
126             } else {
127                 cfg = getDefaultConfig().getSessionConfig();
128             }
129 
130             ch.socket().setReuseAddress(cfg.isReuseAddress());
131             ch.socket().setBroadcast(cfg.isBroadcast());
132             ch.socket().setReceiveBufferSize(cfg.getReceiveBufferSize());
133             ch.socket().setSendBufferSize(cfg.getSendBufferSize());
134 
135             if (ch.socket().getTrafficClass() != cfg.getTrafficClass()) {
136                 ch.socket().setTrafficClass(cfg.getTrafficClass());
137             }
138 
139             if (localAddress != null) {
140                 ch.socket().bind(localAddress);
141             }
142             ch.connect(address);
143             ch.configureBlocking(false);
144             initialized = true;
145         } catch (IOException e) {
146             return DefaultConnectFuture.newFailedFuture(e);
147         } finally {
148             if (!initialized && ch != null) {
149                 try {
150                     ch.disconnect();
151                     ch.close();
152                 } catch (IOException e) {
153                     ExceptionMonitor.getInstance().exceptionCaught(e);
154                 }
155             }
156         }
157 
158         RegistrationRequest request = new RegistrationRequest(ch, handler,
159                 config);
160         synchronized (lock) {
161             try {
162                 startupWorker();
163             } catch (IOException e) {
164                 try {
165                     ch.disconnect();
166                     ch.close();
167                 } catch (IOException e2) {
168                     ExceptionMonitor.getInstance().exceptionCaught(e2);
169                 }
170     
171                 return DefaultConnectFuture.newFailedFuture(e);
172             }
173     
174             registerQueue.add(request);
175     
176             selector.wakeup();
177         }
178         return request;
179     }
180 
181     public DatagramConnectorConfig getDefaultConfig() {
182         return defaultConfig;
183     }
184 
185     /**
186      * Sets the config this connector will use by default.
187      *
188      * @param defaultConfig the default config.
189      * @throws NullPointerException if the specified value is <code>null</code>.
190      */
191     public void setDefaultConfig(DatagramConnectorConfig defaultConfig) {
192         if (defaultConfig == null) {
193             throw new NullPointerException("defaultConfig");
194         }
195         this.defaultConfig = defaultConfig;
196     }
197 
198     private void startupWorker() throws IOException {
199         synchronized (lock) {
200             if (worker == null) {
201                 selector = Selector.open();
202                 worker = new Worker();
203                 executor.execute(
204                         new NamePreservingRunnable(worker, "DatagramConnector-" + id));
205             }
206         }
207     }
208 
209     public void closeSession(DatagramSessionImpl session) {
210         synchronized (lock) {
211             try {
212                 startupWorker();
213             } catch (IOException e) {
214                 // IOException is thrown only when Worker thread is not
215                 // running and failed to open a selector.  We simply return
216                 // silently here because it we can simply conclude that
217                 // this session is not managed by this connector or
218                 // already closed.
219                 return;
220             }
221     
222             cancelQueue.add(session);
223     
224             selector.wakeup();
225         }
226     }
227 
228     public void flushSession(DatagramSessionImpl session) {
229         if (scheduleFlush(session)) {
230             Selector selector = this.selector;
231             if (selector != null) {
232                 selector.wakeup();
233             }
234         }
235     }
236 
237     private boolean scheduleFlush(DatagramSessionImpl session) {
238         if (session.setScheduledForFlush(true)) {
239             flushingSessions.add(session);
240             return true;
241         } else {
242             return false;
243         }
244     }
245 
246     public void updateTrafficMask(DatagramSessionImpl session) {
247         scheduleTrafficControl(session);
248         Selector selector = this.selector;
249         if (selector != null) {
250             selector.wakeup();
251         }
252     }
253 
254     private void scheduleTrafficControl(DatagramSessionImpl session) {
255         trafficControllingSessions.add(session);
256     }
257 
258     private void doUpdateTrafficMask() {
259         if (trafficControllingSessions.isEmpty())
260             return;
261 
262         for (;;) {
263             DatagramSessionImpl session = trafficControllingSessions.poll();
264 
265             if (session == null)
266                 break;
267 
268             SelectionKey key = session.getSelectionKey();
269             // Retry later if session is not yet fully initialized.
270             // (In case that Session.suspend??() or session.resume??() is
271             // called before addSession() is processed)
272             if (key == null) {
273                 scheduleTrafficControl(session);
274                 break;
275             }
276             // skip if channel is already closed
277             if (!key.isValid()) {
278                 continue;
279             }
280 
281             // The normal is OP_READ and, if there are write requests in the
282             // session's write queue, set OP_WRITE to trigger flushing.
283             int ops = SelectionKey.OP_READ;
284             if (!session.getWriteRequestQueue().isEmpty()) {
285                 ops |= SelectionKey.OP_WRITE;
286             }
287 
288             // Now mask the preferred ops with the mask of the current session
289             int mask = session.getTrafficMask().getInterestOps();
290             key.interestOps(ops & mask);
291         }
292     }
293 
294     private class Worker implements Runnable {
295         public void run() {
296             Selector selector = DatagramConnectorDelegate.this.selector;
297             for (;;) {
298                 try {
299                     int nKeys = selector.select();
300 
301                     registerNew();
302                     doUpdateTrafficMask();
303 
304                     if (nKeys > 0) {
305                         processReadySessions(selector.selectedKeys());
306                     }
307 
308                     flushSessions();
309                     cancelKeys();
310 
311                     if (selector.keys().isEmpty()) {
312                         synchronized (lock) {
313                             if (selector.keys().isEmpty()
314                                     && registerQueue.isEmpty()
315                                     && cancelQueue.isEmpty()) {
316                                 worker = null;
317                                 try {
318                                     selector.close();
319                                 } catch (IOException e) {
320                                     ExceptionMonitor.getInstance()
321                                             .exceptionCaught(e);
322                                 } finally {
323                                     DatagramConnectorDelegate.this.selector = null;
324                                 }
325                                 break;
326                             }
327                         }
328                     }
329                 } catch (IOException e) {
330                     ExceptionMonitor.getInstance().exceptionCaught(e);
331 
332                     try {
333                         Thread.sleep(1000);
334                     } catch (InterruptedException e1) {
335                         ExceptionMonitor.getInstance().exceptionCaught(e1);
336                     }
337                 }
338             }
339         }
340     }
341 
342     private void processReadySessions(Set<SelectionKey> keys) {
343         Iterator<SelectionKey> it = keys.iterator();
344         while (it.hasNext()) {
345             SelectionKey key = it.next();
346             it.remove();
347 
348             DatagramSessionImpl session = (DatagramSessionImpl) key
349                     .attachment();
350 
351             // Let the recycler know that the session is still active. 
352             getSessionRecycler(session).recycle(session.getLocalAddress(),
353                     session.getRemoteAddress());
354 
355             if (key.isReadable() && session.getTrafficMask().isReadable()) {
356                 readSession(session);
357             }
358 
359             if (key.isWritable() && session.getTrafficMask().isWritable()) {
360                 scheduleFlush(session);
361             }
362         }
363     }
364 
365     private IoSessionRecycler getSessionRecycler(IoSession session) {
366         IoServiceConfig config = session.getServiceConfig();
367         IoSessionRecycler sessionRecycler;
368         if (config instanceof DatagramServiceConfig) {
369             sessionRecycler = ((DatagramServiceConfig) config)
370                     .getSessionRecycler();
371         } else {
372             sessionRecycler = defaultConfig.getSessionRecycler();
373         }
374         return sessionRecycler;
375     }
376 
377     private void readSession(DatagramSessionImpl session) {
378 
379         ByteBuffer readBuf = ByteBuffer.allocate(session.getReadBufferSize());
380         try {
381             int readBytes = session.getChannel().read(readBuf.buf());
382             if (readBytes > 0) {
383                 readBuf.flip();
384                 ByteBuffer newBuf = ByteBuffer.allocate(readBuf.limit());
385                 newBuf.put(readBuf);
386                 newBuf.flip();
387 
388                 session.increaseReadBytes(readBytes);
389                 session.getFilterChain().fireMessageReceived(session, newBuf);
390             }
391         } catch (IOException e) {
392             session.getFilterChain().fireExceptionCaught(session, e);
393         } finally {
394             readBuf.release();
395         }
396     }
397 
398     private void flushSessions() {
399         if (flushingSessions.size() == 0)
400             return;
401 
402         for (;;) {
403             DatagramSessionImpl session = flushingSessions.poll();
404 
405             if (session == null)
406                 break;
407 
408             session.setScheduledForFlush(false);
409 
410             try {
411                 boolean flushedAll = flush(session);
412                 if (flushedAll && !session.getWriteRequestQueue().isEmpty() && !session.isScheduledForFlush()) {
413                     scheduleFlush(session);
414                 }
415             } catch (IOException e) {
416                 session.getFilterChain().fireExceptionCaught(session, e);
417             }
418         }
419     }
420 
421     private boolean flush(DatagramSessionImpl session) throws IOException {
422         // Clear OP_WRITE
423         SelectionKey key = session.getSelectionKey();
424         if (key == null) {
425             scheduleFlush(session);
426             return false;
427         }
428         if (!key.isValid()) {
429             return false;
430         }
431         key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
432 
433         DatagramChannel ch = session.getChannel();
434         Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
435 
436         int writtenBytes = 0;
437         int maxWrittenBytes = ((DatagramSessionConfig) session.getConfig()).getSendBufferSize() << 1;
438         try {
439             for (;;) {
440                 WriteRequest req = writeRequestQueue.peek();
441         
442                 if (req == null)
443                     break;
444         
445                 ByteBuffer buf = (ByteBuffer) req.getMessage();
446                 if (buf.remaining() == 0) {
447                     // pop and fire event
448                     writeRequestQueue.poll();
449         
450                     buf.reset();
451                     
452                     if (!buf.hasRemaining()) {
453                         session.increaseWrittenMessages();
454                     }
455 
456                     session.getFilterChain().fireMessageSent(session, req);
457                     continue;
458                 }
459         
460                 int localWrittenBytes = ch.write(buf.buf());
461                 writtenBytes += localWrittenBytes;
462     
463                 if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
464                     // Kernel buffer is full or wrote too much
465                     key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
466                     return false;
467                 } else {
468                     key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
469     
470                     // pop and fire event
471                     writeRequestQueue.poll();
472     
473                     buf.reset();
474                     
475                     if (!buf.hasRemaining()) {
476                         session.increaseWrittenMessages();
477                     }
478                     
479                     session.getFilterChain().fireMessageSent(session, req);
480                 }
481             }
482         } finally {
483             session.increaseWrittenBytes(writtenBytes);
484         }
485         
486         return true;
487     }
488 
489     private void registerNew() {
490         if (registerQueue.isEmpty())
491             return;
492 
493         Selector selector = this.selector;
494         for (;;) {
495             RegistrationRequest req = registerQueue.poll();
496 
497             if (req == null)
498                 break;
499 
500             DatagramSessionImpl session = new DatagramSessionImpl(wrapper,
501                     this, req.config, req.channel, req.handler, req.channel
502                             .socket().getRemoteSocketAddress(), req.channel
503                             .socket().getLocalSocketAddress());
504 
505             // AbstractIoFilterChain will notify the connect future.
506             session.setAttribute(AbstractIoFilterChain.CONNECT_FUTURE, req);
507 
508             boolean success = false;
509             try {
510                 SelectionKey key = req.channel.register(selector,
511                         SelectionKey.OP_READ, session);
512 
513                 session.setSelectionKey(key);
514                 buildFilterChain(req, session);
515                 getSessionRecycler(session).put(session);
516 
517                 // The CONNECT_FUTURE attribute is cleared and notified here.
518                 getListeners().fireSessionCreated(session);
519                 success = true;
520             } catch (Throwable t) {
521                 // The CONNECT_FUTURE attribute is cleared and notified here.
522                 session.getFilterChain().fireExceptionCaught(session, t);
523             } finally {
524                 if (!success) {
525                     try {
526                         req.channel.disconnect();
527                         req.channel.close();
528                     } catch (IOException e) {
529                         ExceptionMonitor.getInstance().exceptionCaught(e);
530                     }
531                 }
532             }
533         }
534     }
535 
536     private void buildFilterChain(RegistrationRequest req, IoSession session)
537             throws Exception {
538         getFilterChainBuilder().buildFilterChain(session.getFilterChain());
539         req.config.getFilterChainBuilder().buildFilterChain(
540                 session.getFilterChain());
541         req.config.getThreadModel().buildFilterChain(session.getFilterChain());
542     }
543 
544     private void cancelKeys() {
545         if (cancelQueue.isEmpty())
546             return;
547 
548         Selector selector = this.selector;
549         for (;;) {
550             DatagramSessionImpl session = cancelQueue.poll();
551 
552             if (session == null)
553                 break;
554             else {
555                 SelectionKey key = session.getSelectionKey();
556                 DatagramChannel ch = (DatagramChannel) key.channel();
557                 try {
558                     ch.disconnect();
559                     ch.close();
560                 } catch (IOException e) {
561                     ExceptionMonitor.getInstance().exceptionCaught(e);
562                 }
563 
564                 getListeners().fireSessionDestroyed(session);
565                 session.getCloseFuture().setClosed();
566                 key.cancel();
567                 selector.wakeup(); // wake up again to trigger thread death
568             }
569         }
570     }
571 
572     private static class RegistrationRequest extends DefaultConnectFuture {
573         private final DatagramChannel channel;
574 
575         private final IoHandler handler;
576 
577         private final IoServiceConfig config;
578 
579         private RegistrationRequest(DatagramChannel channel, IoHandler handler,
580                 IoServiceConfig config) {
581             this.channel = channel;
582             this.handler = handler;
583             this.config = config;
584         }
585     }
586 }