View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *  
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *  
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License. 
18   *  
19   */
20  package org.apache.mina.transport.socket.nio.support;
21  
22  import java.io.IOException;
23  import java.net.InetSocketAddress;
24  import java.net.SocketAddress;
25  import java.nio.channels.DatagramChannel;
26  import java.nio.channels.SelectionKey;
27  import java.nio.channels.Selector;
28  import java.util.Iterator;
29  import java.util.Set;
30  
31  import org.apache.mina.common.ByteBuffer;
32  import org.apache.mina.common.ConnectFuture;
33  import org.apache.mina.common.ExceptionMonitor;
34  import org.apache.mina.common.IoConnector;
35  import org.apache.mina.common.IoHandler;
36  import org.apache.mina.common.IoServiceConfig;
37  import org.apache.mina.common.IoSession;
38  import org.apache.mina.common.IoSessionRecycler;
39  import org.apache.mina.common.IoFilter.WriteRequest;
40  import org.apache.mina.common.support.AbstractIoFilterChain;
41  import org.apache.mina.common.support.BaseIoConnector;
42  import org.apache.mina.common.support.DefaultConnectFuture;
43  import org.apache.mina.transport.socket.nio.DatagramConnectorConfig;
44  import org.apache.mina.transport.socket.nio.DatagramServiceConfig;
45  import org.apache.mina.transport.socket.nio.DatagramSessionConfig;
46  import org.apache.mina.util.NamePreservingRunnable;
47  import org.apache.mina.util.Queue;
48  
49  import edu.emory.mathcs.backport.java.util.concurrent.Executor;
50  
51  /**
52   * {@link IoConnector} for datagram transport (UDP/IP).
53   * 
54   * @author The Apache Directory Project (mina-dev@directory.apache.org)
55   * @version $Rev: 555855 $, $Date: 2007-07-13 12:19:00 +0900 (금, 13  7월 2007) $
56   */
57  public class DatagramConnectorDelegate extends BaseIoConnector implements
58          DatagramService {
59      private static volatile int nextId = 0;
60  
61      private final IoConnector wrapper;
62  
63      private final Executor executor;
64  
65      private final int id = nextId++;
66  
67      private Selector selector;
68  
69      private DatagramConnectorConfig defaultConfig = new DatagramConnectorConfig();
70  
71      private final Queue registerQueue = new Queue();
72  
73      private final Queue cancelQueue = new Queue();
74  
75      private final Queue flushingSessions = new Queue();
76  
77      private final Queue trafficControllingSessions = new Queue();
78  
79      private Worker worker;
80  
81      /**
82       * Creates a new instance.
83       */
84      public DatagramConnectorDelegate(IoConnector wrapper, Executor executor) {
85          this.wrapper = wrapper;
86          this.executor = executor;
87      }
88  
89      public ConnectFuture connect(SocketAddress address, IoHandler handler,
90              IoServiceConfig config) {
91          return connect(address, null, handler, config);
92      }
93  
94      public ConnectFuture connect(SocketAddress address,
95              SocketAddress localAddress, IoHandler handler,
96              IoServiceConfig config) {
97          if (address == null)
98              throw new NullPointerException("address");
99          if (handler == null)
100             throw new NullPointerException("handler");
101 
102         if (!(address instanceof InetSocketAddress))
103             throw new IllegalArgumentException("Unexpected address type: "
104                     + address.getClass());
105 
106         if (localAddress != null
107                 && !(localAddress instanceof InetSocketAddress)) {
108             throw new IllegalArgumentException(
109                     "Unexpected local address type: " + localAddress.getClass());
110         }
111 
112         if (config == null) {
113             config = getDefaultConfig();
114         }
115 
116         DatagramChannel ch = null;
117         boolean initialized = false;
118         try {
119             ch = DatagramChannel.open();
120             DatagramSessionConfig cfg;
121             if (config.getSessionConfig() instanceof DatagramSessionConfig) {
122                 cfg = (DatagramSessionConfig) config.getSessionConfig();
123             } else {
124                 cfg = (DatagramSessionConfig) getDefaultConfig()
125                         .getSessionConfig();
126             }
127 
128             ch.socket().setReuseAddress(cfg.isReuseAddress());
129             ch.socket().setBroadcast(cfg.isBroadcast());
130             ch.socket().setReceiveBufferSize(cfg.getReceiveBufferSize());
131             ch.socket().setSendBufferSize(cfg.getSendBufferSize());
132 
133             if (ch.socket().getTrafficClass() != cfg.getTrafficClass()) {
134                 ch.socket().setTrafficClass(cfg.getTrafficClass());
135             }
136 
137             if (localAddress != null) {
138                 ch.socket().bind(localAddress);
139             }
140             ch.connect(address);
141             ch.configureBlocking(false);
142             initialized = true;
143         } catch (IOException e) {
144             return DefaultConnectFuture.newFailedFuture(e);
145         } finally {
146             if (!initialized && ch != null) {
147                 try {
148                     ch.disconnect();
149                     ch.close();
150                 } catch (IOException e) {
151                     ExceptionMonitor.getInstance().exceptionCaught(e);
152                 }
153             }
154         }
155 
156         RegistrationRequest request = new RegistrationRequest(ch, handler,
157                 config);
158         synchronized (this) {
159             try {
160                 startupWorker();
161             } catch (IOException e) {
162                 try {
163                     ch.disconnect();
164                     ch.close();
165                 } catch (IOException e2) {
166                     ExceptionMonitor.getInstance().exceptionCaught(e2);
167                 }
168 
169                 return DefaultConnectFuture.newFailedFuture(e);
170             }
171 
172             synchronized (registerQueue) {
173                 registerQueue.push(request);
174             }
175         }
176 
177         selector.wakeup();
178         return request;
179     }
180 
181     public IoServiceConfig getDefaultConfig() {
182         return defaultConfig;
183     }
184 
185     /**
186      * Sets the config this connector will use by default.
187      * 
188      * @param defaultConfig the default config.
189      * @throws NullPointerException if the specified value is <code>null</code>.
190      */
191     public void setDefaultConfig(DatagramConnectorConfig defaultConfig) {
192         if (defaultConfig == null) {
193             throw new NullPointerException("defaultConfig");
194         }
195         this.defaultConfig = defaultConfig;
196     }
197 
198     private synchronized void startupWorker() throws IOException {
199         if (worker == null) {
200             selector = Selector.open();
201             worker = new Worker();
202             executor.execute(new NamePreservingRunnable(worker));
203         }
204     }
205 
206     public void closeSession(DatagramSessionImpl session) {
207         synchronized (this) {
208             try {
209                 startupWorker();
210             } catch (IOException e) {
211                 // IOException is thrown only when Worker thread is not
212                 // running and failed to open a selector.  We simply return
213                 // silently here because it we can simply conclude that
214                 // this session is not managed by this connector or
215                 // already closed.
216                 return;
217             }
218 
219             synchronized (cancelQueue) {
220                 cancelQueue.push(session);
221             }
222         }
223 
224         selector.wakeup();
225     }
226 
227     public void flushSession(DatagramSessionImpl session) {
228         scheduleFlush(session);
229         Selector selector = this.selector;
230         if (selector != null) {
231             selector.wakeup();
232         }
233     }
234 
235     private void scheduleFlush(DatagramSessionImpl session) {
236         synchronized (flushingSessions) {
237             flushingSessions.push(session);
238         }
239     }
240 
241     public void updateTrafficMask(DatagramSessionImpl session) {
242         scheduleTrafficControl(session);
243         Selector selector = this.selector;
244         if (selector != null) {
245             selector.wakeup();
246         }
247     }
248 
249     private void scheduleTrafficControl(DatagramSessionImpl session) {
250         synchronized (trafficControllingSessions) {
251             trafficControllingSessions.push(session);
252         }
253     }
254 
255     private void doUpdateTrafficMask() {
256         if (trafficControllingSessions.isEmpty())
257             return;
258 
259         for (;;) {
260             DatagramSessionImpl session;
261 
262             synchronized (trafficControllingSessions) {
263                 session = (DatagramSessionImpl) trafficControllingSessions
264                         .pop();
265             }
266 
267             if (session == null)
268                 break;
269 
270             SelectionKey key = session.getSelectionKey();
271             // Retry later if session is not yet fully initialized.
272             // (In case that Session.suspend??() or session.resume??() is 
273             // called before addSession() is processed)
274             if (key == null) {
275                 scheduleTrafficControl(session);
276                 break;
277             }
278             // skip if channel is already closed
279             if (!key.isValid()) {
280                 continue;
281             }
282 
283             // The normal is OP_READ and, if there are write requests in the
284             // session's write queue, set OP_WRITE to trigger flushing.
285             int ops = SelectionKey.OP_READ;
286             Queue writeRequestQueue = session.getWriteRequestQueue();
287             synchronized (writeRequestQueue) {
288                 if (!writeRequestQueue.isEmpty()) {
289                     ops |= SelectionKey.OP_WRITE;
290                 }
291             }
292 
293             // Now mask the preferred ops with the mask of the current session
294             int mask = session.getTrafficMask().getInterestOps();
295             key.interestOps(ops & mask);
296         }
297     }
298 
299     private class Worker implements Runnable {
300         public void run() {
301             Thread.currentThread().setName("DatagramConnector-" + id);
302 
303             for (;;) {
304                 try {
305                     int nKeys = selector.select();
306 
307                     registerNew();
308                     doUpdateTrafficMask();
309 
310                     if (nKeys > 0) {
311                         processReadySessions(selector.selectedKeys());
312                     }
313 
314                     flushSessions();
315                     cancelKeys();
316 
317                     if (selector.keys().isEmpty()) {
318                         synchronized (DatagramConnectorDelegate.this) {
319                             if (selector.keys().isEmpty()
320                                     && registerQueue.isEmpty()
321                                     && cancelQueue.isEmpty()) {
322                                 worker = null;
323                                 try {
324                                     selector.close();
325                                 } catch (IOException e) {
326                                     ExceptionMonitor.getInstance()
327                                             .exceptionCaught(e);
328                                 } finally {
329                                     selector = null;
330                                 }
331                                 break;
332                             }
333                         }
334                     }
335                 } catch (IOException e) {
336                     ExceptionMonitor.getInstance().exceptionCaught(e);
337 
338                     try {
339                         Thread.sleep(1000);
340                     } catch (InterruptedException e1) {
341                     }
342                 }
343             }
344         }
345     }
346 
347     private void processReadySessions(Set keys) {
348         Iterator it = keys.iterator();
349         while (it.hasNext()) {
350             SelectionKey key = (SelectionKey) it.next();
351             it.remove();
352 
353             DatagramSessionImpl session = (DatagramSessionImpl) key
354                     .attachment();
355 
356             // Let the recycler know that the session is still active. 
357             getSessionRecycler(session).recycle(session.getLocalAddress(),
358                     session.getRemoteAddress());
359 
360             if (key.isReadable() && session.getTrafficMask().isReadable()) {
361                 readSession(session);
362             }
363 
364             if (key.isWritable() && session.getTrafficMask().isWritable()) {
365                 scheduleFlush(session);
366             }
367         }
368     }
369 
370     private IoSessionRecycler getSessionRecycler(IoSession session) {
371         IoServiceConfig config = session.getServiceConfig();
372         IoSessionRecycler sessionRecycler;
373         if (config instanceof DatagramServiceConfig) {
374             sessionRecycler = ((DatagramServiceConfig) config)
375                     .getSessionRecycler();
376         } else {
377             sessionRecycler = defaultConfig.getSessionRecycler();
378         }
379         return sessionRecycler;
380     }
381 
382     private void readSession(DatagramSessionImpl session) {
383 
384         ByteBuffer readBuf = ByteBuffer.allocate(session.getReadBufferSize());
385         try {
386             int readBytes = session.getChannel().read(readBuf.buf());
387             if (readBytes > 0) {
388                 readBuf.flip();
389                 ByteBuffer newBuf = ByteBuffer.allocate(readBuf.limit());
390                 newBuf.put(readBuf);
391                 newBuf.flip();
392 
393                 session.increaseReadBytes(readBytes);
394                 session.getFilterChain().fireMessageReceived(session, newBuf);
395             }
396         } catch (IOException e) {
397             session.getFilterChain().fireExceptionCaught(session, e);
398         } finally {
399             readBuf.release();
400         }
401     }
402 
403     private void flushSessions() {
404         if (flushingSessions.size() == 0)
405             return;
406 
407         for (;;) {
408             DatagramSessionImpl session;
409 
410             synchronized (flushingSessions) {
411                 session = (DatagramSessionImpl) flushingSessions.pop();
412             }
413 
414             if (session == null)
415                 break;
416 
417             try {
418                 flush(session);
419             } catch (IOException e) {
420                 session.getFilterChain().fireExceptionCaught(session, e);
421             }
422         }
423     }
424 
425     private void flush(DatagramSessionImpl session) throws IOException {
426         DatagramChannel ch = session.getChannel();
427 
428         Queue writeRequestQueue = session.getWriteRequestQueue();
429 
430         WriteRequest req;
431         for (;;) {
432             synchronized (writeRequestQueue) {
433                 req = (WriteRequest) writeRequestQueue.first();
434             }
435 
436             if (req == null)
437                 break;
438 
439             ByteBuffer buf = (ByteBuffer) req.getMessage();
440             if (buf.remaining() == 0) {
441                 // pop and fire event
442                 synchronized (writeRequestQueue) {
443                     writeRequestQueue.pop();
444                 }
445 
446                 session.increaseWrittenMessages();
447                 buf.reset();
448                 session.getFilterChain().fireMessageSent(session, req);
449                 continue;
450             }
451 
452             SelectionKey key = session.getSelectionKey();
453             if (key == null) {
454                 scheduleFlush(session);
455                 break;
456             }
457             if (!key.isValid()) {
458                 continue;
459             }
460 
461             int writtenBytes = ch.write(buf.buf());
462 
463             if (writtenBytes == 0) {
464                 // Kernel buffer is full
465                 key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
466             } else if (writtenBytes > 0) {
467                 key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
468 
469                 // pop and fire event
470                 synchronized (writeRequestQueue) {
471                     writeRequestQueue.pop();
472                 }
473 
474                 session.increaseWrittenBytes(writtenBytes);
475                 session.increaseWrittenMessages();
476                 buf.reset();
477                 session.getFilterChain().fireMessageSent(session, req);
478             }
479         }
480     }
481 
482     private void registerNew() {
483         if (registerQueue.isEmpty())
484             return;
485 
486         for (;;) {
487             RegistrationRequest req;
488             synchronized (registerQueue) {
489                 req = (RegistrationRequest) registerQueue.pop();
490             }
491 
492             if (req == null)
493                 break;
494 
495             DatagramSessionImpl session = new DatagramSessionImpl(wrapper,
496                     this, req.config, req.channel, req.handler, req.channel
497                             .socket().getRemoteSocketAddress(), req.channel
498                             .socket().getLocalSocketAddress());
499 
500             // AbstractIoFilterChain will notify the connect future.
501             session.setAttribute(AbstractIoFilterChain.CONNECT_FUTURE, req);
502 
503             boolean success = false;
504             try {
505                 SelectionKey key = req.channel.register(selector,
506                         SelectionKey.OP_READ, session);
507 
508                 session.setSelectionKey(key);
509                 buildFilterChain(req, session);
510                 getSessionRecycler(session).put(session);
511 
512                 // The CONNECT_FUTURE attribute is cleared and notified here.
513                 getListeners().fireSessionCreated(session);
514                 success = true;
515             } catch (Throwable t) {
516                 // The CONNECT_FUTURE attribute is cleared and notified here.
517                 session.getFilterChain().fireExceptionCaught(session, t);
518             } finally {
519                 if (!success) {
520                     try {
521                         req.channel.disconnect();
522                         req.channel.close();
523                     } catch (IOException e) {
524                         ExceptionMonitor.getInstance().exceptionCaught(e);
525                     }
526                 }
527             }
528         }
529     }
530 
531     private void buildFilterChain(RegistrationRequest req, IoSession session)
532             throws Exception {
533         getFilterChainBuilder().buildFilterChain(session.getFilterChain());
534         req.config.getFilterChainBuilder().buildFilterChain(
535                 session.getFilterChain());
536         req.config.getThreadModel().buildFilterChain(session.getFilterChain());
537     }
538 
539     private void cancelKeys() {
540         if (cancelQueue.isEmpty())
541             return;
542 
543         for (;;) {
544             DatagramSessionImpl session;
545             synchronized (cancelQueue) {
546                 session = (DatagramSessionImpl) cancelQueue.pop();
547             }
548 
549             if (session == null)
550                 break;
551             else {
552                 SelectionKey key = session.getSelectionKey();
553                 DatagramChannel ch = (DatagramChannel) key.channel();
554                 try {
555                     ch.disconnect();
556                     ch.close();
557                 } catch (IOException e) {
558                     ExceptionMonitor.getInstance().exceptionCaught(e);
559                 }
560 
561                 getListeners().fireSessionDestroyed(session);
562                 session.getCloseFuture().setClosed();
563                 key.cancel();
564                 selector.wakeup(); // wake up again to trigger thread death
565             }
566         }
567     }
568 
569     private static class RegistrationRequest extends DefaultConnectFuture {
570         private final DatagramChannel channel;
571 
572         private final IoHandler handler;
573 
574         private final IoServiceConfig config;
575 
576         private RegistrationRequest(DatagramChannel channel, IoHandler handler,
577                 IoServiceConfig config) {
578             this.channel = channel;
579             this.handler = handler;
580             this.config = config;
581         }
582     }
583 }