View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.socket.nio.support;
21  
22  import java.io.IOException;
23  import java.net.InetSocketAddress;
24  import java.net.SocketAddress;
25  import java.nio.channels.DatagramChannel;
26  import java.nio.channels.SelectionKey;
27  import java.nio.channels.Selector;
28  import java.util.Iterator;
29  import java.util.Queue;
30  import java.util.Set;
31  import java.util.concurrent.ConcurrentLinkedQueue;
32  import java.util.concurrent.Executor;
33  import java.util.concurrent.atomic.AtomicInteger;
34  
35  import org.apache.mina.common.ByteBuffer;
36  import org.apache.mina.common.ConnectFuture;
37  import org.apache.mina.common.ExceptionMonitor;
38  import org.apache.mina.common.IoConnector;
39  import org.apache.mina.common.IoHandler;
40  import org.apache.mina.common.IoServiceConfig;
41  import org.apache.mina.common.IoSession;
42  import org.apache.mina.common.IoSessionRecycler;
43  import org.apache.mina.common.IoFilter.WriteRequest;
44  import org.apache.mina.common.support.AbstractIoFilterChain;
45  import org.apache.mina.common.support.BaseIoConnector;
46  import org.apache.mina.common.support.DefaultConnectFuture;
47  import org.apache.mina.transport.socket.nio.DatagramConnectorConfig;
48  import org.apache.mina.transport.socket.nio.DatagramServiceConfig;
49  import org.apache.mina.transport.socket.nio.DatagramSessionConfig;
50  import org.apache.mina.util.NamePreservingRunnable;
51  
52  /**
53   * {@link IoConnector} for datagram transport (UDP/IP).
54   *
55   * @author The Apache Directory Project (mina-dev@directory.apache.org)
56   * @version $Rev: 563373 $, $Date: 2007-08-07 11:49:24 +0900 (화, 07  8월 2007) $
57   */
58  public class DatagramConnectorDelegate extends BaseIoConnector implements
59          DatagramService {
60      private static final AtomicInteger nextId = new AtomicInteger();
61  
62      private final Object lock = new Object();
63  
64      private final IoConnector wrapper;
65  
66      private final Executor executor;
67  
68      private final int id = nextId.getAndIncrement();
69  
70      private volatile Selector selector;
71  
72      private DatagramConnectorConfig defaultConfig = new DatagramConnectorConfig();
73  
74      private final Queue<RegistrationRequest> registerQueue = new ConcurrentLinkedQueue<RegistrationRequest>();
75  
76      private final Queue<DatagramSessionImpl> cancelQueue = new ConcurrentLinkedQueue<DatagramSessionImpl>();
77  
78      private final Queue<DatagramSessionImpl> flushingSessions = new ConcurrentLinkedQueue<DatagramSessionImpl>();
79  
80      private final Queue<DatagramSessionImpl> trafficControllingSessions = new ConcurrentLinkedQueue<DatagramSessionImpl>();
81  
82      private Worker worker;
83  
84      /**
85       * Creates a new instance.
86       */
87      public DatagramConnectorDelegate(IoConnector wrapper, Executor executor) {
88          this.wrapper = wrapper;
89          this.executor = executor;
90      }
91  
92      public ConnectFuture connect(SocketAddress address, IoHandler handler,
93              IoServiceConfig config) {
94          return connect(address, null, handler, config);
95      }
96  
97      public ConnectFuture connect(SocketAddress address,
98              SocketAddress localAddress, IoHandler handler,
99              IoServiceConfig config) {
100         if (address == null)
101             throw new NullPointerException("address");
102         if (handler == null)
103             throw new NullPointerException("handler");
104 
105         if (!(address instanceof InetSocketAddress))
106             throw new IllegalArgumentException("Unexpected address type: "
107                     + address.getClass());
108 
109         if (localAddress != null
110                 && !(localAddress instanceof InetSocketAddress)) {
111             throw new IllegalArgumentException(
112                     "Unexpected local address type: " + localAddress.getClass());
113         }
114 
115         if (config == null) {
116             config = getDefaultConfig();
117         }
118 
119         DatagramChannel ch = null;
120         boolean initialized = false;
121         try {
122             ch = DatagramChannel.open();
123             DatagramSessionConfig cfg;
124             if (config.getSessionConfig() instanceof DatagramSessionConfig) {
125                 cfg = (DatagramSessionConfig) config.getSessionConfig();
126             } else {
127                 cfg = getDefaultConfig().getSessionConfig();
128             }
129 
130             ch.socket().setReuseAddress(cfg.isReuseAddress());
131             ch.socket().setBroadcast(cfg.isBroadcast());
132             ch.socket().setReceiveBufferSize(cfg.getReceiveBufferSize());
133             ch.socket().setSendBufferSize(cfg.getSendBufferSize());
134 
135             if (ch.socket().getTrafficClass() != cfg.getTrafficClass()) {
136                 ch.socket().setTrafficClass(cfg.getTrafficClass());
137             }
138 
139             if (localAddress != null) {
140                 ch.socket().bind(localAddress);
141             }
142             ch.connect(address);
143             ch.configureBlocking(false);
144             initialized = true;
145         } catch (IOException e) {
146             return DefaultConnectFuture.newFailedFuture(e);
147         } finally {
148             if (!initialized && ch != null) {
149                 try {
150                     ch.disconnect();
151                     ch.close();
152                 } catch (IOException e) {
153                     ExceptionMonitor.getInstance().exceptionCaught(e);
154                 }
155             }
156         }
157 
158         RegistrationRequest request = new RegistrationRequest(ch, handler,
159                 config);
160         synchronized (lock) {
161             try {
162                 startupWorker();
163             } catch (IOException e) {
164                 try {
165                     ch.disconnect();
166                     ch.close();
167                 } catch (IOException e2) {
168                     ExceptionMonitor.getInstance().exceptionCaught(e2);
169                 }
170     
171                 return DefaultConnectFuture.newFailedFuture(e);
172             }
173     
174             registerQueue.add(request);
175     
176             selector.wakeup();
177         }
178         return request;
179     }
180 
181     public DatagramConnectorConfig getDefaultConfig() {
182         return defaultConfig;
183     }
184 
185     /**
186      * Sets the config this connector will use by default.
187      *
188      * @param defaultConfig the default config.
189      * @throws NullPointerException if the specified value is <code>null</code>.
190      */
191     public void setDefaultConfig(DatagramConnectorConfig defaultConfig) {
192         if (defaultConfig == null) {
193             throw new NullPointerException("defaultConfig");
194         }
195         this.defaultConfig = defaultConfig;
196     }
197 
198     private void startupWorker() throws IOException {
199         synchronized (lock) {
200             if (worker == null) {
201                 selector = Selector.open();
202                 worker = new Worker();
203                 executor.execute(new NamePreservingRunnable(worker));
204             }
205         }
206     }
207 
208     public void closeSession(DatagramSessionImpl session) {
209         synchronized (lock) {
210             try {
211                 startupWorker();
212             } catch (IOException e) {
213                 // IOException is thrown only when Worker thread is not
214                 // running and failed to open a selector.  We simply return
215                 // silently here because it we can simply conclude that
216                 // this session is not managed by this connector or
217                 // already closed.
218                 return;
219             }
220     
221             cancelQueue.add(session);
222     
223             selector.wakeup();
224         }
225     }
226 
227     public void flushSession(DatagramSessionImpl session) {
228         scheduleFlush(session);
229         Selector selector = this.selector;
230         if (selector != null) {
231             selector.wakeup();
232         }
233     }
234 
235     private void scheduleFlush(DatagramSessionImpl session) {
236         flushingSessions.add(session);
237     }
238 
239     public void updateTrafficMask(DatagramSessionImpl session) {
240         scheduleTrafficControl(session);
241         Selector selector = this.selector;
242         if (selector != null) {
243             selector.wakeup();
244         }
245     }
246 
247     private void scheduleTrafficControl(DatagramSessionImpl session) {
248         trafficControllingSessions.add(session);
249     }
250 
251     private void doUpdateTrafficMask() {
252         if (trafficControllingSessions.isEmpty())
253             return;
254 
255         for (;;) {
256             DatagramSessionImpl session = trafficControllingSessions.poll();
257 
258             if (session == null)
259                 break;
260 
261             SelectionKey key = session.getSelectionKey();
262             // Retry later if session is not yet fully initialized.
263             // (In case that Session.suspend??() or session.resume??() is
264             // called before addSession() is processed)
265             if (key == null) {
266                 scheduleTrafficControl(session);
267                 break;
268             }
269             // skip if channel is already closed
270             if (!key.isValid()) {
271                 continue;
272             }
273 
274             // The normal is OP_READ and, if there are write requests in the
275             // session's write queue, set OP_WRITE to trigger flushing.
276             int ops = SelectionKey.OP_READ;
277             if (!session.getWriteRequestQueue().isEmpty()) {
278                 ops |= SelectionKey.OP_WRITE;
279             }
280 
281             // Now mask the preferred ops with the mask of the current session
282             int mask = session.getTrafficMask().getInterestOps();
283             key.interestOps(ops & mask);
284         }
285     }
286 
287     private class Worker implements Runnable {
288         public void run() {
289             Thread.currentThread().setName("DatagramConnector-" + id);
290 
291             Selector selector = DatagramConnectorDelegate.this.selector;
292             for (;;) {
293                 try {
294                     int nKeys = selector.select();
295 
296                     registerNew();
297                     doUpdateTrafficMask();
298 
299                     if (nKeys > 0) {
300                         processReadySessions(selector.selectedKeys());
301                     }
302 
303                     flushSessions();
304                     cancelKeys();
305 
306                     if (selector.keys().isEmpty()) {
307                         synchronized (lock) {
308                             if (selector.keys().isEmpty()
309                                     && registerQueue.isEmpty()
310                                     && cancelQueue.isEmpty()) {
311                                 worker = null;
312                                 try {
313                                     selector.close();
314                                 } catch (IOException e) {
315                                     ExceptionMonitor.getInstance()
316                                             .exceptionCaught(e);
317                                 } finally {
318                                     DatagramConnectorDelegate.this.selector = null;
319                                 }
320                                 break;
321                             }
322                         }
323                     }
324                 } catch (IOException e) {
325                     ExceptionMonitor.getInstance().exceptionCaught(e);
326 
327                     try {
328                         Thread.sleep(1000);
329                     } catch (InterruptedException e1) {
330                         ExceptionMonitor.getInstance().exceptionCaught(e1);
331                     }
332                 }
333             }
334         }
335     }
336 
337     private void processReadySessions(Set<SelectionKey> keys) {
338         Iterator<SelectionKey> it = keys.iterator();
339         while (it.hasNext()) {
340             SelectionKey key = it.next();
341             it.remove();
342 
343             DatagramSessionImpl session = (DatagramSessionImpl) key
344                     .attachment();
345 
346             // Let the recycler know that the session is still active. 
347             getSessionRecycler(session).recycle(session.getLocalAddress(),
348                     session.getRemoteAddress());
349 
350             if (key.isReadable() && session.getTrafficMask().isReadable()) {
351                 readSession(session);
352             }
353 
354             if (key.isWritable() && session.getTrafficMask().isWritable()) {
355                 scheduleFlush(session);
356             }
357         }
358     }
359 
360     private IoSessionRecycler getSessionRecycler(IoSession session) {
361         IoServiceConfig config = session.getServiceConfig();
362         IoSessionRecycler sessionRecycler;
363         if (config instanceof DatagramServiceConfig) {
364             sessionRecycler = ((DatagramServiceConfig) config)
365                     .getSessionRecycler();
366         } else {
367             sessionRecycler = defaultConfig.getSessionRecycler();
368         }
369         return sessionRecycler;
370     }
371 
372     private void readSession(DatagramSessionImpl session) {
373 
374         ByteBuffer readBuf = ByteBuffer.allocate(session.getReadBufferSize());
375         try {
376             int readBytes = session.getChannel().read(readBuf.buf());
377             if (readBytes > 0) {
378                 readBuf.flip();
379                 ByteBuffer newBuf = ByteBuffer.allocate(readBuf.limit());
380                 newBuf.put(readBuf);
381                 newBuf.flip();
382 
383                 session.increaseReadBytes(readBytes);
384                 session.getFilterChain().fireMessageReceived(session, newBuf);
385             }
386         } catch (IOException e) {
387             session.getFilterChain().fireExceptionCaught(session, e);
388         } finally {
389             readBuf.release();
390         }
391     }
392 
393     private void flushSessions() {
394         if (flushingSessions.size() == 0)
395             return;
396 
397         for (;;) {
398             DatagramSessionImpl session = flushingSessions.poll();
399 
400             if (session == null)
401                 break;
402 
403             try {
404                 flush(session);
405             } catch (IOException e) {
406                 session.getFilterChain().fireExceptionCaught(session, e);
407             }
408         }
409     }
410 
411     private void flush(DatagramSessionImpl session) throws IOException {
412         DatagramChannel ch = session.getChannel();
413 
414         Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
415 
416         for (;;) {
417             WriteRequest req = writeRequestQueue.peek();
418 
419             if (req == null)
420                 break;
421 
422             ByteBuffer buf = (ByteBuffer) req.getMessage();
423             if (buf.remaining() == 0) {
424                 // pop and fire event
425                 writeRequestQueue.poll();
426 
427                 session.increaseWrittenMessages();
428                 buf.reset();
429                 session.getFilterChain().fireMessageSent(session, req);
430                 continue;
431             }
432 
433             SelectionKey key = session.getSelectionKey();
434             if (key == null) {
435                 scheduleFlush(session);
436                 break;
437             }
438             if (!key.isValid()) {
439                 continue;
440             }
441 
442             int writtenBytes = ch.write(buf.buf());
443 
444             if (writtenBytes == 0) {
445                 // Kernel buffer is full
446                 key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
447             } else if (writtenBytes > 0) {
448                 key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
449 
450                 // pop and fire event
451                 writeRequestQueue.poll();
452 
453                 session.increaseWrittenBytes(writtenBytes);
454                 session.increaseWrittenMessages();
455                 buf.reset();
456                 session.getFilterChain().fireMessageSent(session, req);
457             }
458         }
459     }
460 
461     private void registerNew() {
462         if (registerQueue.isEmpty())
463             return;
464 
465         Selector selector = this.selector;
466         for (;;) {
467             RegistrationRequest req = registerQueue.poll();
468 
469             if (req == null)
470                 break;
471 
472             DatagramSessionImpl session = new DatagramSessionImpl(wrapper,
473                     this, req.config, req.channel, req.handler, req.channel
474                             .socket().getRemoteSocketAddress(), req.channel
475                             .socket().getLocalSocketAddress());
476 
477             // AbstractIoFilterChain will notify the connect future.
478             session.setAttribute(AbstractIoFilterChain.CONNECT_FUTURE, req);
479 
480             boolean success = false;
481             try {
482                 SelectionKey key = req.channel.register(selector,
483                         SelectionKey.OP_READ, session);
484 
485                 session.setSelectionKey(key);
486                 buildFilterChain(req, session);
487                 getSessionRecycler(session).put(session);
488 
489                 // The CONNECT_FUTURE attribute is cleared and notified here.
490                 getListeners().fireSessionCreated(session);
491                 success = true;
492             } catch (Throwable t) {
493                 // The CONNECT_FUTURE attribute is cleared and notified here.
494                 session.getFilterChain().fireExceptionCaught(session, t);
495             } finally {
496                 if (!success) {
497                     try {
498                         req.channel.disconnect();
499                         req.channel.close();
500                     } catch (IOException e) {
501                         ExceptionMonitor.getInstance().exceptionCaught(e);
502                     }
503                 }
504             }
505         }
506     }
507 
508     private void buildFilterChain(RegistrationRequest req, IoSession session)
509             throws Exception {
510         getFilterChainBuilder().buildFilterChain(session.getFilterChain());
511         req.config.getFilterChainBuilder().buildFilterChain(
512                 session.getFilterChain());
513         req.config.getThreadModel().buildFilterChain(session.getFilterChain());
514     }
515 
516     private void cancelKeys() {
517         if (cancelQueue.isEmpty())
518             return;
519 
520         Selector selector = this.selector;
521         for (;;) {
522             DatagramSessionImpl session = cancelQueue.poll();
523 
524             if (session == null)
525                 break;
526             else {
527                 SelectionKey key = session.getSelectionKey();
528                 DatagramChannel ch = (DatagramChannel) key.channel();
529                 try {
530                     ch.disconnect();
531                     ch.close();
532                 } catch (IOException e) {
533                     ExceptionMonitor.getInstance().exceptionCaught(e);
534                 }
535 
536                 getListeners().fireSessionDestroyed(session);
537                 session.getCloseFuture().setClosed();
538                 key.cancel();
539                 selector.wakeup(); // wake up again to trigger thread death
540             }
541         }
542     }
543 
544     private static class RegistrationRequest extends DefaultConnectFuture {
545         private final DatagramChannel channel;
546 
547         private final IoHandler handler;
548 
549         private final IoServiceConfig config;
550 
551         private RegistrationRequest(DatagramChannel channel, IoHandler handler,
552                 IoServiceConfig config) {
553             this.channel = channel;
554             this.handler = handler;
555             this.config = config;
556         }
557     }
558 }