1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.socket.nio;
21
22 import java.io.IOException;
23 import java.net.InetSocketAddress;
24 import java.net.SocketAddress;
25 import java.nio.channels.SelectionKey;
26 import java.nio.channels.Selector;
27 import java.nio.channels.ServerSocketChannel;
28 import java.nio.channels.SocketChannel;
29 import java.util.ArrayList;
30 import java.util.Iterator;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.Queue;
34 import java.util.Set;
35 import java.util.concurrent.ConcurrentHashMap;
36 import java.util.concurrent.ConcurrentLinkedQueue;
37 import java.util.concurrent.CountDownLatch;
38 import java.util.concurrent.Executor;
39 import java.util.concurrent.atomic.AtomicInteger;
40
41 import org.apache.mina.common.ExceptionMonitor;
42 import org.apache.mina.common.IoAcceptor;
43 import org.apache.mina.common.IoHandler;
44 import org.apache.mina.common.IoServiceConfig;
45 import org.apache.mina.common.support.BaseIoAcceptor;
46 import org.apache.mina.util.NamePreservingRunnable;
47 import org.apache.mina.util.NewThreadExecutor;
48
49
50
51
52
53
54
55 public class SocketAcceptor extends BaseIoAcceptor {
56 private static final AtomicInteger nextId = new AtomicInteger();
57
58 private final Executor executor;
59
60 private final Object lock = new Object();
61
62 private final int id = nextId.getAndIncrement();
63
64 private final String threadName = "SocketAcceptor-" + id;
65
66 private SocketAcceptorConfig defaultConfig = new SocketAcceptorConfig();
67
68 private final Map<SocketAddress, ServerSocketChannel> channels = new ConcurrentHashMap<SocketAddress, ServerSocketChannel>();
69
70 private final Queue<RegistrationRequest> registerQueue = new ConcurrentLinkedQueue<RegistrationRequest>();
71
72 private final Queue<CancellationRequest> cancelQueue = new ConcurrentLinkedQueue<CancellationRequest>();
73
74 private final SocketIoProcessor[] ioProcessors;
75
76 private final int processorCount;
77
78 private Selector selector;
79
80 private Worker worker;
81
82 private int processorDistributor = 0;
83
84
85
86
87 public SocketAcceptor() {
88 this(1, new NewThreadExecutor());
89 }
90
91
92
93
94
95
96
97 public SocketAcceptor(int processorCount, Executor executor) {
98 if (processorCount < 1) {
99 throw new IllegalArgumentException(
100 "Must have at least one processor");
101 }
102
103
104 defaultConfig.getSessionConfig().setReuseAddress(true);
105
106 this.executor = executor;
107 this.processorCount = processorCount;
108 ioProcessors = new SocketIoProcessor[processorCount];
109
110 for (int i = 0; i < processorCount; i++) {
111 ioProcessors[i] = new SocketIoProcessor(
112 "SocketAcceptorIoProcessor-" + id + "." + i, executor);
113 }
114 }
115
116
117
118
119
120
121
122 public void bind(SocketAddress address, IoHandler handler,
123 IoServiceConfig config) throws IOException {
124 if (handler == null) {
125 throw new NullPointerException("handler");
126 }
127
128 if (address != null && !(address instanceof InetSocketAddress)) {
129 throw new IllegalArgumentException("Unexpected address type: "
130 + address.getClass());
131 }
132
133 if (config == null) {
134 config = getDefaultConfig();
135 }
136
137 RegistrationRequest request = new RegistrationRequest(address, handler,
138 config);
139
140 registerQueue.add(request);
141
142 startupWorker();
143
144 selector.wakeup();
145
146 try {
147 request.done.await();
148 } catch (InterruptedException e) {
149 ExceptionMonitor.getInstance().exceptionCaught(e);
150 }
151
152 if (request.exception != null) {
153 throw request.exception;
154 }
155 }
156
157 private synchronized void startupWorker() throws IOException {
158 synchronized (lock) {
159 if (worker == null) {
160 selector = Selector.open();
161 worker = new Worker();
162
163 executor.execute(new NamePreservingRunnable(worker));
164 }
165 }
166 }
167
168 public void unbind(SocketAddress address) {
169 if (address == null) {
170 throw new NullPointerException("address");
171 }
172
173 CancellationRequest request = new CancellationRequest(address);
174
175 try {
176 startupWorker();
177 } catch (IOException e) {
178
179
180
181
182 throw new IllegalArgumentException("Address not bound: " + address);
183 }
184
185 cancelQueue.add(request);
186
187 selector.wakeup();
188
189 try {
190 request.done.await();
191 } catch (InterruptedException e) {
192 ExceptionMonitor.getInstance().exceptionCaught(e);
193 }
194
195 if (request.exception != null) {
196 request.exception.fillInStackTrace();
197
198 throw request.exception;
199 }
200 }
201
202 public void unbindAll() {
203 List<SocketAddress> addresses = new ArrayList<SocketAddress>(channels
204 .keySet());
205
206 for (SocketAddress address : addresses) {
207 unbind(address);
208 }
209 }
210
211 private class Worker implements Runnable {
212 public void run() {
213 Thread.currentThread().setName(SocketAcceptor.this.threadName);
214
215 for (;;) {
216 try {
217 int nKeys = selector.select();
218
219 registerNew();
220
221 if (nKeys > 0) {
222 processSessions(selector.selectedKeys());
223 }
224
225 cancelKeys();
226
227 if (selector.keys().isEmpty()) {
228 synchronized (lock) {
229 if (selector.keys().isEmpty()
230 && registerQueue.isEmpty()
231 && cancelQueue.isEmpty()) {
232 worker = null;
233 try {
234 selector.close();
235 } catch (IOException e) {
236 ExceptionMonitor.getInstance()
237 .exceptionCaught(e);
238 } finally {
239 selector = null;
240 }
241 break;
242 }
243 }
244 }
245 } catch (IOException e) {
246 ExceptionMonitor.getInstance().exceptionCaught(e);
247
248 try {
249 Thread.sleep(1000);
250 } catch (InterruptedException e1) {
251 ExceptionMonitor.getInstance().exceptionCaught(e1);
252 }
253 }
254 }
255 }
256
257 private void processSessions(Set<SelectionKey> keys) throws IOException {
258 Iterator<SelectionKey> it = keys.iterator();
259 while (it.hasNext()) {
260 SelectionKey key = it.next();
261
262 it.remove();
263
264 if (!key.isAcceptable()) {
265 continue;
266 }
267
268 ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
269
270 SocketChannel ch = ssc.accept();
271
272 if (ch == null) {
273 continue;
274 }
275
276 boolean success = false;
277 try {
278 RegistrationRequest req = (RegistrationRequest) key
279 .attachment();
280 SocketSessionImpl session = new SocketSessionImpl(
281 SocketAcceptor.this, nextProcessor(),
282 getListeners(), req.config, ch, req.handler,
283 req.address);
284 getFilterChainBuilder().buildFilterChain(
285 session.getFilterChain());
286 req.config.getFilterChainBuilder().buildFilterChain(
287 session.getFilterChain());
288 req.config.getThreadModel().buildFilterChain(
289 session.getFilterChain());
290 session.getIoProcessor().addNew(session);
291 success = true;
292 } catch (Throwable t) {
293 ExceptionMonitor.getInstance().exceptionCaught(t);
294 } finally {
295 if (!success) {
296 ch.close();
297 }
298 }
299 }
300 }
301 }
302
303 private SocketIoProcessor nextProcessor() {
304 if (this.processorDistributor == Integer.MAX_VALUE) {
305 this.processorDistributor = Integer.MAX_VALUE % this.processorCount;
306 }
307
308 return ioProcessors[processorDistributor++ % processorCount];
309 }
310
311 public SocketAcceptorConfig getDefaultConfig() {
312 return defaultConfig;
313 }
314
315
316
317
318
319
320
321 public void setDefaultConfig(SocketAcceptorConfig defaultConfig) {
322 if (defaultConfig == null) {
323 throw new NullPointerException("defaultConfig");
324 }
325 this.defaultConfig = defaultConfig;
326 }
327
328 private void registerNew() {
329 if (registerQueue.isEmpty()) {
330 return;
331 }
332
333 for (;;) {
334 RegistrationRequest req = registerQueue.poll();
335
336 if (req == null) {
337 break;
338 }
339
340 ServerSocketChannel ssc = null;
341
342 try {
343 ssc = ServerSocketChannel.open();
344 ssc.configureBlocking(false);
345
346
347 SocketAcceptorConfig cfg;
348 if (req.config instanceof SocketAcceptorConfig) {
349 cfg = (SocketAcceptorConfig) req.config;
350 } else {
351 cfg = getDefaultConfig();
352 }
353
354 ssc.socket().setReuseAddress(cfg.isReuseAddress());
355 ssc.socket().setReceiveBufferSize(
356 cfg.getSessionConfig().getReceiveBufferSize());
357
358
359 ssc.socket().bind(req.address, cfg.getBacklog());
360 if (req.address == null || req.address.getPort() == 0) {
361 req.address = (InetSocketAddress) ssc.socket()
362 .getLocalSocketAddress();
363 }
364 ssc.register(selector, SelectionKey.OP_ACCEPT, req);
365
366 channels.put(req.address, ssc);
367
368 getListeners().fireServiceActivated(this, req.address,
369 req.handler, req.config);
370 } catch (IOException e) {
371 req.exception = e;
372 } finally {
373 req.done.countDown();
374
375 if (ssc != null && req.exception != null) {
376 try {
377 ssc.close();
378 } catch (IOException e) {
379 ExceptionMonitor.getInstance().exceptionCaught(e);
380 }
381 }
382 }
383 }
384 }
385
386 private void cancelKeys() {
387 if (cancelQueue.isEmpty()) {
388 return;
389 }
390
391 for (;;) {
392 CancellationRequest request = cancelQueue.poll();
393
394 if (request == null) {
395 break;
396 }
397
398 ServerSocketChannel ssc = channels.remove(request.address);
399
400
401 try {
402 if (ssc == null) {
403 request.exception = new IllegalArgumentException(
404 "Address not bound: " + request.address);
405 } else {
406 SelectionKey key = ssc.keyFor(selector);
407 request.registrationRequest = (RegistrationRequest) key
408 .attachment();
409 key.cancel();
410
411 selector.wakeup();
412
413 ssc.close();
414 }
415 } catch (IOException e) {
416 ExceptionMonitor.getInstance().exceptionCaught(e);
417 } finally {
418 request.done.countDown();
419
420 if (request.exception == null) {
421 getListeners().fireServiceDeactivated(this,
422 request.address,
423 request.registrationRequest.handler,
424 request.registrationRequest.config);
425 }
426 }
427 }
428 }
429
430 private static class RegistrationRequest {
431 private InetSocketAddress address;
432
433 private final IoHandler handler;
434
435 private final IoServiceConfig config;
436
437 private final CountDownLatch done = new CountDownLatch(1);
438
439 private volatile IOException exception;
440
441 private RegistrationRequest(SocketAddress address, IoHandler handler,
442 IoServiceConfig config) {
443 this.address = (InetSocketAddress) address;
444 this.handler = handler;
445 this.config = config;
446 }
447 }
448
449 private static class CancellationRequest {
450 private final SocketAddress address;
451
452 private final CountDownLatch done = new CountDownLatch(1);
453
454 private RegistrationRequest registrationRequest;
455
456 private volatile RuntimeException exception;
457
458 private CancellationRequest(SocketAddress address) {
459 this.address = address;
460 }
461 }
462 }