1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.socket.nio;
21
22 import java.io.IOException;
23 import java.net.ConnectException;
24 import java.net.InetSocketAddress;
25 import java.net.SocketAddress;
26 import java.nio.channels.SelectionKey;
27 import java.nio.channels.Selector;
28 import java.nio.channels.SocketChannel;
29 import java.util.Iterator;
30 import java.util.Set;
31
32 import org.apache.mina.common.ConnectFuture;
33 import org.apache.mina.common.ExceptionMonitor;
34 import org.apache.mina.common.IoConnector;
35 import org.apache.mina.common.IoConnectorConfig;
36 import org.apache.mina.common.IoHandler;
37 import org.apache.mina.common.IoServiceConfig;
38 import org.apache.mina.common.support.AbstractIoFilterChain;
39 import org.apache.mina.common.support.BaseIoConnector;
40 import org.apache.mina.common.support.DefaultConnectFuture;
41 import org.apache.mina.util.Queue;
42 import org.apache.mina.util.NewThreadExecutor;
43 import org.apache.mina.util.NamePreservingRunnable;
44 import edu.emory.mathcs.backport.java.util.concurrent.Executor;
45
46
47
48
49
50
51
52 public class SocketConnector extends BaseIoConnector {
53
54
55
56 private static volatile int nextId = 0;
57
58 private final Object lock = new Object();
59
60 private final int id = nextId++;
61
62 private final String threadName = "SocketConnector-" + id;
63
64 private SocketConnectorConfig defaultConfig = new SocketConnectorConfig();
65
66 private final Queue connectQueue = new Queue();
67
68 private final SocketIoProcessor[] ioProcessors;
69
70 private final int processorCount;
71
72 private final Executor executor;
73
74
75
76
77 private Selector selector;
78
79 private Worker worker;
80
81 private int processorDistributor = 0;
82
83 private int workerTimeout = 60;
84
85
86
87
88 public SocketConnector() {
89 this(1, new NewThreadExecutor());
90 }
91
92
93
94
95
96
97
98 public SocketConnector(int processorCount, Executor executor) {
99 if (processorCount < 1) {
100 throw new IllegalArgumentException(
101 "Must have at least one processor");
102 }
103
104 this.executor = executor;
105 this.processorCount = processorCount;
106 ioProcessors = new SocketIoProcessor[processorCount];
107
108 for (int i = 0; i < processorCount; i++) {
109 ioProcessors[i] = new SocketIoProcessor(
110 "SocketConnectorIoProcessor-" + id + "." + i, executor);
111 }
112 }
113
114
115
116
117
118
119 public int getWorkerTimeout() {
120 return workerTimeout;
121 }
122
123
124
125
126
127
128 public void setWorkerTimeout(int workerTimeout) {
129 if (workerTimeout < 0) {
130 throw new IllegalArgumentException("Must be >= 0");
131 }
132 this.workerTimeout = workerTimeout;
133 }
134
135 public ConnectFuture connect(SocketAddress address, IoHandler handler,
136 IoServiceConfig config) {
137 return connect(address, null, handler, config);
138 }
139
140 public ConnectFuture connect(SocketAddress address,
141 SocketAddress localAddress, IoHandler handler,
142 IoServiceConfig config) {
143 if (address == null)
144 throw new NullPointerException("address");
145 if (handler == null)
146 throw new NullPointerException("handler");
147
148 if (!(address instanceof InetSocketAddress))
149 throw new IllegalArgumentException("Unexpected address type: "
150 + address.getClass());
151
152 if (localAddress != null
153 && !(localAddress instanceof InetSocketAddress))
154 throw new IllegalArgumentException(
155 "Unexpected local address type: " + localAddress.getClass());
156
157 if (config == null) {
158 config = getDefaultConfig();
159 }
160
161 SocketChannel ch = null;
162 boolean success = false;
163 try {
164 ch = SocketChannel.open();
165 ch.socket().setReuseAddress(true);
166 if (localAddress != null) {
167 ch.socket().bind(localAddress);
168 }
169
170 ch.configureBlocking(false);
171
172 if (ch.connect(address)) {
173 DefaultConnectFuture future = new DefaultConnectFuture();
174 newSession(ch, handler, config, future);
175 success = true;
176 return future;
177 }
178
179 success = true;
180 } catch (IOException e) {
181 return DefaultConnectFuture.newFailedFuture(e);
182 } finally {
183 if (!success && ch != null) {
184 try {
185 ch.close();
186 } catch (IOException e) {
187 ExceptionMonitor.getInstance().exceptionCaught(e);
188 }
189 }
190 }
191
192 ConnectionRequest request = new ConnectionRequest(ch, handler, config);
193 synchronized (lock) {
194 try {
195 startupWorker();
196 } catch (IOException e) {
197 try {
198 ch.close();
199 } catch (IOException e2) {
200 ExceptionMonitor.getInstance().exceptionCaught(e2);
201 }
202
203 return DefaultConnectFuture.newFailedFuture(e);
204 }
205 }
206
207 synchronized (connectQueue) {
208 connectQueue.push(request);
209 }
210 selector.wakeup();
211
212 return request;
213 }
214
215 public IoServiceConfig getDefaultConfig() {
216 return defaultConfig;
217 }
218
219
220
221
222
223
224
225 public void setDefaultConfig(SocketConnectorConfig defaultConfig) {
226 if (defaultConfig == null) {
227 throw new NullPointerException("defaultConfig");
228 }
229 this.defaultConfig = defaultConfig;
230 }
231
232 private synchronized void startupWorker() throws IOException {
233 if (worker == null) {
234 selector = Selector.open();
235 worker = new Worker();
236 executor.execute(new NamePreservingRunnable(worker));
237 }
238 }
239
240 private void registerNew() {
241 if (connectQueue.isEmpty())
242 return;
243
244 for (;;) {
245 ConnectionRequest req;
246 synchronized (connectQueue) {
247 req = (ConnectionRequest) connectQueue.pop();
248 }
249
250 if (req == null)
251 break;
252
253 SocketChannel ch = req.channel;
254 try {
255 ch.register(selector, SelectionKey.OP_CONNECT, req);
256 } catch (IOException e) {
257 req.setException(e);
258 }
259 }
260 }
261
262 private void processSessions(Set keys) {
263 Iterator it = keys.iterator();
264
265 while (it.hasNext()) {
266 SelectionKey key = (SelectionKey) it.next();
267
268 if (!key.isConnectable())
269 continue;
270
271 SocketChannel ch = (SocketChannel) key.channel();
272 ConnectionRequest entry = (ConnectionRequest) key.attachment();
273
274 boolean success = false;
275 try {
276 ch.finishConnect();
277 newSession(ch, entry.handler, entry.config, entry);
278 success = true;
279 } catch (Throwable e) {
280 entry.setException(e);
281 } finally {
282 key.cancel();
283 if (!success) {
284 try {
285 ch.close();
286 } catch (IOException e) {
287 ExceptionMonitor.getInstance().exceptionCaught(e);
288 }
289 }
290 }
291 }
292
293 keys.clear();
294 }
295
296 private void processTimedOutSessions(Set keys) {
297 long currentTime = System.currentTimeMillis();
298 Iterator it = keys.iterator();
299
300 while (it.hasNext()) {
301 SelectionKey key = (SelectionKey) it.next();
302
303 if (!key.isValid())
304 continue;
305
306 ConnectionRequest entry = (ConnectionRequest) key.attachment();
307
308 if (currentTime >= entry.deadline) {
309 entry.setException(new ConnectException());
310 try {
311 key.channel().close();
312 } catch (IOException e) {
313 ExceptionMonitor.getInstance().exceptionCaught(e);
314 } finally {
315 key.cancel();
316 }
317 }
318 }
319 }
320
321 private void newSession(SocketChannel ch, IoHandler handler,
322 IoServiceConfig config, ConnectFuture connectFuture)
323 throws IOException {
324 SocketSessionImpl session = new SocketSessionImpl(this,
325 nextProcessor(), getListeners(), config, ch, handler, ch
326 .socket().getRemoteSocketAddress());
327 try {
328 getFilterChainBuilder().buildFilterChain(session.getFilterChain());
329 config.getFilterChainBuilder().buildFilterChain(
330 session.getFilterChain());
331 config.getThreadModel().buildFilterChain(session.getFilterChain());
332 } catch (Throwable e) {
333 throw (IOException) new IOException("Failed to create a session.")
334 .initCause(e);
335 }
336
337
338
339 session.setAttribute(AbstractIoFilterChain.CONNECT_FUTURE,
340 connectFuture);
341
342
343 session.getIoProcessor().addNew(session);
344 }
345
346 private SocketIoProcessor nextProcessor() {
347 if (this.processorDistributor == Integer.MAX_VALUE) {
348 this.processorDistributor = Integer.MAX_VALUE % this.processorCount;
349 }
350
351 return ioProcessors[processorDistributor++ % processorCount];
352 }
353
354 private class Worker implements Runnable {
355 private long lastActive = System.currentTimeMillis();
356
357 public void run() {
358 Thread.currentThread().setName(SocketConnector.this.threadName);
359
360 for (;;) {
361 try {
362 int nKeys = selector.select(1000);
363
364 registerNew();
365
366 if (nKeys > 0) {
367 processSessions(selector.selectedKeys());
368 }
369
370 processTimedOutSessions(selector.keys());
371
372 if (selector.keys().isEmpty()) {
373 if (System.currentTimeMillis() - lastActive > workerTimeout * 1000L) {
374 synchronized (lock) {
375 if (selector.keys().isEmpty()
376 && connectQueue.isEmpty()) {
377 worker = null;
378 try {
379 selector.close();
380 } catch (IOException e) {
381 ExceptionMonitor.getInstance()
382 .exceptionCaught(e);
383 } finally {
384 selector = null;
385 }
386 break;
387 }
388 }
389 }
390 } else {
391 lastActive = System.currentTimeMillis();
392 }
393 } catch (IOException e) {
394 ExceptionMonitor.getInstance().exceptionCaught(e);
395
396 try {
397 Thread.sleep(1000);
398 } catch (InterruptedException e1) {
399 ExceptionMonitor.getInstance().exceptionCaught(e1);
400 }
401 }
402 }
403 }
404 }
405
406 private class ConnectionRequest extends DefaultConnectFuture {
407 private final SocketChannel channel;
408
409 private final long deadline;
410
411 private final IoHandler handler;
412
413 private final IoServiceConfig config;
414
415 private ConnectionRequest(SocketChannel channel, IoHandler handler,
416 IoServiceConfig config) {
417 this.channel = channel;
418 long timeout;
419 if (config instanceof IoConnectorConfig) {
420 timeout = ((IoConnectorConfig) config)
421 .getConnectTimeoutMillis();
422 } else {
423 timeout = ((IoConnectorConfig) getDefaultConfig())
424 .getConnectTimeoutMillis();
425 }
426 this.deadline = System.currentTimeMillis() + timeout;
427 this.handler = handler;
428 this.config = config;
429 }
430 }
431 }