1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.core.polling;
21
22 import java.net.Inet4Address;
23 import java.net.Inet6Address;
24 import java.net.InetAddress;
25 import java.net.InetSocketAddress;
26 import java.net.SocketAddress;
27 import java.util.Collections;
28 import java.util.HashMap;
29 import java.util.HashSet;
30 import java.util.Iterator;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.Queue;
34 import java.util.Set;
35 import java.util.concurrent.ConcurrentLinkedQueue;
36 import java.util.concurrent.Executor;
37
38 import org.apache.mina.core.RuntimeIoException;
39 import org.apache.mina.core.buffer.IoBuffer;
40 import org.apache.mina.core.service.AbstractIoAcceptor;
41 import org.apache.mina.core.service.IoAcceptor;
42 import org.apache.mina.core.service.IoProcessor;
43 import org.apache.mina.core.session.AbstractIoSession;
44 import org.apache.mina.core.session.ExpiringSessionRecycler;
45 import org.apache.mina.core.session.IoSession;
46 import org.apache.mina.core.session.IoSessionConfig;
47 import org.apache.mina.core.session.IoSessionRecycler;
48 import org.apache.mina.core.write.WriteRequest;
49 import org.apache.mina.core.write.WriteRequestQueue;
50 import org.apache.mina.util.ExceptionMonitor;
51
52
53
54
55
56
57
58
59
60 public abstract class AbstractPollingConnectionlessIoAcceptor<S extends AbstractIoSession, H>
61 extends AbstractIoAcceptor {
62
63 private static final IoSessionRecycler DEFAULT_RECYCLER = new ExpiringSessionRecycler();
64
65
66
67
68
69 private static final long SELECT_TIMEOUT = 1000L;
70
71 private final Object lock = new Object();
72
73 private final IoProcessor<S> processor = new ConnectionlessAcceptorProcessor();
74 private final Queue<AcceptorOperationFuture> registerQueue =
75 new ConcurrentLinkedQueue<AcceptorOperationFuture>();
76 private final Queue<AcceptorOperationFuture> cancelQueue =
77 new ConcurrentLinkedQueue<AcceptorOperationFuture>();
78
79 private final Queue<S> flushingSessions = new ConcurrentLinkedQueue<S>();
80 private final Map<String, H> boundHandles =
81 Collections.synchronizedMap(new HashMap<String, H>());
82
83 private IoSessionRecycler sessionRecycler = DEFAULT_RECYCLER;
84
85 private final ServiceOperationFuture disposalFuture =
86 new ServiceOperationFuture();
87 private volatile boolean selectable;
88
89
90 private Acceptor acceptor;
91
92 private long lastIdleCheckTime;
93
94 private String getAddressAsString(SocketAddress address) {
95 InetAddress inetAddress = ((InetSocketAddress)address).getAddress();
96 int port = ((InetSocketAddress)address).getPort();
97
98 String result = null;
99
100 if ( inetAddress instanceof Inet4Address ) {
101 result = "/" + inetAddress.getHostAddress() + ":" + port;
102 } else {
103
104 if ( ((Inet6Address)inetAddress).isIPv4CompatibleAddress() ) {
105 byte[] bytes = inetAddress.getAddress();
106
107 result = "/" + bytes[12] + "." + bytes[13] + "." + bytes[14] + "." + bytes[15] + ":" + port;
108 } else {
109 result = inetAddress.toString();
110 }
111 }
112
113 return result;
114 }
115
116
117
118
119 protected AbstractPollingConnectionlessIoAcceptor(IoSessionConfig sessionConfig) {
120 this(sessionConfig, null);
121 }
122
123
124
125
126 protected AbstractPollingConnectionlessIoAcceptor(IoSessionConfig sessionConfig, Executor executor) {
127 super(sessionConfig, executor);
128
129 try {
130 init();
131 selectable = true;
132 } catch (RuntimeException e) {
133 throw e;
134 } catch (Exception e) {
135 throw new RuntimeIoException("Failed to initialize.", e);
136 } finally {
137 if (!selectable) {
138 try {
139 destroy();
140 } catch (Exception e) {
141 ExceptionMonitor.getInstance().exceptionCaught(e);
142 }
143 }
144 }
145 }
146
147 protected abstract void init() throws Exception;
148 protected abstract void destroy() throws Exception;
149 protected abstract int select() throws Exception;
150 protected abstract int select(long timeout) throws Exception;
151 protected abstract void wakeup();
152 protected abstract Iterator<H> selectedHandles();
153 protected abstract H open(SocketAddress localAddress) throws Exception;
154 protected abstract void close(H handle) throws Exception;
155 protected abstract SocketAddress localAddress(H handle) throws Exception;
156 protected abstract boolean isReadable(H handle);
157 protected abstract boolean isWritable(H handle);
158 protected abstract SocketAddress receive(H handle, IoBuffer buffer) throws Exception;
159
160 protected abstract int send(S session, IoBuffer buffer, SocketAddress remoteAddress) throws Exception;
161
162 protected abstract S newSession(IoProcessor<S> processor, H handle, SocketAddress remoteAddress) throws Exception;
163
164 protected abstract void setInterestedInWrite(S session, boolean interested) throws Exception;
165
166
167
168
169 @Override
170 protected void dispose0() throws Exception {
171 unbind();
172 startupAcceptor();
173 wakeup();
174 }
175
176
177
178
179 @Override
180 protected final Set<SocketAddress> bindInternal(
181 List<? extends SocketAddress> localAddresses) throws Exception {
182
183
184 AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
185
186
187
188 registerQueue.add(request);
189
190
191
192 startupAcceptor();
193
194
195
196
197 wakeup();
198
199
200 request.awaitUninterruptibly();
201
202 if (request.getException() != null) {
203 throw request.getException();
204 }
205
206
207
208
209 Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();
210
211 for (H handle : boundHandles.values()) {
212 newLocalAddresses.add(localAddress(handle));
213 }
214
215 return newLocalAddresses;
216 }
217
218
219
220
221 @Override
222 protected final void unbind0(List<? extends SocketAddress> localAddresses) throws Exception {
223 AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
224
225 cancelQueue.add(request);
226 startupAcceptor();
227 wakeup();
228
229 request.awaitUninterruptibly();
230
231 if (request.getException() != null) {
232 throw request.getException();
233 }
234 }
235
236
237
238
239 public final IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) {
240 if (isDisposing()) {
241 throw new IllegalStateException("Already disposed.");
242 }
243
244 if (remoteAddress == null) {
245 throw new IllegalArgumentException("remoteAddress");
246 }
247
248 synchronized (bindLock) {
249 if (!isActive()) {
250 throw new IllegalStateException(
251 "Can't create a session from a unbound service.");
252 }
253
254 try {
255 return newSessionWithoutLock(remoteAddress, localAddress);
256 } catch (RuntimeException e) {
257 throw e;
258 } catch (Error e) {
259 throw e;
260 } catch (Exception e) {
261 throw new RuntimeIoException("Failed to create a session.", e);
262 }
263 }
264 }
265
266 private IoSession newSessionWithoutLock(
267 SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
268 H handle = boundHandles.get(getAddressAsString(localAddress));
269
270 if (handle == null) {
271 throw new IllegalArgumentException("Unknown local address: " + localAddress);
272 }
273
274 IoSession session;
275 IoSessionRecycler sessionRecycler = getSessionRecycler();
276
277 synchronized (sessionRecycler) {
278 session = sessionRecycler.recycle(localAddress, remoteAddress);
279
280 if (session != null) {
281 return session;
282 }
283
284
285 S newSession = newSession(processor, handle, remoteAddress);
286 getSessionRecycler().put(newSession);
287 session = newSession;
288 }
289
290 initSession(session, null, null);
291
292 try {
293 this.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
294 getListeners().fireSessionCreated(session);
295 } catch (Throwable t) {
296 ExceptionMonitor.getInstance().exceptionCaught(t);
297 }
298
299 return session;
300 }
301
302 public final IoSessionRecycler getSessionRecycler() {
303 return sessionRecycler;
304 }
305
306 public final void setSessionRecycler(IoSessionRecycler sessionRecycler) {
307 synchronized (bindLock) {
308 if (isActive()) {
309 throw new IllegalStateException(
310 "sessionRecycler can't be set while the acceptor is bound.");
311 }
312
313 if (sessionRecycler == null) {
314 sessionRecycler = DEFAULT_RECYCLER;
315 }
316
317 this.sessionRecycler = sessionRecycler;
318 }
319 }
320
321 private class ConnectionlessAcceptorProcessor implements IoProcessor<S> {
322
323 public void add(S session) {
324 }
325
326 public void flush(S session) {
327 if (scheduleFlush(session)) {
328 wakeup();
329 }
330 }
331
332 public void remove(S session) {
333 getSessionRecycler().remove(session);
334 getListeners().fireSessionDestroyed(session);
335 }
336
337 public void updateTrafficControl(S session) {
338 throw new UnsupportedOperationException();
339 }
340
341 public void dispose() {
342 }
343
344 public boolean isDisposed() {
345 return false;
346 }
347
348 public boolean isDisposing() {
349 return false;
350 }
351 }
352
353
354
355
356 private void startupAcceptor() {
357 if (!selectable) {
358 registerQueue.clear();
359 cancelQueue.clear();
360 flushingSessions.clear();
361 }
362
363 synchronized (lock) {
364 if (acceptor == null) {
365 acceptor = new Acceptor();
366 executeWorker(acceptor);
367 }
368 }
369 }
370
371 private boolean scheduleFlush(S session) {
372
373
374
375 if (session.setScheduledForFlush(true)) {
376 flushingSessions.add(session);
377 return true;
378 } else {
379 return false;
380 }
381 }
382
383
384
385
386
387
388 private class Acceptor implements Runnable {
389 public void run() {
390 int nHandles = 0;
391 lastIdleCheckTime = System.currentTimeMillis();
392
393 while (selectable) {
394 try {
395 int selected = select(SELECT_TIMEOUT);
396
397 nHandles += registerHandles();
398
399 if (selected > 0) {
400 processReadySessions(selectedHandles());
401 }
402
403 long currentTime = System.currentTimeMillis();
404 flushSessions(currentTime);
405 nHandles -= unregisterHandles();
406
407 notifyIdleSessions(currentTime);
408
409 if (nHandles == 0) {
410 synchronized (lock) {
411 if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
412 acceptor = null;
413 break;
414 }
415 }
416 }
417 } catch (Exception e) {
418 ExceptionMonitor.getInstance().exceptionCaught(e);
419
420 try {
421 Thread.sleep(1000);
422 } catch (InterruptedException e1) {
423 }
424 }
425 }
426
427 if (selectable && isDisposing()) {
428 selectable = false;
429 try {
430 destroy();
431 } catch (Exception e) {
432 ExceptionMonitor.getInstance().exceptionCaught(e);
433 } finally {
434 disposalFuture.setValue(true);
435 }
436 }
437 }
438 }
439
440 @SuppressWarnings("unchecked")
441 private void processReadySessions(Iterator<H> handles) {
442 while (handles.hasNext()) {
443 H h = handles.next();
444 handles.remove();
445
446 try {
447 if (isReadable(h)) {
448 readHandle(h);
449 }
450
451 if (isWritable(h)) {
452 for (IoSession session : getManagedSessions().values()) {
453 scheduleFlush((S) session);
454 }
455 }
456 } catch (Throwable t) {
457 ExceptionMonitor.getInstance().exceptionCaught(t);
458 }
459 }
460 }
461
462 private void readHandle(H handle) throws Exception {
463 IoBuffer readBuf = IoBuffer.allocate(
464 getSessionConfig().getReadBufferSize());
465
466 SocketAddress remoteAddress = receive(handle, readBuf);
467
468 if (remoteAddress != null) {
469 IoSession session = newSessionWithoutLock(
470 remoteAddress, localAddress(handle));
471
472 readBuf.flip();
473
474 IoBuffer newBuf = IoBuffer.allocate(readBuf.limit());
475 newBuf.put(readBuf);
476 newBuf.flip();
477
478 session.getFilterChain().fireMessageReceived(newBuf);
479 }
480 }
481
482 private void flushSessions(long currentTime) {
483 for (;;) {
484 S session = flushingSessions.poll();
485
486 if (session == null) {
487 break;
488 }
489
490
491
492 session.unscheduledForFlush();
493
494 try {
495 boolean flushedAll = flush(session, currentTime);
496 if (flushedAll && !session.getWriteRequestQueue().isEmpty(session) &&
497 !session.isScheduledForFlush()) {
498 scheduleFlush(session);
499 }
500 } catch (Exception e) {
501 session.getFilterChain().fireExceptionCaught(e);
502 }
503 }
504 }
505
506 private boolean flush(S session, long currentTime) throws Exception {
507
508 setInterestedInWrite(session, false);
509
510 final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
511 final int maxWrittenBytes =
512 session.getConfig().getMaxReadBufferSize() +
513 (session.getConfig().getMaxReadBufferSize() >>> 1);
514
515 int writtenBytes = 0;
516
517 try {
518 for (;;) {
519 WriteRequest req = session.getCurrentWriteRequest();
520
521 if (req == null) {
522 req = writeRequestQueue.poll(session);
523 if (req == null) {
524 break;
525 }
526 session.setCurrentWriteRequest(req);
527 }
528
529 IoBuffer buf = (IoBuffer) req.getMessage();
530
531 if (buf.remaining() == 0) {
532
533 session.setCurrentWriteRequest(null);
534 buf.reset();
535 session.getFilterChain().fireMessageSent(req);
536 continue;
537 }
538
539 SocketAddress destination = req.getDestination();
540
541 if (destination == null) {
542 destination = session.getRemoteAddress();
543 }
544
545 int localWrittenBytes = send(session, buf, destination);
546
547 if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
548
549 setInterestedInWrite(session, true);
550 return false;
551 } else {
552 setInterestedInWrite(session, false);
553
554
555 session.setCurrentWriteRequest(null);
556 writtenBytes += localWrittenBytes;
557 buf.reset();
558 session.getFilterChain().fireMessageSent(req);
559 }
560 }
561 } finally {
562 session.increaseWrittenBytes(writtenBytes, currentTime);
563 }
564
565 return true;
566 }
567
568 private int registerHandles() {
569 for (;;) {
570 AcceptorOperationFuture req = registerQueue.poll();
571
572 if (req == null) {
573 break;
574 }
575
576 Map<String, H> newHandles = new HashMap<String, H>();
577 List<SocketAddress> localAddresses = req.getLocalAddresses();
578
579 try {
580 for (SocketAddress socketAddress : localAddresses) {
581 H handle = open(socketAddress);
582 newHandles.put(getAddressAsString(localAddress(handle)), handle);
583 }
584
585 boundHandles.putAll(newHandles);
586
587 getListeners().fireServiceActivated();
588 req.setDone();
589
590 return newHandles.size();
591 } catch (Exception e) {
592 req.setException(e);
593 } finally {
594
595 if (req.getException() != null) {
596 for (H handle : newHandles.values()) {
597 try {
598 close(handle);
599 } catch (Exception e) {
600 ExceptionMonitor.getInstance().exceptionCaught(e);
601 }
602 }
603
604 wakeup();
605 }
606 }
607 }
608
609 return 0;
610 }
611
612 private int unregisterHandles() {
613 int nHandles = 0;
614
615 for (;;) {
616 AcceptorOperationFuture request = cancelQueue.poll();
617 if (request == null) {
618 break;
619 }
620
621
622 for (SocketAddress socketAddress : request.getLocalAddresses()) {
623 H handle = boundHandles.remove(getAddressAsString(socketAddress));
624
625 if (handle == null) {
626 continue;
627 }
628
629 try {
630 close(handle);
631 wakeup();
632 } catch (Throwable e) {
633 ExceptionMonitor.getInstance().exceptionCaught(e);
634 } finally {
635 nHandles++;
636 }
637 }
638
639 request.setDone();
640 }
641
642 return nHandles;
643 }
644
645 private void notifyIdleSessions(long currentTime) {
646
647 if (currentTime - lastIdleCheckTime >= 1000) {
648 lastIdleCheckTime = currentTime;
649 AbstractIoSession.notifyIdleness(
650 getListeners().getManagedSessions().values().iterator(),
651 currentTime);
652 }
653 }
654 }