1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.socket.nio.support;
21
22 import java.io.IOException;
23 import java.net.InetSocketAddress;
24 import java.net.SocketAddress;
25 import java.nio.channels.DatagramChannel;
26 import java.nio.channels.SelectionKey;
27 import java.nio.channels.Selector;
28 import java.util.ArrayList;
29 import java.util.HashMap;
30 import java.util.Iterator;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.Set;
34
35 import org.apache.mina.common.ByteBuffer;
36 import org.apache.mina.common.ExceptionMonitor;
37 import org.apache.mina.common.IoAcceptor;
38 import org.apache.mina.common.IoHandler;
39 import org.apache.mina.common.IoServiceConfig;
40 import org.apache.mina.common.IoSession;
41 import org.apache.mina.common.IoSessionRecycler;
42 import org.apache.mina.common.IoFilter.WriteRequest;
43 import org.apache.mina.common.support.BaseIoAcceptor;
44 import org.apache.mina.common.support.IoServiceListenerSupport;
45 import org.apache.mina.transport.socket.nio.DatagramAcceptorConfig;
46 import org.apache.mina.transport.socket.nio.DatagramServiceConfig;
47 import org.apache.mina.transport.socket.nio.DatagramSessionConfig;
48 import org.apache.mina.util.NamePreservingRunnable;
49 import org.apache.mina.util.Queue;
50
51 import edu.emory.mathcs.backport.java.util.concurrent.Executor;
52
53
54
55
56
57
58
59 public class DatagramAcceptorDelegate extends BaseIoAcceptor implements
60 IoAcceptor, DatagramService {
61 private static volatile int nextId = 0;
62
63 private final IoAcceptor wrapper;
64
65 private final Executor executor;
66
67 private final int id = nextId++;
68
69 private Selector selector;
70
71 private DatagramAcceptorConfig defaultConfig = new DatagramAcceptorConfig();
72
73 private final Map channels = new HashMap();
74
75 private final Queue registerQueue = new Queue();
76
77 private final Queue cancelQueue = new Queue();
78
79 private final Queue flushingSessions = new Queue();
80
81 private Worker worker;
82
83
84
85
86 public DatagramAcceptorDelegate(IoAcceptor wrapper, Executor executor) {
87 this.wrapper = wrapper;
88 this.executor = executor;
89
90
91 ((DatagramSessionConfig) defaultConfig.getSessionConfig())
92 .setReuseAddress(true);
93 }
94
95 public void bind(SocketAddress address, IoHandler handler,
96 IoServiceConfig config) throws IOException {
97 if (handler == null)
98 throw new NullPointerException("handler");
99 if (config == null) {
100 config = getDefaultConfig();
101 }
102
103 if (address != null && !(address instanceof InetSocketAddress))
104 throw new IllegalArgumentException("Unexpected address type: "
105 + address.getClass());
106
107 RegistrationRequest request = new RegistrationRequest(address, handler,
108 config);
109 synchronized (this) {
110 synchronized (registerQueue) {
111 registerQueue.push(request);
112 }
113 startupWorker();
114 selector.wakeup();
115 }
116
117 synchronized (request) {
118 while (!request.done) {
119 try {
120 request.wait();
121 } catch (InterruptedException e) {
122 }
123 }
124 }
125
126 if (request.exception != null) {
127 throw (IOException) new IOException("Failed to bind")
128 .initCause(request.exception);
129 }
130 }
131
132 public void unbind(SocketAddress address) {
133 if (address == null)
134 throw new NullPointerException("address");
135
136 CancellationRequest request = new CancellationRequest(address);
137 synchronized (this) {
138 try {
139 startupWorker();
140 } catch (IOException e) {
141
142
143
144
145 throw new IllegalArgumentException("Address not bound: "
146 + address);
147 }
148
149 synchronized (cancelQueue) {
150 cancelQueue.push(request);
151 }
152 selector.wakeup();
153 }
154
155 synchronized (request) {
156 while (!request.done) {
157 try {
158 request.wait();
159 } catch (InterruptedException e) {
160 }
161 }
162 }
163
164 if (request.exception != null) {
165 throw new RuntimeException("Failed to unbind", request.exception);
166 }
167 }
168
169 public void unbindAll() {
170 List addresses;
171 synchronized (channels) {
172 addresses = new ArrayList(channels.keySet());
173 }
174
175 for (Iterator i = addresses.iterator(); i.hasNext();) {
176 unbind((SocketAddress) i.next());
177 }
178 }
179
180 public IoSession newSession(SocketAddress remoteAddress,
181 SocketAddress localAddress) {
182 if (remoteAddress == null) {
183 throw new NullPointerException("remoteAddress");
184 }
185 if (localAddress == null) {
186 throw new NullPointerException("localAddress");
187 }
188
189 Selector selector = getSelector();
190 DatagramChannel ch = (DatagramChannel) channels.get(localAddress);
191 if (selector == null || ch == null) {
192 throw new IllegalArgumentException("Unknown localAddress: "
193 + localAddress);
194 }
195
196 SelectionKey key = ch.keyFor(selector);
197 if (key == null) {
198 throw new IllegalArgumentException("Unknown localAddress: "
199 + localAddress);
200 }
201
202 RegistrationRequest req = (RegistrationRequest) key.attachment();
203 IoSession session;
204 IoSessionRecycler sessionRecycler = getSessionRecycler(req);
205 synchronized (sessionRecycler) {
206 session = sessionRecycler.recycle(localAddress, remoteAddress);
207 if (session != null) {
208 return session;
209 }
210
211
212
213
214
215
216
217 DatagramSessionImpl datagramSession = new DatagramSessionImpl(
218 wrapper, this, req.config, ch, req.handler, req.address,
219 req.address);
220 datagramSession.setRemoteAddress(remoteAddress);
221 datagramSession.setSelectionKey(key);
222
223 getSessionRecycler(req).put(datagramSession);
224 session = datagramSession;
225 }
226
227 try {
228 buildFilterChain(req, session);
229 getListeners().fireSessionCreated(session);
230 } catch (Throwable t) {
231 ExceptionMonitor.getInstance().exceptionCaught(t);
232 }
233
234 return session;
235 }
236
237 private IoSessionRecycler getSessionRecycler(RegistrationRequest req) {
238 IoSessionRecycler sessionRecycler;
239 if (req.config instanceof DatagramServiceConfig) {
240 sessionRecycler = ((DatagramServiceConfig) req.config)
241 .getSessionRecycler();
242 } else {
243 sessionRecycler = defaultConfig.getSessionRecycler();
244 }
245 return sessionRecycler;
246 }
247
248 public IoServiceListenerSupport getListeners() {
249 return super.getListeners();
250 }
251
252 private void buildFilterChain(RegistrationRequest req, IoSession session)
253 throws Exception {
254 this.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
255 req.config.getFilterChainBuilder().buildFilterChain(
256 session.getFilterChain());
257 req.config.getThreadModel().buildFilterChain(session.getFilterChain());
258 }
259
260 public IoServiceConfig getDefaultConfig() {
261 return defaultConfig;
262 }
263
264
265
266
267
268
269
270 public void setDefaultConfig(DatagramAcceptorConfig defaultConfig) {
271 if (defaultConfig == null) {
272 throw new NullPointerException("defaultConfig");
273 }
274 this.defaultConfig = defaultConfig;
275 }
276
277 private synchronized Selector getSelector() {
278 return this.selector;
279 }
280
281 private synchronized void startupWorker() throws IOException {
282 if (worker == null) {
283 selector = Selector.open();
284 worker = new Worker();
285 executor.execute(new NamePreservingRunnable(worker));
286 }
287 }
288
289 public void flushSession(DatagramSessionImpl session) {
290 if (scheduleFlush(session)) {
291 Selector selector = getSelector();
292 if (selector != null) {
293 selector.wakeup();
294 }
295 }
296 }
297
298 public void closeSession(DatagramSessionImpl session) {
299 }
300
301 private boolean scheduleFlush(DatagramSessionImpl session) {
302 if (session.setScheduledForFlush(true)) {
303 synchronized (flushingSessions) {
304 flushingSessions.push(session);
305 }
306 return true;
307 } else {
308 return false;
309 }
310 }
311
312 private class Worker implements Runnable {
313 public void run() {
314 Thread.currentThread().setName("DatagramAcceptor-" + id);
315
316 Selector selector = DatagramAcceptorDelegate.this.getSelector();
317 for (;;) {
318 try {
319 int nKeys = selector.select();
320
321 registerNew();
322
323 if (nKeys > 0) {
324 processReadySessions(selector.selectedKeys());
325 }
326
327 flushSessions();
328 cancelKeys();
329
330 if (selector.keys().isEmpty()) {
331 synchronized (DatagramAcceptorDelegate.this) {
332 if (selector.keys().isEmpty()
333 && registerQueue.isEmpty()
334 && cancelQueue.isEmpty()) {
335 worker = null;
336 try {
337 selector.close();
338 } catch (IOException e) {
339 ExceptionMonitor.getInstance()
340 .exceptionCaught(e);
341 } finally {
342 DatagramAcceptorDelegate.this.selector = null;
343 }
344 break;
345 }
346 }
347 }
348 } catch (IOException e) {
349 ExceptionMonitor.getInstance().exceptionCaught(e);
350
351 try {
352 Thread.sleep(1000);
353 } catch (InterruptedException e1) {
354 }
355 }
356 }
357 }
358 }
359
360 private void processReadySessions(Set keys) {
361 Iterator it = keys.iterator();
362 while (it.hasNext()) {
363 SelectionKey key = (SelectionKey) it.next();
364 it.remove();
365
366 DatagramChannel ch = (DatagramChannel) key.channel();
367
368 RegistrationRequest req = (RegistrationRequest) key.attachment();
369 try {
370 if (key.isReadable()) {
371 readSession(ch, req);
372 }
373
374 if (key.isWritable()) {
375 for (Iterator i = getManagedSessions(req.address)
376 .iterator(); i.hasNext();) {
377 scheduleFlush((DatagramSessionImpl) i.next());
378 }
379 }
380 } catch (Throwable t) {
381 ExceptionMonitor.getInstance().exceptionCaught(t);
382 }
383 }
384 }
385
386 private void readSession(DatagramChannel channel, RegistrationRequest req)
387 throws Exception {
388 ByteBuffer readBuf = ByteBuffer
389 .allocate(((DatagramSessionConfig) req.config
390 .getSessionConfig()).getReceiveBufferSize());
391 try {
392 SocketAddress remoteAddress = channel.receive(readBuf.buf());
393 if (remoteAddress != null) {
394 DatagramSessionImpl session = (DatagramSessionImpl) newSession(
395 remoteAddress, req.address);
396
397 readBuf.flip();
398
399 ByteBuffer newBuf = ByteBuffer.allocate(readBuf.limit());
400 newBuf.put(readBuf);
401 newBuf.flip();
402
403 session.increaseReadBytes(newBuf.remaining());
404 session.getFilterChain().fireMessageReceived(session, newBuf);
405 }
406 } finally {
407 readBuf.release();
408 }
409 }
410
411 private void flushSessions() {
412 if (flushingSessions.size() == 0)
413 return;
414
415 for (;;) {
416 DatagramSessionImpl session;
417
418 synchronized (flushingSessions) {
419 session = (DatagramSessionImpl) flushingSessions.pop();
420 }
421
422 if (session == null)
423 break;
424
425 session.setScheduledForFlush(false);
426
427 try {
428 boolean flushedAll = flush(session);
429 if (flushedAll && !session.getWriteRequestQueue().isEmpty() && !session.isScheduledForFlush()) {
430 scheduleFlush(session);
431 }
432 } catch (IOException e) {
433 session.getFilterChain().fireExceptionCaught(session, e);
434 }
435 }
436 }
437
438 private boolean flush(DatagramSessionImpl session) throws IOException {
439
440 SelectionKey key = session.getSelectionKey();
441 if (key == null) {
442 scheduleFlush(session);
443 return false;
444 }
445 if (!key.isValid()) {
446 return false;
447 }
448 key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
449
450 DatagramChannel ch = session.getChannel();
451 Queue writeRequestQueue = session.getWriteRequestQueue();
452
453 int writtenBytes = 0;
454 int maxWrittenBytes = ((DatagramSessionConfig) session.getConfig()).getSendBufferSize() << 1;
455 try {
456 for (;;) {
457 WriteRequest req;
458 synchronized (writeRequestQueue) {
459 req = (WriteRequest) writeRequestQueue.first();
460 }
461
462 if (req == null)
463 break;
464
465 ByteBuffer buf = (ByteBuffer) req.getMessage();
466 if (buf.remaining() == 0) {
467
468 synchronized (writeRequestQueue) {
469 writeRequestQueue.pop();
470 }
471
472 session.increaseWrittenMessages();
473 buf.reset();
474 ((DatagramFilterChain) session.getFilterChain())
475 .fireMessageSent(session, req);
476 continue;
477 }
478
479 SocketAddress destination = req.getDestination();
480 if (destination == null) {
481 destination = session.getRemoteAddress();
482 }
483
484 int localWrittenBytes = ch.send(buf.buf(), destination);
485 writtenBytes += localWrittenBytes;
486
487 if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
488
489 key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
490 return false;
491 } else {
492
493 synchronized (writeRequestQueue) {
494 writeRequestQueue.pop();
495 }
496
497 session.increaseWrittenMessages();
498 buf.reset();
499 session.getFilterChain().fireMessageSent(session, req);
500 }
501 }
502 } finally {
503 session.increaseWrittenBytes(writtenBytes);
504 }
505
506 return true;
507 }
508
509 private void registerNew() {
510 if (registerQueue.isEmpty())
511 return;
512
513 Selector selector = getSelector();
514 for (;;) {
515 RegistrationRequest req;
516 synchronized (registerQueue) {
517 req = (RegistrationRequest) registerQueue.pop();
518 }
519
520 if (req == null)
521 break;
522
523 DatagramChannel ch = null;
524 try {
525 ch = DatagramChannel.open();
526 DatagramSessionConfig cfg;
527 if (req.config.getSessionConfig() instanceof DatagramSessionConfig) {
528 cfg = (DatagramSessionConfig) req.config.getSessionConfig();
529 } else {
530 cfg = (DatagramSessionConfig) getDefaultConfig()
531 .getSessionConfig();
532 }
533
534 ch.socket().setReuseAddress(cfg.isReuseAddress());
535 ch.socket().setBroadcast(cfg.isBroadcast());
536 ch.socket().setReceiveBufferSize(cfg.getReceiveBufferSize());
537 ch.socket().setSendBufferSize(cfg.getSendBufferSize());
538
539 if (ch.socket().getTrafficClass() != cfg.getTrafficClass()) {
540 ch.socket().setTrafficClass(cfg.getTrafficClass());
541 }
542
543 ch.configureBlocking(false);
544 ch.socket().bind(req.address);
545 if (req.address == null || req.address.getPort() == 0) {
546 req.address = (InetSocketAddress) ch.socket()
547 .getLocalSocketAddress();
548 }
549 ch.register(selector, SelectionKey.OP_READ, req);
550 synchronized (channels) {
551 channels.put(req.address, ch);
552 }
553
554 getListeners().fireServiceActivated(this, req.address,
555 req.handler, req.config);
556 } catch (Throwable t) {
557 req.exception = t;
558 } finally {
559 synchronized (req) {
560 req.done = true;
561 req.notify();
562 }
563
564 if (ch != null && req.exception != null) {
565 try {
566 ch.disconnect();
567 ch.close();
568 } catch (Throwable e) {
569 ExceptionMonitor.getInstance().exceptionCaught(e);
570 }
571 }
572 }
573 }
574 }
575
576 private void cancelKeys() {
577 if (cancelQueue.isEmpty())
578 return;
579
580 Selector selector = getSelector();
581 for (;;) {
582 CancellationRequest request;
583 synchronized (cancelQueue) {
584 request = (CancellationRequest) cancelQueue.pop();
585 }
586
587 if (request == null) {
588 break;
589 }
590
591 DatagramChannel ch;
592 synchronized (channels) {
593 ch = (DatagramChannel) channels.remove(request.address);
594 }
595
596
597 try {
598 if (ch == null) {
599 request.exception = new IllegalArgumentException(
600 "Address not bound: " + request.address);
601 } else {
602 SelectionKey key = ch.keyFor(selector);
603 request.registrationRequest = (RegistrationRequest) key
604 .attachment();
605 key.cancel();
606 selector.wakeup();
607 ch.disconnect();
608 ch.close();
609 }
610 } catch (Throwable t) {
611 ExceptionMonitor.getInstance().exceptionCaught(t);
612 } finally {
613 synchronized (request) {
614 request.done = true;
615 request.notify();
616 }
617
618 if (request.exception == null) {
619 getListeners().fireServiceDeactivated(this,
620 request.address,
621 request.registrationRequest.handler,
622 request.registrationRequest.config);
623 }
624 }
625 }
626 }
627
628 public void updateTrafficMask(DatagramSessionImpl session) {
629
630
631
632 }
633
634 private static class RegistrationRequest {
635 private InetSocketAddress address;
636
637 private final IoHandler handler;
638
639 private final IoServiceConfig config;
640
641 private Throwable exception;
642
643 private boolean done;
644
645 private RegistrationRequest(SocketAddress address, IoHandler handler,
646 IoServiceConfig config) {
647 this.address = (InetSocketAddress) address;
648 this.handler = handler;
649 this.config = config;
650 }
651 }
652
653 private static class CancellationRequest {
654 private final SocketAddress address;
655
656 private boolean done;
657
658 private RegistrationRequest registrationRequest;
659
660 private RuntimeException exception;
661
662 private CancellationRequest(SocketAddress address) {
663 this.address = address;
664 }
665 }
666 }