1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.socket.nio.support;
21
22 import java.io.IOException;
23 import java.net.InetSocketAddress;
24 import java.net.SocketAddress;
25 import java.nio.channels.DatagramChannel;
26 import java.nio.channels.SelectionKey;
27 import java.nio.channels.Selector;
28 import java.util.Iterator;
29 import java.util.Queue;
30 import java.util.Set;
31 import java.util.concurrent.ConcurrentLinkedQueue;
32 import java.util.concurrent.Executor;
33 import java.util.concurrent.atomic.AtomicInteger;
34
35 import org.apache.mina.common.ByteBuffer;
36 import org.apache.mina.common.ConnectFuture;
37 import org.apache.mina.common.ExceptionMonitor;
38 import org.apache.mina.common.IoConnector;
39 import org.apache.mina.common.IoHandler;
40 import org.apache.mina.common.IoServiceConfig;
41 import org.apache.mina.common.IoSession;
42 import org.apache.mina.common.IoSessionRecycler;
43 import org.apache.mina.common.IoFilter.WriteRequest;
44 import org.apache.mina.common.support.AbstractIoFilterChain;
45 import org.apache.mina.common.support.BaseIoConnector;
46 import org.apache.mina.common.support.DefaultConnectFuture;
47 import org.apache.mina.transport.socket.nio.DatagramConnectorConfig;
48 import org.apache.mina.transport.socket.nio.DatagramServiceConfig;
49 import org.apache.mina.transport.socket.nio.DatagramSessionConfig;
50 import org.apache.mina.util.NamePreservingRunnable;
51
52
53
54
55
56
57
58 public class DatagramConnectorDelegate extends BaseIoConnector implements
59 DatagramService {
60 private static final AtomicInteger nextId = new AtomicInteger();
61
62 private final Object lock = new Object();
63
64 private final IoConnector wrapper;
65
66 private final Executor executor;
67
68 private final int id = nextId.getAndIncrement();
69
70 private Selector selector;
71
72 private DatagramConnectorConfig defaultConfig = new DatagramConnectorConfig();
73
74 private final Queue<RegistrationRequest> registerQueue = new ConcurrentLinkedQueue<RegistrationRequest>();
75
76 private final Queue<DatagramSessionImpl> cancelQueue = new ConcurrentLinkedQueue<DatagramSessionImpl>();
77
78 private final Queue<DatagramSessionImpl> flushingSessions = new ConcurrentLinkedQueue<DatagramSessionImpl>();
79
80 private final Queue<DatagramSessionImpl> trafficControllingSessions = new ConcurrentLinkedQueue<DatagramSessionImpl>();
81
82 private Worker worker;
83
84
85
86
87 public DatagramConnectorDelegate(IoConnector wrapper, Executor executor) {
88 this.wrapper = wrapper;
89 this.executor = executor;
90 }
91
92 public ConnectFuture connect(SocketAddress address, IoHandler handler,
93 IoServiceConfig config) {
94 return connect(address, null, handler, config);
95 }
96
97 public ConnectFuture connect(SocketAddress address,
98 SocketAddress localAddress, IoHandler handler,
99 IoServiceConfig config) {
100 if (address == null)
101 throw new NullPointerException("address");
102 if (handler == null)
103 throw new NullPointerException("handler");
104
105 if (!(address instanceof InetSocketAddress))
106 throw new IllegalArgumentException("Unexpected address type: "
107 + address.getClass());
108
109 if (localAddress != null
110 && !(localAddress instanceof InetSocketAddress)) {
111 throw new IllegalArgumentException(
112 "Unexpected local address type: " + localAddress.getClass());
113 }
114
115 if (config == null) {
116 config = getDefaultConfig();
117 }
118
119 DatagramChannel ch = null;
120 boolean initialized = false;
121 try {
122 ch = DatagramChannel.open();
123 DatagramSessionConfig cfg;
124 if (config.getSessionConfig() instanceof DatagramSessionConfig) {
125 cfg = (DatagramSessionConfig) config.getSessionConfig();
126 } else {
127 cfg = getDefaultConfig().getSessionConfig();
128 }
129
130 ch.socket().setReuseAddress(cfg.isReuseAddress());
131 ch.socket().setBroadcast(cfg.isBroadcast());
132 ch.socket().setReceiveBufferSize(cfg.getReceiveBufferSize());
133 ch.socket().setSendBufferSize(cfg.getSendBufferSize());
134
135 if (ch.socket().getTrafficClass() != cfg.getTrafficClass()) {
136 ch.socket().setTrafficClass(cfg.getTrafficClass());
137 }
138
139 if (localAddress != null) {
140 ch.socket().bind(localAddress);
141 }
142 ch.connect(address);
143 ch.configureBlocking(false);
144 initialized = true;
145 } catch (IOException e) {
146 return DefaultConnectFuture.newFailedFuture(e);
147 } finally {
148 if (!initialized && ch != null) {
149 try {
150 ch.disconnect();
151 ch.close();
152 } catch (IOException e) {
153 ExceptionMonitor.getInstance().exceptionCaught(e);
154 }
155 }
156 }
157
158 RegistrationRequest request = new RegistrationRequest(ch, handler,
159 config);
160 try {
161 startupWorker();
162 } catch (IOException e) {
163 try {
164 ch.disconnect();
165 ch.close();
166 } catch (IOException e2) {
167 ExceptionMonitor.getInstance().exceptionCaught(e2);
168 }
169
170 return DefaultConnectFuture.newFailedFuture(e);
171 }
172
173 registerQueue.add(request);
174
175 selector.wakeup();
176 return request;
177 }
178
179 public DatagramConnectorConfig getDefaultConfig() {
180 return defaultConfig;
181 }
182
183
184
185
186
187
188
189 public void setDefaultConfig(DatagramConnectorConfig defaultConfig) {
190 if (defaultConfig == null) {
191 throw new NullPointerException("defaultConfig");
192 }
193 this.defaultConfig = defaultConfig;
194 }
195
196 private void startupWorker() throws IOException {
197 synchronized (lock) {
198 if (worker == null) {
199 selector = Selector.open();
200 worker = new Worker();
201 executor.execute(new NamePreservingRunnable(worker));
202 }
203 }
204 }
205
206 public void closeSession(DatagramSessionImpl session) {
207 try {
208 startupWorker();
209 } catch (IOException e) {
210
211
212
213
214
215 return;
216 }
217
218 cancelQueue.add(session);
219
220 selector.wakeup();
221 }
222
223 public void flushSession(DatagramSessionImpl session) {
224 scheduleFlush(session);
225 Selector selector = this.selector;
226 if (selector != null) {
227 selector.wakeup();
228 }
229 }
230
231 private void scheduleFlush(DatagramSessionImpl session) {
232 flushingSessions.add(session);
233 }
234
235 public void updateTrafficMask(DatagramSessionImpl session) {
236 scheduleTrafficControl(session);
237 Selector selector = this.selector;
238 if (selector != null) {
239 selector.wakeup();
240 }
241 }
242
243 private void scheduleTrafficControl(DatagramSessionImpl session) {
244 trafficControllingSessions.add(session);
245 }
246
247 private void doUpdateTrafficMask() {
248 if (trafficControllingSessions.isEmpty())
249 return;
250
251 for (;;) {
252 DatagramSessionImpl session = trafficControllingSessions.poll();
253
254 if (session == null)
255 break;
256
257 SelectionKey key = session.getSelectionKey();
258
259
260
261 if (key == null) {
262 scheduleTrafficControl(session);
263 break;
264 }
265
266 if (!key.isValid()) {
267 continue;
268 }
269
270
271
272 int ops = SelectionKey.OP_READ;
273 if (!session.getWriteRequestQueue().isEmpty()) {
274 ops |= SelectionKey.OP_WRITE;
275 }
276
277
278 int mask = session.getTrafficMask().getInterestOps();
279 key.interestOps(ops & mask);
280 }
281 }
282
283 private class Worker implements Runnable {
284 public void run() {
285 Thread.currentThread().setName("DatagramConnector-" + id);
286
287 for (;;) {
288 try {
289 int nKeys = selector.select();
290
291 registerNew();
292 doUpdateTrafficMask();
293
294 if (nKeys > 0) {
295 processReadySessions(selector.selectedKeys());
296 }
297
298 flushSessions();
299 cancelKeys();
300
301 if (selector.keys().isEmpty()) {
302 synchronized (lock) {
303 if (selector.keys().isEmpty()
304 && registerQueue.isEmpty()
305 && cancelQueue.isEmpty()) {
306 worker = null;
307 try {
308 selector.close();
309 } catch (IOException e) {
310 ExceptionMonitor.getInstance()
311 .exceptionCaught(e);
312 } finally {
313 selector = null;
314 }
315 break;
316 }
317 }
318 }
319 } catch (IOException e) {
320 ExceptionMonitor.getInstance().exceptionCaught(e);
321
322 try {
323 Thread.sleep(1000);
324 } catch (InterruptedException e1) {
325 ExceptionMonitor.getInstance().exceptionCaught(e1);
326 }
327 }
328 }
329 }
330 }
331
332 private void processReadySessions(Set<SelectionKey> keys) {
333 Iterator<SelectionKey> it = keys.iterator();
334 while (it.hasNext()) {
335 SelectionKey key = it.next();
336 it.remove();
337
338 DatagramSessionImpl session = (DatagramSessionImpl) key
339 .attachment();
340
341
342 getSessionRecycler(session).recycle(session.getLocalAddress(),
343 session.getRemoteAddress());
344
345 if (key.isReadable() && session.getTrafficMask().isReadable()) {
346 readSession(session);
347 }
348
349 if (key.isWritable() && session.getTrafficMask().isWritable()) {
350 scheduleFlush(session);
351 }
352 }
353 }
354
355 private IoSessionRecycler getSessionRecycler(IoSession session) {
356 IoServiceConfig config = session.getServiceConfig();
357 IoSessionRecycler sessionRecycler;
358 if (config instanceof DatagramServiceConfig) {
359 sessionRecycler = ((DatagramServiceConfig) config)
360 .getSessionRecycler();
361 } else {
362 sessionRecycler = defaultConfig.getSessionRecycler();
363 }
364 return sessionRecycler;
365 }
366
367 private void readSession(DatagramSessionImpl session) {
368
369 ByteBuffer readBuf = ByteBuffer.allocate(session.getReadBufferSize());
370 try {
371 int readBytes = session.getChannel().read(readBuf.buf());
372 if (readBytes > 0) {
373 readBuf.flip();
374 ByteBuffer newBuf = ByteBuffer.allocate(readBuf.limit());
375 newBuf.put(readBuf);
376 newBuf.flip();
377
378 session.increaseReadBytes(readBytes);
379 session.getFilterChain().fireMessageReceived(session, newBuf);
380 }
381 } catch (IOException e) {
382 session.getFilterChain().fireExceptionCaught(session, e);
383 } finally {
384 readBuf.release();
385 }
386 }
387
388 private void flushSessions() {
389 if (flushingSessions.size() == 0)
390 return;
391
392 for (;;) {
393 DatagramSessionImpl session = flushingSessions.poll();
394
395 if (session == null)
396 break;
397
398 try {
399 flush(session);
400 } catch (IOException e) {
401 session.getFilterChain().fireExceptionCaught(session, e);
402 }
403 }
404 }
405
406 private void flush(DatagramSessionImpl session) throws IOException {
407 DatagramChannel ch = session.getChannel();
408
409 Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
410
411 for (;;) {
412 WriteRequest req = writeRequestQueue.peek();
413
414 if (req == null)
415 break;
416
417 ByteBuffer buf = (ByteBuffer) req.getMessage();
418 if (buf.remaining() == 0) {
419
420 writeRequestQueue.poll();
421
422 session.increaseWrittenMessages();
423 buf.reset();
424 session.getFilterChain().fireMessageSent(session, req);
425 continue;
426 }
427
428 SelectionKey key = session.getSelectionKey();
429 if (key == null) {
430 scheduleFlush(session);
431 break;
432 }
433 if (!key.isValid()) {
434 continue;
435 }
436
437 int writtenBytes = ch.write(buf.buf());
438
439 if (writtenBytes == 0) {
440
441 key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
442 } else if (writtenBytes > 0) {
443 key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
444
445
446 writeRequestQueue.poll();
447
448 session.increaseWrittenBytes(writtenBytes);
449 session.increaseWrittenMessages();
450 buf.reset();
451 session.getFilterChain().fireMessageSent(session, req);
452 }
453 }
454 }
455
456 private void registerNew() {
457 if (registerQueue.isEmpty())
458 return;
459
460 for (;;) {
461 RegistrationRequest req = registerQueue.poll();
462
463 if (req == null)
464 break;
465
466 DatagramSessionImpl session = new DatagramSessionImpl(wrapper,
467 this, req.config, req.channel, req.handler, req.channel
468 .socket().getRemoteSocketAddress(), req.channel
469 .socket().getLocalSocketAddress());
470
471
472 session.setAttribute(AbstractIoFilterChain.CONNECT_FUTURE, req);
473
474 boolean success = false;
475 try {
476 SelectionKey key = req.channel.register(selector,
477 SelectionKey.OP_READ, session);
478
479 session.setSelectionKey(key);
480 buildFilterChain(req, session);
481 getSessionRecycler(session).put(session);
482
483
484 getListeners().fireSessionCreated(session);
485 success = true;
486 } catch (Throwable t) {
487
488 session.getFilterChain().fireExceptionCaught(session, t);
489 } finally {
490 if (!success) {
491 try {
492 req.channel.disconnect();
493 req.channel.close();
494 } catch (IOException e) {
495 ExceptionMonitor.getInstance().exceptionCaught(e);
496 }
497 }
498 }
499 }
500 }
501
502 private void buildFilterChain(RegistrationRequest req, IoSession session)
503 throws Exception {
504 getFilterChainBuilder().buildFilterChain(session.getFilterChain());
505 req.config.getFilterChainBuilder().buildFilterChain(
506 session.getFilterChain());
507 req.config.getThreadModel().buildFilterChain(session.getFilterChain());
508 }
509
510 private void cancelKeys() {
511 if (cancelQueue.isEmpty())
512 return;
513
514 for (;;) {
515 DatagramSessionImpl session = cancelQueue.poll();
516
517 if (session == null)
518 break;
519 else {
520 SelectionKey key = session.getSelectionKey();
521 DatagramChannel ch = (DatagramChannel) key.channel();
522 try {
523 ch.disconnect();
524 ch.close();
525 } catch (IOException e) {
526 ExceptionMonitor.getInstance().exceptionCaught(e);
527 }
528
529 getListeners().fireSessionDestroyed(session);
530 session.getCloseFuture().setClosed();
531 key.cancel();
532 selector.wakeup();
533 }
534 }
535 }
536
537 private static class RegistrationRequest extends DefaultConnectFuture {
538 private final DatagramChannel channel;
539
540 private final IoHandler handler;
541
542 private final IoServiceConfig config;
543
544 private RegistrationRequest(DatagramChannel channel, IoHandler handler,
545 IoServiceConfig config) {
546 this.channel = channel;
547 this.handler = handler;
548 this.config = config;
549 }
550 }
551 }