1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.socket.nio;
21
22 import java.io.IOException;
23 import java.net.ConnectException;
24 import java.net.InetSocketAddress;
25 import java.net.SocketAddress;
26 import java.nio.channels.SelectionKey;
27 import java.nio.channels.Selector;
28 import java.nio.channels.SocketChannel;
29 import java.util.Iterator;
30 import java.util.Set;
31
32 import org.apache.mina.common.ConnectFuture;
33 import org.apache.mina.common.ExceptionMonitor;
34 import org.apache.mina.common.IoConnector;
35 import org.apache.mina.common.IoConnectorConfig;
36 import org.apache.mina.common.IoHandler;
37 import org.apache.mina.common.IoServiceConfig;
38 import org.apache.mina.common.support.AbstractIoFilterChain;
39 import org.apache.mina.common.support.BaseIoConnector;
40 import org.apache.mina.common.support.DefaultConnectFuture;
41 import org.apache.mina.util.NamePreservingRunnable;
42 import org.apache.mina.util.NewThreadExecutor;
43 import org.apache.mina.util.Queue;
44
45 import edu.emory.mathcs.backport.java.util.concurrent.Executor;
46
47
48
49
50
51
52
53 public class SocketConnector extends BaseIoConnector {
54
55
56
57 private static volatile int nextId = 0;
58
59 private final Object lock = new Object();
60
61 private final int id = nextId++;
62
63 private final String threadName = "SocketConnector-" + id;
64
65 private SocketConnectorConfig defaultConfig = new SocketConnectorConfig();
66
67 private final Queue connectQueue = new Queue();
68
69 private final SocketIoProcessor[] ioProcessors;
70
71 private final int processorCount;
72
73 private final Executor executor;
74
75
76
77
78 private Selector selector;
79
80 private Worker worker;
81
82 private int processorDistributor = 0;
83
84 private int workerTimeout = 60;
85
86
87
88
89 public SocketConnector() {
90 this(1, new NewThreadExecutor());
91 }
92
93
94
95
96
97
98
99 public SocketConnector(int processorCount, Executor executor) {
100 if (processorCount < 1) {
101 throw new IllegalArgumentException(
102 "Must have at least one processor");
103 }
104
105 this.executor = executor;
106 this.processorCount = processorCount;
107 ioProcessors = new SocketIoProcessor[processorCount];
108
109 for (int i = 0; i < processorCount; i++) {
110 ioProcessors[i] = new SocketIoProcessor(
111 "SocketConnectorIoProcessor-" + id + "." + i, executor);
112 }
113 }
114
115
116
117
118
119
120
121
122 public int getWorkerTimeout() {
123 return workerTimeout;
124 }
125
126
127
128
129
130
131
132
133
134 public void setWorkerTimeout(int workerTimeout) {
135 if (workerTimeout < 0) {
136 throw new IllegalArgumentException("Must be >= 0");
137 }
138 this.workerTimeout = workerTimeout;
139 }
140
141 public ConnectFuture connect(SocketAddress address, IoHandler handler,
142 IoServiceConfig config) {
143 return connect(address, null, handler, config);
144 }
145
146 public ConnectFuture connect(SocketAddress address,
147 SocketAddress localAddress, IoHandler handler,
148 IoServiceConfig config) {
149 if (address == null)
150 throw new NullPointerException("address");
151 if (handler == null)
152 throw new NullPointerException("handler");
153
154 if (!(address instanceof InetSocketAddress))
155 throw new IllegalArgumentException("Unexpected address type: "
156 + address.getClass());
157
158 if (localAddress != null
159 && !(localAddress instanceof InetSocketAddress))
160 throw new IllegalArgumentException(
161 "Unexpected local address type: " + localAddress.getClass());
162
163 if (config == null) {
164 config = getDefaultConfig();
165 }
166
167 SocketChannel ch = null;
168 boolean success = false;
169 try {
170 ch = SocketChannel.open();
171 ch.socket().setReuseAddress(true);
172 if (localAddress != null) {
173 ch.socket().bind(localAddress);
174 }
175
176 ch.configureBlocking(false);
177
178 if (ch.connect(address)) {
179 DefaultConnectFuture future = new DefaultConnectFuture();
180 newSession(ch, handler, config, future);
181 success = true;
182 return future;
183 }
184
185 success = true;
186 } catch (IOException e) {
187 return DefaultConnectFuture.newFailedFuture(e);
188 } finally {
189 if (!success && ch != null) {
190 try {
191 ch.close();
192 } catch (IOException e) {
193 ExceptionMonitor.getInstance().exceptionCaught(e);
194 }
195 }
196 }
197
198 ConnectionRequest request = new ConnectionRequest(ch, handler, config);
199 synchronized (lock) {
200 try {
201 startupWorker();
202 } catch (IOException e) {
203 try {
204 ch.close();
205 } catch (IOException e2) {
206 ExceptionMonitor.getInstance().exceptionCaught(e2);
207 }
208
209 return DefaultConnectFuture.newFailedFuture(e);
210 }
211
212 synchronized (connectQueue) {
213 connectQueue.push(request);
214 }
215 selector.wakeup();
216 }
217
218 return request;
219 }
220
221 public IoServiceConfig getDefaultConfig() {
222 return defaultConfig;
223 }
224
225
226
227
228
229
230
231 public void setDefaultConfig(SocketConnectorConfig defaultConfig) {
232 if (defaultConfig == null) {
233 throw new NullPointerException("defaultConfig");
234 }
235 this.defaultConfig = defaultConfig;
236 }
237
238 private Selector getSelector() {
239 synchronized (lock) {
240 return this.selector;
241 }
242 }
243
244 private void startupWorker() throws IOException {
245 synchronized (lock) {
246 if (worker == null) {
247 selector = Selector.open();
248 worker = new Worker();
249 executor.execute(new NamePreservingRunnable(worker, threadName));
250 }
251 }
252 }
253
254 private void registerNew() {
255 if (connectQueue.isEmpty())
256 return;
257
258 Selector selector = getSelector();
259 for (;;) {
260 ConnectionRequest req;
261 synchronized (connectQueue) {
262 req = (ConnectionRequest) connectQueue.pop();
263 }
264
265 if (req == null)
266 break;
267
268 SocketChannel ch = req.channel;
269 try {
270 ch.register(selector, SelectionKey.OP_CONNECT, req);
271 } catch (IOException e) {
272 req.setException(e);
273 try {
274 ch.close();
275 } catch (IOException e2) {
276 ExceptionMonitor.getInstance().exceptionCaught(e2);
277 }
278 }
279 }
280 }
281
282 private void processSessions(Set keys) {
283 Iterator it = keys.iterator();
284
285 while (it.hasNext()) {
286 SelectionKey key = (SelectionKey) it.next();
287
288 if (!key.isConnectable())
289 continue;
290
291 SocketChannel ch = (SocketChannel) key.channel();
292 ConnectionRequest entry = (ConnectionRequest) key.attachment();
293
294 boolean success = false;
295 try {
296 if (ch.finishConnect()) {
297 key.cancel();
298 newSession(ch, entry.handler, entry.config, entry);
299 }
300 success = true;
301 } catch (Throwable e) {
302 entry.setException(e);
303 } finally {
304 if (!success) {
305 key.cancel();
306 try {
307 ch.close();
308 } catch (IOException e) {
309 ExceptionMonitor.getInstance().exceptionCaught(e);
310 }
311 }
312 }
313 }
314
315 keys.clear();
316 }
317
318 private void processTimedOutSessions(Set keys) {
319 long currentTime = System.currentTimeMillis();
320 Iterator it = keys.iterator();
321
322 while (it.hasNext()) {
323 SelectionKey key = (SelectionKey) it.next();
324
325 if (!key.isValid())
326 continue;
327
328 ConnectionRequest entry = (ConnectionRequest) key.attachment();
329
330 if (currentTime >= entry.deadline) {
331 entry.setException(new ConnectException());
332 try {
333 key.channel().close();
334 } catch (IOException e) {
335 ExceptionMonitor.getInstance().exceptionCaught(e);
336 } finally {
337 key.cancel();
338 }
339 }
340 }
341 }
342
343 private void newSession(SocketChannel ch, IoHandler handler,
344 IoServiceConfig config, ConnectFuture connectFuture)
345 throws IOException {
346 SocketSessionImpl session = new SocketSessionImpl(this,
347 nextProcessor(), getListeners(), config, ch, handler, ch
348 .socket().getRemoteSocketAddress());
349 try {
350 getFilterChainBuilder().buildFilterChain(session.getFilterChain());
351 config.getFilterChainBuilder().buildFilterChain(
352 session.getFilterChain());
353 config.getThreadModel().buildFilterChain(session.getFilterChain());
354 } catch (Throwable e) {
355 throw (IOException) new IOException("Failed to create a session.")
356 .initCause(e);
357 }
358
359
360
361 session.setAttribute(AbstractIoFilterChain.CONNECT_FUTURE,
362 connectFuture);
363
364
365 session.getIoProcessor().addNew(session);
366 }
367
368 private SocketIoProcessor nextProcessor() {
369 if (this.processorDistributor == Integer.MAX_VALUE) {
370 this.processorDistributor = Integer.MAX_VALUE % this.processorCount;
371 }
372
373 return ioProcessors[processorDistributor++ % processorCount];
374 }
375
376 private class Worker implements Runnable {
377 private long lastActive = System.currentTimeMillis();
378
379 public void run() {
380 Selector selector = getSelector();
381 for (;;) {
382 try {
383 int nKeys = selector.select(1000);
384
385 registerNew();
386
387 if (nKeys > 0) {
388 processSessions(selector.selectedKeys());
389 }
390
391 processTimedOutSessions(selector.keys());
392
393 if (selector.keys().isEmpty()) {
394 if (System.currentTimeMillis() - lastActive > workerTimeout * 1000L) {
395 synchronized (lock) {
396 if (selector.keys().isEmpty()
397 && connectQueue.isEmpty()) {
398 worker = null;
399 try {
400 selector.close();
401 } catch (IOException e) {
402 ExceptionMonitor.getInstance()
403 .exceptionCaught(e);
404 } finally {
405 SocketConnector.this.selector = null;
406 }
407 break;
408 }
409 }
410 }
411 } else {
412 lastActive = System.currentTimeMillis();
413 }
414 } catch (IOException e) {
415 ExceptionMonitor.getInstance().exceptionCaught(e);
416
417 try {
418 Thread.sleep(1000);
419 } catch (InterruptedException e1) {
420 ExceptionMonitor.getInstance().exceptionCaught(e1);
421 }
422 }
423 }
424 }
425 }
426
427 private class ConnectionRequest extends DefaultConnectFuture {
428 private final SocketChannel channel;
429
430 private final long deadline;
431
432 private final IoHandler handler;
433
434 private final IoServiceConfig config;
435
436 private ConnectionRequest(SocketChannel channel, IoHandler handler,
437 IoServiceConfig config) {
438 this.channel = channel;
439 long timeout;
440 if (config instanceof IoConnectorConfig) {
441 timeout = ((IoConnectorConfig) config)
442 .getConnectTimeoutMillis();
443 } else {
444 timeout = ((IoConnectorConfig) getDefaultConfig())
445 .getConnectTimeoutMillis();
446 }
447 this.deadline = System.currentTimeMillis() + timeout;
448 this.handler = handler;
449 this.config = config;
450 }
451 }
452 }