1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.socket.nio.support;
21
22 import java.io.IOException;
23 import java.net.InetSocketAddress;
24 import java.net.SocketAddress;
25 import java.nio.channels.DatagramChannel;
26 import java.nio.channels.SelectionKey;
27 import java.nio.channels.Selector;
28 import java.util.Iterator;
29 import java.util.Queue;
30 import java.util.Set;
31 import java.util.concurrent.ConcurrentLinkedQueue;
32 import java.util.concurrent.Executor;
33 import java.util.concurrent.atomic.AtomicInteger;
34
35 import org.apache.mina.common.ByteBuffer;
36 import org.apache.mina.common.ConnectFuture;
37 import org.apache.mina.common.ExceptionMonitor;
38 import org.apache.mina.common.IoConnector;
39 import org.apache.mina.common.IoHandler;
40 import org.apache.mina.common.IoServiceConfig;
41 import org.apache.mina.common.IoSession;
42 import org.apache.mina.common.IoSessionRecycler;
43 import org.apache.mina.common.IoFilter.WriteRequest;
44 import org.apache.mina.common.support.AbstractIoFilterChain;
45 import org.apache.mina.common.support.BaseIoConnector;
46 import org.apache.mina.common.support.DefaultConnectFuture;
47 import org.apache.mina.transport.socket.nio.DatagramConnectorConfig;
48 import org.apache.mina.transport.socket.nio.DatagramServiceConfig;
49 import org.apache.mina.transport.socket.nio.DatagramSessionConfig;
50 import org.apache.mina.util.NamePreservingRunnable;
51
52
53
54
55
56
57
58 public class DatagramConnectorDelegate extends BaseIoConnector implements
59 DatagramService {
60 private static final AtomicInteger nextId = new AtomicInteger();
61
62 private final Object lock = new Object();
63
64 private final IoConnector wrapper;
65
66 private final Executor executor;
67
68 private final int id = nextId.getAndIncrement();
69
70 private volatile Selector selector;
71
72 private DatagramConnectorConfig defaultConfig = new DatagramConnectorConfig();
73
74 private final Queue<RegistrationRequest> registerQueue = new ConcurrentLinkedQueue<RegistrationRequest>();
75
76 private final Queue<DatagramSessionImpl> cancelQueue = new ConcurrentLinkedQueue<DatagramSessionImpl>();
77
78 private final Queue<DatagramSessionImpl> flushingSessions = new ConcurrentLinkedQueue<DatagramSessionImpl>();
79
80 private final Queue<DatagramSessionImpl> trafficControllingSessions = new ConcurrentLinkedQueue<DatagramSessionImpl>();
81
82 private Worker worker;
83
84
85
86
87 public DatagramConnectorDelegate(IoConnector wrapper, Executor executor) {
88 this.wrapper = wrapper;
89 this.executor = executor;
90 }
91
92 public ConnectFuture connect(SocketAddress address, IoHandler handler,
93 IoServiceConfig config) {
94 return connect(address, null, handler, config);
95 }
96
97 public ConnectFuture connect(SocketAddress address,
98 SocketAddress localAddress, IoHandler handler,
99 IoServiceConfig config) {
100 if (address == null)
101 throw new NullPointerException("address");
102 if (handler == null)
103 throw new NullPointerException("handler");
104
105 if (!(address instanceof InetSocketAddress))
106 throw new IllegalArgumentException("Unexpected address type: "
107 + address.getClass());
108
109 if (localAddress != null
110 && !(localAddress instanceof InetSocketAddress)) {
111 throw new IllegalArgumentException(
112 "Unexpected local address type: " + localAddress.getClass());
113 }
114
115 if (config == null) {
116 config = getDefaultConfig();
117 }
118
119 DatagramChannel ch = null;
120 boolean initialized = false;
121 try {
122 ch = DatagramChannel.open();
123 DatagramSessionConfig cfg;
124 if (config.getSessionConfig() instanceof DatagramSessionConfig) {
125 cfg = (DatagramSessionConfig) config.getSessionConfig();
126 } else {
127 cfg = getDefaultConfig().getSessionConfig();
128 }
129
130 ch.socket().setReuseAddress(cfg.isReuseAddress());
131 ch.socket().setBroadcast(cfg.isBroadcast());
132 ch.socket().setReceiveBufferSize(cfg.getReceiveBufferSize());
133 ch.socket().setSendBufferSize(cfg.getSendBufferSize());
134
135 if (ch.socket().getTrafficClass() != cfg.getTrafficClass()) {
136 ch.socket().setTrafficClass(cfg.getTrafficClass());
137 }
138
139 if (localAddress != null) {
140 ch.socket().bind(localAddress);
141 }
142 ch.connect(address);
143 ch.configureBlocking(false);
144 initialized = true;
145 } catch (IOException e) {
146 return DefaultConnectFuture.newFailedFuture(e);
147 } finally {
148 if (!initialized && ch != null) {
149 try {
150 ch.disconnect();
151 ch.close();
152 } catch (IOException e) {
153 ExceptionMonitor.getInstance().exceptionCaught(e);
154 }
155 }
156 }
157
158 RegistrationRequest request = new RegistrationRequest(ch, handler,
159 config);
160 synchronized (lock) {
161 try {
162 startupWorker();
163 } catch (IOException e) {
164 try {
165 ch.disconnect();
166 ch.close();
167 } catch (IOException e2) {
168 ExceptionMonitor.getInstance().exceptionCaught(e2);
169 }
170
171 return DefaultConnectFuture.newFailedFuture(e);
172 }
173
174 registerQueue.add(request);
175
176 selector.wakeup();
177 }
178 return request;
179 }
180
181 public DatagramConnectorConfig getDefaultConfig() {
182 return defaultConfig;
183 }
184
185
186
187
188
189
190
191 public void setDefaultConfig(DatagramConnectorConfig defaultConfig) {
192 if (defaultConfig == null) {
193 throw new NullPointerException("defaultConfig");
194 }
195 this.defaultConfig = defaultConfig;
196 }
197
198 private void startupWorker() throws IOException {
199 synchronized (lock) {
200 if (worker == null) {
201 selector = Selector.open();
202 worker = new Worker();
203 executor.execute(new NamePreservingRunnable(worker));
204 }
205 }
206 }
207
208 public void closeSession(DatagramSessionImpl session) {
209 synchronized (lock) {
210 try {
211 startupWorker();
212 } catch (IOException e) {
213
214
215
216
217
218 return;
219 }
220
221 cancelQueue.add(session);
222
223 selector.wakeup();
224 }
225 }
226
227 public void flushSession(DatagramSessionImpl session) {
228 scheduleFlush(session);
229 Selector selector = this.selector;
230 if (selector != null) {
231 selector.wakeup();
232 }
233 }
234
235 private void scheduleFlush(DatagramSessionImpl session) {
236 flushingSessions.add(session);
237 }
238
239 public void updateTrafficMask(DatagramSessionImpl session) {
240 scheduleTrafficControl(session);
241 Selector selector = this.selector;
242 if (selector != null) {
243 selector.wakeup();
244 }
245 }
246
247 private void scheduleTrafficControl(DatagramSessionImpl session) {
248 trafficControllingSessions.add(session);
249 }
250
251 private void doUpdateTrafficMask() {
252 if (trafficControllingSessions.isEmpty())
253 return;
254
255 for (;;) {
256 DatagramSessionImpl session = trafficControllingSessions.poll();
257
258 if (session == null)
259 break;
260
261 SelectionKey key = session.getSelectionKey();
262
263
264
265 if (key == null) {
266 scheduleTrafficControl(session);
267 break;
268 }
269
270 if (!key.isValid()) {
271 continue;
272 }
273
274
275
276 int ops = SelectionKey.OP_READ;
277 if (!session.getWriteRequestQueue().isEmpty()) {
278 ops |= SelectionKey.OP_WRITE;
279 }
280
281
282 int mask = session.getTrafficMask().getInterestOps();
283 key.interestOps(ops & mask);
284 }
285 }
286
287 private class Worker implements Runnable {
288 public void run() {
289 Thread.currentThread().setName("DatagramConnector-" + id);
290
291 Selector selector = DatagramConnectorDelegate.this.selector;
292 for (;;) {
293 try {
294 int nKeys = selector.select();
295
296 registerNew();
297 doUpdateTrafficMask();
298
299 if (nKeys > 0) {
300 processReadySessions(selector.selectedKeys());
301 }
302
303 flushSessions();
304 cancelKeys();
305
306 if (selector.keys().isEmpty()) {
307 synchronized (lock) {
308 if (selector.keys().isEmpty()
309 && registerQueue.isEmpty()
310 && cancelQueue.isEmpty()) {
311 worker = null;
312 try {
313 selector.close();
314 } catch (IOException e) {
315 ExceptionMonitor.getInstance()
316 .exceptionCaught(e);
317 } finally {
318 DatagramConnectorDelegate.this.selector = null;
319 }
320 break;
321 }
322 }
323 }
324 } catch (IOException e) {
325 ExceptionMonitor.getInstance().exceptionCaught(e);
326
327 try {
328 Thread.sleep(1000);
329 } catch (InterruptedException e1) {
330 ExceptionMonitor.getInstance().exceptionCaught(e1);
331 }
332 }
333 }
334 }
335 }
336
337 private void processReadySessions(Set<SelectionKey> keys) {
338 Iterator<SelectionKey> it = keys.iterator();
339 while (it.hasNext()) {
340 SelectionKey key = it.next();
341 it.remove();
342
343 DatagramSessionImpl session = (DatagramSessionImpl) key
344 .attachment();
345
346
347 getSessionRecycler(session).recycle(session.getLocalAddress(),
348 session.getRemoteAddress());
349
350 if (key.isReadable() && session.getTrafficMask().isReadable()) {
351 readSession(session);
352 }
353
354 if (key.isWritable() && session.getTrafficMask().isWritable()) {
355 scheduleFlush(session);
356 }
357 }
358 }
359
360 private IoSessionRecycler getSessionRecycler(IoSession session) {
361 IoServiceConfig config = session.getServiceConfig();
362 IoSessionRecycler sessionRecycler;
363 if (config instanceof DatagramServiceConfig) {
364 sessionRecycler = ((DatagramServiceConfig) config)
365 .getSessionRecycler();
366 } else {
367 sessionRecycler = defaultConfig.getSessionRecycler();
368 }
369 return sessionRecycler;
370 }
371
372 private void readSession(DatagramSessionImpl session) {
373
374 ByteBuffer readBuf = ByteBuffer.allocate(session.getReadBufferSize());
375 try {
376 int readBytes = session.getChannel().read(readBuf.buf());
377 if (readBytes > 0) {
378 readBuf.flip();
379 ByteBuffer newBuf = ByteBuffer.allocate(readBuf.limit());
380 newBuf.put(readBuf);
381 newBuf.flip();
382
383 session.increaseReadBytes(readBytes);
384 session.getFilterChain().fireMessageReceived(session, newBuf);
385 }
386 } catch (IOException e) {
387 session.getFilterChain().fireExceptionCaught(session, e);
388 } finally {
389 readBuf.release();
390 }
391 }
392
393 private void flushSessions() {
394 if (flushingSessions.size() == 0)
395 return;
396
397 for (;;) {
398 DatagramSessionImpl session = flushingSessions.poll();
399
400 if (session == null)
401 break;
402
403 try {
404 flush(session);
405 } catch (IOException e) {
406 session.getFilterChain().fireExceptionCaught(session, e);
407 }
408 }
409 }
410
411 private void flush(DatagramSessionImpl session) throws IOException {
412 DatagramChannel ch = session.getChannel();
413
414 Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
415
416 for (;;) {
417 WriteRequest req = writeRequestQueue.peek();
418
419 if (req == null)
420 break;
421
422 ByteBuffer buf = (ByteBuffer) req.getMessage();
423 if (buf.remaining() == 0) {
424
425 writeRequestQueue.poll();
426
427 session.increaseWrittenMessages();
428 buf.reset();
429 session.getFilterChain().fireMessageSent(session, req);
430 continue;
431 }
432
433 SelectionKey key = session.getSelectionKey();
434 if (key == null) {
435 scheduleFlush(session);
436 break;
437 }
438 if (!key.isValid()) {
439 continue;
440 }
441
442 int writtenBytes = ch.write(buf.buf());
443
444 if (writtenBytes == 0) {
445
446 key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
447 } else if (writtenBytes > 0) {
448 key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
449
450
451 writeRequestQueue.poll();
452
453 session.increaseWrittenBytes(writtenBytes);
454 session.increaseWrittenMessages();
455 buf.reset();
456 session.getFilterChain().fireMessageSent(session, req);
457 }
458 }
459 }
460
461 private void registerNew() {
462 if (registerQueue.isEmpty())
463 return;
464
465 Selector selector = this.selector;
466 for (;;) {
467 RegistrationRequest req = registerQueue.poll();
468
469 if (req == null)
470 break;
471
472 DatagramSessionImpl session = new DatagramSessionImpl(wrapper,
473 this, req.config, req.channel, req.handler, req.channel
474 .socket().getRemoteSocketAddress(), req.channel
475 .socket().getLocalSocketAddress());
476
477
478 session.setAttribute(AbstractIoFilterChain.CONNECT_FUTURE, req);
479
480 boolean success = false;
481 try {
482 SelectionKey key = req.channel.register(selector,
483 SelectionKey.OP_READ, session);
484
485 session.setSelectionKey(key);
486 buildFilterChain(req, session);
487 getSessionRecycler(session).put(session);
488
489
490 getListeners().fireSessionCreated(session);
491 success = true;
492 } catch (Throwable t) {
493
494 session.getFilterChain().fireExceptionCaught(session, t);
495 } finally {
496 if (!success) {
497 try {
498 req.channel.disconnect();
499 req.channel.close();
500 } catch (IOException e) {
501 ExceptionMonitor.getInstance().exceptionCaught(e);
502 }
503 }
504 }
505 }
506 }
507
508 private void buildFilterChain(RegistrationRequest req, IoSession session)
509 throws Exception {
510 getFilterChainBuilder().buildFilterChain(session.getFilterChain());
511 req.config.getFilterChainBuilder().buildFilterChain(
512 session.getFilterChain());
513 req.config.getThreadModel().buildFilterChain(session.getFilterChain());
514 }
515
516 private void cancelKeys() {
517 if (cancelQueue.isEmpty())
518 return;
519
520 Selector selector = this.selector;
521 for (;;) {
522 DatagramSessionImpl session = cancelQueue.poll();
523
524 if (session == null)
525 break;
526 else {
527 SelectionKey key = session.getSelectionKey();
528 DatagramChannel ch = (DatagramChannel) key.channel();
529 try {
530 ch.disconnect();
531 ch.close();
532 } catch (IOException e) {
533 ExceptionMonitor.getInstance().exceptionCaught(e);
534 }
535
536 getListeners().fireSessionDestroyed(session);
537 session.getCloseFuture().setClosed();
538 key.cancel();
539 selector.wakeup();
540 }
541 }
542 }
543
544 private static class RegistrationRequest extends DefaultConnectFuture {
545 private final DatagramChannel channel;
546
547 private final IoHandler handler;
548
549 private final IoServiceConfig config;
550
551 private RegistrationRequest(DatagramChannel channel, IoHandler handler,
552 IoServiceConfig config) {
553 this.channel = channel;
554 this.handler = handler;
555 this.config = config;
556 }
557 }
558 }