1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.core.polling;
21
22 import java.net.ConnectException;
23 import java.net.SocketAddress;
24 import java.util.Iterator;
25 import java.util.Queue;
26 import java.util.concurrent.ConcurrentLinkedQueue;
27 import java.util.concurrent.Executor;
28 import java.util.concurrent.Executors;
29
30 import org.apache.mina.core.RuntimeIoException;
31 import org.apache.mina.core.filterchain.IoFilter;
32 import org.apache.mina.core.future.ConnectFuture;
33 import org.apache.mina.core.future.DefaultConnectFuture;
34 import org.apache.mina.core.service.AbstractIoConnector;
35 import org.apache.mina.core.service.IoConnector;
36 import org.apache.mina.core.service.IoHandler;
37 import org.apache.mina.core.service.IoProcessor;
38 import org.apache.mina.core.service.SimpleIoProcessorPool;
39 import org.apache.mina.core.session.AbstractIoSession;
40 import org.apache.mina.core.session.IoSession;
41 import org.apache.mina.core.session.IoSessionConfig;
42 import org.apache.mina.core.session.IoSessionInitializer;
43 import org.apache.mina.transport.socket.nio.NioSocketConnector;
44 import org.apache.mina.util.ExceptionMonitor;
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62 public abstract class AbstractPollingIoConnector<T extends AbstractIoSession, H>
63 extends AbstractIoConnector {
64
65 private final Object lock = new Object();
66 private final Queue<ConnectionRequest> connectQueue = new ConcurrentLinkedQueue<ConnectionRequest>();
67 private final Queue<ConnectionRequest> cancelQueue = new ConcurrentLinkedQueue<ConnectionRequest>();
68 private final IoProcessor<T> processor;
69 private final boolean createdProcessor;
70
71 private final ServiceOperationFuture disposalFuture =
72 new ServiceOperationFuture();
73 private volatile boolean selectable;
74
75
76 private Connector connector;
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91 protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Class<? extends IoProcessor<T>> processorClass) {
92 this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass), true);
93 }
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109 protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Class<? extends IoProcessor<T>> processorClass, int processorCount) {
110 this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass, processorCount), true);
111 }
112
113
114
115
116
117
118
119
120
121
122
123
124
125 protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, IoProcessor<T> processor) {
126 this(sessionConfig, null, processor, false);
127 }
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145 protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Executor executor, IoProcessor<T> processor) {
146 this(sessionConfig, executor, processor, false);
147 }
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166 private AbstractPollingIoConnector(IoSessionConfig sessionConfig, Executor executor, IoProcessor<T> processor, boolean createdProcessor) {
167 super(sessionConfig, executor);
168
169 if (processor == null) {
170 throw new IllegalArgumentException("processor");
171 }
172
173 this.processor = processor;
174 this.createdProcessor = createdProcessor;
175
176 try {
177 init();
178 selectable = true;
179 } catch (RuntimeException e){
180 throw e;
181 } catch (Exception e) {
182 throw new RuntimeIoException("Failed to initialize.", e);
183 } finally {
184 if (!selectable) {
185 try {
186 destroy();
187 } catch (Exception e) {
188 ExceptionMonitor.getInstance().exceptionCaught(e);
189 }
190 }
191 }
192 }
193
194
195
196
197
198 protected abstract void init() throws Exception;
199
200
201
202
203
204
205 protected abstract void destroy() throws Exception;
206
207
208
209
210
211
212
213 protected abstract H newHandle(SocketAddress localAddress) throws Exception;
214
215
216
217
218
219
220
221
222
223
224
225 protected abstract boolean connect(H handle, SocketAddress remoteAddress) throws Exception;
226
227
228
229
230
231
232
233
234
235 protected abstract boolean finishConnect(H handle) throws Exception;
236
237
238
239
240
241
242
243
244
245
246 protected abstract T newSession(IoProcessor<T> processor, H handle) throws Exception;
247
248
249
250
251
252
253 protected abstract void close(H handle) throws Exception;
254
255
256
257
258 protected abstract void wakeup();
259
260
261
262
263
264
265
266
267 protected abstract int select(int timeout) throws Exception;
268
269
270
271
272
273
274 protected abstract Iterator<H> selectedHandles();
275
276
277
278
279
280 protected abstract Iterator<H> allHandles();
281
282
283
284
285
286
287
288 protected abstract void register(H handle, ConnectionRequest request) throws Exception;
289
290
291
292
293
294
295 protected abstract ConnectionRequest getConnectionRequest(H handle);
296
297
298
299
300 @Override
301 protected final void dispose0() throws Exception {
302 startupWorker();
303 wakeup();
304 }
305
306
307
308
309 @Override
310 @SuppressWarnings("unchecked")
311 protected final ConnectFuture connect0(
312 SocketAddress remoteAddress, SocketAddress localAddress,
313 IoSessionInitializer<? extends ConnectFuture> sessionInitializer) {
314 H handle = null;
315 boolean success = false;
316 try {
317 handle = newHandle(localAddress);
318 if (connect(handle, remoteAddress)) {
319 ConnectFuture future = new DefaultConnectFuture();
320 T session = newSession(processor, handle);
321 initSession(session, future, sessionInitializer);
322
323 session.getProcessor().add(session);
324 success = true;
325 return future;
326 }
327
328 success = true;
329 } catch (Exception e) {
330 return DefaultConnectFuture.newFailedFuture(e);
331 } finally {
332 if (!success && handle != null) {
333 try {
334 close(handle);
335 } catch (Exception e) {
336 ExceptionMonitor.getInstance().exceptionCaught(e);
337 }
338 }
339 }
340
341 ConnectionRequest request = new ConnectionRequest(handle, sessionInitializer);
342 connectQueue.add(request);
343 startupWorker();
344 wakeup();
345
346 return request;
347 }
348
349 private void startupWorker() {
350 if (!selectable) {
351 connectQueue.clear();
352 cancelQueue.clear();
353 }
354
355 synchronized (lock) {
356 if (connector == null) {
357 connector = new Connector();
358 executeWorker(connector);
359 }
360 }
361 }
362
363 private int registerNew() {
364 int nHandles = 0;
365 for (; ;) {
366 ConnectionRequest req = connectQueue.poll();
367 if (req == null) {
368 break;
369 }
370
371 H handle = req.handle;
372 try {
373 register(handle, req);
374 nHandles ++;
375 } catch (Exception e) {
376 req.setException(e);
377 try {
378 close(handle);
379 } catch (Exception e2) {
380 ExceptionMonitor.getInstance().exceptionCaught(e2);
381 }
382 }
383 }
384 return nHandles;
385 }
386
387 private int cancelKeys() {
388 int nHandles = 0;
389 for (; ;) {
390 ConnectionRequest req = cancelQueue.poll();
391 if (req == null) {
392 break;
393 }
394
395 H handle = req.handle;
396 try {
397 close(handle);
398 } catch (Exception e) {
399 ExceptionMonitor.getInstance().exceptionCaught(e);
400 } finally {
401 nHandles ++;
402 }
403 }
404 return nHandles;
405 }
406
407
408
409
410
411 private int processConnections(Iterator<H> handlers) {
412 int nHandles = 0;
413
414
415 while (handlers.hasNext()) {
416 H handle = handlers.next();
417 handlers.remove();
418
419 ConnectionRequest connectionRequest = getConnectionRequest(handle);
420
421 if ( connectionRequest == null) {
422 continue;
423 }
424
425 boolean success = false;
426 try {
427 if (finishConnect(handle)) {
428 T session = newSession(processor, handle);
429 initSession(session, connectionRequest, connectionRequest.getSessionInitializer());
430
431 session.getProcessor().add(session);
432 nHandles ++;
433 }
434 success = true;
435 } catch (Throwable e) {
436 connectionRequest.setException(e);
437 } finally {
438 if (!success) {
439
440 cancelQueue.offer(connectionRequest);
441 }
442 }
443 }
444 return nHandles;
445 }
446
447 private void processTimedOutSessions(Iterator<H> handles) {
448 long currentTime = System.currentTimeMillis();
449
450 while (handles.hasNext()) {
451 H handle = handles.next();
452 ConnectionRequest connectionRequest = getConnectionRequest(handle);
453
454 if ((connectionRequest != null) && (currentTime >= connectionRequest.deadline)) {
455 connectionRequest.setException(
456 new ConnectException("Connection timed out."));
457 cancelQueue.offer(connectionRequest);
458 }
459 }
460 }
461
462 private class Connector implements Runnable {
463
464 public void run() {
465 int nHandles = 0;
466 while (selectable) {
467 try {
468
469
470 int timeout = (int)Math.min(getConnectTimeoutMillis(), 1000L);
471 int selected = select(timeout);
472
473 nHandles += registerNew();
474
475 if (selected > 0) {
476 nHandles -= processConnections(selectedHandles());
477 }
478
479 processTimedOutSessions(allHandles());
480
481 nHandles -= cancelKeys();
482
483 if (nHandles == 0) {
484 synchronized (lock) {
485 if (connectQueue.isEmpty()) {
486 connector = null;
487 break;
488 }
489 }
490 }
491 } catch (Throwable e) {
492 ExceptionMonitor.getInstance().exceptionCaught(e);
493
494 try {
495 Thread.sleep(1000);
496 } catch (InterruptedException e1) {
497 ExceptionMonitor.getInstance().exceptionCaught(e1);
498 }
499 }
500 }
501
502 if (selectable && isDisposing()) {
503 selectable = false;
504 try {
505 if (createdProcessor) {
506 processor.dispose();
507 }
508 } finally {
509 try {
510 synchronized (disposalLock) {
511 if (isDisposing()) {
512 destroy();
513 }
514 }
515 } catch (Exception e) {
516 ExceptionMonitor.getInstance().exceptionCaught(e);
517 } finally {
518 disposalFuture.setDone();
519 }
520 }
521 }
522 }
523 }
524
525 public final class ConnectionRequest extends DefaultConnectFuture {
526 private final H handle;
527 private final long deadline;
528 private final IoSessionInitializer<? extends ConnectFuture> sessionInitializer;
529
530 public ConnectionRequest(H handle, IoSessionInitializer<? extends ConnectFuture> callback) {
531 this.handle = handle;
532 long timeout = getConnectTimeoutMillis();
533 if (timeout <= 0L) {
534 this.deadline = Long.MAX_VALUE;
535 } else {
536 this.deadline = System.currentTimeMillis() + timeout;
537 }
538 this.sessionInitializer = callback;
539 }
540
541 public H getHandle() {
542 return handle;
543 }
544
545 public long getDeadline() {
546 return deadline;
547 }
548
549 public IoSessionInitializer<? extends ConnectFuture> getSessionInitializer() {
550 return sessionInitializer;
551 }
552
553 @Override
554 public void cancel() {
555 if ( !isDone() ) {
556 super.cancel();
557 cancelQueue.add(this);
558 startupWorker();
559 wakeup();
560 }
561 }
562 }
563 }