1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.socket.nio.support;
21
22 import java.io.IOException;
23 import java.net.InetSocketAddress;
24 import java.net.SocketAddress;
25 import java.nio.channels.DatagramChannel;
26 import java.nio.channels.SelectionKey;
27 import java.nio.channels.Selector;
28 import java.util.ArrayList;
29 import java.util.Iterator;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.Queue;
33 import java.util.Set;
34 import java.util.concurrent.ConcurrentHashMap;
35 import java.util.concurrent.ConcurrentLinkedQueue;
36 import java.util.concurrent.Executor;
37 import java.util.concurrent.atomic.AtomicInteger;
38
39 import org.apache.mina.common.ByteBuffer;
40 import org.apache.mina.common.ExceptionMonitor;
41 import org.apache.mina.common.IoAcceptor;
42 import org.apache.mina.common.IoFilter.WriteRequest;
43 import org.apache.mina.common.IoHandler;
44 import org.apache.mina.common.IoServiceConfig;
45 import org.apache.mina.common.IoSession;
46 import org.apache.mina.common.IoSessionRecycler;
47 import org.apache.mina.common.RuntimeIOException;
48 import org.apache.mina.common.support.BaseIoAcceptor;
49 import org.apache.mina.common.support.IoServiceListenerSupport;
50 import org.apache.mina.transport.socket.nio.DatagramAcceptorConfig;
51 import org.apache.mina.transport.socket.nio.DatagramServiceConfig;
52 import org.apache.mina.transport.socket.nio.DatagramSessionConfig;
53 import org.apache.mina.util.NamePreservingRunnable;
54
55
56
57
58
59
60
61 public class DatagramAcceptorDelegate extends BaseIoAcceptor implements
62 IoAcceptor, DatagramService {
63 private static final AtomicInteger nextId = new AtomicInteger();
64
65 private final Object lock = new Object();
66
67 private final IoAcceptor wrapper;
68
69 private final Executor executor;
70
71 private final int id = nextId.getAndIncrement();
72
73 private volatile Selector selector;
74
75 private DatagramAcceptorConfig defaultConfig = new DatagramAcceptorConfig();
76
77 private final Map<SocketAddress, DatagramChannel> channels = new ConcurrentHashMap<SocketAddress, DatagramChannel>();
78
79 private final Queue<RegistrationRequest> registerQueue = new ConcurrentLinkedQueue<RegistrationRequest>();
80
81 private final Queue<CancellationRequest> cancelQueue = new ConcurrentLinkedQueue<CancellationRequest>();
82
83 private final Queue<DatagramSessionImpl> flushingSessions = new ConcurrentLinkedQueue<DatagramSessionImpl>();
84
85 private Worker worker;
86
87
88
89
90 public DatagramAcceptorDelegate(IoAcceptor wrapper, Executor executor) {
91 this.wrapper = wrapper;
92 this.executor = executor;
93
94
95 defaultConfig.getSessionConfig().setReuseAddress(true);
96 }
97
98 public void bind(SocketAddress address, IoHandler handler,
99 IoServiceConfig config) throws IOException {
100 if (handler == null)
101 throw new NullPointerException("handler");
102 if (config == null) {
103 config = getDefaultConfig();
104 }
105
106 if (address != null && !(address instanceof InetSocketAddress))
107 throw new IllegalArgumentException("Unexpected address type: "
108 + address.getClass());
109
110 RegistrationRequest request = new RegistrationRequest(address, handler,
111 config);
112
113 synchronized (lock) {
114 startupWorker();
115 registerQueue.add(request);
116 selector.wakeup();
117 }
118
119 synchronized (request) {
120 while (!request.done) {
121 try {
122 request.wait();
123 } catch (InterruptedException e) {
124 throw new RuntimeIOException(e);
125 }
126 }
127 }
128
129 if (request.exception != null) {
130 throw (IOException) new IOException("Failed to bind")
131 .initCause(request.exception);
132 }
133 }
134
135 public void unbind(SocketAddress address) {
136 if (address == null)
137 throw new NullPointerException("address");
138
139 CancellationRequest request = new CancellationRequest(address);
140 synchronized (lock) {
141 try {
142 startupWorker();
143 } catch (IOException e) {
144
145
146
147
148 throw new IllegalArgumentException("Address not bound: " + address);
149 }
150
151 cancelQueue.add(request);
152 selector.wakeup();
153 }
154
155 synchronized (request) {
156 while (!request.done) {
157 try {
158 request.wait();
159 } catch (InterruptedException e) {
160 throw new RuntimeIOException(e);
161 }
162 }
163 }
164
165 if (request.exception != null) {
166 throw new RuntimeException("Failed to unbind", request.exception);
167 }
168 }
169
170 public void unbindAll() {
171 List<SocketAddress> addresses = new ArrayList<SocketAddress>(channels
172 .keySet());
173
174 for (SocketAddress address : addresses) {
175 unbind(address);
176 }
177 }
178
179 @Override
180 public IoSession newSession(SocketAddress remoteAddress,
181 SocketAddress localAddress) {
182 if (remoteAddress == null) {
183 throw new NullPointerException("remoteAddress");
184 }
185 if (localAddress == null) {
186 throw new NullPointerException("localAddress");
187 }
188
189 Selector selector = this.selector;
190 DatagramChannel ch = channels.get(localAddress);
191 if (selector == null || ch == null) {
192 throw new IllegalArgumentException("Unknown localAddress: "
193 + localAddress);
194 }
195
196 SelectionKey key = ch.keyFor(selector);
197 if (key == null) {
198 throw new IllegalArgumentException("Unknown localAddress: "
199 + localAddress);
200 }
201
202 RegistrationRequest req = (RegistrationRequest) key.attachment();
203 IoSession session;
204 IoSessionRecycler sessionRecycler = getSessionRecycler(req);
205 synchronized (sessionRecycler) {
206 session = sessionRecycler.recycle(localAddress, remoteAddress);
207 if (session != null) {
208 return session;
209 }
210
211
212
213
214
215
216
217 DatagramSessionImpl datagramSession = new DatagramSessionImpl(
218 wrapper, this, req.config, ch, req.handler, req.address,
219 req.address);
220 datagramSession.setRemoteAddress(remoteAddress);
221 datagramSession.setSelectionKey(key);
222
223 getSessionRecycler(req).put(datagramSession);
224 session = datagramSession;
225 }
226
227 try {
228 buildFilterChain(req, session);
229 getListeners().fireSessionCreated(session);
230 } catch (Throwable t) {
231 ExceptionMonitor.getInstance().exceptionCaught(t);
232 }
233
234 return session;
235 }
236
237 private IoSessionRecycler getSessionRecycler(RegistrationRequest req) {
238 IoSessionRecycler sessionRecycler;
239 if (req.config instanceof DatagramServiceConfig) {
240 sessionRecycler = ((DatagramServiceConfig) req.config)
241 .getSessionRecycler();
242 } else {
243 sessionRecycler = defaultConfig.getSessionRecycler();
244 }
245 return sessionRecycler;
246 }
247
248 @Override
249 public IoServiceListenerSupport getListeners() {
250 return super.getListeners();
251 }
252
253 private void buildFilterChain(RegistrationRequest req, IoSession session)
254 throws Exception {
255 this.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
256 req.config.getFilterChainBuilder().buildFilterChain(
257 session.getFilterChain());
258 req.config.getThreadModel().buildFilterChain(session.getFilterChain());
259 }
260
261 public DatagramAcceptorConfig getDefaultConfig() {
262 return defaultConfig;
263 }
264
265
266
267
268
269
270
271 public void setDefaultConfig(DatagramAcceptorConfig defaultConfig) {
272 if (defaultConfig == null) {
273 throw new NullPointerException("defaultConfig");
274 }
275 this.defaultConfig = defaultConfig;
276 }
277
278 private void startupWorker() throws IOException {
279 synchronized (lock) {
280 if (worker == null) {
281 selector = Selector.open();
282 worker = new Worker();
283 executor.execute(new NamePreservingRunnable(worker));
284 }
285 }
286 }
287
288 public void flushSession(DatagramSessionImpl session) {
289 scheduleFlush(session);
290 Selector selector = this.selector;
291 if (selector != null) {
292 selector.wakeup();
293 }
294 }
295
296 public void closeSession(DatagramSessionImpl session) {
297 }
298
299 private void scheduleFlush(DatagramSessionImpl session) {
300 flushingSessions.add(session);
301 }
302
303 private class Worker implements Runnable {
304 public void run() {
305 Thread.currentThread().setName("DatagramAcceptor-" + id);
306
307 Selector selector = DatagramAcceptorDelegate.this.selector;
308 for (;;) {
309 try {
310 int nKeys = selector.select();
311
312 registerNew();
313
314 if (nKeys > 0) {
315 processReadySessions(selector.selectedKeys());
316 }
317
318 flushSessions();
319 cancelKeys();
320
321 if (selector.keys().isEmpty()) {
322 synchronized (lock) {
323 if (selector.keys().isEmpty()
324 && registerQueue.isEmpty()
325 && cancelQueue.isEmpty()) {
326 worker = null;
327 try {
328 selector.close();
329 } catch (IOException e) {
330 ExceptionMonitor.getInstance()
331 .exceptionCaught(e);
332 } finally {
333 DatagramAcceptorDelegate.this.selector = null;
334 }
335 break;
336 }
337 }
338 }
339 } catch (IOException e) {
340 ExceptionMonitor.getInstance().exceptionCaught(e);
341
342 try {
343 Thread.sleep(1000);
344 } catch (InterruptedException e1) {
345 ExceptionMonitor.getInstance().exceptionCaught(e1);
346 }
347 }
348 }
349 }
350 }
351
352 private void processReadySessions(Set<SelectionKey> keys) {
353 Iterator<SelectionKey> it = keys.iterator();
354 while (it.hasNext()) {
355 SelectionKey key = it.next();
356 it.remove();
357
358 DatagramChannel ch = (DatagramChannel) key.channel();
359
360 RegistrationRequest req = (RegistrationRequest) key.attachment();
361 try {
362 if (key.isReadable()) {
363 readSession(ch, req);
364 }
365
366 if (key.isWritable()) {
367 for (Object o : getManagedSessions(req.address)) {
368 scheduleFlush((DatagramSessionImpl) o);
369 }
370 }
371 } catch (Throwable t) {
372 ExceptionMonitor.getInstance().exceptionCaught(t);
373 }
374 }
375 }
376
377 private void readSession(DatagramChannel channel, RegistrationRequest req)
378 throws Exception {
379 ByteBuffer readBuf = ByteBuffer
380 .allocate(((DatagramSessionConfig) req.config
381 .getSessionConfig()).getReceiveBufferSize());
382 try {
383 SocketAddress remoteAddress = channel.receive(readBuf.buf());
384 if (remoteAddress != null) {
385 DatagramSessionImpl session = (DatagramSessionImpl) newSession(
386 remoteAddress, req.address);
387
388 readBuf.flip();
389
390 ByteBuffer newBuf = ByteBuffer.allocate(readBuf.limit());
391 newBuf.put(readBuf);
392 newBuf.flip();
393
394 session.increaseReadBytes(newBuf.remaining());
395 session.getFilterChain().fireMessageReceived(session, newBuf);
396 }
397 } finally {
398 readBuf.release();
399 }
400 }
401
402 private void flushSessions() {
403 if (flushingSessions.size() == 0)
404 return;
405
406 for (;;) {
407 DatagramSessionImpl session = flushingSessions.poll();
408
409 if (session == null)
410 break;
411
412 try {
413 flush(session);
414 } catch (IOException e) {
415 session.getFilterChain().fireExceptionCaught(session, e);
416 }
417 }
418 }
419
420 private void flush(DatagramSessionImpl session) throws IOException {
421 DatagramChannel ch = session.getChannel();
422
423 Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
424
425 for (;;) {
426 WriteRequest req = writeRequestQueue.peek();
427
428 if (req == null)
429 break;
430
431 ByteBuffer buf = (ByteBuffer) req.getMessage();
432 if (buf.remaining() == 0) {
433
434 writeRequestQueue.poll();
435
436 session.increaseWrittenMessages();
437 buf.reset();
438 session.getFilterChain().fireMessageSent(session, req);
439 continue;
440 }
441
442 SelectionKey key = session.getSelectionKey();
443 if (key == null) {
444 scheduleFlush(session);
445 break;
446 }
447 if (!key.isValid()) {
448 continue;
449 }
450
451 SocketAddress destination = req.getDestination();
452 if (destination == null) {
453 destination = session.getRemoteAddress();
454 }
455
456 int writtenBytes = ch.send(buf.buf(), destination);
457
458 if (writtenBytes == 0) {
459
460 key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
461 } else if (writtenBytes > 0) {
462 key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
463
464
465 writeRequestQueue.poll();
466
467 session.increaseWrittenBytes(writtenBytes);
468 session.increaseWrittenMessages();
469 buf.reset();
470 session.getFilterChain().fireMessageSent(session, req);
471 }
472 }
473 }
474
475 private void registerNew() {
476 if (registerQueue.isEmpty())
477 return;
478
479 Selector selector = this.selector;
480 for (;;) {
481 RegistrationRequest req = registerQueue.poll();
482
483 if (req == null)
484 break;
485
486 DatagramChannel ch = null;
487 try {
488 ch = DatagramChannel.open();
489 DatagramSessionConfig cfg;
490 if (req.config.getSessionConfig() instanceof DatagramSessionConfig) {
491 cfg = (DatagramSessionConfig) req.config.getSessionConfig();
492 } else {
493 cfg = getDefaultConfig().getSessionConfig();
494 }
495
496 ch.socket().setReuseAddress(cfg.isReuseAddress());
497 ch.socket().setBroadcast(cfg.isBroadcast());
498 ch.socket().setReceiveBufferSize(cfg.getReceiveBufferSize());
499 ch.socket().setSendBufferSize(cfg.getSendBufferSize());
500
501 if (ch.socket().getTrafficClass() != cfg.getTrafficClass()) {
502 ch.socket().setTrafficClass(cfg.getTrafficClass());
503 }
504
505 ch.configureBlocking(false);
506 ch.socket().bind(req.address);
507 if (req.address == null || req.address.getPort() == 0) {
508 req.address = (InetSocketAddress) ch.socket()
509 .getLocalSocketAddress();
510 }
511 ch.register(selector, SelectionKey.OP_READ, req);
512 channels.put(req.address, ch);
513
514 getListeners().fireServiceActivated(this, req.address,
515 req.handler, req.config);
516 } catch (Throwable t) {
517 req.exception = t;
518 } finally {
519 synchronized (req) {
520 req.done = true;
521 req.notify();
522 }
523
524 if (ch != null && req.exception != null) {
525 try {
526 ch.disconnect();
527 ch.close();
528 } catch (Throwable e) {
529 ExceptionMonitor.getInstance().exceptionCaught(e);
530 }
531 }
532 }
533 }
534 }
535
536 private void cancelKeys() {
537 if (cancelQueue.isEmpty())
538 return;
539
540 Selector selector = this.selector;
541 for (;;) {
542 CancellationRequest request = cancelQueue.poll();
543
544 if (request == null) {
545 break;
546 }
547
548 DatagramChannel ch = channels.remove(request.address);
549
550
551 try {
552 if (ch == null) {
553 request.exception = new IllegalArgumentException(
554 "Address not bound: " + request.address);
555 } else {
556 SelectionKey key = ch.keyFor(selector);
557 request.registrationRequest = (RegistrationRequest) key
558 .attachment();
559 key.cancel();
560 selector.wakeup();
561 ch.disconnect();
562 ch.close();
563 }
564 } catch (Throwable t) {
565 ExceptionMonitor.getInstance().exceptionCaught(t);
566 } finally {
567 synchronized (request) {
568 request.done = true;
569 request.notify();
570 }
571
572 if (request.exception == null) {
573 getListeners().fireServiceDeactivated(this,
574 request.address,
575 request.registrationRequest.handler,
576 request.registrationRequest.config);
577 }
578 }
579 }
580 }
581
582 public void updateTrafficMask(DatagramSessionImpl session) {
583
584
585
586 }
587
588 private static class RegistrationRequest {
589 private InetSocketAddress address;
590
591 private final IoHandler handler;
592
593 private final IoServiceConfig config;
594
595 private Throwable exception;
596
597 private boolean done;
598
599 private RegistrationRequest(SocketAddress address, IoHandler handler,
600 IoServiceConfig config) {
601 this.address = (InetSocketAddress) address;
602 this.handler = handler;
603 this.config = config;
604 }
605 }
606
607 private static class CancellationRequest {
608 private final SocketAddress address;
609
610 private boolean done;
611
612 private RegistrationRequest registrationRequest;
613
614 private RuntimeException exception;
615
616 private CancellationRequest(SocketAddress address) {
617 this.address = address;
618 }
619 }
620 }