1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.socket.nio;
21
22 import java.io.IOException;
23 import java.nio.channels.SelectionKey;
24 import java.nio.channels.Selector;
25 import java.nio.channels.SocketChannel;
26 import java.util.Iterator;
27 import java.util.Set;
28
29 import org.apache.mina.common.ByteBuffer;
30 import org.apache.mina.common.ExceptionMonitor;
31 import org.apache.mina.common.IdleStatus;
32 import org.apache.mina.common.IoFilter.WriteRequest;
33 import org.apache.mina.common.WriteTimeoutException;
34 import org.apache.mina.util.NamePreservingRunnable;
35 import org.apache.mina.util.Queue;
36 import edu.emory.mathcs.backport.java.util.concurrent.Executor;
37
38
39
40
41
42
43
44 class SocketIoProcessor {
45 private final Object lock = new Object();
46
47 private final String threadName;
48
49 private final Executor executor;
50
51
52
53
54 private Selector selector;
55
56 private final Queue newSessions = new Queue();
57
58 private final Queue removingSessions = new Queue();
59
60 private final Queue flushingSessions = new Queue();
61
62 private final Queue trafficControllingSessions = new Queue();
63
64 private Worker worker;
65
66 private long lastIdleCheckTime = System.currentTimeMillis();
67
68 SocketIoProcessor(String threadName, Executor executor) {
69 this.threadName = threadName;
70 this.executor = executor;
71 }
72
73 void addNew(SocketSessionImpl session) throws IOException {
74 synchronized (newSessions) {
75 newSessions.push(session);
76 }
77
78 startupWorker();
79 }
80
81 void remove(SocketSessionImpl session) throws IOException {
82 scheduleRemove(session);
83 startupWorker();
84 }
85
86 private Selector getSelector() {
87 synchronized (lock) {
88 return this.selector;
89 }
90 }
91
92 private void startupWorker() throws IOException {
93 synchronized (lock) {
94 if (worker == null) {
95 selector = Selector.open();
96 worker = new Worker();
97 executor.execute(new NamePreservingRunnable(worker));
98 }
99 selector.wakeup();
100 }
101 }
102
103 void flush(SocketSessionImpl session) {
104 if (scheduleFlush(session)) {
105 Selector selector = getSelector();
106 if (selector != null) {
107 selector.wakeup();
108 }
109 }
110 }
111
112 void updateTrafficMask(SocketSessionImpl session) {
113 scheduleTrafficControl(session);
114 Selector selector = getSelector();
115 if (selector != null) {
116 selector.wakeup();
117 }
118 }
119
120 private void scheduleRemove(SocketSessionImpl session) {
121 synchronized (removingSessions) {
122 removingSessions.push(session);
123 }
124 }
125
126 private boolean scheduleFlush(SocketSessionImpl session) {
127 if (session.setScheduledForFlush(true)) {
128 synchronized (flushingSessions) {
129 flushingSessions.push(session);
130 }
131
132 return true;
133 }
134
135 return false;
136 }
137
138 private void scheduleTrafficControl(SocketSessionImpl session) {
139 synchronized (trafficControllingSessions) {
140 trafficControllingSessions.push(session);
141 }
142 }
143
144 private void doAddNew() {
145 if (newSessions.isEmpty())
146 return;
147
148 Selector selector = getSelector();
149 for (;;) {
150 SocketSessionImpl session;
151
152 synchronized (newSessions) {
153 session = (SocketSessionImpl) newSessions.pop();
154 }
155
156 if (session == null)
157 break;
158
159 SocketChannel ch = session.getChannel();
160 try {
161 ch.configureBlocking(false);
162 session.setSelectionKey(ch.register(selector,
163 SelectionKey.OP_READ, session));
164
165
166
167 session.getServiceListeners().fireSessionCreated(session);
168 } catch (IOException e) {
169
170
171 session.getFilterChain().fireExceptionCaught(session, e);
172 }
173 }
174 }
175
176 private void doRemove() {
177 if (removingSessions.isEmpty())
178 return;
179
180 for (;;) {
181 SocketSessionImpl session;
182
183 synchronized (removingSessions) {
184 session = (SocketSessionImpl) removingSessions.pop();
185 }
186
187 if (session == null)
188 break;
189
190 SocketChannel ch = session.getChannel();
191 SelectionKey key = session.getSelectionKey();
192
193
194 if (key == null) {
195 scheduleRemove(session);
196 break;
197 }
198
199 if (!key.isValid()) {
200 continue;
201 }
202
203 try {
204 key.cancel();
205 ch.close();
206 } catch (IOException e) {
207 session.getFilterChain().fireExceptionCaught(session, e);
208 } finally {
209 releaseWriteBuffers(session);
210 session.getServiceListeners().fireSessionDestroyed(session);
211 }
212 }
213 }
214
215 private void process(Set selectedKeys) {
216 Iterator it = selectedKeys.iterator();
217
218 while (it.hasNext()) {
219 SelectionKey key = (SelectionKey) it.next();
220 SocketSessionImpl session = (SocketSessionImpl) key.attachment();
221
222 if (key.isReadable() && session.getTrafficMask().isReadable()) {
223 read(session);
224 }
225
226 if (key.isWritable() && session.getTrafficMask().isWritable()) {
227 scheduleFlush(session);
228 }
229 }
230
231 selectedKeys.clear();
232 }
233
234 private void read(SocketSessionImpl session) {
235 ByteBuffer buf = ByteBuffer.allocate(session.getReadBufferSize());
236 SocketChannel ch = session.getChannel();
237
238 try {
239 int readBytes = 0;
240 int ret;
241
242 try {
243 while ((ret = ch.read(buf.buf())) > 0) {
244 readBytes += ret;
245 }
246 } finally {
247 buf.flip();
248 }
249
250 session.increaseReadBytes(readBytes);
251
252 if (readBytes > 0) {
253 session.getFilterChain().fireMessageReceived(session, buf);
254 buf = null;
255
256 if (readBytes * 2 < session.getReadBufferSize()) {
257 if (session.getReadBufferSize() > 64) {
258 session.setReadBufferSize(session.getReadBufferSize() >>> 1);
259 }
260 } else if (readBytes == session.getReadBufferSize()) {
261 int newReadBufferSize = session.getReadBufferSize() << 1;
262 if (newReadBufferSize <= (((SocketSessionConfig) session.getConfig()).getReceiveBufferSize() << 1)) {
263
264
265
266 session.setReadBufferSize(newReadBufferSize);
267 }
268 }
269 }
270 if (ret < 0) {
271 scheduleRemove(session);
272 }
273 } catch (Throwable e) {
274 if (e instanceof IOException)
275 scheduleRemove(session);
276 session.getFilterChain().fireExceptionCaught(session, e);
277 } finally {
278 if (buf != null)
279 buf.release();
280 }
281 }
282
283 private void notifyIdleness() {
284
285 long currentTime = System.currentTimeMillis();
286 if ((currentTime - lastIdleCheckTime) >= 1000) {
287 lastIdleCheckTime = currentTime;
288 Selector selector = getSelector();
289 Set keys = selector.keys();
290 if (keys != null) {
291 for (Iterator it = keys.iterator(); it.hasNext();) {
292 SelectionKey key = (SelectionKey) it.next();
293 SocketSessionImpl session = (SocketSessionImpl) key
294 .attachment();
295 notifyIdleness(session, currentTime);
296 }
297 }
298 }
299 }
300
301 private void notifyIdleness(SocketSessionImpl session, long currentTime) {
302 notifyIdleness0(session, currentTime, session
303 .getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
304 IdleStatus.BOTH_IDLE, Math.max(session.getLastIoTime(), session
305 .getLastIdleTime(IdleStatus.BOTH_IDLE)));
306 notifyIdleness0(session, currentTime, session
307 .getIdleTimeInMillis(IdleStatus.READER_IDLE),
308 IdleStatus.READER_IDLE, Math.max(session.getLastReadTime(),
309 session.getLastIdleTime(IdleStatus.READER_IDLE)));
310 notifyIdleness0(session, currentTime, session
311 .getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
312 IdleStatus.WRITER_IDLE, Math.max(session.getLastWriteTime(),
313 session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
314
315 notifyWriteTimeout(session, currentTime, session
316 .getWriteTimeoutInMillis(), session.getLastWriteTime());
317 }
318
319 private void notifyIdleness0(SocketSessionImpl session, long currentTime,
320 long idleTime, IdleStatus status, long lastIoTime) {
321 if (idleTime > 0 && lastIoTime != 0
322 && (currentTime - lastIoTime) >= idleTime) {
323 session.increaseIdleCount(status);
324 session.getFilterChain().fireSessionIdle(session, status);
325 }
326 }
327
328 private void notifyWriteTimeout(SocketSessionImpl session,
329 long currentTime, long writeTimeout, long lastIoTime) {
330 SelectionKey key = session.getSelectionKey();
331 if (writeTimeout > 0 && (currentTime - lastIoTime) >= writeTimeout
332 && key != null && key.isValid()
333 && (key.interestOps() & SelectionKey.OP_WRITE) != 0) {
334 session.getFilterChain().fireExceptionCaught(session,
335 new WriteTimeoutException());
336 }
337 }
338
339 private void doFlush() {
340 if (flushingSessions.size() == 0)
341 return;
342
343 for (;;) {
344 SocketSessionImpl session;
345
346 synchronized (flushingSessions) {
347 session = (SocketSessionImpl) flushingSessions.pop();
348 }
349
350 if (session == null)
351 break;
352
353 session.setScheduledForFlush(false);
354
355 if (!session.isConnected()) {
356 releaseWriteBuffers(session);
357 continue;
358 }
359
360 SelectionKey key = session.getSelectionKey();
361
362
363 if (key == null) {
364 scheduleFlush(session);
365 break;
366 }
367
368
369 if (!key.isValid()) {
370 continue;
371 }
372
373 try {
374 boolean flushedAll = doFlush(session);
375 if (flushedAll && !session.getWriteRequestQueue().isEmpty() && !session.isScheduledForFlush()) {
376 scheduleFlush(session);
377 }
378 } catch (IOException e) {
379 scheduleRemove(session);
380 session.getFilterChain().fireExceptionCaught(session, e);
381 }
382 }
383 }
384
385 private void releaseWriteBuffers(SocketSessionImpl session) {
386 Queue writeRequestQueue = session.getWriteRequestQueue();
387 WriteRequest req;
388
389 if ((req = (WriteRequest) writeRequestQueue.pop()) != null) {
390 ByteBuffer buf = (ByteBuffer) req.getMessage();
391 try {
392 buf.release();
393 } catch (IllegalStateException e) {
394 session.getFilterChain().fireExceptionCaught(session, e);
395 } finally {
396
397
398 if (buf.hasRemaining()) {
399 req.getFuture().setWritten(false);
400 } else {
401 session.getFilterChain().fireMessageSent(session, req);
402 }
403 }
404
405
406 while ((req = (WriteRequest) writeRequestQueue.pop()) != null) {
407 try {
408 ((ByteBuffer) req.getMessage()).release();
409 } catch (IllegalStateException e) {
410 session.getFilterChain().fireExceptionCaught(session, e);
411 } finally {
412 req.getFuture().setWritten(false);
413 }
414 }
415 }
416 }
417
418 private boolean doFlush(SocketSessionImpl session) throws IOException {
419
420 SelectionKey key = session.getSelectionKey();
421 key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
422
423 SocketChannel ch = session.getChannel();
424 Queue writeRequestQueue = session.getWriteRequestQueue();
425
426 int writtenBytes = 0;
427 int maxWrittenBytes = ((SocketSessionConfig) session.getConfig()).getSendBufferSize() << 1;
428 try {
429 for (;;) {
430 WriteRequest req;
431
432 synchronized (writeRequestQueue) {
433 req = (WriteRequest) writeRequestQueue.first();
434 }
435
436 if (req == null)
437 break;
438
439 ByteBuffer buf = (ByteBuffer) req.getMessage();
440 if (buf.remaining() == 0) {
441 synchronized (writeRequestQueue) {
442 writeRequestQueue.pop();
443 }
444
445 session.increaseWrittenMessages();
446
447 buf.reset();
448 session.getFilterChain().fireMessageSent(session, req);
449 continue;
450 }
451
452 if (key.isWritable()) {
453 writtenBytes += ch.write(buf.buf());
454 }
455
456 if (buf.hasRemaining() || writtenBytes >= maxWrittenBytes) {
457
458 key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
459 return false;
460 }
461 }
462 } finally {
463 session.increaseWrittenBytes(writtenBytes);
464 }
465
466 return true;
467 }
468
469 private void doUpdateTrafficMask() {
470 if (trafficControllingSessions.isEmpty())
471 return;
472
473 for (;;) {
474 SocketSessionImpl session;
475
476 synchronized (trafficControllingSessions) {
477 session = (SocketSessionImpl) trafficControllingSessions.pop();
478 }
479
480 if (session == null)
481 break;
482
483 SelectionKey key = session.getSelectionKey();
484
485
486
487 if (key == null) {
488 scheduleTrafficControl(session);
489 break;
490 }
491
492 if (!key.isValid()) {
493 continue;
494 }
495
496
497
498 int ops = SelectionKey.OP_READ;
499 Queue writeRequestQueue = session.getWriteRequestQueue();
500 synchronized (writeRequestQueue) {
501 if (!writeRequestQueue.isEmpty()) {
502 ops |= SelectionKey.OP_WRITE;
503 }
504 }
505
506
507 int mask = session.getTrafficMask().getInterestOps();
508 key.interestOps(ops & mask);
509 }
510 }
511
512 private class Worker implements Runnable {
513 public void run() {
514 Thread.currentThread().setName(SocketIoProcessor.this.threadName);
515
516 Selector selector = getSelector();
517 for (;;) {
518 try {
519 int nKeys = selector.select(1000);
520 doAddNew();
521 doUpdateTrafficMask();
522
523 if (nKeys > 0) {
524 process(selector.selectedKeys());
525 }
526
527 doFlush();
528 doRemove();
529 notifyIdleness();
530
531 if (selector.keys().isEmpty()) {
532 synchronized (lock) {
533 if (selector.keys().isEmpty()
534 && newSessions.isEmpty()) {
535 worker = null;
536
537 try {
538 selector.close();
539 } catch (IOException e) {
540 ExceptionMonitor.getInstance()
541 .exceptionCaught(e);
542 } finally {
543 selector = null;
544 }
545
546 break;
547 }
548 }
549 }
550 } catch (Throwable t) {
551 ExceptionMonitor.getInstance().exceptionCaught(t);
552
553 try {
554 Thread.sleep(1000);
555 } catch (InterruptedException e1) {
556 ExceptionMonitor.getInstance().exceptionCaught(e1);
557 }
558 }
559 }
560 }
561 }
562 }