View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.socket.nio.support;
21  
22  import java.io.IOException;
23  import java.net.InetSocketAddress;
24  import java.net.SocketAddress;
25  import java.nio.channels.DatagramChannel;
26  import java.nio.channels.SelectionKey;
27  import java.nio.channels.Selector;
28  import java.util.Iterator;
29  import java.util.Queue;
30  import java.util.Set;
31  import java.util.concurrent.ConcurrentLinkedQueue;
32  import java.util.concurrent.Executor;
33  import java.util.concurrent.atomic.AtomicInteger;
34  
35  import org.apache.mina.common.ByteBuffer;
36  import org.apache.mina.common.ConnectFuture;
37  import org.apache.mina.common.ExceptionMonitor;
38  import org.apache.mina.common.IoConnector;
39  import org.apache.mina.common.IoHandler;
40  import org.apache.mina.common.IoServiceConfig;
41  import org.apache.mina.common.IoSession;
42  import org.apache.mina.common.IoSessionRecycler;
43  import org.apache.mina.common.IoFilter.WriteRequest;
44  import org.apache.mina.common.support.AbstractIoFilterChain;
45  import org.apache.mina.common.support.BaseIoConnector;
46  import org.apache.mina.common.support.DefaultConnectFuture;
47  import org.apache.mina.transport.socket.nio.DatagramConnectorConfig;
48  import org.apache.mina.transport.socket.nio.DatagramServiceConfig;
49  import org.apache.mina.transport.socket.nio.DatagramSessionConfig;
50  import org.apache.mina.util.NamePreservingRunnable;
51  
52  /**
53   * {@link IoConnector} for datagram transport (UDP/IP).
54   *
55   * @author The Apache Directory Project (mina-dev@directory.apache.org)
56   * @version $Rev: 555855 $, $Date: 2007-07-13 12:19:00 +0900 (금, 13  7월 2007) $
57   */
58  public class DatagramConnectorDelegate extends BaseIoConnector implements
59          DatagramService {
60      private static final AtomicInteger nextId = new AtomicInteger();
61  
62      private final Object lock = new Object();
63  
64      private final IoConnector wrapper;
65  
66      private final Executor executor;
67  
68      private final int id = nextId.getAndIncrement();
69  
70      private Selector selector;
71  
72      private DatagramConnectorConfig defaultConfig = new DatagramConnectorConfig();
73  
74      private final Queue<RegistrationRequest> registerQueue = new ConcurrentLinkedQueue<RegistrationRequest>();
75  
76      private final Queue<DatagramSessionImpl> cancelQueue = new ConcurrentLinkedQueue<DatagramSessionImpl>();
77  
78      private final Queue<DatagramSessionImpl> flushingSessions = new ConcurrentLinkedQueue<DatagramSessionImpl>();
79  
80      private final Queue<DatagramSessionImpl> trafficControllingSessions = new ConcurrentLinkedQueue<DatagramSessionImpl>();
81  
82      private Worker worker;
83  
84      /**
85       * Creates a new instance.
86       */
87      public DatagramConnectorDelegate(IoConnector wrapper, Executor executor) {
88          this.wrapper = wrapper;
89          this.executor = executor;
90      }
91  
92      public ConnectFuture connect(SocketAddress address, IoHandler handler,
93              IoServiceConfig config) {
94          return connect(address, null, handler, config);
95      }
96  
97      public ConnectFuture connect(SocketAddress address,
98              SocketAddress localAddress, IoHandler handler,
99              IoServiceConfig config) {
100         if (address == null)
101             throw new NullPointerException("address");
102         if (handler == null)
103             throw new NullPointerException("handler");
104 
105         if (!(address instanceof InetSocketAddress))
106             throw new IllegalArgumentException("Unexpected address type: "
107                     + address.getClass());
108 
109         if (localAddress != null
110                 && !(localAddress instanceof InetSocketAddress)) {
111             throw new IllegalArgumentException(
112                     "Unexpected local address type: " + localAddress.getClass());
113         }
114 
115         if (config == null) {
116             config = getDefaultConfig();
117         }
118 
119         DatagramChannel ch = null;
120         boolean initialized = false;
121         try {
122             ch = DatagramChannel.open();
123             DatagramSessionConfig cfg;
124             if (config.getSessionConfig() instanceof DatagramSessionConfig) {
125                 cfg = (DatagramSessionConfig) config.getSessionConfig();
126             } else {
127                 cfg = getDefaultConfig().getSessionConfig();
128             }
129 
130             ch.socket().setReuseAddress(cfg.isReuseAddress());
131             ch.socket().setBroadcast(cfg.isBroadcast());
132             ch.socket().setReceiveBufferSize(cfg.getReceiveBufferSize());
133             ch.socket().setSendBufferSize(cfg.getSendBufferSize());
134 
135             if (ch.socket().getTrafficClass() != cfg.getTrafficClass()) {
136                 ch.socket().setTrafficClass(cfg.getTrafficClass());
137             }
138 
139             if (localAddress != null) {
140                 ch.socket().bind(localAddress);
141             }
142             ch.connect(address);
143             ch.configureBlocking(false);
144             initialized = true;
145         } catch (IOException e) {
146             return DefaultConnectFuture.newFailedFuture(e);
147         } finally {
148             if (!initialized && ch != null) {
149                 try {
150                     ch.disconnect();
151                     ch.close();
152                 } catch (IOException e) {
153                     ExceptionMonitor.getInstance().exceptionCaught(e);
154                 }
155             }
156         }
157 
158         RegistrationRequest request = new RegistrationRequest(ch, handler,
159                 config);
160         try {
161             startupWorker();
162         } catch (IOException e) {
163             try {
164                 ch.disconnect();
165                 ch.close();
166             } catch (IOException e2) {
167                 ExceptionMonitor.getInstance().exceptionCaught(e2);
168             }
169 
170             return DefaultConnectFuture.newFailedFuture(e);
171         }
172 
173         registerQueue.add(request);
174 
175         selector.wakeup();
176         return request;
177     }
178 
179     public DatagramConnectorConfig getDefaultConfig() {
180         return defaultConfig;
181     }
182 
183     /**
184      * Sets the config this connector will use by default.
185      *
186      * @param defaultConfig the default config.
187      * @throws NullPointerException if the specified value is <code>null</code>.
188      */
189     public void setDefaultConfig(DatagramConnectorConfig defaultConfig) {
190         if (defaultConfig == null) {
191             throw new NullPointerException("defaultConfig");
192         }
193         this.defaultConfig = defaultConfig;
194     }
195 
196     private void startupWorker() throws IOException {
197         synchronized (lock) {
198             if (worker == null) {
199                 selector = Selector.open();
200                 worker = new Worker();
201                 executor.execute(new NamePreservingRunnable(worker));
202             }
203         }
204     }
205 
206     public void closeSession(DatagramSessionImpl session) {
207         try {
208             startupWorker();
209         } catch (IOException e) {
210             // IOException is thrown only when Worker thread is not
211             // running and failed to open a selector.  We simply return
212             // silently here because it we can simply conclude that
213             // this session is not managed by this connector or
214             // already closed.
215             return;
216         }
217 
218         cancelQueue.add(session);
219 
220         selector.wakeup();
221     }
222 
223     public void flushSession(DatagramSessionImpl session) {
224         scheduleFlush(session);
225         Selector selector = this.selector;
226         if (selector != null) {
227             selector.wakeup();
228         }
229     }
230 
231     private void scheduleFlush(DatagramSessionImpl session) {
232         flushingSessions.add(session);
233     }
234 
235     public void updateTrafficMask(DatagramSessionImpl session) {
236         scheduleTrafficControl(session);
237         Selector selector = this.selector;
238         if (selector != null) {
239             selector.wakeup();
240         }
241     }
242 
243     private void scheduleTrafficControl(DatagramSessionImpl session) {
244         trafficControllingSessions.add(session);
245     }
246 
247     private void doUpdateTrafficMask() {
248         if (trafficControllingSessions.isEmpty())
249             return;
250 
251         for (;;) {
252             DatagramSessionImpl session = trafficControllingSessions.poll();
253 
254             if (session == null)
255                 break;
256 
257             SelectionKey key = session.getSelectionKey();
258             // Retry later if session is not yet fully initialized.
259             // (In case that Session.suspend??() or session.resume??() is
260             // called before addSession() is processed)
261             if (key == null) {
262                 scheduleTrafficControl(session);
263                 break;
264             }
265             // skip if channel is already closed
266             if (!key.isValid()) {
267                 continue;
268             }
269 
270             // The normal is OP_READ and, if there are write requests in the
271             // session's write queue, set OP_WRITE to trigger flushing.
272             int ops = SelectionKey.OP_READ;
273             if (!session.getWriteRequestQueue().isEmpty()) {
274                 ops |= SelectionKey.OP_WRITE;
275             }
276 
277             // Now mask the preferred ops with the mask of the current session
278             int mask = session.getTrafficMask().getInterestOps();
279             key.interestOps(ops & mask);
280         }
281     }
282 
283     private class Worker implements Runnable {
284         public void run() {
285             Thread.currentThread().setName("DatagramConnector-" + id);
286 
287             for (;;) {
288                 try {
289                     int nKeys = selector.select();
290 
291                     registerNew();
292                     doUpdateTrafficMask();
293 
294                     if (nKeys > 0) {
295                         processReadySessions(selector.selectedKeys());
296                     }
297 
298                     flushSessions();
299                     cancelKeys();
300 
301                     if (selector.keys().isEmpty()) {
302                         synchronized (lock) {
303                             if (selector.keys().isEmpty()
304                                     && registerQueue.isEmpty()
305                                     && cancelQueue.isEmpty()) {
306                                 worker = null;
307                                 try {
308                                     selector.close();
309                                 } catch (IOException e) {
310                                     ExceptionMonitor.getInstance()
311                                             .exceptionCaught(e);
312                                 } finally {
313                                     selector = null;
314                                 }
315                                 break;
316                             }
317                         }
318                     }
319                 } catch (IOException e) {
320                     ExceptionMonitor.getInstance().exceptionCaught(e);
321 
322                     try {
323                         Thread.sleep(1000);
324                     } catch (InterruptedException e1) {
325                         ExceptionMonitor.getInstance().exceptionCaught(e1);
326                     }
327                 }
328             }
329         }
330     }
331 
332     private void processReadySessions(Set<SelectionKey> keys) {
333         Iterator<SelectionKey> it = keys.iterator();
334         while (it.hasNext()) {
335             SelectionKey key = it.next();
336             it.remove();
337 
338             DatagramSessionImpl session = (DatagramSessionImpl) key
339                     .attachment();
340 
341             // Let the recycler know that the session is still active. 
342             getSessionRecycler(session).recycle(session.getLocalAddress(),
343                     session.getRemoteAddress());
344 
345             if (key.isReadable() && session.getTrafficMask().isReadable()) {
346                 readSession(session);
347             }
348 
349             if (key.isWritable() && session.getTrafficMask().isWritable()) {
350                 scheduleFlush(session);
351             }
352         }
353     }
354 
355     private IoSessionRecycler getSessionRecycler(IoSession session) {
356         IoServiceConfig config = session.getServiceConfig();
357         IoSessionRecycler sessionRecycler;
358         if (config instanceof DatagramServiceConfig) {
359             sessionRecycler = ((DatagramServiceConfig) config)
360                     .getSessionRecycler();
361         } else {
362             sessionRecycler = defaultConfig.getSessionRecycler();
363         }
364         return sessionRecycler;
365     }
366 
367     private void readSession(DatagramSessionImpl session) {
368 
369         ByteBuffer readBuf = ByteBuffer.allocate(session.getReadBufferSize());
370         try {
371             int readBytes = session.getChannel().read(readBuf.buf());
372             if (readBytes > 0) {
373                 readBuf.flip();
374                 ByteBuffer newBuf = ByteBuffer.allocate(readBuf.limit());
375                 newBuf.put(readBuf);
376                 newBuf.flip();
377 
378                 session.increaseReadBytes(readBytes);
379                 session.getFilterChain().fireMessageReceived(session, newBuf);
380             }
381         } catch (IOException e) {
382             session.getFilterChain().fireExceptionCaught(session, e);
383         } finally {
384             readBuf.release();
385         }
386     }
387 
388     private void flushSessions() {
389         if (flushingSessions.size() == 0)
390             return;
391 
392         for (;;) {
393             DatagramSessionImpl session = flushingSessions.poll();
394 
395             if (session == null)
396                 break;
397 
398             try {
399                 flush(session);
400             } catch (IOException e) {
401                 session.getFilterChain().fireExceptionCaught(session, e);
402             }
403         }
404     }
405 
406     private void flush(DatagramSessionImpl session) throws IOException {
407         DatagramChannel ch = session.getChannel();
408 
409         Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
410 
411         for (;;) {
412             WriteRequest req = writeRequestQueue.peek();
413 
414             if (req == null)
415                 break;
416 
417             ByteBuffer buf = (ByteBuffer) req.getMessage();
418             if (buf.remaining() == 0) {
419                 // pop and fire event
420                 writeRequestQueue.poll();
421 
422                 session.increaseWrittenMessages();
423                 buf.reset();
424                 session.getFilterChain().fireMessageSent(session, req);
425                 continue;
426             }
427 
428             SelectionKey key = session.getSelectionKey();
429             if (key == null) {
430                 scheduleFlush(session);
431                 break;
432             }
433             if (!key.isValid()) {
434                 continue;
435             }
436 
437             int writtenBytes = ch.write(buf.buf());
438 
439             if (writtenBytes == 0) {
440                 // Kernel buffer is full
441                 key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
442             } else if (writtenBytes > 0) {
443                 key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
444 
445                 // pop and fire event
446                 writeRequestQueue.poll();
447 
448                 session.increaseWrittenBytes(writtenBytes);
449                 session.increaseWrittenMessages();
450                 buf.reset();
451                 session.getFilterChain().fireMessageSent(session, req);
452             }
453         }
454     }
455 
456     private void registerNew() {
457         if (registerQueue.isEmpty())
458             return;
459 
460         for (;;) {
461             RegistrationRequest req = registerQueue.poll();
462 
463             if (req == null)
464                 break;
465 
466             DatagramSessionImpl session = new DatagramSessionImpl(wrapper,
467                     this, req.config, req.channel, req.handler, req.channel
468                             .socket().getRemoteSocketAddress(), req.channel
469                             .socket().getLocalSocketAddress());
470 
471             // AbstractIoFilterChain will notify the connect future.
472             session.setAttribute(AbstractIoFilterChain.CONNECT_FUTURE, req);
473 
474             boolean success = false;
475             try {
476                 SelectionKey key = req.channel.register(selector,
477                         SelectionKey.OP_READ, session);
478 
479                 session.setSelectionKey(key);
480                 buildFilterChain(req, session);
481                 getSessionRecycler(session).put(session);
482 
483                 // The CONNECT_FUTURE attribute is cleared and notified here.
484                 getListeners().fireSessionCreated(session);
485                 success = true;
486             } catch (Throwable t) {
487                 // The CONNECT_FUTURE attribute is cleared and notified here.
488                 session.getFilterChain().fireExceptionCaught(session, t);
489             } finally {
490                 if (!success) {
491                     try {
492                         req.channel.disconnect();
493                         req.channel.close();
494                     } catch (IOException e) {
495                         ExceptionMonitor.getInstance().exceptionCaught(e);
496                     }
497                 }
498             }
499         }
500     }
501 
502     private void buildFilterChain(RegistrationRequest req, IoSession session)
503             throws Exception {
504         getFilterChainBuilder().buildFilterChain(session.getFilterChain());
505         req.config.getFilterChainBuilder().buildFilterChain(
506                 session.getFilterChain());
507         req.config.getThreadModel().buildFilterChain(session.getFilterChain());
508     }
509 
510     private void cancelKeys() {
511         if (cancelQueue.isEmpty())
512             return;
513 
514         for (;;) {
515             DatagramSessionImpl session = cancelQueue.poll();
516 
517             if (session == null)
518                 break;
519             else {
520                 SelectionKey key = session.getSelectionKey();
521                 DatagramChannel ch = (DatagramChannel) key.channel();
522                 try {
523                     ch.disconnect();
524                     ch.close();
525                 } catch (IOException e) {
526                     ExceptionMonitor.getInstance().exceptionCaught(e);
527                 }
528 
529                 getListeners().fireSessionDestroyed(session);
530                 session.getCloseFuture().setClosed();
531                 key.cancel();
532                 selector.wakeup(); // wake up again to trigger thread death
533             }
534         }
535     }
536 
537     private static class RegistrationRequest extends DefaultConnectFuture {
538         private final DatagramChannel channel;
539 
540         private final IoHandler handler;
541 
542         private final IoServiceConfig config;
543 
544         private RegistrationRequest(DatagramChannel channel, IoHandler handler,
545                 IoServiceConfig config) {
546             this.channel = channel;
547             this.handler = handler;
548             this.config = config;
549         }
550     }
551 }