1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.socket.nio;
21
22 import java.io.IOException;
23 import java.net.InetSocketAddress;
24 import java.net.SocketAddress;
25 import java.nio.channels.SelectionKey;
26 import java.nio.channels.Selector;
27 import java.nio.channels.ServerSocketChannel;
28 import java.nio.channels.SocketChannel;
29 import java.util.ArrayList;
30 import java.util.Iterator;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.Queue;
34 import java.util.Set;
35 import java.util.concurrent.ConcurrentHashMap;
36 import java.util.concurrent.ConcurrentLinkedQueue;
37 import java.util.concurrent.CountDownLatch;
38 import java.util.concurrent.Executor;
39 import java.util.concurrent.atomic.AtomicInteger;
40
41 import org.apache.mina.common.ExceptionMonitor;
42 import org.apache.mina.common.IoAcceptor;
43 import org.apache.mina.common.IoHandler;
44 import org.apache.mina.common.IoServiceConfig;
45 import org.apache.mina.common.support.BaseIoAcceptor;
46 import org.apache.mina.util.NamePreservingRunnable;
47 import org.apache.mina.util.NewThreadExecutor;
48
49
50
51
52
53
54
55 public class SocketAcceptor extends BaseIoAcceptor {
56 private static final AtomicInteger nextId = new AtomicInteger();
57
58 private final Executor executor;
59
60 private final Object lock = new Object();
61
62 private final int id = nextId.getAndIncrement();
63
64 private final String threadName = "SocketAcceptor-" + id;
65
66 private SocketAcceptorConfig defaultConfig = new SocketAcceptorConfig();
67
68 private final Map<SocketAddress, ServerSocketChannel> channels = new ConcurrentHashMap<SocketAddress, ServerSocketChannel>();
69
70 private final Queue<RegistrationRequest> registerQueue = new ConcurrentLinkedQueue<RegistrationRequest>();
71
72 private final Queue<CancellationRequest> cancelQueue = new ConcurrentLinkedQueue<CancellationRequest>();
73
74 private final SocketIoProcessor[] ioProcessors;
75
76 private final int processorCount;
77
78 private volatile Selector selector;
79
80 private Worker worker;
81
82 private int processorDistributor = 0;
83
84
85
86
87 public SocketAcceptor() {
88 this(1, new NewThreadExecutor());
89 }
90
91
92
93
94
95
96
97 public SocketAcceptor(int processorCount, Executor executor) {
98 if (processorCount < 1) {
99 throw new IllegalArgumentException(
100 "Must have at least one processor");
101 }
102
103
104 defaultConfig.getSessionConfig().setReuseAddress(true);
105
106 this.executor = executor;
107 this.processorCount = processorCount;
108 ioProcessors = new SocketIoProcessor[processorCount];
109
110 for (int i = 0; i < processorCount; i++) {
111 ioProcessors[i] = new SocketIoProcessor(
112 "SocketAcceptorIoProcessor-" + id + "." + i, executor);
113 }
114 }
115
116
117
118
119
120
121
122 public void bind(SocketAddress address, IoHandler handler,
123 IoServiceConfig config) throws IOException {
124 if (handler == null) {
125 throw new NullPointerException("handler");
126 }
127
128 if (address != null && !(address instanceof InetSocketAddress)) {
129 throw new IllegalArgumentException("Unexpected address type: "
130 + address.getClass());
131 }
132
133 if (config == null) {
134 config = getDefaultConfig();
135 }
136
137 RegistrationRequest request = new RegistrationRequest(address, handler,
138 config);
139
140 synchronized (lock) {
141 startupWorker();
142
143 registerQueue.add(request);
144
145 selector.wakeup();
146 }
147
148 try {
149 request.done.await();
150 } catch (InterruptedException e) {
151 ExceptionMonitor.getInstance().exceptionCaught(e);
152 }
153
154 if (request.exception != null) {
155 throw request.exception;
156 }
157 }
158
159 private void startupWorker() throws IOException {
160 synchronized (lock) {
161 if (worker == null) {
162 selector = Selector.open();
163 worker = new Worker();
164
165 executor.execute(new NamePreservingRunnable(worker));
166 }
167 }
168 }
169
170 public void unbind(SocketAddress address) {
171 if (address == null) {
172 throw new NullPointerException("address");
173 }
174
175 CancellationRequest request = new CancellationRequest(address);
176
177 synchronized (lock) {
178 try {
179 startupWorker();
180 } catch (IOException e) {
181
182
183
184
185 throw new IllegalArgumentException("Address not bound: " + address);
186 }
187
188 cancelQueue.add(request);
189
190 selector.wakeup();
191 }
192
193 try {
194 request.done.await();
195 } catch (InterruptedException e) {
196 ExceptionMonitor.getInstance().exceptionCaught(e);
197 }
198
199 if (request.exception != null) {
200 request.exception.fillInStackTrace();
201
202 throw request.exception;
203 }
204 }
205
206 public void unbindAll() {
207 List<SocketAddress> addresses = new ArrayList<SocketAddress>(channels
208 .keySet());
209
210 for (SocketAddress address : addresses) {
211 unbind(address);
212 }
213 }
214
215 private class Worker implements Runnable {
216 public void run() {
217 Thread.currentThread().setName(SocketAcceptor.this.threadName);
218
219 Selector selector = SocketAcceptor.this.selector;
220 for (;;) {
221 try {
222 int nKeys = selector.select();
223
224 registerNew();
225
226 if (nKeys > 0) {
227 processSessions(selector.selectedKeys());
228 }
229
230 cancelKeys();
231
232 if (selector.keys().isEmpty()) {
233 synchronized (lock) {
234 if (selector.keys().isEmpty()
235 && registerQueue.isEmpty()
236 && cancelQueue.isEmpty()) {
237 worker = null;
238 try {
239 selector.close();
240 } catch (IOException e) {
241 ExceptionMonitor.getInstance()
242 .exceptionCaught(e);
243 } finally {
244 SocketAcceptor.this.selector = null;
245 }
246 break;
247 }
248 }
249 }
250 } catch (IOException e) {
251 ExceptionMonitor.getInstance().exceptionCaught(e);
252
253 try {
254 Thread.sleep(1000);
255 } catch (InterruptedException e1) {
256 ExceptionMonitor.getInstance().exceptionCaught(e1);
257 }
258 }
259 }
260 }
261
262 private void processSessions(Set<SelectionKey> keys) throws IOException {
263 Iterator<SelectionKey> it = keys.iterator();
264 while (it.hasNext()) {
265 SelectionKey key = it.next();
266
267 it.remove();
268
269 if (!key.isAcceptable()) {
270 continue;
271 }
272
273 ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
274
275 SocketChannel ch = ssc.accept();
276
277 if (ch == null) {
278 continue;
279 }
280
281 boolean success = false;
282 try {
283 RegistrationRequest req = (RegistrationRequest) key
284 .attachment();
285 SocketSessionImpl session = new SocketSessionImpl(
286 SocketAcceptor.this, nextProcessor(),
287 getListeners(), req.config, ch, req.handler,
288 req.address);
289 getFilterChainBuilder().buildFilterChain(
290 session.getFilterChain());
291 req.config.getFilterChainBuilder().buildFilterChain(
292 session.getFilterChain());
293 req.config.getThreadModel().buildFilterChain(
294 session.getFilterChain());
295 session.getIoProcessor().addNew(session);
296 success = true;
297 } catch (Throwable t) {
298 ExceptionMonitor.getInstance().exceptionCaught(t);
299 } finally {
300 if (!success) {
301 ch.close();
302 }
303 }
304 }
305 }
306 }
307
308 private SocketIoProcessor nextProcessor() {
309 if (this.processorDistributor == Integer.MAX_VALUE) {
310 this.processorDistributor = Integer.MAX_VALUE % this.processorCount;
311 }
312
313 return ioProcessors[processorDistributor++ % processorCount];
314 }
315
316 public SocketAcceptorConfig getDefaultConfig() {
317 return defaultConfig;
318 }
319
320
321
322
323
324
325
326 public void setDefaultConfig(SocketAcceptorConfig defaultConfig) {
327 if (defaultConfig == null) {
328 throw new NullPointerException("defaultConfig");
329 }
330 this.defaultConfig = defaultConfig;
331 }
332
333 private void registerNew() {
334 if (registerQueue.isEmpty()) {
335 return;
336 }
337
338 Selector selector = this.selector;
339 for (;;) {
340 RegistrationRequest req = registerQueue.poll();
341
342 if (req == null) {
343 break;
344 }
345
346 ServerSocketChannel ssc = null;
347
348 try {
349 ssc = ServerSocketChannel.open();
350 ssc.configureBlocking(false);
351
352
353 SocketAcceptorConfig cfg;
354 if (req.config instanceof SocketAcceptorConfig) {
355 cfg = (SocketAcceptorConfig) req.config;
356 } else {
357 cfg = getDefaultConfig();
358 }
359
360 ssc.socket().setReuseAddress(cfg.isReuseAddress());
361 ssc.socket().setReceiveBufferSize(
362 cfg.getSessionConfig().getReceiveBufferSize());
363
364
365 ssc.socket().bind(req.address, cfg.getBacklog());
366 if (req.address == null || req.address.getPort() == 0) {
367 req.address = (InetSocketAddress) ssc.socket()
368 .getLocalSocketAddress();
369 }
370 ssc.register(selector, SelectionKey.OP_ACCEPT, req);
371
372 channels.put(req.address, ssc);
373
374 getListeners().fireServiceActivated(this, req.address,
375 req.handler, req.config);
376 } catch (IOException e) {
377 req.exception = e;
378 } finally {
379 req.done.countDown();
380
381 if (ssc != null && req.exception != null) {
382 try {
383 ssc.close();
384 } catch (IOException e) {
385 ExceptionMonitor.getInstance().exceptionCaught(e);
386 }
387 }
388 }
389 }
390 }
391
392 private void cancelKeys() {
393 if (cancelQueue.isEmpty()) {
394 return;
395 }
396
397 Selector selector = this.selector;
398 for (;;) {
399 CancellationRequest request = cancelQueue.poll();
400
401 if (request == null) {
402 break;
403 }
404
405 ServerSocketChannel ssc = channels.remove(request.address);
406
407
408 try {
409 if (ssc == null) {
410 request.exception = new IllegalArgumentException(
411 "Address not bound: " + request.address);
412 } else {
413 SelectionKey key = ssc.keyFor(selector);
414 request.registrationRequest = (RegistrationRequest) key
415 .attachment();
416 key.cancel();
417
418 selector.wakeup();
419
420 ssc.close();
421 }
422 } catch (IOException e) {
423 ExceptionMonitor.getInstance().exceptionCaught(e);
424 } finally {
425 request.done.countDown();
426
427 if (request.exception == null) {
428 getListeners().fireServiceDeactivated(this,
429 request.address,
430 request.registrationRequest.handler,
431 request.registrationRequest.config);
432 }
433 }
434 }
435 }
436
437 private static class RegistrationRequest {
438 private InetSocketAddress address;
439
440 private final IoHandler handler;
441
442 private final IoServiceConfig config;
443
444 private final CountDownLatch done = new CountDownLatch(1);
445
446 private volatile IOException exception;
447
448 private RegistrationRequest(SocketAddress address, IoHandler handler,
449 IoServiceConfig config) {
450 this.address = (InetSocketAddress) address;
451 this.handler = handler;
452 this.config = config;
453 }
454 }
455
456 private static class CancellationRequest {
457 private final SocketAddress address;
458
459 private final CountDownLatch done = new CountDownLatch(1);
460
461 private RegistrationRequest registrationRequest;
462
463 private volatile RuntimeException exception;
464
465 private CancellationRequest(SocketAddress address) {
466 this.address = address;
467 }
468 }
469 }