1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.core.polling;
21
22 import java.net.SocketAddress;
23 import java.nio.channels.ClosedSelectorException;
24 import java.nio.channels.SelectionKey;
25 import java.util.Collections;
26 import java.util.HashMap;
27 import java.util.HashSet;
28 import java.util.Iterator;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Queue;
32 import java.util.Set;
33 import java.util.concurrent.ConcurrentLinkedQueue;
34 import java.util.concurrent.Executor;
35 import java.util.concurrent.Semaphore;
36
37 import org.apache.mina.core.RuntimeIoException;
38 import org.apache.mina.core.buffer.IoBuffer;
39 import org.apache.mina.core.service.AbstractIoAcceptor;
40 import org.apache.mina.core.service.IoAcceptor;
41 import org.apache.mina.core.service.IoProcessor;
42 import org.apache.mina.core.session.AbstractIoSession;
43 import org.apache.mina.core.session.ExpiringSessionRecycler;
44 import org.apache.mina.core.session.IoSession;
45 import org.apache.mina.core.session.IoSessionConfig;
46 import org.apache.mina.core.session.IoSessionRecycler;
47 import org.apache.mina.core.write.WriteRequest;
48 import org.apache.mina.core.write.WriteRequestQueue;
49 import org.apache.mina.util.ExceptionMonitor;
50
51
52
53
54
55
56
57
58
59 public abstract class AbstractPollingConnectionlessIoAcceptor<S extends AbstractIoSession, H> extends
60 AbstractIoAcceptor implements IoProcessor<S> {
61
62 private static final IoSessionRecycler DEFAULT_RECYCLER = new ExpiringSessionRecycler();
63
64
65
66
67
68 private static final long SELECT_TIMEOUT = 1000L;
69
70
71 private final Semaphore lock = new Semaphore(1);
72
73 private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
74
75 private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
76
77 private final Queue<S> flushingSessions = new ConcurrentLinkedQueue<S>();
78
79 private final Map<SocketAddress, H> boundHandles = Collections.synchronizedMap(new HashMap<SocketAddress, H>());
80
81 private IoSessionRecycler sessionRecycler = DEFAULT_RECYCLER;
82
83 private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture();
84
85 private volatile boolean selectable;
86
87
88 private Acceptor acceptor;
89
90 private long lastIdleCheckTime;
91
92
93
94
95 protected AbstractPollingConnectionlessIoAcceptor(IoSessionConfig sessionConfig) {
96 this(sessionConfig, null);
97 }
98
99
100
101
102 protected AbstractPollingConnectionlessIoAcceptor(IoSessionConfig sessionConfig, Executor executor) {
103 super(sessionConfig, executor);
104
105 try {
106 init();
107 selectable = true;
108 } catch (RuntimeException e) {
109 throw e;
110 } catch (Exception e) {
111 throw new RuntimeIoException("Failed to initialize.", e);
112 } finally {
113 if (!selectable) {
114 try {
115 destroy();
116 } catch (Exception e) {
117 ExceptionMonitor.getInstance().exceptionCaught(e);
118 }
119 }
120 }
121 }
122
123 protected abstract void init() throws Exception;
124
125 protected abstract void destroy() throws Exception;
126
127 protected abstract int select() throws Exception;
128
129 protected abstract int select(long timeout) throws Exception;
130
131 protected abstract void wakeup();
132
133 protected abstract Set<SelectionKey> selectedHandles();
134
135 protected abstract H open(SocketAddress localAddress) throws Exception;
136
137 protected abstract void close(H handle) throws Exception;
138
139 protected abstract SocketAddress localAddress(H handle) throws Exception;
140
141 protected abstract boolean isReadable(H handle);
142
143 protected abstract boolean isWritable(H handle);
144
145 protected abstract SocketAddress receive(H handle, IoBuffer buffer) throws Exception;
146
147 protected abstract int send(S session, IoBuffer buffer, SocketAddress remoteAddress) throws Exception;
148
149 protected abstract S newSession(IoProcessor<S> processor, H handle, SocketAddress remoteAddress) throws Exception;
150
151 protected abstract void setInterestedInWrite(S session, boolean interested) throws Exception;
152
153
154
155
156 @Override
157 protected void dispose0() throws Exception {
158 unbind();
159 startupAcceptor();
160 wakeup();
161 }
162
163
164
165
166 @Override
167 protected final Set<SocketAddress> bindInternal(List<? extends SocketAddress> localAddresses) throws Exception {
168
169
170 AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
171
172
173
174 registerQueue.add(request);
175
176
177
178 startupAcceptor();
179
180
181
182
183 try {
184 lock.acquire();
185
186
187 Thread.sleep(10);
188 wakeup();
189 } finally {
190 lock.release();
191 }
192
193
194 request.awaitUninterruptibly();
195
196 if (request.getException() != null) {
197 throw request.getException();
198 }
199
200
201
202
203 Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();
204
205 for (H handle : boundHandles.values()) {
206 newLocalAddresses.add(localAddress(handle));
207 }
208
209 return newLocalAddresses;
210 }
211
212
213
214
215 @Override
216 protected final void unbind0(List<? extends SocketAddress> localAddresses) throws Exception {
217 AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
218
219 cancelQueue.add(request);
220 startupAcceptor();
221 wakeup();
222
223 request.awaitUninterruptibly();
224
225 if (request.getException() != null) {
226 throw request.getException();
227 }
228 }
229
230
231
232
233 public final IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) {
234 if (isDisposing()) {
235 throw new IllegalStateException("Already disposed.");
236 }
237
238 if (remoteAddress == null) {
239 throw new IllegalArgumentException("remoteAddress");
240 }
241
242 synchronized (bindLock) {
243 if (!isActive()) {
244 throw new IllegalStateException("Can't create a session from a unbound service.");
245 }
246
247 try {
248 return newSessionWithoutLock(remoteAddress, localAddress);
249 } catch (RuntimeException e) {
250 throw e;
251 } catch (Error e) {
252 throw e;
253 } catch (Exception e) {
254 throw new RuntimeIoException("Failed to create a session.", e);
255 }
256 }
257 }
258
259 private IoSession newSessionWithoutLock(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
260 H handle = boundHandles.get(localAddress);
261
262 if (handle == null) {
263 throw new IllegalArgumentException("Unknown local address: " + localAddress);
264 }
265
266 IoSession session;
267
268 synchronized (sessionRecycler) {
269 session = sessionRecycler.recycle(remoteAddress);
270
271 if (session != null) {
272 return session;
273 }
274
275
276 S newSession = newSession(this, handle, remoteAddress);
277 getSessionRecycler().put(newSession);
278 session = newSession;
279 }
280
281 initSession(session, null, null);
282
283 try {
284 this.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
285 getListeners().fireSessionCreated(session);
286 } catch (Throwable t) {
287 ExceptionMonitor.getInstance().exceptionCaught(t);
288 }
289
290 return session;
291 }
292
293 public final IoSessionRecycler getSessionRecycler() {
294 return sessionRecycler;
295 }
296
297 public final void setSessionRecycler(IoSessionRecycler sessionRecycler) {
298 synchronized (bindLock) {
299 if (isActive()) {
300 throw new IllegalStateException("sessionRecycler can't be set while the acceptor is bound.");
301 }
302
303 if (sessionRecycler == null) {
304 sessionRecycler = DEFAULT_RECYCLER;
305 }
306
307 this.sessionRecycler = sessionRecycler;
308 }
309 }
310
311
312
313
314 public void add(S session) {
315
316 }
317
318
319
320
321 public void flush(S session) {
322 if (scheduleFlush(session)) {
323 wakeup();
324 }
325 }
326
327
328
329
330 public void write(S session, WriteRequest writeRequest) {
331
332 long currentTime = System.currentTimeMillis();
333 final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
334 final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()
335 + (session.getConfig().getMaxReadBufferSize() >>> 1);
336
337 int writtenBytes = 0;
338
339 try {
340 for (;;) {
341 if (writeRequest == null) {
342 writeRequest = writeRequestQueue.poll(session);
343
344 if (writeRequest == null) {
345 setInterestedInWrite(session, false);
346 break;
347 }
348
349 session.setCurrentWriteRequest(writeRequest);
350 }
351
352 IoBuffer buf = (IoBuffer) writeRequest.getMessage();
353
354 if (buf.remaining() == 0) {
355
356 session.setCurrentWriteRequest(null);
357 buf.reset();
358 session.getFilterChain().fireMessageSent(writeRequest);
359 continue;
360 }
361
362 SocketAddress destination = writeRequest.getDestination();
363
364 if (destination == null) {
365 destination = session.getRemoteAddress();
366 }
367
368 int localWrittenBytes = send(session, buf, destination);
369
370 if ((localWrittenBytes == 0) || (writtenBytes >= maxWrittenBytes)) {
371
372 setInterestedInWrite(session, true);
373
374 session.getWriteRequestQueue().offer(session, writeRequest);
375 scheduleFlush(session);
376 } else {
377 setInterestedInWrite(session, false);
378
379
380 session.setCurrentWriteRequest(null);
381 writtenBytes += localWrittenBytes;
382 buf.reset();
383 session.getFilterChain().fireMessageSent(writeRequest);
384
385 break;
386 }
387 }
388 } catch (Exception e) {
389 session.getFilterChain().fireExceptionCaught(e);
390 } finally {
391 session.increaseWrittenBytes(writtenBytes, currentTime);
392 }
393 }
394
395
396
397
398 public void remove(S session) {
399 getSessionRecycler().remove(session);
400 getListeners().fireSessionDestroyed(session);
401 }
402
403
404
405
406 public void updateTrafficControl(S session) {
407 throw new UnsupportedOperationException();
408 }
409
410
411
412
413 private void startupAcceptor() throws InterruptedException {
414 if (!selectable) {
415 registerQueue.clear();
416 cancelQueue.clear();
417 flushingSessions.clear();
418 }
419
420 lock.acquire();
421
422 if (acceptor == null) {
423 acceptor = new Acceptor();
424 executeWorker(acceptor);
425 } else {
426 lock.release();
427 }
428 }
429
430 private boolean scheduleFlush(S session) {
431
432
433
434 if (session.setScheduledForFlush(true)) {
435 flushingSessions.add(session);
436 return true;
437 } else {
438 return false;
439 }
440 }
441
442
443
444
445
446
447 private class Acceptor implements Runnable {
448 public void run() {
449 int nHandles = 0;
450 lastIdleCheckTime = System.currentTimeMillis();
451
452
453 lock.release();
454
455 while (selectable) {
456 try {
457 int selected = select(SELECT_TIMEOUT);
458
459 nHandles += registerHandles();
460
461 if (nHandles == 0) {
462 try {
463 lock.acquire();
464
465 if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
466 acceptor = null;
467 break;
468 }
469 } finally {
470 lock.release();
471 }
472 }
473
474 if (selected > 0) {
475 processReadySessions(selectedHandles());
476 }
477
478 long currentTime = System.currentTimeMillis();
479 flushSessions(currentTime);
480 nHandles -= unregisterHandles();
481
482 notifyIdleSessions(currentTime);
483 } catch (ClosedSelectorException cse) {
484
485 break;
486 } catch (Exception e) {
487 ExceptionMonitor.getInstance().exceptionCaught(e);
488
489 try {
490 Thread.sleep(1000);
491 } catch (InterruptedException e1) {
492 }
493 }
494 }
495
496 if (selectable && isDisposing()) {
497 selectable = false;
498 try {
499 destroy();
500 } catch (Exception e) {
501 ExceptionMonitor.getInstance().exceptionCaught(e);
502 } finally {
503 disposalFuture.setValue(true);
504 }
505 }
506 }
507 }
508
509 @SuppressWarnings("unchecked")
510 private void processReadySessions(Set<SelectionKey> handles) {
511 Iterator<SelectionKey> iterator = handles.iterator();
512
513 while (iterator.hasNext()) {
514 SelectionKey key = iterator.next();
515 H handle = (H) key.channel();
516 iterator.remove();
517
518 try {
519 if ((key != null) && key.isValid() && key.isReadable()) {
520 readHandle(handle);
521 }
522
523 if ((key != null) && key.isValid() && key.isWritable()) {
524 for (IoSession session : getManagedSessions().values()) {
525 scheduleFlush((S) session);
526 }
527 }
528 } catch (Throwable t) {
529 ExceptionMonitor.getInstance().exceptionCaught(t);
530 }
531 }
532 }
533
534 private void readHandle(H handle) throws Exception {
535 IoBuffer readBuf = IoBuffer.allocate(getSessionConfig().getReadBufferSize());
536
537 SocketAddress remoteAddress = receive(handle, readBuf);
538
539 if (remoteAddress != null) {
540 IoSession session = newSessionWithoutLock(remoteAddress, localAddress(handle));
541
542 readBuf.flip();
543
544 session.getFilterChain().fireMessageReceived(readBuf);
545 }
546 }
547
548 private void flushSessions(long currentTime) {
549 for (;;) {
550 S session = flushingSessions.poll();
551
552 if (session == null) {
553 break;
554 }
555
556
557
558 session.unscheduledForFlush();
559
560 try {
561 boolean flushedAll = flush(session, currentTime);
562 if (flushedAll && !session.getWriteRequestQueue().isEmpty(session) && !session.isScheduledForFlush()) {
563 scheduleFlush(session);
564 }
565 } catch (Exception e) {
566 session.getFilterChain().fireExceptionCaught(e);
567 }
568 }
569 }
570
571 private boolean flush(S session, long currentTime) throws Exception {
572 final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
573 final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()
574 + (session.getConfig().getMaxReadBufferSize() >>> 1);
575
576 int writtenBytes = 0;
577
578 try {
579 for (;;) {
580 WriteRequest req = session.getCurrentWriteRequest();
581
582 if (req == null) {
583 req = writeRequestQueue.poll(session);
584
585 if (req == null) {
586 setInterestedInWrite(session, false);
587 break;
588 }
589
590 session.setCurrentWriteRequest(req);
591 }
592
593 IoBuffer buf = (IoBuffer) req.getMessage();
594
595 if (buf.remaining() == 0) {
596
597 session.setCurrentWriteRequest(null);
598 buf.reset();
599 session.getFilterChain().fireMessageSent(req);
600 continue;
601 }
602
603 SocketAddress destination = req.getDestination();
604
605 if (destination == null) {
606 destination = session.getRemoteAddress();
607 }
608
609 int localWrittenBytes = send(session, buf, destination);
610
611 if ((localWrittenBytes == 0) || (writtenBytes >= maxWrittenBytes)) {
612
613 setInterestedInWrite(session, true);
614
615 return false;
616 } else {
617 setInterestedInWrite(session, false);
618
619
620 session.setCurrentWriteRequest(null);
621 writtenBytes += localWrittenBytes;
622 buf.reset();
623 session.getFilterChain().fireMessageSent(req);
624 }
625 }
626 } finally {
627 session.increaseWrittenBytes(writtenBytes, currentTime);
628 }
629
630 return true;
631 }
632
633 private int registerHandles() {
634 for (;;) {
635 AcceptorOperationFuture req = registerQueue.poll();
636
637 if (req == null) {
638 break;
639 }
640
641 Map<SocketAddress, H> newHandles = new HashMap<SocketAddress, H>();
642 List<SocketAddress> localAddresses = req.getLocalAddresses();
643
644 try {
645 for (SocketAddress socketAddress : localAddresses) {
646 H handle = open(socketAddress);
647 newHandles.put(localAddress(handle), handle);
648 }
649
650 boundHandles.putAll(newHandles);
651
652 getListeners().fireServiceActivated();
653 req.setDone();
654
655 return newHandles.size();
656 } catch (Exception e) {
657 req.setException(e);
658 } finally {
659
660 if (req.getException() != null) {
661 for (H handle : newHandles.values()) {
662 try {
663 close(handle);
664 } catch (Exception e) {
665 ExceptionMonitor.getInstance().exceptionCaught(e);
666 }
667 }
668
669 wakeup();
670 }
671 }
672 }
673
674 return 0;
675 }
676
677 private int unregisterHandles() {
678 int nHandles = 0;
679
680 for (;;) {
681 AcceptorOperationFuture request = cancelQueue.poll();
682 if (request == null) {
683 break;
684 }
685
686
687 for (SocketAddress socketAddress : request.getLocalAddresses()) {
688 H handle = boundHandles.remove(socketAddress);
689
690 if (handle == null) {
691 continue;
692 }
693
694 try {
695 close(handle);
696 wakeup();
697 } catch (Throwable e) {
698 ExceptionMonitor.getInstance().exceptionCaught(e);
699 } finally {
700 nHandles++;
701 }
702 }
703
704 request.setDone();
705 }
706
707 return nHandles;
708 }
709
710 private void notifyIdleSessions(long currentTime) {
711
712 if (currentTime - lastIdleCheckTime >= 1000) {
713 lastIdleCheckTime = currentTime;
714 AbstractIoSession.notifyIdleness(getListeners().getManagedSessions().values().iterator(), currentTime);
715 }
716 }
717 }