1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.common;
21
22 import java.net.SocketAddress;
23 import java.util.Collections;
24 import java.util.HashMap;
25 import java.util.HashSet;
26 import java.util.Iterator;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Queue;
30 import java.util.Set;
31 import java.util.concurrent.ConcurrentLinkedQueue;
32 import java.util.concurrent.Executor;
33 import java.util.concurrent.ExecutorService;
34 import java.util.concurrent.LinkedBlockingQueue;
35 import java.util.concurrent.RejectedExecutionException;
36 import java.util.concurrent.ThreadPoolExecutor;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicInteger;
39
40 import org.apache.mina.util.NamePreservingRunnable;
41
42
43
44
45
46 public abstract class AbstractPollingIoAcceptor<T extends AbstractIoSession, H>
47 extends AbstractIoAcceptor {
48
49 private static final AtomicInteger id = new AtomicInteger();
50
51 private final Executor executor;
52 private final boolean createdExecutor;
53 private final String threadName;
54 private final IoProcessor<T> processor;
55 private final boolean createdProcessor;
56
57 private final Object lock = new Object();
58
59 private final Queue<AcceptorOperationFuture> registerQueue =
60 new ConcurrentLinkedQueue<AcceptorOperationFuture>();
61 private final Queue<AcceptorOperationFuture> cancelQueue =
62 new ConcurrentLinkedQueue<AcceptorOperationFuture>();
63
64 private final Map<SocketAddress, H> boundHandles =
65 Collections.synchronizedMap(new HashMap<SocketAddress, H>());
66
67 private final ServiceOperationFuture disposalFuture =
68 new ServiceOperationFuture();
69 private volatile boolean selectable;
70 private Worker worker;
71
72
73
74
75 protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<T>> processorClass) {
76 this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass), true);
77 }
78
79 protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<T>> processorClass, int processorCount) {
80 this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass, processorCount), true);
81 }
82
83 protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, IoProcessor<T> processor) {
84 this(sessionConfig, null, processor, false);
85 }
86
87 protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor<T> processor) {
88 this(sessionConfig, executor, processor, false);
89 }
90
91 private AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor<T> processor, boolean createdProcessor) {
92 super(sessionConfig);
93
94 if (processor == null) {
95 throw new NullPointerException("processor");
96 }
97
98 if (executor == null) {
99 this.executor = new ThreadPoolExecutor(
100 1, 1, 1L, TimeUnit.SECONDS,
101 new LinkedBlockingQueue<Runnable>());
102 this.createdExecutor = true;
103 } else {
104 this.executor = executor;
105 this.createdExecutor = false;
106 }
107
108 this.threadName = getClass().getSimpleName() + '-' + id.incrementAndGet();
109 this.processor = processor;
110 this.createdProcessor = createdProcessor;
111
112 try {
113 init();
114 selectable = true;
115 } catch (RuntimeException e){
116 throw e;
117 } catch (Exception e) {
118 throw new RuntimeIoException("Failed to initialize.", e);
119 } finally {
120 if (!selectable) {
121 try {
122 destroy();
123 } catch (Exception e) {
124 ExceptionMonitor.getInstance().exceptionCaught(e);
125 }
126 }
127 }
128 }
129
130 protected abstract void init() throws Exception;
131 protected abstract void destroy() throws Exception;
132 protected abstract boolean select() throws Exception;
133 protected abstract void wakeup();
134 protected abstract Iterator<H> selectedHandles();
135 protected abstract H open(SocketAddress localAddress) throws Exception;
136 protected abstract SocketAddress localAddress(H handle) throws Exception;
137 protected abstract T accept(IoProcessor<T> processor, H handle) throws Exception;
138 protected abstract void close(H handle) throws Exception;
139
140 @Override
141 protected IoFuture dispose0() throws Exception {
142 unbind();
143 if (!disposalFuture.isDone()) {
144 try {
145 startupWorker();
146 wakeup();
147 } catch (RejectedExecutionException e) {
148 if (createdExecutor) {
149
150 } else {
151 throw e;
152 }
153 }
154 }
155 return disposalFuture;
156 }
157
158 @Override
159 protected final Set<SocketAddress> bind0(List<? extends SocketAddress> localAddresses) throws Exception {
160 AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
161
162
163
164 registerQueue.add(request);
165
166
167
168 startupWorker();
169 wakeup();
170 request.awaitUninterruptibly();
171
172 if (request.getException() != null) {
173 throw request.getException();
174 }
175
176
177
178
179 Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();
180 for (H handle: boundHandles.values()) {
181 newLocalAddresses.add(localAddress(handle));
182 }
183
184 return newLocalAddresses;
185 }
186
187
188
189
190
191
192
193
194
195
196 private void startupWorker() {
197 if (!selectable) {
198 registerQueue.clear();
199 cancelQueue.clear();
200 }
201
202 synchronized (lock) {
203 if (worker == null) {
204 worker = new Worker();
205 executor.execute(new NamePreservingRunnable(worker, threadName));
206 }
207 }
208 }
209
210 @Override
211 protected final void unbind0(List<? extends SocketAddress> localAddresses) throws Exception {
212 AcceptorOperationFuture future = new AcceptorOperationFuture(localAddresses);
213
214 cancelQueue.add(future);
215 startupWorker();
216 wakeup();
217
218 future.awaitUninterruptibly();
219 if (future.getException() != null) {
220 throw future.getException();
221 }
222 }
223
224
225
226
227
228 private class Worker implements Runnable {
229 public void run() {
230 int nHandles = 0;
231
232 while (selectable) {
233 try {
234
235 boolean selected = select();
236
237
238
239
240 nHandles += registerHandles();
241
242 if (selected) {
243 processHandles(selectedHandles());
244 }
245
246
247 nHandles -= unregisterHandles();
248
249 if (nHandles == 0) {
250 synchronized (lock) {
251 if (registerQueue.isEmpty() &&
252 cancelQueue.isEmpty()) {
253 worker = null;
254 break;
255 }
256 }
257 }
258 } catch (Throwable e) {
259 ExceptionMonitor.getInstance().exceptionCaught(e);
260
261 try {
262 Thread.sleep(1000);
263 } catch (InterruptedException e1) {
264 ExceptionMonitor.getInstance().exceptionCaught(e1);
265 }
266 }
267 }
268
269 if (selectable && isDisposing()) {
270 selectable = false;
271 try {
272 if (createdProcessor) {
273 processor.dispose();
274 }
275 } finally {
276 try {
277 destroy();
278 } catch (Exception e) {
279 ExceptionMonitor.getInstance().exceptionCaught(e);
280 } finally {
281 disposalFuture.setDone();
282 if (createdExecutor) {
283 ((ExecutorService) executor).shutdown();
284 }
285 }
286 }
287 }
288 }
289
290
291
292
293
294
295
296
297
298
299 @SuppressWarnings("unchecked")
300 private void processHandles(Iterator<H> handles) throws Exception {
301 while (handles.hasNext()) {
302 H handle = handles.next();
303 handles.remove();
304
305 T session = accept(processor, handle);
306 if (session == null) {
307 break;
308 }
309
310 finishSessionInitialization(session, null, null);
311
312
313 session.getProcessor().add(session);
314 }
315 }
316 }
317
318
319
320
321
322
323
324
325
326
327 private int registerHandles() {
328 for (;;) {
329 AcceptorOperationFuture future = registerQueue.poll();
330 if (future == null) {
331 break;
332 }
333
334 Map<SocketAddress, H> newHandles = new HashMap<SocketAddress, H>();
335 List<SocketAddress> localAddresses = future.getLocalAddresses();
336
337 try {
338 for (SocketAddress a: localAddresses) {
339 H handle = open(a);
340 newHandles.put(localAddress(handle), handle);
341 }
342
343 boundHandles.putAll(newHandles);
344
345
346 future.setDone();
347 return newHandles.size();
348 } catch (Exception e) {
349 future.setException(e);
350 } finally {
351
352 if (future.getException() != null) {
353 for (H handle: newHandles.values()) {
354 try {
355 close(handle);
356 } catch (Exception e) {
357 ExceptionMonitor.getInstance().exceptionCaught(e);
358 }
359 }
360 wakeup();
361 }
362 }
363 }
364
365 return 0;
366 }
367
368
369
370
371
372
373
374 private int unregisterHandles() {
375 int cancelledHandles = 0;
376 for (; ;) {
377 AcceptorOperationFuture future = cancelQueue.poll();
378 if (future == null) {
379 break;
380 }
381
382
383 for (SocketAddress a: future.getLocalAddresses()) {
384 H handle = boundHandles.remove(a);
385 if (handle == null) {
386 continue;
387 }
388
389 try {
390 close(handle);
391 wakeup();
392 } catch (Throwable e) {
393 ExceptionMonitor.getInstance().exceptionCaught(e);
394 } finally {
395 cancelledHandles ++;
396 }
397 }
398
399 future.setDone();
400 }
401
402 return cancelledHandles;
403 }
404
405 public final IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) {
406 throw new UnsupportedOperationException();
407 }
408 }