1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.socket.nio;
21
22 import java.io.IOException;
23 import java.nio.channels.SelectionKey;
24 import java.nio.channels.Selector;
25 import java.nio.channels.SocketChannel;
26 import java.util.Iterator;
27 import java.util.Set;
28
29 import org.apache.mina.common.ByteBuffer;
30 import org.apache.mina.common.ExceptionMonitor;
31 import org.apache.mina.common.IdleStatus;
32 import org.apache.mina.common.IoFilter.WriteRequest;
33 import org.apache.mina.common.WriteTimeoutException;
34 import org.apache.mina.util.NamePreservingRunnable;
35 import org.apache.mina.util.Queue;
36 import edu.emory.mathcs.backport.java.util.concurrent.Executor;
37
38
39
40
41
42
43
44 class SocketIoProcessor {
45 private final Object lock = new Object();
46
47 private final String threadName;
48
49 private final Executor executor;
50
51
52
53
54 private Selector selector;
55
56 private final Queue newSessions = new Queue();
57
58 private final Queue removingSessions = new Queue();
59
60 private final Queue flushingSessions = new Queue();
61
62 private final Queue trafficControllingSessions = new Queue();
63
64 private Worker worker;
65
66 private long lastIdleCheckTime = System.currentTimeMillis();
67
68 SocketIoProcessor(String threadName, Executor executor) {
69 this.threadName = threadName;
70 this.executor = executor;
71 }
72
73 void addNew(SocketSessionImpl session) throws IOException {
74 synchronized (newSessions) {
75 newSessions.push(session);
76 }
77
78 startupWorker();
79 }
80
81 void remove(SocketSessionImpl session) throws IOException {
82 scheduleRemove(session);
83 startupWorker();
84 }
85
86 private Selector getSelector() {
87 synchronized (lock) {
88 return this.selector;
89 }
90 }
91
92 private void startupWorker() throws IOException {
93 synchronized (lock) {
94 if (worker == null) {
95 selector = Selector.open();
96 worker = new Worker();
97 executor.execute(new NamePreservingRunnable(worker));
98 }
99 selector.wakeup();
100 }
101 }
102
103 void flush(SocketSessionImpl session) {
104 scheduleFlush(session);
105 Selector selector = getSelector();
106 if (selector != null) {
107 selector.wakeup();
108 }
109 }
110
111 void updateTrafficMask(SocketSessionImpl session) {
112 scheduleTrafficControl(session);
113 Selector selector = getSelector();
114 if (selector != null) {
115 selector.wakeup();
116 }
117 }
118
119 private void scheduleRemove(SocketSessionImpl session) {
120 synchronized (removingSessions) {
121 removingSessions.push(session);
122 }
123 }
124
125 private void scheduleFlush(SocketSessionImpl session) {
126 synchronized (flushingSessions) {
127 flushingSessions.push(session);
128 }
129 }
130
131 private void scheduleTrafficControl(SocketSessionImpl session) {
132 synchronized (trafficControllingSessions) {
133 trafficControllingSessions.push(session);
134 }
135 }
136
137 private void doAddNew() {
138 if (newSessions.isEmpty())
139 return;
140
141 Selector selector = getSelector();
142 for (;;) {
143 SocketSessionImpl session;
144
145 synchronized (newSessions) {
146 session = (SocketSessionImpl) newSessions.pop();
147 }
148
149 if (session == null)
150 break;
151
152 SocketChannel ch = session.getChannel();
153 try {
154 ch.configureBlocking(false);
155 session.setSelectionKey(ch.register(selector,
156 SelectionKey.OP_READ, session));
157
158
159
160 session.getServiceListeners().fireSessionCreated(session);
161 } catch (IOException e) {
162
163
164 session.getFilterChain().fireExceptionCaught(session, e);
165 }
166 }
167 }
168
169 private void doRemove() {
170 if (removingSessions.isEmpty())
171 return;
172
173 for (;;) {
174 SocketSessionImpl session;
175
176 synchronized (removingSessions) {
177 session = (SocketSessionImpl) removingSessions.pop();
178 }
179
180 if (session == null)
181 break;
182
183 SocketChannel ch = session.getChannel();
184 SelectionKey key = session.getSelectionKey();
185
186
187 if (key == null) {
188 scheduleRemove(session);
189 break;
190 }
191
192 if (!key.isValid()) {
193 continue;
194 }
195
196 try {
197 key.cancel();
198 ch.close();
199 } catch (IOException e) {
200 session.getFilterChain().fireExceptionCaught(session, e);
201 } finally {
202 releaseWriteBuffers(session);
203 session.getServiceListeners().fireSessionDestroyed(session);
204 }
205 }
206 }
207
208 private void process(Set selectedKeys) {
209 Iterator it = selectedKeys.iterator();
210
211 while (it.hasNext()) {
212 SelectionKey key = (SelectionKey) it.next();
213 SocketSessionImpl session = (SocketSessionImpl) key.attachment();
214
215 if (key.isReadable() && session.getTrafficMask().isReadable()) {
216 read(session);
217 }
218
219 if (key.isWritable() && session.getTrafficMask().isWritable()) {
220 scheduleFlush(session);
221 }
222 }
223
224 selectedKeys.clear();
225 }
226
227 private void read(SocketSessionImpl session) {
228 ByteBuffer buf = ByteBuffer.allocate(session.getReadBufferSize());
229 SocketChannel ch = session.getChannel();
230
231 try {
232 int readBytes = 0;
233 int ret;
234
235 try {
236 while ((ret = ch.read(buf.buf())) > 0) {
237 readBytes += ret;
238 }
239 } finally {
240 buf.flip();
241 }
242
243 session.increaseReadBytes(readBytes);
244
245 if (readBytes > 0) {
246 session.getFilterChain().fireMessageReceived(session, buf);
247 buf = null;
248
249 if (readBytes * 2 < session.getReadBufferSize()) {
250 if (session.getReadBufferSize() > 64) {
251 session.setReadBufferSize(session.getReadBufferSize() >>> 1);
252 }
253 } else if (readBytes == session.getReadBufferSize()) {
254 session.setReadBufferSize(session.getReadBufferSize() << 1);
255 }
256 }
257 if (ret < 0) {
258 scheduleRemove(session);
259 }
260 } catch (Throwable e) {
261 if (e instanceof IOException)
262 scheduleRemove(session);
263 session.getFilterChain().fireExceptionCaught(session, e);
264 } finally {
265 if (buf != null)
266 buf.release();
267 }
268 }
269
270 private void notifyIdleness() {
271
272 long currentTime = System.currentTimeMillis();
273 if ((currentTime - lastIdleCheckTime) >= 1000) {
274 lastIdleCheckTime = currentTime;
275 Selector selector = getSelector();
276 Set keys = selector.keys();
277 if (keys != null) {
278 for (Iterator it = keys.iterator(); it.hasNext();) {
279 SelectionKey key = (SelectionKey) it.next();
280 SocketSessionImpl session = (SocketSessionImpl) key
281 .attachment();
282 notifyIdleness(session, currentTime);
283 }
284 }
285 }
286 }
287
288 private void notifyIdleness(SocketSessionImpl session, long currentTime) {
289 notifyIdleness0(session, currentTime, session
290 .getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
291 IdleStatus.BOTH_IDLE, Math.max(session.getLastIoTime(), session
292 .getLastIdleTime(IdleStatus.BOTH_IDLE)));
293 notifyIdleness0(session, currentTime, session
294 .getIdleTimeInMillis(IdleStatus.READER_IDLE),
295 IdleStatus.READER_IDLE, Math.max(session.getLastReadTime(),
296 session.getLastIdleTime(IdleStatus.READER_IDLE)));
297 notifyIdleness0(session, currentTime, session
298 .getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
299 IdleStatus.WRITER_IDLE, Math.max(session.getLastWriteTime(),
300 session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
301
302 notifyWriteTimeout(session, currentTime, session
303 .getWriteTimeoutInMillis(), session.getLastWriteTime());
304 }
305
306 private void notifyIdleness0(SocketSessionImpl session, long currentTime,
307 long idleTime, IdleStatus status, long lastIoTime) {
308 if (idleTime > 0 && lastIoTime != 0
309 && (currentTime - lastIoTime) >= idleTime) {
310 session.increaseIdleCount(status);
311 session.getFilterChain().fireSessionIdle(session, status);
312 }
313 }
314
315 private void notifyWriteTimeout(SocketSessionImpl session,
316 long currentTime, long writeTimeout, long lastIoTime) {
317 SelectionKey key = session.getSelectionKey();
318 if (writeTimeout > 0 && (currentTime - lastIoTime) >= writeTimeout
319 && key != null && key.isValid()
320 && (key.interestOps() & SelectionKey.OP_WRITE) != 0) {
321 session.getFilterChain().fireExceptionCaught(session,
322 new WriteTimeoutException());
323 }
324 }
325
326 private void doFlush() {
327 if (flushingSessions.size() == 0)
328 return;
329
330 for (;;) {
331 SocketSessionImpl session;
332
333 synchronized (flushingSessions) {
334 session = (SocketSessionImpl) flushingSessions.pop();
335 }
336
337 if (session == null)
338 break;
339
340 if (!session.isConnected()) {
341 releaseWriteBuffers(session);
342 continue;
343 }
344
345 SelectionKey key = session.getSelectionKey();
346
347
348 if (key == null) {
349 scheduleFlush(session);
350 break;
351 }
352
353
354 if (!key.isValid()) {
355 continue;
356 }
357
358 try {
359 doFlush(session);
360 } catch (IOException e) {
361 scheduleRemove(session);
362 session.getFilterChain().fireExceptionCaught(session, e);
363 }
364 }
365 }
366
367 private void releaseWriteBuffers(SocketSessionImpl session) {
368 Queue writeRequestQueue = session.getWriteRequestQueue();
369 WriteRequest req;
370
371 if ((req = (WriteRequest) writeRequestQueue.pop()) != null) {
372 ByteBuffer buf = (ByteBuffer) req.getMessage();
373 try {
374 buf.release();
375 } catch (IllegalStateException e) {
376 session.getFilterChain().fireExceptionCaught(session, e);
377 } finally {
378
379
380 if (buf.hasRemaining()) {
381 req.getFuture().setWritten(false);
382 } else {
383 session.getFilterChain().fireMessageSent(session, req);
384 }
385 }
386
387
388 while ((req = (WriteRequest) writeRequestQueue.pop()) != null) {
389 try {
390 ((ByteBuffer) req.getMessage()).release();
391 } catch (IllegalStateException e) {
392 session.getFilterChain().fireExceptionCaught(session, e);
393 } finally {
394 req.getFuture().setWritten(false);
395 }
396 }
397 }
398 }
399
400 private void doFlush(SocketSessionImpl session) throws IOException {
401
402 SelectionKey key = session.getSelectionKey();
403 key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
404
405 SocketChannel ch = session.getChannel();
406 Queue writeRequestQueue = session.getWriteRequestQueue();
407
408 for (;;) {
409 WriteRequest req;
410
411 synchronized (writeRequestQueue) {
412 req = (WriteRequest) writeRequestQueue.first();
413 }
414
415 if (req == null)
416 break;
417
418 ByteBuffer buf = (ByteBuffer) req.getMessage();
419 if (buf.remaining() == 0) {
420 synchronized (writeRequestQueue) {
421 writeRequestQueue.pop();
422 }
423
424 session.increaseWrittenMessages();
425
426 buf.reset();
427 session.getFilterChain().fireMessageSent(session, req);
428 continue;
429 }
430
431 if (key.isWritable()) {
432 int writtenBytes = ch.write(buf.buf());
433 if (writtenBytes > 0) {
434 session.increaseWrittenBytes(writtenBytes);
435 }
436 }
437
438 if (buf.hasRemaining()) {
439
440 key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
441 break;
442 }
443 }
444 }
445
446 private void doUpdateTrafficMask() {
447 if (trafficControllingSessions.isEmpty())
448 return;
449
450 for (;;) {
451 SocketSessionImpl session;
452
453 synchronized (trafficControllingSessions) {
454 session = (SocketSessionImpl) trafficControllingSessions.pop();
455 }
456
457 if (session == null)
458 break;
459
460 SelectionKey key = session.getSelectionKey();
461
462
463
464 if (key == null) {
465 scheduleTrafficControl(session);
466 break;
467 }
468
469 if (!key.isValid()) {
470 continue;
471 }
472
473
474
475 int ops = SelectionKey.OP_READ;
476 Queue writeRequestQueue = session.getWriteRequestQueue();
477 synchronized (writeRequestQueue) {
478 if (!writeRequestQueue.isEmpty()) {
479 ops |= SelectionKey.OP_WRITE;
480 }
481 }
482
483
484 int mask = session.getTrafficMask().getInterestOps();
485 key.interestOps(ops & mask);
486 }
487 }
488
489 private class Worker implements Runnable {
490 public void run() {
491 Thread.currentThread().setName(SocketIoProcessor.this.threadName);
492
493 Selector selector = getSelector();
494 for (;;) {
495 try {
496 int nKeys = selector.select(1000);
497 doAddNew();
498 doUpdateTrafficMask();
499
500 if (nKeys > 0) {
501 process(selector.selectedKeys());
502 }
503
504 doFlush();
505 doRemove();
506 notifyIdleness();
507
508 if (selector.keys().isEmpty()) {
509 synchronized (lock) {
510 if (selector.keys().isEmpty()
511 && newSessions.isEmpty()) {
512 worker = null;
513
514 try {
515 selector.close();
516 } catch (IOException e) {
517 ExceptionMonitor.getInstance()
518 .exceptionCaught(e);
519 } finally {
520 selector = null;
521 }
522
523 break;
524 }
525 }
526 }
527 } catch (Throwable t) {
528 ExceptionMonitor.getInstance().exceptionCaught(t);
529
530 try {
531 Thread.sleep(1000);
532 } catch (InterruptedException e1) {
533 ExceptionMonitor.getInstance().exceptionCaught(e1);
534 }
535 }
536 }
537 }
538 }
539 }