1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.core.polling;
21
22 import java.net.SocketAddress;
23 import java.nio.channels.ClosedSelectorException;
24 import java.nio.channels.spi.SelectorProvider;
25 import java.util.Collections;
26 import java.util.HashMap;
27 import java.util.HashSet;
28 import java.util.Iterator;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Queue;
32 import java.util.Set;
33 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.ConcurrentLinkedQueue;
35 import java.util.concurrent.Executor;
36 import java.util.concurrent.Executors;
37 import java.util.concurrent.Semaphore;
38 import java.util.concurrent.atomic.AtomicReference;
39
40 import org.apache.mina.core.RuntimeIoException;
41 import org.apache.mina.core.filterchain.IoFilter;
42 import org.apache.mina.core.service.AbstractIoAcceptor;
43 import org.apache.mina.core.service.IoAcceptor;
44 import org.apache.mina.core.service.IoHandler;
45 import org.apache.mina.core.service.IoProcessor;
46 import org.apache.mina.core.service.SimpleIoProcessorPool;
47 import org.apache.mina.core.session.AbstractIoSession;
48 import org.apache.mina.core.session.IoSession;
49 import org.apache.mina.core.session.IoSessionConfig;
50 import org.apache.mina.transport.socket.SocketSessionConfig;
51 import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
52 import org.apache.mina.util.ExceptionMonitor;
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70 public abstract class AbstractPollingIoAcceptor<S extends AbstractIoSession, H> extends AbstractIoAcceptor {
71
72 private final Semaphore lock = new Semaphore(1);
73
74 private final IoProcessor<S> processor;
75
76 private final boolean createdProcessor;
77
78 private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
79
80 private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
81
82 private final Map<SocketAddress, H> boundHandles = Collections.synchronizedMap(new HashMap<SocketAddress, H>());
83
84 private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture();
85
86
87 private volatile boolean selectable;
88
89
90 private AtomicReference<Acceptor> acceptorRef = new AtomicReference<Acceptor>();
91
92 protected boolean reuseAddress = false;
93
94
95
96
97
98 protected int backlog = 50;
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113 protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass) {
114 this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass), true, null);
115 }
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131 protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass,
132 int processorCount) {
133 this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass, processorCount), true, null);
134 }
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151 protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass,
152 int processorCount, SelectorProvider selectorProvider ) {
153 this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass, processorCount, selectorProvider), true, selectorProvider);
154 }
155
156
157
158
159
160
161
162
163
164
165
166
167
168 protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, IoProcessor<S> processor) {
169 this(sessionConfig, null, processor, false, null);
170 }
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191 protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor<S> processor) {
192 this(sessionConfig, executor, processor, false, null);
193 }
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217 private AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor<S> processor,
218 boolean createdProcessor, SelectorProvider selectorProvider) {
219 super(sessionConfig, executor);
220
221 if (processor == null) {
222 throw new IllegalArgumentException("processor");
223 }
224
225 this.processor = processor;
226 this.createdProcessor = createdProcessor;
227
228 try {
229
230 init(selectorProvider);
231
232
233
234 selectable = true;
235 } catch (RuntimeException e) {
236 throw e;
237 } catch (Exception e) {
238 throw new RuntimeIoException("Failed to initialize.", e);
239 } finally {
240 if (!selectable) {
241 try {
242 destroy();
243 } catch (Exception e) {
244 ExceptionMonitor.getInstance().exceptionCaught(e);
245 }
246 }
247 }
248 }
249
250
251
252
253
254 protected abstract void init() throws Exception;
255
256
257
258
259
260 protected abstract void init(SelectorProvider selectorProvider) throws Exception;
261
262
263
264
265
266
267 protected abstract void destroy() throws Exception;
268
269
270
271
272
273
274
275 protected abstract int select() throws Exception;
276
277
278
279
280 protected abstract void wakeup();
281
282
283
284
285
286
287 protected abstract Iterator<H> selectedHandles();
288
289
290
291
292
293
294
295 protected abstract H open(SocketAddress localAddress) throws Exception;
296
297
298
299
300
301
302
303 protected abstract SocketAddress localAddress(H handle) throws Exception;
304
305
306
307
308
309
310
311
312
313 protected abstract S accept(IoProcessor<S> processor, H handle) throws Exception;
314
315
316
317
318
319
320 protected abstract void close(H handle) throws Exception;
321
322
323
324
325 @Override
326 protected void dispose0() throws Exception {
327 unbind();
328
329 startupAcceptor();
330 wakeup();
331 }
332
333
334
335
336 @Override
337 protected final Set<SocketAddress> bindInternal(List<? extends SocketAddress> localAddresses) throws Exception {
338
339
340 AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
341
342
343
344 registerQueue.add(request);
345
346
347
348 startupAcceptor();
349
350
351
352
353 try {
354 lock.acquire();
355
356
357 Thread.sleep(10);
358 wakeup();
359 } finally {
360 lock.release();
361 }
362
363
364 request.awaitUninterruptibly();
365
366 if (request.getException() != null) {
367 throw request.getException();
368 }
369
370
371
372
373 Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();
374
375 for (H handle : boundHandles.values()) {
376 newLocalAddresses.add(localAddress(handle));
377 }
378
379 return newLocalAddresses;
380 }
381
382
383
384
385
386
387
388
389
390 private void startupAcceptor() throws InterruptedException {
391
392
393 if (!selectable) {
394 registerQueue.clear();
395 cancelQueue.clear();
396 }
397
398
399 Acceptor acceptor = acceptorRef.get();
400
401 if (acceptor == null) {
402 lock.acquire();
403 acceptor = new Acceptor();
404
405 if (acceptorRef.compareAndSet(null, acceptor)) {
406 executeWorker(acceptor);
407 } else {
408 lock.release();
409 }
410 }
411 }
412
413
414
415
416 @Override
417 protected final void unbind0(List<? extends SocketAddress> localAddresses) throws Exception {
418 AcceptorOperationFuture future = new AcceptorOperationFuture(localAddresses);
419
420 cancelQueue.add(future);
421 startupAcceptor();
422 wakeup();
423
424 future.awaitUninterruptibly();
425 if (future.getException() != null) {
426 throw future.getException();
427 }
428 }
429
430
431
432
433
434
435
436 private class Acceptor implements Runnable {
437 public void run() {
438 assert (acceptorRef.get() == this);
439
440 int nHandles = 0;
441
442
443 lock.release();
444
445 while (selectable) {
446 try {
447
448
449
450
451 int selected = select();
452
453
454
455
456 nHandles += registerHandles();
457
458
459
460
461 if (nHandles == 0) {
462 acceptorRef.set(null);
463
464 if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
465 assert (acceptorRef.get() != this);
466 break;
467 }
468
469 if (!acceptorRef.compareAndSet(null, this)) {
470 assert (acceptorRef.get() != this);
471 break;
472 }
473
474 assert (acceptorRef.get() == this);
475 }
476
477 if (selected > 0) {
478
479
480 processHandles(selectedHandles());
481 }
482
483
484 nHandles -= unregisterHandles();
485 } catch (ClosedSelectorException cse) {
486
487 ExceptionMonitor.getInstance().exceptionCaught(cse);
488 break;
489 } catch (Exception e) {
490 ExceptionMonitor.getInstance().exceptionCaught(e);
491
492 try {
493 Thread.sleep(1000);
494 } catch (InterruptedException e1) {
495 ExceptionMonitor.getInstance().exceptionCaught(e1);
496 }
497 }
498 }
499
500
501 if (selectable && isDisposing()) {
502 selectable = false;
503 try {
504 if (createdProcessor) {
505 processor.dispose();
506 }
507 } finally {
508 try {
509 synchronized (disposalLock) {
510 if (isDisposing()) {
511 destroy();
512 }
513 }
514 } catch (Exception e) {
515 ExceptionMonitor.getInstance().exceptionCaught(e);
516 } finally {
517 disposalFuture.setDone();
518 }
519 }
520 }
521 }
522
523
524
525
526
527
528
529
530
531
532 @SuppressWarnings("unchecked")
533 private void processHandles(Iterator<H> handles) throws Exception {
534 while (handles.hasNext()) {
535 H handle = handles.next();
536 handles.remove();
537
538
539
540 S session = accept(processor, handle);
541
542 if (session == null) {
543 continue;
544 }
545
546 initSession(session, null, null);
547
548
549 session.getProcessor().add(session);
550 }
551 }
552 }
553
554
555
556
557
558
559
560
561
562
563 private int registerHandles() {
564 for (;;) {
565
566
567 AcceptorOperationFuture future = registerQueue.poll();
568
569 if (future == null) {
570 return 0;
571 }
572
573
574
575
576 Map<SocketAddress, H> newHandles = new ConcurrentHashMap<SocketAddress, H>();
577 List<SocketAddress> localAddresses = future.getLocalAddresses();
578
579 try {
580
581 for (SocketAddress a : localAddresses) {
582 H handle = open(a);
583 newHandles.put(localAddress(handle), handle);
584 }
585
586
587
588 boundHandles.putAll(newHandles);
589
590
591 future.setDone();
592 return newHandles.size();
593 } catch (Exception e) {
594
595 future.setException(e);
596 } finally {
597
598 if (future.getException() != null) {
599 for (H handle : newHandles.values()) {
600 try {
601 close(handle);
602 } catch (Exception e) {
603 ExceptionMonitor.getInstance().exceptionCaught(e);
604 }
605 }
606
607
608 wakeup();
609 }
610 }
611 }
612 }
613
614
615
616
617
618
619
620 private int unregisterHandles() {
621 int cancelledHandles = 0;
622 for (;;) {
623 AcceptorOperationFuture future = cancelQueue.poll();
624 if (future == null) {
625 break;
626 }
627
628
629 for (SocketAddress a : future.getLocalAddresses()) {
630 H handle = boundHandles.remove(a);
631
632 if (handle == null) {
633 continue;
634 }
635
636 try {
637 close(handle);
638 wakeup();
639 } catch (Exception e) {
640 ExceptionMonitor.getInstance().exceptionCaught(e);
641 } finally {
642 cancelledHandles++;
643 }
644 }
645
646 future.setDone();
647 }
648
649 return cancelledHandles;
650 }
651
652
653
654
655 public final IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) {
656 throw new UnsupportedOperationException();
657 }
658
659
660
661
662 public int getBacklog() {
663 return backlog;
664 }
665
666
667
668
669
670
671
672 public void setBacklog(int backlog) {
673 synchronized (bindLock) {
674 if (isActive()) {
675 throw new IllegalStateException("backlog can't be set while the acceptor is bound.");
676 }
677
678 this.backlog = backlog;
679 }
680 }
681
682
683
684
685 public boolean isReuseAddress() {
686 return reuseAddress;
687 }
688
689
690
691
692
693
694
695 public void setReuseAddress(boolean reuseAddress) {
696 synchronized (bindLock) {
697 if (isActive()) {
698 throw new IllegalStateException("backlog can't be set while the acceptor is bound.");
699 }
700
701 this.reuseAddress = reuseAddress;
702 }
703 }
704
705
706
707
708 public SocketSessionConfig getSessionConfig() {
709 return (SocketSessionConfig)sessionConfig;
710 }
711 }