View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *  
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *  
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License. 
18   *  
19   */
20  package org.apache.mina.transport.socket.nio.support;
21  
22  import java.io.IOException;
23  import java.net.InetSocketAddress;
24  import java.net.SocketAddress;
25  import java.nio.channels.DatagramChannel;
26  import java.nio.channels.SelectionKey;
27  import java.nio.channels.Selector;
28  import java.util.Iterator;
29  import java.util.Set;
30  
31  import org.apache.mina.common.ByteBuffer;
32  import org.apache.mina.common.ConnectFuture;
33  import org.apache.mina.common.ExceptionMonitor;
34  import org.apache.mina.common.IoConnector;
35  import org.apache.mina.common.IoHandler;
36  import org.apache.mina.common.IoServiceConfig;
37  import org.apache.mina.common.IoSession;
38  import org.apache.mina.common.IoSessionRecycler;
39  import org.apache.mina.common.IoFilter.WriteRequest;
40  import org.apache.mina.common.support.AbstractIoFilterChain;
41  import org.apache.mina.common.support.BaseIoConnector;
42  import org.apache.mina.common.support.DefaultConnectFuture;
43  import org.apache.mina.transport.socket.nio.DatagramConnectorConfig;
44  import org.apache.mina.transport.socket.nio.DatagramServiceConfig;
45  import org.apache.mina.transport.socket.nio.DatagramSessionConfig;
46  import org.apache.mina.util.NamePreservingRunnable;
47  import org.apache.mina.util.Queue;
48  
49  import edu.emory.mathcs.backport.java.util.concurrent.Executor;
50  
51  /**
52   * {@link IoConnector} for datagram transport (UDP/IP).
53   * 
54   * @author The Apache Directory Project (mina-dev@directory.apache.org)
55   * @version $Rev: 588150 $, $Date: 2007-10-25 08:20:04 +0200 (Thu, 25 Oct 2007) $
56   */
57  public class DatagramConnectorDelegate extends BaseIoConnector implements
58          DatagramService {
59      private static volatile int nextId = 0;
60  
61      private final IoConnector wrapper;
62  
63      private final Executor executor;
64  
65      private final int id = nextId++;
66  
67      private Selector selector;
68  
69      private DatagramConnectorConfig defaultConfig = new DatagramConnectorConfig();
70  
71      private final Queue registerQueue = new Queue();
72  
73      private final Queue cancelQueue = new Queue();
74  
75      private final Queue flushingSessions = new Queue();
76  
77      private final Queue trafficControllingSessions = new Queue();
78  
79      private Worker worker;
80  
81      /**
82       * Creates a new instance.
83       */
84      public DatagramConnectorDelegate(IoConnector wrapper, Executor executor) {
85          this.wrapper = wrapper;
86          this.executor = executor;
87      }
88  
89      public ConnectFuture connect(SocketAddress address, IoHandler handler,
90              IoServiceConfig config) {
91          return connect(address, null, handler, config);
92      }
93  
94      public ConnectFuture connect(SocketAddress address,
95              SocketAddress localAddress, IoHandler handler,
96              IoServiceConfig config) {
97          if (address == null)
98              throw new NullPointerException("address");
99          if (handler == null)
100             throw new NullPointerException("handler");
101 
102         if (!(address instanceof InetSocketAddress))
103             throw new IllegalArgumentException("Unexpected address type: "
104                     + address.getClass());
105 
106         if (localAddress != null
107                 && !(localAddress instanceof InetSocketAddress)) {
108             throw new IllegalArgumentException(
109                     "Unexpected local address type: " + localAddress.getClass());
110         }
111 
112         if (config == null) {
113             config = getDefaultConfig();
114         }
115 
116         DatagramChannel ch = null;
117         boolean initialized = false;
118         try {
119             ch = DatagramChannel.open();
120             DatagramSessionConfig cfg;
121             if (config.getSessionConfig() instanceof DatagramSessionConfig) {
122                 cfg = (DatagramSessionConfig) config.getSessionConfig();
123             } else {
124                 cfg = (DatagramSessionConfig) getDefaultConfig()
125                         .getSessionConfig();
126             }
127 
128             ch.socket().setReuseAddress(cfg.isReuseAddress());
129             ch.socket().setBroadcast(cfg.isBroadcast());
130             ch.socket().setReceiveBufferSize(cfg.getReceiveBufferSize());
131             ch.socket().setSendBufferSize(cfg.getSendBufferSize());
132 
133             if (ch.socket().getTrafficClass() != cfg.getTrafficClass()) {
134                 ch.socket().setTrafficClass(cfg.getTrafficClass());
135             }
136 
137             if (localAddress != null) {
138                 ch.socket().bind(localAddress);
139             }
140             ch.connect(address);
141             ch.configureBlocking(false);
142             initialized = true;
143         } catch (IOException e) {
144             return DefaultConnectFuture.newFailedFuture(e);
145         } finally {
146             if (!initialized && ch != null) {
147                 try {
148                     ch.disconnect();
149                     ch.close();
150                 } catch (IOException e) {
151                     ExceptionMonitor.getInstance().exceptionCaught(e);
152                 }
153             }
154         }
155 
156         RegistrationRequest request = new RegistrationRequest(ch, handler,
157                 config);
158         synchronized (this) {
159             try {
160                 startupWorker();
161             } catch (IOException e) {
162                 try {
163                     ch.disconnect();
164                     ch.close();
165                 } catch (IOException e2) {
166                     ExceptionMonitor.getInstance().exceptionCaught(e2);
167                 }
168 
169                 return DefaultConnectFuture.newFailedFuture(e);
170             }
171 
172             synchronized (registerQueue) {
173                 registerQueue.push(request);
174             }
175             selector.wakeup();
176         }
177 
178         return request;
179     }
180 
181     public IoServiceConfig getDefaultConfig() {
182         return defaultConfig;
183     }
184 
185     /**
186      * Sets the config this connector will use by default.
187      * 
188      * @param defaultConfig the default config.
189      * @throws NullPointerException if the specified value is <code>null</code>.
190      */
191     public void setDefaultConfig(DatagramConnectorConfig defaultConfig) {
192         if (defaultConfig == null) {
193             throw new NullPointerException("defaultConfig");
194         }
195         this.defaultConfig = defaultConfig;
196     }
197     
198     private synchronized Selector getSelector() {
199         return this.selector;
200     }
201 
202     private synchronized void startupWorker() throws IOException {
203         if (worker == null) {
204             selector = Selector.open();
205             worker = new Worker();
206             executor.execute(
207                     new NamePreservingRunnable(worker, "DatagramConnector-" + id));
208         }
209     }
210 
211     public void closeSession(DatagramSessionImpl session) {
212         synchronized (this) {
213             try {
214                 startupWorker();
215             } catch (IOException e) {
216                 // IOException is thrown only when Worker thread is not
217                 // running and failed to open a selector.  We simply return
218                 // silently here because it we can simply conclude that
219                 // this session is not managed by this connector or
220                 // already closed.
221                 return;
222             }
223 
224             synchronized (cancelQueue) {
225                 cancelQueue.push(session);
226             }
227 
228             selector.wakeup();
229         }
230     }
231 
232     public void flushSession(DatagramSessionImpl session) {
233         if (scheduleFlush(session)) {
234             Selector selector = getSelector();
235             if (selector != null) {
236                 selector.wakeup();
237             }
238         }
239     }
240 
241     private boolean scheduleFlush(DatagramSessionImpl session) {
242         if (session.setScheduledForFlush(true)) {
243             synchronized (flushingSessions) {
244                 flushingSessions.push(session);
245             }
246             return true;
247         } else {
248             return false;
249         }
250     }
251 
252     public void updateTrafficMask(DatagramSessionImpl session) {
253         scheduleTrafficControl(session);
254         Selector selector = getSelector();
255         if (selector != null) {
256             selector.wakeup();
257         }
258     }
259 
260     private void scheduleTrafficControl(DatagramSessionImpl session) {
261         synchronized (trafficControllingSessions) {
262             trafficControllingSessions.push(session);
263         }
264     }
265 
266     private void doUpdateTrafficMask() {
267         if (trafficControllingSessions.isEmpty())
268             return;
269 
270         for (;;) {
271             DatagramSessionImpl session;
272 
273             synchronized (trafficControllingSessions) {
274                 session = (DatagramSessionImpl) trafficControllingSessions
275                         .pop();
276             }
277 
278             if (session == null)
279                 break;
280 
281             SelectionKey key = session.getSelectionKey();
282             // Retry later if session is not yet fully initialized.
283             // (In case that Session.suspend??() or session.resume??() is 
284             // called before addSession() is processed)
285             if (key == null) {
286                 scheduleTrafficControl(session);
287                 break;
288             }
289             // skip if channel is already closed
290             if (!key.isValid()) {
291                 continue;
292             }
293 
294             // The normal is OP_READ and, if there are write requests in the
295             // session's write queue, set OP_WRITE to trigger flushing.
296             int ops = SelectionKey.OP_READ;
297             Queue writeRequestQueue = session.getWriteRequestQueue();
298             synchronized (writeRequestQueue) {
299                 if (!writeRequestQueue.isEmpty()) {
300                     ops |= SelectionKey.OP_WRITE;
301                 }
302             }
303 
304             // Now mask the preferred ops with the mask of the current session
305             int mask = session.getTrafficMask().getInterestOps();
306             key.interestOps(ops & mask);
307         }
308     }
309 
310     private class Worker implements Runnable {
311         public void run() {
312             Selector selector = getSelector();
313             for (;;) {
314                 try {
315                     int nKeys = selector.select();
316 
317                     registerNew();
318                     doUpdateTrafficMask();
319 
320                     if (nKeys > 0) {
321                         processReadySessions(selector.selectedKeys());
322                     }
323 
324                     flushSessions();
325                     cancelKeys();
326 
327                     if (selector.keys().isEmpty()) {
328                         synchronized (DatagramConnectorDelegate.this) {
329                             if (selector.keys().isEmpty()
330                                     && registerQueue.isEmpty()
331                                     && cancelQueue.isEmpty()) {
332                                 worker = null;
333                                 try {
334                                     selector.close();
335                                 } catch (IOException e) {
336                                     ExceptionMonitor.getInstance()
337                                             .exceptionCaught(e);
338                                 } finally {
339                                     DatagramConnectorDelegate.this.selector = null;
340                                 }
341                                 break;
342                             }
343                         }
344                     }
345                 } catch (IOException e) {
346                     ExceptionMonitor.getInstance().exceptionCaught(e);
347 
348                     try {
349                         Thread.sleep(1000);
350                     } catch (InterruptedException e1) {
351                     }
352                 }
353             }
354         }
355     }
356 
357     private void processReadySessions(Set keys) {
358         Iterator it = keys.iterator();
359         while (it.hasNext()) {
360             SelectionKey key = (SelectionKey) it.next();
361             it.remove();
362 
363             DatagramSessionImpl session = (DatagramSessionImpl) key
364                     .attachment();
365 
366             // Let the recycler know that the session is still active. 
367             getSessionRecycler(session).recycle(session.getLocalAddress(),
368                     session.getRemoteAddress());
369 
370             if (key.isReadable() && session.getTrafficMask().isReadable()) {
371                 readSession(session);
372             }
373 
374             if (key.isWritable() && session.getTrafficMask().isWritable()) {
375                 scheduleFlush(session);
376             }
377         }
378     }
379 
380     private IoSessionRecycler getSessionRecycler(IoSession session) {
381         IoServiceConfig config = session.getServiceConfig();
382         IoSessionRecycler sessionRecycler;
383         if (config instanceof DatagramServiceConfig) {
384             sessionRecycler = ((DatagramServiceConfig) config)
385                     .getSessionRecycler();
386         } else {
387             sessionRecycler = defaultConfig.getSessionRecycler();
388         }
389         return sessionRecycler;
390     }
391 
392     private void readSession(DatagramSessionImpl session) {
393 
394         ByteBuffer readBuf = ByteBuffer.allocate(session.getReadBufferSize());
395         try {
396             int readBytes = session.getChannel().read(readBuf.buf());
397             if (readBytes > 0) {
398                 readBuf.flip();
399                 ByteBuffer newBuf = ByteBuffer.allocate(readBuf.limit());
400                 newBuf.put(readBuf);
401                 newBuf.flip();
402 
403                 session.increaseReadBytes(readBytes);
404                 session.getFilterChain().fireMessageReceived(session, newBuf);
405             }
406         } catch (IOException e) {
407             session.getFilterChain().fireExceptionCaught(session, e);
408         } finally {
409             readBuf.release();
410         }
411     }
412 
413     private void flushSessions() {
414         if (flushingSessions.size() == 0)
415             return;
416 
417         for (;;) {
418             DatagramSessionImpl session;
419 
420             synchronized (flushingSessions) {
421                 session = (DatagramSessionImpl) flushingSessions.pop();
422             }
423 
424             if (session == null)
425                 break;
426             
427             session.setScheduledForFlush(false);
428 
429             try {
430                 boolean flushedAll = flush(session);
431                 if (flushedAll && !session.getWriteRequestQueue().isEmpty() && !session.isScheduledForFlush()) {
432                     scheduleFlush(session);
433                 }
434             } catch (IOException e) {
435                 session.getFilterChain().fireExceptionCaught(session, e);
436             }
437         }
438     }
439 
440     private boolean flush(DatagramSessionImpl session) throws IOException {
441         // Clear OP_WRITE
442         SelectionKey key = session.getSelectionKey();
443         if (key == null) {
444             scheduleFlush(session);
445             return false;
446         }
447         if (!key.isValid()) {
448             return false;
449         }
450         key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
451 
452         DatagramChannel ch = session.getChannel();
453         Queue writeRequestQueue = session.getWriteRequestQueue();
454 
455         int writtenBytes = 0;
456         int maxWrittenBytes = ((DatagramSessionConfig) session.getConfig()).getSendBufferSize() << 1;
457         try {
458             for (;;) {
459                 WriteRequest req;
460                 synchronized (writeRequestQueue) {
461                     req = (WriteRequest) writeRequestQueue.first();
462                 }
463     
464                 if (req == null)
465                     break;
466     
467                 ByteBuffer buf = (ByteBuffer) req.getMessage();
468                 if (buf.remaining() == 0) {
469                     // pop and fire event
470                     synchronized (writeRequestQueue) {
471                         writeRequestQueue.pop();
472                     }
473     
474                     buf.reset();
475                     
476                     if (!buf.hasRemaining()) {
477                         session.increaseWrittenMessages();
478                     }
479                     
480                     session.getFilterChain().fireMessageSent(session, req);
481                     continue;
482                 }
483     
484                 int localWrittenBytes = ch.write(buf.buf());
485                 writtenBytes += localWrittenBytes;
486     
487                 if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
488                     // Kernel buffer is full or wrote too much
489                     key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
490                     return false;
491                 } else {
492                     key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
493     
494                     // pop and fire event
495                     synchronized (writeRequestQueue) {
496                         writeRequestQueue.pop();
497                     }
498     
499                     buf.reset();
500                     
501                     if (!buf.hasRemaining()) {
502                         session.increaseWrittenMessages();
503                     }
504 
505                     session.getFilterChain().fireMessageSent(session, req);
506                 }
507             }
508         } finally {
509             session.increaseWrittenBytes(writtenBytes);
510         }
511         
512         return true;
513     }
514 
515     private void registerNew() {
516         if (registerQueue.isEmpty())
517             return;
518 
519         Selector selector = getSelector();
520         for (;;) {
521             RegistrationRequest req;
522             synchronized (registerQueue) {
523                 req = (RegistrationRequest) registerQueue.pop();
524             }
525 
526             if (req == null)
527                 break;
528 
529             DatagramSessionImpl session = new DatagramSessionImpl(wrapper,
530                     this, req.config, req.channel, req.handler, req.channel
531                             .socket().getRemoteSocketAddress(), req.channel
532                             .socket().getLocalSocketAddress());
533 
534             // AbstractIoFilterChain will notify the connect future.
535             session.setAttribute(AbstractIoFilterChain.CONNECT_FUTURE, req);
536 
537             boolean success = false;
538             try {
539                 SelectionKey key = req.channel.register(selector,
540                         SelectionKey.OP_READ, session);
541 
542                 session.setSelectionKey(key);
543                 buildFilterChain(req, session);
544                 getSessionRecycler(session).put(session);
545 
546                 // The CONNECT_FUTURE attribute is cleared and notified here.
547                 getListeners().fireSessionCreated(session);
548                 success = true;
549             } catch (Throwable t) {
550                 // The CONNECT_FUTURE attribute is cleared and notified here.
551                 session.getFilterChain().fireExceptionCaught(session, t);
552             } finally {
553                 if (!success) {
554                     try {
555                         req.channel.disconnect();
556                         req.channel.close();
557                     } catch (IOException e) {
558                         ExceptionMonitor.getInstance().exceptionCaught(e);
559                     }
560                 }
561             }
562         }
563     }
564 
565     private void buildFilterChain(RegistrationRequest req, IoSession session)
566             throws Exception {
567         getFilterChainBuilder().buildFilterChain(session.getFilterChain());
568         req.config.getFilterChainBuilder().buildFilterChain(
569                 session.getFilterChain());
570         req.config.getThreadModel().buildFilterChain(session.getFilterChain());
571     }
572 
573     private void cancelKeys() {
574         if (cancelQueue.isEmpty())
575             return;
576 
577         Selector selector = getSelector();
578         for (;;) {
579             DatagramSessionImpl session;
580             synchronized (cancelQueue) {
581                 session = (DatagramSessionImpl) cancelQueue.pop();
582             }
583 
584             if (session == null)
585                 break;
586             else {
587                 SelectionKey key = session.getSelectionKey();
588                 DatagramChannel ch = (DatagramChannel) key.channel();
589                 try {
590                     ch.disconnect();
591                     ch.close();
592                 } catch (IOException e) {
593                     ExceptionMonitor.getInstance().exceptionCaught(e);
594                 }
595 
596                 getListeners().fireSessionDestroyed(session);
597                 session.getCloseFuture().setClosed();
598                 key.cancel();
599                 selector.wakeup(); // wake up again to trigger thread death
600             }
601         }
602     }
603 
604     private static class RegistrationRequest extends DefaultConnectFuture {
605         private final DatagramChannel channel;
606 
607         private final IoHandler handler;
608 
609         private final IoServiceConfig config;
610 
611         private RegistrationRequest(DatagramChannel channel, IoHandler handler,
612                 IoServiceConfig config) {
613             this.channel = channel;
614             this.handler = handler;
615             this.config = config;
616         }
617     }
618 }