View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *  
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *  
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License. 
18   *  
19   */
20  package org.apache.mina.transport.socket.nio.support;
21  
22  import java.io.IOException;
23  import java.net.InetSocketAddress;
24  import java.net.SocketAddress;
25  import java.nio.channels.DatagramChannel;
26  import java.nio.channels.SelectionKey;
27  import java.nio.channels.Selector;
28  import java.util.Iterator;
29  import java.util.Set;
30  
31  import org.apache.mina.common.ByteBuffer;
32  import org.apache.mina.common.ConnectFuture;
33  import org.apache.mina.common.ExceptionMonitor;
34  import org.apache.mina.common.IoConnector;
35  import org.apache.mina.common.IoHandler;
36  import org.apache.mina.common.IoServiceConfig;
37  import org.apache.mina.common.IoSession;
38  import org.apache.mina.common.IoSessionRecycler;
39  import org.apache.mina.common.IoFilter.WriteRequest;
40  import org.apache.mina.common.support.AbstractIoFilterChain;
41  import org.apache.mina.common.support.BaseIoConnector;
42  import org.apache.mina.common.support.DefaultConnectFuture;
43  import org.apache.mina.transport.socket.nio.DatagramConnectorConfig;
44  import org.apache.mina.transport.socket.nio.DatagramServiceConfig;
45  import org.apache.mina.transport.socket.nio.DatagramSessionConfig;
46  import org.apache.mina.util.NamePreservingRunnable;
47  import org.apache.mina.util.Queue;
48  
49  import edu.emory.mathcs.backport.java.util.concurrent.Executor;
50  
51  /**
52   * {@link IoConnector} for datagram transport (UDP/IP).
53   * 
54   * @author The Apache Directory Project (mina-dev@directory.apache.org)
55   * @version $Rev: 575603 $, $Date: 2007-09-14 19:04:45 +0900 (Fri, 14 Sep 2007) $
56   */
57  public class DatagramConnectorDelegate extends BaseIoConnector implements
58          DatagramService {
59      private static volatile int nextId = 0;
60  
61      private final IoConnector wrapper;
62  
63      private final Executor executor;
64  
65      private final int id = nextId++;
66  
67      private Selector selector;
68  
69      private DatagramConnectorConfig defaultConfig = new DatagramConnectorConfig();
70  
71      private final Queue registerQueue = new Queue();
72  
73      private final Queue cancelQueue = new Queue();
74  
75      private final Queue flushingSessions = new Queue();
76  
77      private final Queue trafficControllingSessions = new Queue();
78  
79      private Worker worker;
80  
81      /**
82       * Creates a new instance.
83       */
84      public DatagramConnectorDelegate(IoConnector wrapper, Executor executor) {
85          this.wrapper = wrapper;
86          this.executor = executor;
87      }
88  
89      public ConnectFuture connect(SocketAddress address, IoHandler handler,
90              IoServiceConfig config) {
91          return connect(address, null, handler, config);
92      }
93  
94      public ConnectFuture connect(SocketAddress address,
95              SocketAddress localAddress, IoHandler handler,
96              IoServiceConfig config) {
97          if (address == null)
98              throw new NullPointerException("address");
99          if (handler == null)
100             throw new NullPointerException("handler");
101 
102         if (!(address instanceof InetSocketAddress))
103             throw new IllegalArgumentException("Unexpected address type: "
104                     + address.getClass());
105 
106         if (localAddress != null
107                 && !(localAddress instanceof InetSocketAddress)) {
108             throw new IllegalArgumentException(
109                     "Unexpected local address type: " + localAddress.getClass());
110         }
111 
112         if (config == null) {
113             config = getDefaultConfig();
114         }
115 
116         DatagramChannel ch = null;
117         boolean initialized = false;
118         try {
119             ch = DatagramChannel.open();
120             DatagramSessionConfig cfg;
121             if (config.getSessionConfig() instanceof DatagramSessionConfig) {
122                 cfg = (DatagramSessionConfig) config.getSessionConfig();
123             } else {
124                 cfg = (DatagramSessionConfig) getDefaultConfig()
125                         .getSessionConfig();
126             }
127 
128             ch.socket().setReuseAddress(cfg.isReuseAddress());
129             ch.socket().setBroadcast(cfg.isBroadcast());
130             ch.socket().setReceiveBufferSize(cfg.getReceiveBufferSize());
131             ch.socket().setSendBufferSize(cfg.getSendBufferSize());
132 
133             if (ch.socket().getTrafficClass() != cfg.getTrafficClass()) {
134                 ch.socket().setTrafficClass(cfg.getTrafficClass());
135             }
136 
137             if (localAddress != null) {
138                 ch.socket().bind(localAddress);
139             }
140             ch.connect(address);
141             ch.configureBlocking(false);
142             initialized = true;
143         } catch (IOException e) {
144             return DefaultConnectFuture.newFailedFuture(e);
145         } finally {
146             if (!initialized && ch != null) {
147                 try {
148                     ch.disconnect();
149                     ch.close();
150                 } catch (IOException e) {
151                     ExceptionMonitor.getInstance().exceptionCaught(e);
152                 }
153             }
154         }
155 
156         RegistrationRequest request = new RegistrationRequest(ch, handler,
157                 config);
158         synchronized (this) {
159             try {
160                 startupWorker();
161             } catch (IOException e) {
162                 try {
163                     ch.disconnect();
164                     ch.close();
165                 } catch (IOException e2) {
166                     ExceptionMonitor.getInstance().exceptionCaught(e2);
167                 }
168 
169                 return DefaultConnectFuture.newFailedFuture(e);
170             }
171 
172             synchronized (registerQueue) {
173                 registerQueue.push(request);
174             }
175             selector.wakeup();
176         }
177 
178         return request;
179     }
180 
181     public IoServiceConfig getDefaultConfig() {
182         return defaultConfig;
183     }
184 
185     /**
186      * Sets the config this connector will use by default.
187      * 
188      * @param defaultConfig the default config.
189      * @throws NullPointerException if the specified value is <code>null</code>.
190      */
191     public void setDefaultConfig(DatagramConnectorConfig defaultConfig) {
192         if (defaultConfig == null) {
193             throw new NullPointerException("defaultConfig");
194         }
195         this.defaultConfig = defaultConfig;
196     }
197     
198     private synchronized Selector getSelector() {
199         return this.selector;
200     }
201 
202     private synchronized void startupWorker() throws IOException {
203         if (worker == null) {
204             selector = Selector.open();
205             worker = new Worker();
206             executor.execute(new NamePreservingRunnable(worker));
207         }
208     }
209 
210     public void closeSession(DatagramSessionImpl session) {
211         synchronized (this) {
212             try {
213                 startupWorker();
214             } catch (IOException e) {
215                 // IOException is thrown only when Worker thread is not
216                 // running and failed to open a selector.  We simply return
217                 // silently here because it we can simply conclude that
218                 // this session is not managed by this connector or
219                 // already closed.
220                 return;
221             }
222 
223             synchronized (cancelQueue) {
224                 cancelQueue.push(session);
225             }
226 
227             selector.wakeup();
228         }
229     }
230 
231     public void flushSession(DatagramSessionImpl session) {
232         if (scheduleFlush(session)) {
233             Selector selector = getSelector();
234             if (selector != null) {
235                 selector.wakeup();
236             }
237         }
238     }
239 
240     private boolean scheduleFlush(DatagramSessionImpl session) {
241         if (session.setScheduledForFlush(true)) {
242             synchronized (flushingSessions) {
243                 flushingSessions.push(session);
244             }
245             return true;
246         } else {
247             return false;
248         }
249     }
250 
251     public void updateTrafficMask(DatagramSessionImpl session) {
252         scheduleTrafficControl(session);
253         Selector selector = getSelector();
254         if (selector != null) {
255             selector.wakeup();
256         }
257     }
258 
259     private void scheduleTrafficControl(DatagramSessionImpl session) {
260         synchronized (trafficControllingSessions) {
261             trafficControllingSessions.push(session);
262         }
263     }
264 
265     private void doUpdateTrafficMask() {
266         if (trafficControllingSessions.isEmpty())
267             return;
268 
269         for (;;) {
270             DatagramSessionImpl session;
271 
272             synchronized (trafficControllingSessions) {
273                 session = (DatagramSessionImpl) trafficControllingSessions
274                         .pop();
275             }
276 
277             if (session == null)
278                 break;
279 
280             SelectionKey key = session.getSelectionKey();
281             // Retry later if session is not yet fully initialized.
282             // (In case that Session.suspend??() or session.resume??() is 
283             // called before addSession() is processed)
284             if (key == null) {
285                 scheduleTrafficControl(session);
286                 break;
287             }
288             // skip if channel is already closed
289             if (!key.isValid()) {
290                 continue;
291             }
292 
293             // The normal is OP_READ and, if there are write requests in the
294             // session's write queue, set OP_WRITE to trigger flushing.
295             int ops = SelectionKey.OP_READ;
296             Queue writeRequestQueue = session.getWriteRequestQueue();
297             synchronized (writeRequestQueue) {
298                 if (!writeRequestQueue.isEmpty()) {
299                     ops |= SelectionKey.OP_WRITE;
300                 }
301             }
302 
303             // Now mask the preferred ops with the mask of the current session
304             int mask = session.getTrafficMask().getInterestOps();
305             key.interestOps(ops & mask);
306         }
307     }
308 
309     private class Worker implements Runnable {
310         public void run() {
311             Thread.currentThread().setName("DatagramConnector-" + id);
312 
313             Selector selector = getSelector();
314             for (;;) {
315                 try {
316                     int nKeys = selector.select();
317 
318                     registerNew();
319                     doUpdateTrafficMask();
320 
321                     if (nKeys > 0) {
322                         processReadySessions(selector.selectedKeys());
323                     }
324 
325                     flushSessions();
326                     cancelKeys();
327 
328                     if (selector.keys().isEmpty()) {
329                         synchronized (DatagramConnectorDelegate.this) {
330                             if (selector.keys().isEmpty()
331                                     && registerQueue.isEmpty()
332                                     && cancelQueue.isEmpty()) {
333                                 worker = null;
334                                 try {
335                                     selector.close();
336                                 } catch (IOException e) {
337                                     ExceptionMonitor.getInstance()
338                                             .exceptionCaught(e);
339                                 } finally {
340                                     DatagramConnectorDelegate.this.selector = null;
341                                 }
342                                 break;
343                             }
344                         }
345                     }
346                 } catch (IOException e) {
347                     ExceptionMonitor.getInstance().exceptionCaught(e);
348 
349                     try {
350                         Thread.sleep(1000);
351                     } catch (InterruptedException e1) {
352                     }
353                 }
354             }
355         }
356     }
357 
358     private void processReadySessions(Set keys) {
359         Iterator it = keys.iterator();
360         while (it.hasNext()) {
361             SelectionKey key = (SelectionKey) it.next();
362             it.remove();
363 
364             DatagramSessionImpl session = (DatagramSessionImpl) key
365                     .attachment();
366 
367             // Let the recycler know that the session is still active. 
368             getSessionRecycler(session).recycle(session.getLocalAddress(),
369                     session.getRemoteAddress());
370 
371             if (key.isReadable() && session.getTrafficMask().isReadable()) {
372                 readSession(session);
373             }
374 
375             if (key.isWritable() && session.getTrafficMask().isWritable()) {
376                 scheduleFlush(session);
377             }
378         }
379     }
380 
381     private IoSessionRecycler getSessionRecycler(IoSession session) {
382         IoServiceConfig config = session.getServiceConfig();
383         IoSessionRecycler sessionRecycler;
384         if (config instanceof DatagramServiceConfig) {
385             sessionRecycler = ((DatagramServiceConfig) config)
386                     .getSessionRecycler();
387         } else {
388             sessionRecycler = defaultConfig.getSessionRecycler();
389         }
390         return sessionRecycler;
391     }
392 
393     private void readSession(DatagramSessionImpl session) {
394 
395         ByteBuffer readBuf = ByteBuffer.allocate(session.getReadBufferSize());
396         try {
397             int readBytes = session.getChannel().read(readBuf.buf());
398             if (readBytes > 0) {
399                 readBuf.flip();
400                 ByteBuffer newBuf = ByteBuffer.allocate(readBuf.limit());
401                 newBuf.put(readBuf);
402                 newBuf.flip();
403 
404                 session.increaseReadBytes(readBytes);
405                 session.getFilterChain().fireMessageReceived(session, newBuf);
406             }
407         } catch (IOException e) {
408             session.getFilterChain().fireExceptionCaught(session, e);
409         } finally {
410             readBuf.release();
411         }
412     }
413 
414     private void flushSessions() {
415         if (flushingSessions.size() == 0)
416             return;
417 
418         for (;;) {
419             DatagramSessionImpl session;
420 
421             synchronized (flushingSessions) {
422                 session = (DatagramSessionImpl) flushingSessions.pop();
423             }
424 
425             if (session == null)
426                 break;
427             
428             session.setScheduledForFlush(false);
429 
430             try {
431                 boolean flushedAll = flush(session);
432                 if (flushedAll && !session.getWriteRequestQueue().isEmpty() && !session.isScheduledForFlush()) {
433                     scheduleFlush(session);
434                 }
435             } catch (IOException e) {
436                 session.getFilterChain().fireExceptionCaught(session, e);
437             }
438         }
439     }
440 
441     private boolean flush(DatagramSessionImpl session) throws IOException {
442         // Clear OP_WRITE
443         SelectionKey key = session.getSelectionKey();
444         if (key == null) {
445             scheduleFlush(session);
446             return false;
447         }
448         if (!key.isValid()) {
449             return false;
450         }
451         key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
452 
453         DatagramChannel ch = session.getChannel();
454         Queue writeRequestQueue = session.getWriteRequestQueue();
455 
456         int writtenBytes = 0;
457         int maxWrittenBytes = ((DatagramSessionConfig) session.getConfig()).getSendBufferSize() << 1;
458         try {
459             for (;;) {
460                 WriteRequest req;
461                 synchronized (writeRequestQueue) {
462                     req = (WriteRequest) writeRequestQueue.first();
463                 }
464     
465                 if (req == null)
466                     break;
467     
468                 ByteBuffer buf = (ByteBuffer) req.getMessage();
469                 if (buf.remaining() == 0) {
470                     // pop and fire event
471                     synchronized (writeRequestQueue) {
472                         writeRequestQueue.pop();
473                     }
474     
475                     session.increaseWrittenMessages();
476                     buf.reset();
477                     session.getFilterChain().fireMessageSent(session, req);
478                     continue;
479                 }
480     
481                 int localWrittenBytes = ch.write(buf.buf());
482                 writtenBytes += localWrittenBytes;
483     
484                 if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
485                     // Kernel buffer is full or wrote too much
486                     key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
487                     return false;
488                 } else {
489                     key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
490     
491                     // pop and fire event
492                     synchronized (writeRequestQueue) {
493                         writeRequestQueue.pop();
494                     }
495     
496                     session.increaseWrittenMessages();
497                     buf.reset();
498                     session.getFilterChain().fireMessageSent(session, req);
499                 }
500             }
501         } finally {
502             session.increaseWrittenBytes(writtenBytes);
503         }
504         
505         return true;
506     }
507 
508     private void registerNew() {
509         if (registerQueue.isEmpty())
510             return;
511 
512         Selector selector = getSelector();
513         for (;;) {
514             RegistrationRequest req;
515             synchronized (registerQueue) {
516                 req = (RegistrationRequest) registerQueue.pop();
517             }
518 
519             if (req == null)
520                 break;
521 
522             DatagramSessionImpl session = new DatagramSessionImpl(wrapper,
523                     this, req.config, req.channel, req.handler, req.channel
524                             .socket().getRemoteSocketAddress(), req.channel
525                             .socket().getLocalSocketAddress());
526 
527             // AbstractIoFilterChain will notify the connect future.
528             session.setAttribute(AbstractIoFilterChain.CONNECT_FUTURE, req);
529 
530             boolean success = false;
531             try {
532                 SelectionKey key = req.channel.register(selector,
533                         SelectionKey.OP_READ, session);
534 
535                 session.setSelectionKey(key);
536                 buildFilterChain(req, session);
537                 getSessionRecycler(session).put(session);
538 
539                 // The CONNECT_FUTURE attribute is cleared and notified here.
540                 getListeners().fireSessionCreated(session);
541                 success = true;
542             } catch (Throwable t) {
543                 // The CONNECT_FUTURE attribute is cleared and notified here.
544                 session.getFilterChain().fireExceptionCaught(session, t);
545             } finally {
546                 if (!success) {
547                     try {
548                         req.channel.disconnect();
549                         req.channel.close();
550                     } catch (IOException e) {
551                         ExceptionMonitor.getInstance().exceptionCaught(e);
552                     }
553                 }
554             }
555         }
556     }
557 
558     private void buildFilterChain(RegistrationRequest req, IoSession session)
559             throws Exception {
560         getFilterChainBuilder().buildFilterChain(session.getFilterChain());
561         req.config.getFilterChainBuilder().buildFilterChain(
562                 session.getFilterChain());
563         req.config.getThreadModel().buildFilterChain(session.getFilterChain());
564     }
565 
566     private void cancelKeys() {
567         if (cancelQueue.isEmpty())
568             return;
569 
570         Selector selector = getSelector();
571         for (;;) {
572             DatagramSessionImpl session;
573             synchronized (cancelQueue) {
574                 session = (DatagramSessionImpl) cancelQueue.pop();
575             }
576 
577             if (session == null)
578                 break;
579             else {
580                 SelectionKey key = session.getSelectionKey();
581                 DatagramChannel ch = (DatagramChannel) key.channel();
582                 try {
583                     ch.disconnect();
584                     ch.close();
585                 } catch (IOException e) {
586                     ExceptionMonitor.getInstance().exceptionCaught(e);
587                 }
588 
589                 getListeners().fireSessionDestroyed(session);
590                 session.getCloseFuture().setClosed();
591                 key.cancel();
592                 selector.wakeup(); // wake up again to trigger thread death
593             }
594         }
595     }
596 
597     private static class RegistrationRequest extends DefaultConnectFuture {
598         private final DatagramChannel channel;
599 
600         private final IoHandler handler;
601 
602         private final IoServiceConfig config;
603 
604         private RegistrationRequest(DatagramChannel channel, IoHandler handler,
605                 IoServiceConfig config) {
606             this.channel = channel;
607             this.handler = handler;
608             this.config = config;
609         }
610     }
611 }