1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.socket.nio.support;
21
22 import java.io.IOException;
23 import java.net.InetSocketAddress;
24 import java.net.SocketAddress;
25 import java.nio.channels.DatagramChannel;
26 import java.nio.channels.SelectionKey;
27 import java.nio.channels.Selector;
28 import java.util.ArrayList;
29 import java.util.HashMap;
30 import java.util.Iterator;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.Set;
34
35 import org.apache.mina.common.ByteBuffer;
36 import org.apache.mina.common.ExceptionMonitor;
37 import org.apache.mina.common.IoAcceptor;
38 import org.apache.mina.common.IoHandler;
39 import org.apache.mina.common.IoServiceConfig;
40 import org.apache.mina.common.IoSession;
41 import org.apache.mina.common.IoSessionRecycler;
42 import org.apache.mina.common.IoFilter.WriteRequest;
43 import org.apache.mina.common.support.BaseIoAcceptor;
44 import org.apache.mina.common.support.IoServiceListenerSupport;
45 import org.apache.mina.transport.socket.nio.DatagramAcceptorConfig;
46 import org.apache.mina.transport.socket.nio.DatagramServiceConfig;
47 import org.apache.mina.transport.socket.nio.DatagramSessionConfig;
48 import org.apache.mina.util.NamePreservingRunnable;
49 import org.apache.mina.util.Queue;
50
51 import edu.emory.mathcs.backport.java.util.concurrent.Executor;
52
53
54
55
56
57
58
59 public class DatagramAcceptorDelegate extends BaseIoAcceptor implements
60 IoAcceptor, DatagramService {
61 private static volatile int nextId = 0;
62
63 private final IoAcceptor wrapper;
64
65 private final Executor executor;
66
67 private final int id = nextId++;
68
69 private Selector selector;
70
71 private DatagramAcceptorConfig defaultConfig = new DatagramAcceptorConfig();
72
73 private final Map channels = new HashMap();
74
75 private final Queue registerQueue = new Queue();
76
77 private final Queue cancelQueue = new Queue();
78
79 private final Queue flushingSessions = new Queue();
80
81 private Worker worker;
82
83
84
85
86 public DatagramAcceptorDelegate(IoAcceptor wrapper, Executor executor) {
87 this.wrapper = wrapper;
88 this.executor = executor;
89
90
91 ((DatagramSessionConfig) defaultConfig.getSessionConfig())
92 .setReuseAddress(true);
93 }
94
95 public void bind(SocketAddress address, IoHandler handler,
96 IoServiceConfig config) throws IOException {
97 if (handler == null)
98 throw new NullPointerException("handler");
99 if (config == null) {
100 config = getDefaultConfig();
101 }
102
103 if (address != null && !(address instanceof InetSocketAddress))
104 throw new IllegalArgumentException("Unexpected address type: "
105 + address.getClass());
106
107 RegistrationRequest request = new RegistrationRequest(address, handler,
108 config);
109 synchronized (this) {
110 synchronized (registerQueue) {
111 registerQueue.push(request);
112 }
113 startupWorker();
114 }
115 selector.wakeup();
116
117 synchronized (request) {
118 while (!request.done) {
119 try {
120 request.wait();
121 } catch (InterruptedException e) {
122 }
123 }
124 }
125
126 if (request.exception != null) {
127 throw (IOException) new IOException("Failed to bind")
128 .initCause(request.exception);
129 }
130 }
131
132 public void unbind(SocketAddress address) {
133 if (address == null)
134 throw new NullPointerException("address");
135
136 CancellationRequest request = new CancellationRequest(address);
137 synchronized (this) {
138 try {
139 startupWorker();
140 } catch (IOException e) {
141
142
143
144
145 throw new IllegalArgumentException("Address not bound: "
146 + address);
147 }
148
149 synchronized (cancelQueue) {
150 cancelQueue.push(request);
151 }
152 }
153 selector.wakeup();
154
155 synchronized (request) {
156 while (!request.done) {
157 try {
158 request.wait();
159 } catch (InterruptedException e) {
160 }
161 }
162 }
163
164 if (request.exception != null) {
165 throw new RuntimeException("Failed to unbind", request.exception);
166 }
167 }
168
169 public void unbindAll() {
170 List addresses;
171 synchronized (channels) {
172 addresses = new ArrayList(channels.keySet());
173 }
174
175 for (Iterator i = addresses.iterator(); i.hasNext();) {
176 unbind((SocketAddress) i.next());
177 }
178 }
179
180 public IoSession newSession(SocketAddress remoteAddress,
181 SocketAddress localAddress) {
182 if (remoteAddress == null) {
183 throw new NullPointerException("remoteAddress");
184 }
185 if (localAddress == null) {
186 throw new NullPointerException("localAddress");
187 }
188
189 Selector selector = this.selector;
190 DatagramChannel ch = (DatagramChannel) channels.get(localAddress);
191 if (selector == null || ch == null) {
192 throw new IllegalArgumentException("Unknown localAddress: "
193 + localAddress);
194 }
195
196 SelectionKey key = ch.keyFor(selector);
197 if (key == null) {
198 throw new IllegalArgumentException("Unknown localAddress: "
199 + localAddress);
200 }
201
202 RegistrationRequest req = (RegistrationRequest) key.attachment();
203 IoSession session;
204 IoSessionRecycler sessionRecycler = getSessionRecycler(req);
205 synchronized (sessionRecycler) {
206 session = sessionRecycler.recycle(localAddress, remoteAddress);
207 if (session != null) {
208 return session;
209 }
210
211
212
213
214
215
216
217 DatagramSessionImpl datagramSession = new DatagramSessionImpl(
218 wrapper, this, req.config, ch, req.handler, req.address,
219 req.address);
220 datagramSession.setRemoteAddress(remoteAddress);
221 datagramSession.setSelectionKey(key);
222
223 getSessionRecycler(req).put(datagramSession);
224 session = datagramSession;
225 }
226
227 try {
228 buildFilterChain(req, session);
229 getListeners().fireSessionCreated(session);
230 } catch (Throwable t) {
231 ExceptionMonitor.getInstance().exceptionCaught(t);
232 }
233
234 return session;
235 }
236
237 private IoSessionRecycler getSessionRecycler(RegistrationRequest req) {
238 IoSessionRecycler sessionRecycler;
239 if (req.config instanceof DatagramServiceConfig) {
240 sessionRecycler = ((DatagramServiceConfig) req.config)
241 .getSessionRecycler();
242 } else {
243 sessionRecycler = defaultConfig.getSessionRecycler();
244 }
245 return sessionRecycler;
246 }
247
248 public IoServiceListenerSupport getListeners() {
249 return super.getListeners();
250 }
251
252 private void buildFilterChain(RegistrationRequest req, IoSession session)
253 throws Exception {
254 this.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
255 req.config.getFilterChainBuilder().buildFilterChain(
256 session.getFilterChain());
257 req.config.getThreadModel().buildFilterChain(session.getFilterChain());
258 }
259
260 public IoServiceConfig getDefaultConfig() {
261 return defaultConfig;
262 }
263
264
265
266
267
268
269
270 public void setDefaultConfig(DatagramAcceptorConfig defaultConfig) {
271 if (defaultConfig == null) {
272 throw new NullPointerException("defaultConfig");
273 }
274 this.defaultConfig = defaultConfig;
275 }
276
277 private synchronized void startupWorker() throws IOException {
278 if (worker == null) {
279 selector = Selector.open();
280 worker = new Worker();
281 executor.execute(new NamePreservingRunnable(worker));
282 }
283 }
284
285 public void flushSession(DatagramSessionImpl session) {
286 scheduleFlush(session);
287 Selector selector = this.selector;
288 if (selector != null) {
289 selector.wakeup();
290 }
291 }
292
293 public void closeSession(DatagramSessionImpl session) {
294 }
295
296 private void scheduleFlush(DatagramSessionImpl session) {
297 synchronized (flushingSessions) {
298 flushingSessions.push(session);
299 }
300 }
301
302 private class Worker implements Runnable {
303 public void run() {
304 Thread.currentThread().setName("DatagramAcceptor-" + id);
305
306 for (;;) {
307 try {
308 int nKeys = selector.select();
309
310 registerNew();
311
312 if (nKeys > 0) {
313 processReadySessions(selector.selectedKeys());
314 }
315
316 flushSessions();
317 cancelKeys();
318
319 if (selector.keys().isEmpty()) {
320 synchronized (DatagramAcceptorDelegate.this) {
321 if (selector.keys().isEmpty()
322 && registerQueue.isEmpty()
323 && cancelQueue.isEmpty()) {
324 worker = null;
325 try {
326 selector.close();
327 } catch (IOException e) {
328 ExceptionMonitor.getInstance()
329 .exceptionCaught(e);
330 } finally {
331 selector = null;
332 }
333 break;
334 }
335 }
336 }
337 } catch (IOException e) {
338 ExceptionMonitor.getInstance().exceptionCaught(e);
339
340 try {
341 Thread.sleep(1000);
342 } catch (InterruptedException e1) {
343 }
344 }
345 }
346 }
347 }
348
349 private void processReadySessions(Set keys) {
350 Iterator it = keys.iterator();
351 while (it.hasNext()) {
352 SelectionKey key = (SelectionKey) it.next();
353 it.remove();
354
355 DatagramChannel ch = (DatagramChannel) key.channel();
356
357 RegistrationRequest req = (RegistrationRequest) key.attachment();
358 try {
359 if (key.isReadable()) {
360 readSession(ch, req);
361 }
362
363 if (key.isWritable()) {
364 for (Iterator i = getManagedSessions(req.address)
365 .iterator(); i.hasNext();) {
366 scheduleFlush((DatagramSessionImpl) i.next());
367 }
368 }
369 } catch (Throwable t) {
370 ExceptionMonitor.getInstance().exceptionCaught(t);
371 }
372 }
373 }
374
375 private void readSession(DatagramChannel channel, RegistrationRequest req)
376 throws Exception {
377 ByteBuffer readBuf = ByteBuffer
378 .allocate(((DatagramSessionConfig) req.config
379 .getSessionConfig()).getReceiveBufferSize());
380 try {
381 SocketAddress remoteAddress = channel.receive(readBuf.buf());
382 if (remoteAddress != null) {
383 DatagramSessionImpl session = (DatagramSessionImpl) newSession(
384 remoteAddress, req.address);
385
386 readBuf.flip();
387
388 ByteBuffer newBuf = ByteBuffer.allocate(readBuf.limit());
389 newBuf.put(readBuf);
390 newBuf.flip();
391
392 session.increaseReadBytes(newBuf.remaining());
393 session.getFilterChain().fireMessageReceived(session, newBuf);
394 }
395 } finally {
396 readBuf.release();
397 }
398 }
399
400 private void flushSessions() {
401 if (flushingSessions.size() == 0)
402 return;
403
404 for (;;) {
405 DatagramSessionImpl session;
406
407 synchronized (flushingSessions) {
408 session = (DatagramSessionImpl) flushingSessions.pop();
409 }
410
411 if (session == null)
412 break;
413
414 try {
415 flush(session);
416 } catch (IOException e) {
417 session.getFilterChain().fireExceptionCaught(session, e);
418 }
419 }
420 }
421
422 private void flush(DatagramSessionImpl session) throws IOException {
423 DatagramChannel ch = session.getChannel();
424
425 Queue writeRequestQueue = session.getWriteRequestQueue();
426
427 WriteRequest req;
428 for (;;) {
429 synchronized (writeRequestQueue) {
430 req = (WriteRequest) writeRequestQueue.first();
431 }
432
433 if (req == null)
434 break;
435
436 ByteBuffer buf = (ByteBuffer) req.getMessage();
437 if (buf.remaining() == 0) {
438
439 synchronized (writeRequestQueue) {
440 writeRequestQueue.pop();
441 }
442
443 session.increaseWrittenMessages();
444 buf.reset();
445 ((DatagramFilterChain) session.getFilterChain())
446 .fireMessageSent(session, req);
447 continue;
448 }
449
450 SelectionKey key = session.getSelectionKey();
451 if (key == null) {
452 scheduleFlush(session);
453 break;
454 }
455 if (!key.isValid()) {
456 continue;
457 }
458
459 SocketAddress destination = req.getDestination();
460 if (destination == null) {
461 destination = session.getRemoteAddress();
462 }
463
464 int writtenBytes = ch.send(buf.buf(), destination);
465
466 if (writtenBytes == 0) {
467
468 key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
469 } else if (writtenBytes > 0) {
470 key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
471
472
473 synchronized (writeRequestQueue) {
474 writeRequestQueue.pop();
475 }
476
477 session.increaseWrittenBytes(writtenBytes);
478 session.increaseWrittenMessages();
479 buf.reset();
480 session.getFilterChain().fireMessageSent(session, req);
481 }
482 }
483 }
484
485 private void registerNew() {
486 if (registerQueue.isEmpty())
487 return;
488
489 for (;;) {
490 RegistrationRequest req;
491 synchronized (registerQueue) {
492 req = (RegistrationRequest) registerQueue.pop();
493 }
494
495 if (req == null)
496 break;
497
498 DatagramChannel ch = null;
499 try {
500 ch = DatagramChannel.open();
501 DatagramSessionConfig cfg;
502 if (req.config.getSessionConfig() instanceof DatagramSessionConfig) {
503 cfg = (DatagramSessionConfig) req.config.getSessionConfig();
504 } else {
505 cfg = (DatagramSessionConfig) getDefaultConfig()
506 .getSessionConfig();
507 }
508
509 ch.socket().setReuseAddress(cfg.isReuseAddress());
510 ch.socket().setBroadcast(cfg.isBroadcast());
511 ch.socket().setReceiveBufferSize(cfg.getReceiveBufferSize());
512 ch.socket().setSendBufferSize(cfg.getSendBufferSize());
513
514 if (ch.socket().getTrafficClass() != cfg.getTrafficClass()) {
515 ch.socket().setTrafficClass(cfg.getTrafficClass());
516 }
517
518 ch.configureBlocking(false);
519 ch.socket().bind(req.address);
520 if (req.address == null || req.address.getPort() == 0) {
521 req.address = (InetSocketAddress) ch.socket()
522 .getLocalSocketAddress();
523 }
524 ch.register(selector, SelectionKey.OP_READ, req);
525 synchronized (channels) {
526 channels.put(req.address, ch);
527 }
528
529 getListeners().fireServiceActivated(this, req.address,
530 req.handler, req.config);
531 } catch (Throwable t) {
532 req.exception = t;
533 } finally {
534 synchronized (req) {
535 req.done = true;
536 req.notify();
537 }
538
539 if (ch != null && req.exception != null) {
540 try {
541 ch.disconnect();
542 ch.close();
543 } catch (Throwable e) {
544 ExceptionMonitor.getInstance().exceptionCaught(e);
545 }
546 }
547 }
548 }
549 }
550
551 private void cancelKeys() {
552 if (cancelQueue.isEmpty())
553 return;
554
555 for (;;) {
556 CancellationRequest request;
557 synchronized (cancelQueue) {
558 request = (CancellationRequest) cancelQueue.pop();
559 }
560
561 if (request == null) {
562 break;
563 }
564
565 DatagramChannel ch;
566 synchronized (channels) {
567 ch = (DatagramChannel) channels.remove(request.address);
568 }
569
570
571 try {
572 if (ch == null) {
573 request.exception = new IllegalArgumentException(
574 "Address not bound: " + request.address);
575 } else {
576 SelectionKey key = ch.keyFor(selector);
577 request.registrationRequest = (RegistrationRequest) key
578 .attachment();
579 key.cancel();
580 selector.wakeup();
581 ch.disconnect();
582 ch.close();
583 }
584 } catch (Throwable t) {
585 ExceptionMonitor.getInstance().exceptionCaught(t);
586 } finally {
587 synchronized (request) {
588 request.done = true;
589 request.notify();
590 }
591
592 if (request.exception == null) {
593 getListeners().fireServiceDeactivated(this,
594 request.address,
595 request.registrationRequest.handler,
596 request.registrationRequest.config);
597 }
598 }
599 }
600 }
601
602 public void updateTrafficMask(DatagramSessionImpl session) {
603
604
605
606 }
607
608 private static class RegistrationRequest {
609 private InetSocketAddress address;
610
611 private final IoHandler handler;
612
613 private final IoServiceConfig config;
614
615 private Throwable exception;
616
617 private boolean done;
618
619 private RegistrationRequest(SocketAddress address, IoHandler handler,
620 IoServiceConfig config) {
621 this.address = (InetSocketAddress) address;
622 this.handler = handler;
623 this.config = config;
624 }
625 }
626
627 private static class CancellationRequest {
628 private final SocketAddress address;
629
630 private boolean done;
631
632 private RegistrationRequest registrationRequest;
633
634 private RuntimeException exception;
635
636 private CancellationRequest(SocketAddress address) {
637 this.address = address;
638 }
639 }
640 }