1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.socket.nio;
21
22 import java.io.IOException;
23 import java.nio.channels.SelectionKey;
24 import java.nio.channels.Selector;
25 import java.nio.channels.SocketChannel;
26 import java.util.Iterator;
27 import java.util.Set;
28
29 import org.apache.mina.common.ByteBuffer;
30 import org.apache.mina.common.ExceptionMonitor;
31 import org.apache.mina.common.IdleStatus;
32 import org.apache.mina.common.IoFilter.WriteRequest;
33 import org.apache.mina.common.WriteTimeoutException;
34 import org.apache.mina.util.NamePreservingRunnable;
35 import org.apache.mina.util.Queue;
36 import edu.emory.mathcs.backport.java.util.concurrent.Executor;
37
38
39
40
41
42
43
44 class SocketIoProcessor {
45 private final Object lock = new Object();
46
47 private final String threadName;
48
49 private final Executor executor;
50
51
52
53
54 private Selector selector;
55
56 private final Queue newSessions = new Queue();
57
58 private final Queue removingSessions = new Queue();
59
60 private final Queue flushingSessions = new Queue();
61
62 private final Queue trafficControllingSessions = new Queue();
63
64 private Worker worker;
65
66 private long lastIdleCheckTime = System.currentTimeMillis();
67
68 SocketIoProcessor(String threadName, Executor executor) {
69 this.threadName = threadName;
70 this.executor = executor;
71 }
72
73 void addNew(SocketSessionImpl session) throws IOException {
74 synchronized (newSessions) {
75 newSessions.push(session);
76 }
77
78 startupWorker();
79 }
80
81 void remove(SocketSessionImpl session) throws IOException {
82 scheduleRemove(session);
83 startupWorker();
84 }
85
86 private void startupWorker() throws IOException {
87 synchronized (lock) {
88 if (worker == null) {
89 selector = Selector.open();
90 worker = new Worker();
91 executor.execute(new NamePreservingRunnable(worker));
92 }
93 selector.wakeup();
94 }
95 }
96
97 void flush(SocketSessionImpl session) {
98 scheduleFlush(session);
99 Selector selector = this.selector;
100 if (selector != null) {
101 selector.wakeup();
102 }
103 }
104
105 void updateTrafficMask(SocketSessionImpl session) {
106 scheduleTrafficControl(session);
107 Selector selector = this.selector;
108 if (selector != null) {
109 selector.wakeup();
110 }
111 }
112
113 private void scheduleRemove(SocketSessionImpl session) {
114 synchronized (removingSessions) {
115 removingSessions.push(session);
116 }
117 }
118
119 private void scheduleFlush(SocketSessionImpl session) {
120 synchronized (flushingSessions) {
121 flushingSessions.push(session);
122 }
123 }
124
125 private void scheduleTrafficControl(SocketSessionImpl session) {
126 synchronized (trafficControllingSessions) {
127 trafficControllingSessions.push(session);
128 }
129 }
130
131 private void doAddNew() {
132 if (newSessions.isEmpty())
133 return;
134
135 for (;;) {
136 SocketSessionImpl session;
137
138 synchronized (newSessions) {
139 session = (SocketSessionImpl) newSessions.pop();
140 }
141
142 if (session == null)
143 break;
144
145 SocketChannel ch = session.getChannel();
146 try {
147 ch.configureBlocking(false);
148 session.setSelectionKey(ch.register(selector,
149 SelectionKey.OP_READ, session));
150
151
152
153 session.getServiceListeners().fireSessionCreated(session);
154 } catch (IOException e) {
155
156
157 session.getFilterChain().fireExceptionCaught(session, e);
158 }
159 }
160 }
161
162 private void doRemove() {
163 if (removingSessions.isEmpty())
164 return;
165
166 for (;;) {
167 SocketSessionImpl session;
168
169 synchronized (removingSessions) {
170 session = (SocketSessionImpl) removingSessions.pop();
171 }
172
173 if (session == null)
174 break;
175
176 SocketChannel ch = session.getChannel();
177 SelectionKey key = session.getSelectionKey();
178
179
180 if (key == null) {
181 scheduleRemove(session);
182 break;
183 }
184
185 if (!key.isValid()) {
186 continue;
187 }
188
189 try {
190 key.cancel();
191 ch.close();
192 } catch (IOException e) {
193 session.getFilterChain().fireExceptionCaught(session, e);
194 } finally {
195 releaseWriteBuffers(session);
196 session.getServiceListeners().fireSessionDestroyed(session);
197 }
198 }
199 }
200
201 private void process(Set selectedKeys) {
202 Iterator it = selectedKeys.iterator();
203
204 while (it.hasNext()) {
205 SelectionKey key = (SelectionKey) it.next();
206 SocketSessionImpl session = (SocketSessionImpl) key.attachment();
207
208 if (key.isReadable() && session.getTrafficMask().isReadable()) {
209 read(session);
210 }
211
212 if (key.isWritable() && session.getTrafficMask().isWritable()) {
213 scheduleFlush(session);
214 }
215 }
216
217 selectedKeys.clear();
218 }
219
220 private void read(SocketSessionImpl session) {
221 ByteBuffer buf = ByteBuffer.allocate(session.getReadBufferSize());
222 SocketChannel ch = session.getChannel();
223
224 try {
225 int readBytes = 0;
226 int ret;
227
228 try {
229 while ((ret = ch.read(buf.buf())) > 0) {
230 readBytes += ret;
231 }
232 } finally {
233 buf.flip();
234 }
235
236 session.increaseReadBytes(readBytes);
237
238 if (readBytes > 0) {
239 session.getFilterChain().fireMessageReceived(session, buf);
240 buf = null;
241
242 if (readBytes * 2 < session.getReadBufferSize()) {
243 if (session.getReadBufferSize() > 64) {
244 session.setReadBufferSize(session.getReadBufferSize() >>> 1);
245 }
246 } else if (readBytes == session.getReadBufferSize()) {
247 session.setReadBufferSize(session.getReadBufferSize() << 1);
248 }
249 }
250 if (ret < 0) {
251 scheduleRemove(session);
252 }
253 } catch (Throwable e) {
254 if (e instanceof IOException)
255 scheduleRemove(session);
256 session.getFilterChain().fireExceptionCaught(session, e);
257 } finally {
258 if (buf != null)
259 buf.release();
260 }
261 }
262
263 private void notifyIdleness() {
264
265 long currentTime = System.currentTimeMillis();
266 if ((currentTime - lastIdleCheckTime) >= 1000) {
267 lastIdleCheckTime = currentTime;
268 Set keys = selector.keys();
269 if (keys != null) {
270 for (Iterator it = keys.iterator(); it.hasNext();) {
271 SelectionKey key = (SelectionKey) it.next();
272 SocketSessionImpl session = (SocketSessionImpl) key
273 .attachment();
274 notifyIdleness(session, currentTime);
275 }
276 }
277 }
278 }
279
280 private void notifyIdleness(SocketSessionImpl session, long currentTime) {
281 notifyIdleness0(session, currentTime, session
282 .getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
283 IdleStatus.BOTH_IDLE, Math.max(session.getLastIoTime(), session
284 .getLastIdleTime(IdleStatus.BOTH_IDLE)));
285 notifyIdleness0(session, currentTime, session
286 .getIdleTimeInMillis(IdleStatus.READER_IDLE),
287 IdleStatus.READER_IDLE, Math.max(session.getLastReadTime(),
288 session.getLastIdleTime(IdleStatus.READER_IDLE)));
289 notifyIdleness0(session, currentTime, session
290 .getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
291 IdleStatus.WRITER_IDLE, Math.max(session.getLastWriteTime(),
292 session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
293
294 notifyWriteTimeout(session, currentTime, session
295 .getWriteTimeoutInMillis(), session.getLastWriteTime());
296 }
297
298 private void notifyIdleness0(SocketSessionImpl session, long currentTime,
299 long idleTime, IdleStatus status, long lastIoTime) {
300 if (idleTime > 0 && lastIoTime != 0
301 && (currentTime - lastIoTime) >= idleTime) {
302 session.increaseIdleCount(status);
303 session.getFilterChain().fireSessionIdle(session, status);
304 }
305 }
306
307 private void notifyWriteTimeout(SocketSessionImpl session,
308 long currentTime, long writeTimeout, long lastIoTime) {
309 SelectionKey key = session.getSelectionKey();
310 if (writeTimeout > 0 && (currentTime - lastIoTime) >= writeTimeout
311 && key != null && key.isValid()
312 && (key.interestOps() & SelectionKey.OP_WRITE) != 0) {
313 session.getFilterChain().fireExceptionCaught(session,
314 new WriteTimeoutException());
315 }
316 }
317
318 private void doFlush() {
319 if (flushingSessions.size() == 0)
320 return;
321
322 for (;;) {
323 SocketSessionImpl session;
324
325 synchronized (flushingSessions) {
326 session = (SocketSessionImpl) flushingSessions.pop();
327 }
328
329 if (session == null)
330 break;
331
332 if (!session.isConnected()) {
333 releaseWriteBuffers(session);
334 continue;
335 }
336
337 SelectionKey key = session.getSelectionKey();
338
339
340 if (key == null) {
341 scheduleFlush(session);
342 break;
343 }
344
345
346 if (!key.isValid()) {
347 continue;
348 }
349
350 try {
351 doFlush(session);
352 } catch (IOException e) {
353 scheduleRemove(session);
354 session.getFilterChain().fireExceptionCaught(session, e);
355 }
356 }
357 }
358
359 private void releaseWriteBuffers(SocketSessionImpl session) {
360 Queue writeRequestQueue = session.getWriteRequestQueue();
361 WriteRequest req;
362
363 while ((req = (WriteRequest) writeRequestQueue.pop()) != null) {
364 try {
365 ((ByteBuffer) req.getMessage()).release();
366 } catch (IllegalStateException e) {
367 session.getFilterChain().fireExceptionCaught(session, e);
368 } finally {
369 req.getFuture().setWritten(false);
370 }
371 }
372 }
373
374 private void doFlush(SocketSessionImpl session) throws IOException {
375
376 SelectionKey key = session.getSelectionKey();
377 key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
378
379 SocketChannel ch = session.getChannel();
380 Queue writeRequestQueue = session.getWriteRequestQueue();
381
382 for (;;) {
383 WriteRequest req;
384
385 synchronized (writeRequestQueue) {
386 req = (WriteRequest) writeRequestQueue.first();
387 }
388
389 if (req == null)
390 break;
391
392 ByteBuffer buf = (ByteBuffer) req.getMessage();
393 if (buf.remaining() == 0) {
394 synchronized (writeRequestQueue) {
395 writeRequestQueue.pop();
396 }
397
398 session.increaseWrittenMessages();
399
400 buf.reset();
401 session.getFilterChain().fireMessageSent(session, req);
402 continue;
403 }
404
405 if (key.isWritable()) {
406 int writtenBytes = ch.write(buf.buf());
407 if (writtenBytes > 0) {
408 session.increaseWrittenBytes(writtenBytes);
409 }
410 }
411
412 if (buf.hasRemaining()) {
413
414 key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
415 break;
416 }
417 }
418 }
419
420 private void doUpdateTrafficMask() {
421 if (trafficControllingSessions.isEmpty())
422 return;
423
424 for (;;) {
425 SocketSessionImpl session;
426
427 synchronized (trafficControllingSessions) {
428 session = (SocketSessionImpl) trafficControllingSessions.pop();
429 }
430
431 if (session == null)
432 break;
433
434 SelectionKey key = session.getSelectionKey();
435
436
437
438 if (key == null) {
439 scheduleTrafficControl(session);
440 break;
441 }
442
443 if (!key.isValid()) {
444 continue;
445 }
446
447
448
449 int ops = SelectionKey.OP_READ;
450 Queue writeRequestQueue = session.getWriteRequestQueue();
451 synchronized (writeRequestQueue) {
452 if (!writeRequestQueue.isEmpty()) {
453 ops |= SelectionKey.OP_WRITE;
454 }
455 }
456
457
458 int mask = session.getTrafficMask().getInterestOps();
459 key.interestOps(ops & mask);
460 }
461 }
462
463 private class Worker implements Runnable {
464 public void run() {
465 Thread.currentThread().setName(SocketIoProcessor.this.threadName);
466
467 for (;;) {
468 try {
469 int nKeys = selector.select(1000);
470 doAddNew();
471 doUpdateTrafficMask();
472
473 if (nKeys > 0) {
474 process(selector.selectedKeys());
475 }
476
477 doFlush();
478 doRemove();
479 notifyIdleness();
480
481 if (selector.keys().isEmpty()) {
482 synchronized (lock) {
483 if (selector.keys().isEmpty()
484 && newSessions.isEmpty()) {
485 worker = null;
486
487 try {
488 selector.close();
489 } catch (IOException e) {
490 ExceptionMonitor.getInstance()
491 .exceptionCaught(e);
492 } finally {
493 selector = null;
494 }
495
496 break;
497 }
498 }
499 }
500 } catch (Throwable t) {
501 ExceptionMonitor.getInstance().exceptionCaught(t);
502
503 try {
504 Thread.sleep(1000);
505 } catch (InterruptedException e1) {
506 ExceptionMonitor.getInstance().exceptionCaught(e1);
507 }
508 }
509 }
510 }
511 }
512
513 }