1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.socket.nio;
21
22 import java.io.IOException;
23 import java.net.InetSocketAddress;
24 import java.net.SocketAddress;
25 import java.nio.channels.SelectionKey;
26 import java.nio.channels.Selector;
27 import java.nio.channels.ServerSocketChannel;
28 import java.nio.channels.SocketChannel;
29 import java.util.ArrayList;
30 import java.util.HashMap;
31 import java.util.Iterator;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.Set;
35
36 import org.apache.mina.common.ExceptionMonitor;
37 import org.apache.mina.common.IoAcceptor;
38 import org.apache.mina.common.IoHandler;
39 import org.apache.mina.common.IoServiceConfig;
40 import org.apache.mina.common.support.BaseIoAcceptor;
41 import org.apache.mina.util.NamePreservingRunnable;
42 import org.apache.mina.util.NewThreadExecutor;
43 import org.apache.mina.util.Queue;
44
45 import edu.emory.mathcs.backport.java.util.concurrent.Executor;
46
47
48
49
50
51
52
53 public class SocketAcceptor extends BaseIoAcceptor {
54
55
56
57 private static volatile int nextId = 0;
58
59 private final Executor executor;
60
61 private final Object lock = new Object();
62
63 private final int id = nextId++;
64
65 private final String threadName = "SocketAcceptor-" + id;
66
67 private SocketAcceptorConfig defaultConfig = new SocketAcceptorConfig();
68
69 private final Map channels = new HashMap();
70
71 private final Queue registerQueue = new Queue();
72
73 private final Queue cancelQueue = new Queue();
74
75 private final SocketIoProcessor[] ioProcessors;
76
77 private final int processorCount;
78
79
80
81
82 private Selector selector;
83
84 private Worker worker;
85
86 private int processorDistributor = 0;
87
88
89
90
91 public SocketAcceptor() {
92 this(1, new NewThreadExecutor());
93 }
94
95
96
97
98
99
100
101 public SocketAcceptor(int processorCount, Executor executor) {
102 if (processorCount < 1) {
103 throw new IllegalArgumentException(
104 "Must have at least one processor");
105 }
106
107
108 ((SocketSessionConfig) defaultConfig.getSessionConfig())
109 .setReuseAddress(true);
110
111 this.executor = executor;
112 this.processorCount = processorCount;
113 ioProcessors = new SocketIoProcessor[processorCount];
114
115 for (int i = 0; i < processorCount; i++) {
116 ioProcessors[i] = new SocketIoProcessor(
117 "SocketAcceptorIoProcessor-" + id + "." + i, executor);
118 }
119 }
120
121
122
123
124
125
126
127 public void bind(SocketAddress address, IoHandler handler,
128 IoServiceConfig config) throws IOException {
129 if (handler == null) {
130 throw new NullPointerException("handler");
131 }
132
133 if (address != null && !(address instanceof InetSocketAddress)) {
134 throw new IllegalArgumentException("Unexpected address type: "
135 + address.getClass());
136 }
137
138 if (config == null) {
139 config = getDefaultConfig();
140 }
141
142 RegistrationRequest request = new RegistrationRequest(address, handler,
143 config);
144
145 synchronized (registerQueue) {
146 registerQueue.push(request);
147 }
148
149 startupWorker();
150
151 selector.wakeup();
152
153 synchronized (request) {
154 while (!request.done) {
155 try {
156 request.wait();
157 } catch (InterruptedException e) {
158 ExceptionMonitor.getInstance().exceptionCaught(e);
159 }
160 }
161 }
162
163 if (request.exception != null) {
164 throw request.exception;
165 }
166 }
167
168 private synchronized void startupWorker() throws IOException {
169 synchronized (lock) {
170 if (worker == null) {
171 selector = Selector.open();
172 worker = new Worker();
173
174 executor.execute(new NamePreservingRunnable(worker));
175 }
176 }
177 }
178
179 public void unbind(SocketAddress address) {
180 if (address == null) {
181 throw new NullPointerException("address");
182 }
183
184 CancellationRequest request = new CancellationRequest(address);
185
186 try {
187 startupWorker();
188 } catch (IOException e) {
189
190
191
192
193 throw new IllegalArgumentException("Address not bound: " + address);
194 }
195
196 synchronized (cancelQueue) {
197 cancelQueue.push(request);
198 }
199
200 selector.wakeup();
201
202 synchronized (request) {
203 while (!request.done) {
204 try {
205 request.wait();
206 } catch (InterruptedException e) {
207 ExceptionMonitor.getInstance().exceptionCaught(e);
208 }
209 }
210 }
211
212 if (request.exception != null) {
213 request.exception.fillInStackTrace();
214
215 throw request.exception;
216 }
217 }
218
219 public void unbindAll() {
220 List addresses;
221 synchronized (channels) {
222 addresses = new ArrayList(channels.keySet());
223 }
224
225 for (Iterator i = addresses.iterator(); i.hasNext();) {
226 unbind((SocketAddress) i.next());
227 }
228 }
229
230 private class Worker implements Runnable {
231 public void run() {
232 Thread.currentThread().setName(SocketAcceptor.this.threadName);
233
234 for (;;) {
235 try {
236 int nKeys = selector.select();
237
238 registerNew();
239
240 if (nKeys > 0) {
241 processSessions(selector.selectedKeys());
242 }
243
244 cancelKeys();
245
246 if (selector.keys().isEmpty()) {
247 synchronized (lock) {
248 if (selector.keys().isEmpty()
249 && registerQueue.isEmpty()
250 && cancelQueue.isEmpty()) {
251 worker = null;
252 try {
253 selector.close();
254 } catch (IOException e) {
255 ExceptionMonitor.getInstance()
256 .exceptionCaught(e);
257 } finally {
258 selector = null;
259 }
260 break;
261 }
262 }
263 }
264 } catch (IOException e) {
265 ExceptionMonitor.getInstance().exceptionCaught(e);
266
267 try {
268 Thread.sleep(1000);
269 } catch (InterruptedException e1) {
270 ExceptionMonitor.getInstance().exceptionCaught(e1);
271 }
272 }
273 }
274 }
275
276 private void processSessions(Set keys) throws IOException {
277 Iterator it = keys.iterator();
278 while (it.hasNext()) {
279 SelectionKey key = (SelectionKey) it.next();
280
281 it.remove();
282
283 if (!key.isAcceptable()) {
284 continue;
285 }
286
287 ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
288
289 SocketChannel ch = ssc.accept();
290
291 if (ch == null) {
292 continue;
293 }
294
295 boolean success = false;
296 try {
297 RegistrationRequest req = (RegistrationRequest) key
298 .attachment();
299 SocketSessionImpl session = new SocketSessionImpl(
300 SocketAcceptor.this, nextProcessor(),
301 getListeners(), req.config, ch, req.handler,
302 req.address);
303 getFilterChainBuilder().buildFilterChain(
304 session.getFilterChain());
305 req.config.getFilterChainBuilder().buildFilterChain(
306 session.getFilterChain());
307 req.config.getThreadModel().buildFilterChain(
308 session.getFilterChain());
309 session.getIoProcessor().addNew(session);
310 success = true;
311 } catch (Throwable t) {
312 ExceptionMonitor.getInstance().exceptionCaught(t);
313 } finally {
314 if (!success) {
315 ch.close();
316 }
317 }
318 }
319 }
320 }
321
322 private SocketIoProcessor nextProcessor() {
323 if (this.processorDistributor == Integer.MAX_VALUE) {
324 this.processorDistributor = Integer.MAX_VALUE % this.processorCount;
325 }
326
327 return ioProcessors[processorDistributor++ % processorCount];
328 }
329
330 public IoServiceConfig getDefaultConfig() {
331 return defaultConfig;
332 }
333
334
335
336
337
338
339
340 public void setDefaultConfig(SocketAcceptorConfig defaultConfig) {
341 if (defaultConfig == null) {
342 throw new NullPointerException("defaultConfig");
343 }
344 this.defaultConfig = defaultConfig;
345 }
346
347 private void registerNew() {
348 if (registerQueue.isEmpty()) {
349 return;
350 }
351
352 for (;;) {
353 RegistrationRequest req;
354
355 synchronized (registerQueue) {
356 req = (RegistrationRequest) registerQueue.pop();
357 }
358
359 if (req == null) {
360 break;
361 }
362
363 ServerSocketChannel ssc = null;
364
365 try {
366 ssc = ServerSocketChannel.open();
367 ssc.configureBlocking(false);
368
369
370 SocketAcceptorConfig cfg;
371 if (req.config instanceof SocketAcceptorConfig) {
372 cfg = (SocketAcceptorConfig) req.config;
373 } else {
374 cfg = (SocketAcceptorConfig) getDefaultConfig();
375 }
376
377 ssc.socket().setReuseAddress(cfg.isReuseAddress());
378 ssc.socket().setReceiveBufferSize(
379 ((SocketSessionConfig) cfg.getSessionConfig())
380 .getReceiveBufferSize());
381
382
383 ssc.socket().bind(req.address, cfg.getBacklog());
384 if (req.address == null || req.address.getPort() == 0) {
385 req.address = (InetSocketAddress) ssc.socket()
386 .getLocalSocketAddress();
387 }
388 ssc.register(selector, SelectionKey.OP_ACCEPT, req);
389
390 synchronized (channels) {
391 channels.put(req.address, ssc);
392 }
393
394 getListeners().fireServiceActivated(this, req.address,
395 req.handler, req.config);
396 } catch (IOException e) {
397 req.exception = e;
398 } finally {
399 synchronized (req) {
400 req.done = true;
401
402 req.notifyAll();
403 }
404
405 if (ssc != null && req.exception != null) {
406 try {
407 ssc.close();
408 } catch (IOException e) {
409 ExceptionMonitor.getInstance().exceptionCaught(e);
410 }
411 }
412 }
413 }
414 }
415
416 private void cancelKeys() {
417 if (cancelQueue.isEmpty()) {
418 return;
419 }
420
421 for (;;) {
422 CancellationRequest request;
423
424 synchronized (cancelQueue) {
425 request = (CancellationRequest) cancelQueue.pop();
426 }
427
428 if (request == null) {
429 break;
430 }
431
432 ServerSocketChannel ssc;
433 synchronized (channels) {
434 ssc = (ServerSocketChannel) channels.remove(request.address);
435 }
436
437
438 try {
439 if (ssc == null) {
440 request.exception = new IllegalArgumentException(
441 "Address not bound: " + request.address);
442 } else {
443 SelectionKey key = ssc.keyFor(selector);
444 request.registrationRequest = (RegistrationRequest) key
445 .attachment();
446 key.cancel();
447
448 selector.wakeup();
449
450 ssc.close();
451 }
452 } catch (IOException e) {
453 ExceptionMonitor.getInstance().exceptionCaught(e);
454 } finally {
455 synchronized (request) {
456 request.done = true;
457 request.notifyAll();
458 }
459
460 if (request.exception == null) {
461 getListeners().fireServiceDeactivated(this,
462 request.address,
463 request.registrationRequest.handler,
464 request.registrationRequest.config);
465 }
466 }
467 }
468 }
469
470 private static class RegistrationRequest {
471 private InetSocketAddress address;
472
473 private final IoHandler handler;
474
475 private final IoServiceConfig config;
476
477 private IOException exception;
478
479 private boolean done;
480
481 private RegistrationRequest(SocketAddress address, IoHandler handler,
482 IoServiceConfig config) {
483 this.address = (InetSocketAddress) address;
484 this.handler = handler;
485 this.config = config;
486 }
487 }
488
489 private static class CancellationRequest {
490 private final SocketAddress address;
491
492 private boolean done;
493
494 private RegistrationRequest registrationRequest;
495
496 private RuntimeException exception;
497
498 private CancellationRequest(SocketAddress address) {
499 this.address = address;
500 }
501 }
502 }