1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.core.polling;
21
22 import java.net.SocketAddress;
23 import java.nio.channels.ClosedSelectorException;
24 import java.util.Collections;
25 import java.util.HashMap;
26 import java.util.HashSet;
27 import java.util.Iterator;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Queue;
31 import java.util.Set;
32 import java.util.concurrent.ConcurrentHashMap;
33 import java.util.concurrent.ConcurrentLinkedQueue;
34 import java.util.concurrent.Executor;
35 import java.util.concurrent.Executors;
36 import java.util.concurrent.Semaphore;
37 import java.util.concurrent.atomic.AtomicReference;
38
39 import org.apache.mina.core.RuntimeIoException;
40 import org.apache.mina.core.filterchain.IoFilter;
41 import org.apache.mina.core.service.AbstractIoAcceptor;
42 import org.apache.mina.core.service.IoAcceptor;
43 import org.apache.mina.core.service.IoHandler;
44 import org.apache.mina.core.service.IoProcessor;
45 import org.apache.mina.core.service.SimpleIoProcessorPool;
46 import org.apache.mina.core.session.AbstractIoSession;
47 import org.apache.mina.core.session.IoSession;
48 import org.apache.mina.core.session.IoSessionConfig;
49 import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
50 import org.apache.mina.util.ExceptionMonitor;
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68 public abstract class AbstractPollingIoAcceptor<S extends AbstractIoSession, H>
69 extends AbstractIoAcceptor {
70
71 private final Semaphore lock = new Semaphore(1);
72
73 private final IoProcessor<S> processor;
74
75 private final boolean createdProcessor;
76
77 private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
78
79 private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
80
81 private final Map<SocketAddress, H> boundHandles = Collections
82 .synchronizedMap(new HashMap<SocketAddress, H>());
83
84 private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture();
85
86
87 private volatile boolean selectable;
88
89
90 private AtomicReference<Acceptor> acceptorRef = new AtomicReference<Acceptor>();
91
92 protected boolean reuseAddress = false;
93
94
95
96
97
98 protected int backlog = 50;
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113 protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig,
114 Class<? extends IoProcessor<S>> processorClass) {
115 this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass),
116 true);
117 }
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133 protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig,
134 Class<? extends IoProcessor<S>> processorClass, int processorCount) {
135 this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass,
136 processorCount), true);
137 }
138
139
140
141
142
143
144
145
146
147
148
149
150
151 protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig,
152 IoProcessor<S> processor) {
153 this(sessionConfig, null, processor, false);
154 }
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172 protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig,
173 Executor executor, IoProcessor<S> processor) {
174 this(sessionConfig, executor, processor, false);
175 }
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196 private AbstractPollingIoAcceptor(IoSessionConfig sessionConfig,
197 Executor executor, IoProcessor<S> processor,
198 boolean createdProcessor) {
199 super(sessionConfig, executor);
200
201 if (processor == null) {
202 throw new IllegalArgumentException("processor");
203 }
204
205 this.processor = processor;
206 this.createdProcessor = createdProcessor;
207
208 try {
209
210 init();
211
212
213
214 selectable = true;
215 } catch (RuntimeException e) {
216 throw e;
217 } catch (Exception e) {
218 throw new RuntimeIoException("Failed to initialize.", e);
219 } finally {
220 if (!selectable) {
221 try {
222 destroy();
223 } catch (Exception e) {
224 ExceptionMonitor.getInstance().exceptionCaught(e);
225 }
226 }
227 }
228 }
229
230
231
232
233
234 protected abstract void init() throws Exception;
235
236
237
238
239
240
241 protected abstract void destroy() throws Exception;
242
243
244
245
246
247
248
249 protected abstract int select() throws Exception;
250
251
252
253
254 protected abstract void wakeup();
255
256
257
258
259
260
261 protected abstract Iterator<H> selectedHandles();
262
263
264
265
266
267
268
269 protected abstract H open(SocketAddress localAddress) throws Exception;
270
271
272
273
274
275
276
277 protected abstract SocketAddress localAddress(H handle) throws Exception;
278
279
280
281
282
283
284
285
286
287 protected abstract S accept(IoProcessor<S> processor, H handle)
288 throws Exception;
289
290
291
292
293
294
295 protected abstract void close(H handle) throws Exception;
296
297
298
299
300 @Override
301 protected void dispose0() throws Exception {
302 unbind();
303
304 startupAcceptor();
305 wakeup();
306 }
307
308
309
310
311 @Override
312 protected final Set<SocketAddress> bindInternal(
313 List<? extends SocketAddress> localAddresses) throws Exception {
314
315
316 AcceptorOperationFuture request = new AcceptorOperationFuture(
317 localAddresses);
318
319
320
321 registerQueue.add(request);
322
323
324
325 startupAcceptor();
326
327
328
329
330 try {
331 lock.acquire();
332
333
334 Thread.sleep( 10 );
335 wakeup();
336 }
337 finally
338 {
339 lock.release();
340 }
341
342
343 request.awaitUninterruptibly();
344
345 if (request.getException() != null) {
346 throw request.getException();
347 }
348
349
350
351
352 Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();
353
354 for (H handle:boundHandles.values()) {
355 newLocalAddresses.add(localAddress(handle));
356 }
357
358 return newLocalAddresses;
359 }
360
361
362
363
364
365
366
367
368
369 private void startupAcceptor() throws InterruptedException {
370
371
372 if (!selectable) {
373 registerQueue.clear();
374 cancelQueue.clear();
375 }
376
377
378 Acceptor acceptor = acceptorRef.get();
379
380 if (acceptor == null) {
381 lock.acquire();
382 acceptor = new Acceptor();
383
384 if (acceptorRef.compareAndSet(null, acceptor)) {
385 executeWorker(acceptor);
386 } else {
387 lock.release();
388 }
389 }
390 }
391
392
393
394
395 @Override
396 protected final void unbind0(List<? extends SocketAddress> localAddresses)
397 throws Exception {
398 AcceptorOperationFuture future = new AcceptorOperationFuture(
399 localAddresses);
400
401 cancelQueue.add(future);
402 startupAcceptor();
403 wakeup();
404
405 future.awaitUninterruptibly();
406 if (future.getException() != null) {
407 throw future.getException();
408 }
409 }
410
411
412
413
414
415
416
417 private class Acceptor implements Runnable {
418 public void run() {
419 assert (acceptorRef.get() == this);
420
421 int nHandles = 0;
422
423
424 lock.release();
425
426 while (selectable) {
427 try {
428
429
430
431
432 int selected = select();
433
434
435
436
437 nHandles += registerHandles();
438
439
440
441
442 if (nHandles == 0) {
443 acceptorRef.set(null);
444
445 if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
446 assert (acceptorRef.get() != this);
447 break;
448 }
449
450 if (!acceptorRef.compareAndSet(null, this)) {
451 assert (acceptorRef.get() != this);
452 break;
453 }
454
455 assert (acceptorRef.get() == this);
456 }
457
458 if (selected > 0) {
459
460
461 processHandles(selectedHandles());
462 }
463
464
465 nHandles -= unregisterHandles();
466 } catch (ClosedSelectorException cse) {
467
468 break;
469 } catch (Throwable e) {
470 ExceptionMonitor.getInstance().exceptionCaught(e);
471
472 try {
473 Thread.sleep(1000);
474 } catch (InterruptedException e1) {
475 ExceptionMonitor.getInstance().exceptionCaught(e1);
476 }
477 }
478 }
479
480
481 if (selectable && isDisposing()) {
482 selectable = false;
483 try {
484 if (createdProcessor) {
485 processor.dispose();
486 }
487 } finally {
488 try {
489 synchronized (disposalLock) {
490 if (isDisposing()) {
491 destroy();
492 }
493 }
494 } catch (Exception e) {
495 ExceptionMonitor.getInstance().exceptionCaught(e);
496 } finally {
497 disposalFuture.setDone();
498 }
499 }
500 }
501 }
502
503
504
505
506
507
508
509
510
511
512 @SuppressWarnings("unchecked")
513 private void processHandles(Iterator<H> handles) throws Exception {
514 while (handles.hasNext()) {
515 H handle = handles.next();
516 handles.remove();
517
518
519
520 S session = accept(processor, handle);
521
522 if (session == null) {
523 continue;
524 }
525
526 initSession(session, null, null);
527
528
529 session.getProcessor().add(session);
530 }
531 }
532 }
533
534
535
536
537
538
539
540
541
542
543 private int registerHandles() {
544 for (;;) {
545
546
547 AcceptorOperationFuture future = registerQueue.poll();
548
549 if (future == null) {
550 return 0;
551 }
552
553
554
555
556 Map<SocketAddress, H> newHandles = new ConcurrentHashMap<SocketAddress, H>();
557 List<SocketAddress> localAddresses = future.getLocalAddresses();
558
559 try {
560
561 for (SocketAddress a : localAddresses) {
562 H handle = open(a);
563 newHandles.put(localAddress(handle), handle);
564 }
565
566
567
568 boundHandles.putAll(newHandles);
569
570
571 future.setDone();
572 return newHandles.size();
573 } catch (Exception e) {
574
575 future.setException(e);
576 } finally {
577
578 if (future.getException() != null) {
579 for (H handle : newHandles.values()) {
580 try {
581 close(handle);
582 } catch (Exception e) {
583 ExceptionMonitor.getInstance().exceptionCaught(e);
584 }
585 }
586
587
588 wakeup();
589 }
590 }
591 }
592 }
593
594
595
596
597
598
599
600 private int unregisterHandles() {
601 int cancelledHandles = 0;
602 for (;;) {
603 AcceptorOperationFuture future = cancelQueue.poll();
604 if (future == null) {
605 break;
606 }
607
608
609 for (SocketAddress a : future.getLocalAddresses()) {
610 H handle = boundHandles.remove(a);
611
612 if (handle == null) {
613 continue;
614 }
615
616 try {
617 close(handle);
618 wakeup();
619 } catch (Throwable e) {
620 ExceptionMonitor.getInstance().exceptionCaught(e);
621 } finally {
622 cancelledHandles++;
623 }
624 }
625
626 future.setDone();
627 }
628
629 return cancelledHandles;
630 }
631
632
633
634
635 public final IoSession newSession(SocketAddress remoteAddress,
636 SocketAddress localAddress) {
637 throw new UnsupportedOperationException();
638 }
639
640
641
642
643 public int getBacklog() {
644 return backlog;
645 }
646
647
648
649
650 public void setBacklog(int backlog) {
651 synchronized (bindLock) {
652 if (isActive()) {
653 throw new IllegalStateException(
654 "backlog can't be set while the acceptor is bound.");
655 }
656
657 this.backlog = backlog;
658 }
659 }
660
661
662
663
664 public boolean isReuseAddress() {
665 return reuseAddress;
666 }
667
668
669
670
671 public void setReuseAddress(boolean reuseAddress) {
672 synchronized (bindLock) {
673 if (isActive()) {
674 throw new IllegalStateException(
675 "backlog can't be set while the acceptor is bound.");
676 }
677
678 this.reuseAddress = reuseAddress;
679 }
680 }
681 }