1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.socket.nio;
21
22 import java.io.IOException;
23 import java.nio.channels.SelectionKey;
24 import java.nio.channels.Selector;
25 import java.nio.channels.SocketChannel;
26 import java.util.Queue;
27 import java.util.Set;
28 import java.util.concurrent.ConcurrentLinkedQueue;
29 import java.util.concurrent.Executor;
30
31 import org.apache.mina.common.ByteBuffer;
32 import org.apache.mina.common.ExceptionMonitor;
33 import org.apache.mina.common.IdleStatus;
34 import org.apache.mina.common.IoFilter.WriteRequest;
35 import org.apache.mina.common.WriteTimeoutException;
36 import org.apache.mina.util.NamePreservingRunnable;
37
38
39
40
41
42
43
44 class SocketIoProcessor {
45 private final Object lock = new Object();
46
47 private final String threadName;
48
49 private final Executor executor;
50
51 private volatile Selector selector;
52
53 private final Queue<SocketSessionImpl> newSessions = new ConcurrentLinkedQueue<SocketSessionImpl>();
54
55 private final Queue<SocketSessionImpl> removingSessions = new ConcurrentLinkedQueue<SocketSessionImpl>();
56
57 private final Queue<SocketSessionImpl> flushingSessions = new ConcurrentLinkedQueue<SocketSessionImpl>();
58
59 private final Queue<SocketSessionImpl> trafficControllingSessions = new ConcurrentLinkedQueue<SocketSessionImpl>();
60
61 private Worker worker;
62
63 private long lastIdleCheckTime = System.currentTimeMillis();
64
65 SocketIoProcessor(String threadName, Executor executor) {
66 this.threadName = threadName;
67 this.executor = executor;
68 }
69
70 void addNew(SocketSessionImpl session) throws IOException {
71 newSessions.add(session);
72 startupWorker();
73 }
74
75 void remove(SocketSessionImpl session) throws IOException {
76 scheduleRemove(session);
77 startupWorker();
78 }
79
80 private void startupWorker() throws IOException {
81 synchronized (lock) {
82 if (worker == null) {
83 selector = Selector.open();
84 worker = new Worker();
85 executor.execute(new NamePreservingRunnable(worker, threadName));
86 }
87 selector.wakeup();
88 }
89 }
90
91 void flush(SocketSessionImpl session) {
92 if ( scheduleFlush(session) ) {
93 Selector selector = this.selector;
94 if (selector != null) {
95 selector.wakeup();
96 }
97 }
98 }
99
100 void updateTrafficMask(SocketSessionImpl session) {
101 scheduleTrafficControl(session);
102 Selector selector = this.selector;
103 if (selector != null) {
104 selector.wakeup();
105 }
106 }
107
108 private void scheduleRemove(SocketSessionImpl session) {
109 removingSessions.add(session);
110 }
111
112 private boolean scheduleFlush(SocketSessionImpl session) {
113 if (session.setScheduledForFlush(true)) {
114 flushingSessions.add(session);
115
116 return true;
117 }
118
119 return false;
120 }
121
122 private void scheduleTrafficControl(SocketSessionImpl session) {
123 trafficControllingSessions.add(session);
124 }
125
126 private void doAddNew() {
127 Selector selector = this.selector;
128 for (;;) {
129 SocketSessionImpl session = newSessions.poll();
130
131 if (session == null)
132 break;
133
134 SocketChannel ch = session.getChannel();
135 try {
136 ch.configureBlocking(false);
137 session.setSelectionKey(ch.register(selector,
138 SelectionKey.OP_READ, session));
139
140
141
142 session.getServiceListeners().fireSessionCreated(session);
143 } catch (IOException e) {
144
145
146 session.getFilterChain().fireExceptionCaught(session, e);
147 }
148 }
149 }
150
151 private void doRemove() {
152 for (;;) {
153 SocketSessionImpl session = removingSessions.poll();
154
155 if (session == null)
156 break;
157
158 SocketChannel ch = session.getChannel();
159 SelectionKey key = session.getSelectionKey();
160
161
162 if (key == null) {
163 scheduleRemove(session);
164 break;
165 }
166
167 if (!key.isValid()) {
168 continue;
169 }
170
171 try {
172 key.cancel();
173 ch.close();
174 } catch (IOException e) {
175 session.getFilterChain().fireExceptionCaught(session, e);
176 } finally {
177 releaseWriteBuffers(session);
178 session.getServiceListeners().fireSessionDestroyed(session);
179 }
180 }
181 }
182
183 private void process(Set<SelectionKey> selectedKeys) {
184 for (SelectionKey key : selectedKeys) {
185 SocketSessionImpl session = (SocketSessionImpl) key.attachment();
186
187 if (key.isReadable() && session.getTrafficMask().isReadable()) {
188 read(session);
189 }
190
191 if (key.isWritable() && session.getTrafficMask().isWritable()) {
192 scheduleFlush(session);
193 }
194 }
195
196 selectedKeys.clear();
197 }
198
199 private void read(SocketSessionImpl session) {
200 ByteBuffer buf = ByteBuffer.allocate(session.getReadBufferSize());
201 SocketChannel ch = session.getChannel();
202
203 try {
204 int readBytes = 0;
205 int ret;
206
207 try {
208 while ((ret = ch.read(buf.buf())) > 0) {
209 readBytes += ret;
210 }
211 } finally {
212 buf.flip();
213 }
214
215 session.increaseReadBytes(readBytes);
216
217 if (readBytes > 0) {
218 session.getFilterChain().fireMessageReceived(session, buf);
219 buf = null;
220
221 if (readBytes * 2 < session.getReadBufferSize()) {
222 session.decreaseReadBufferSize();
223 } else if (readBytes == session.getReadBufferSize()) {
224 session.increaseReadBufferSize();
225 }
226 }
227 if (ret < 0) {
228 scheduleRemove(session);
229 }
230 } catch (Throwable e) {
231 if (e instanceof IOException)
232 scheduleRemove(session);
233 session.getFilterChain().fireExceptionCaught(session, e);
234 } finally {
235 if (buf != null)
236 buf.release();
237 }
238 }
239
240 private void notifyIdleness() {
241
242 long currentTime = System.currentTimeMillis();
243 if ((currentTime - lastIdleCheckTime) >= 1000) {
244 lastIdleCheckTime = currentTime;
245 Set<SelectionKey> keys = selector.keys();
246 if (keys != null) {
247 for (SelectionKey key : keys) {
248 SocketSessionImpl session = (SocketSessionImpl) key
249 .attachment();
250 notifyIdleness(session, currentTime);
251 }
252 }
253 }
254 }
255
256 private void notifyIdleness(SocketSessionImpl session, long currentTime) {
257 notifyIdleness0(session, currentTime, session
258 .getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
259 IdleStatus.BOTH_IDLE, Math.max(session.getLastIoTime(), session
260 .getLastIdleTime(IdleStatus.BOTH_IDLE)));
261 notifyIdleness0(session, currentTime, session
262 .getIdleTimeInMillis(IdleStatus.READER_IDLE),
263 IdleStatus.READER_IDLE, Math.max(session.getLastReadTime(),
264 session.getLastIdleTime(IdleStatus.READER_IDLE)));
265 notifyIdleness0(session, currentTime, session
266 .getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
267 IdleStatus.WRITER_IDLE, Math.max(session.getLastWriteTime(),
268 session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
269
270 notifyWriteTimeout(session, currentTime, session
271 .getWriteTimeoutInMillis(), session.getLastWriteTime());
272 }
273
274 private void notifyIdleness0(SocketSessionImpl session, long currentTime,
275 long idleTime, IdleStatus status, long lastIoTime) {
276 if (idleTime > 0 && lastIoTime != 0
277 && (currentTime - lastIoTime) >= idleTime) {
278 session.increaseIdleCount(status);
279 session.getFilterChain().fireSessionIdle(session, status);
280 }
281 }
282
283 private void notifyWriteTimeout(SocketSessionImpl session,
284 long currentTime, long writeTimeout, long lastIoTime) {
285 SelectionKey key = session.getSelectionKey();
286 if (writeTimeout > 0 && (currentTime - lastIoTime) >= writeTimeout
287 && key != null && key.isValid()
288 && (key.interestOps() & SelectionKey.OP_WRITE) != 0) {
289 session.getFilterChain().fireExceptionCaught(session,
290 new WriteTimeoutException());
291 }
292 }
293
294 private void doFlush() {
295 for (;;) {
296 SocketSessionImpl session = flushingSessions.poll();
297
298 if (session == null)
299 break;
300
301 session.setScheduledForFlush(false);
302
303 if (!session.isConnected()) {
304 releaseWriteBuffers(session);
305 continue;
306 }
307
308 SelectionKey key = session.getSelectionKey();
309
310
311 if (key == null) {
312 scheduleFlush(session);
313 break;
314 }
315
316
317 if (!key.isValid()) {
318 continue;
319 }
320
321 try {
322 boolean flushedAll = doFlush(session);
323 if( flushedAll && !session.getWriteRequestQueue().isEmpty() && !session.isScheduledForFlush()) {
324 scheduleFlush( session );
325 }
326 } catch (IOException e) {
327 scheduleRemove(session);
328 session.getFilterChain().fireExceptionCaught(session, e);
329 }
330 }
331 }
332
333 private void releaseWriteBuffers(SocketSessionImpl session) {
334 Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
335 WriteRequest req;
336
337 if ((req = writeRequestQueue.poll()) != null) {
338 ByteBuffer buf = (ByteBuffer) req.getMessage();
339 try {
340 buf.release();
341 } catch (IllegalStateException e) {
342 session.getFilterChain().fireExceptionCaught(session, e);
343 } finally {
344
345
346 if (buf.hasRemaining()) {
347 req.getFuture().setWritten(false);
348 } else {
349 session.getFilterChain().fireMessageSent(session, req);
350 }
351 }
352
353
354 while ((req = writeRequestQueue.poll()) != null) {
355 try {
356 ((ByteBuffer) req.getMessage()).release();
357 } catch (IllegalStateException e) {
358 session.getFilterChain().fireExceptionCaught(session, e);
359 } finally {
360 req.getFuture().setWritten(false);
361 }
362 }
363 }
364 }
365
366 private boolean doFlush(SocketSessionImpl session) throws IOException {
367
368 SelectionKey key = session.getSelectionKey();
369 key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
370
371 SocketChannel ch = session.getChannel();
372 Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
373
374 int writtenBytes = 0;
375 int maxWrittenBytes = ((SocketSessionConfig) session.getConfig()).getSendBufferSize() << 1;
376 try {
377 for (;;) {
378 WriteRequest req = writeRequestQueue.peek();
379
380 if (req == null)
381 break;
382
383 ByteBuffer buf = (ByteBuffer) req.getMessage();
384 if (buf.remaining() == 0) {
385 writeRequestQueue.poll();
386
387 buf.reset();
388
389 if (!buf.hasRemaining()) {
390 session.increaseWrittenMessages();
391 }
392
393 session.getFilterChain().fireMessageSent(session, req);
394 continue;
395 }
396
397 if (key.isWritable()) {
398 writtenBytes += ch.write(buf.buf());
399 }
400
401 if (buf.hasRemaining() || writtenBytes >= maxWrittenBytes) {
402
403 key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
404 return false;
405 }
406 }
407 } finally {
408 session.increaseWrittenBytes(writtenBytes);
409 }
410
411 return true;
412 }
413
414 private void doUpdateTrafficMask() {
415 if (trafficControllingSessions.isEmpty())
416 return;
417
418 for (;;) {
419 SocketSessionImpl session = trafficControllingSessions.poll();
420
421 if (session == null)
422 break;
423
424 SelectionKey key = session.getSelectionKey();
425
426
427
428 if (key == null) {
429 scheduleTrafficControl(session);
430 break;
431 }
432
433 if (!key.isValid()) {
434 continue;
435 }
436
437
438
439 int ops = SelectionKey.OP_READ;
440 Queue<WriteRequest> writeRequestQueue = session
441 .getWriteRequestQueue();
442 synchronized (writeRequestQueue) {
443 if (!writeRequestQueue.isEmpty()) {
444 ops |= SelectionKey.OP_WRITE;
445 }
446 }
447
448
449 int mask = session.getTrafficMask().getInterestOps();
450 key.interestOps(ops & mask);
451 }
452 }
453
454 private class Worker implements Runnable {
455 public void run() {
456 Selector selector = SocketIoProcessor.this.selector;
457 for (;;) {
458 try {
459 int nKeys = selector.select(1000);
460 doAddNew();
461 doUpdateTrafficMask();
462
463 if (nKeys > 0) {
464 process(selector.selectedKeys());
465 }
466
467 doFlush();
468 doRemove();
469 notifyIdleness();
470
471 if (selector.keys().isEmpty()) {
472 synchronized (lock) {
473 if (selector.keys().isEmpty()
474 && newSessions.isEmpty()) {
475 worker = null;
476
477 try {
478 selector.close();
479 } catch (IOException e) {
480 ExceptionMonitor.getInstance()
481 .exceptionCaught(e);
482 } finally {
483 selector = null;
484 }
485
486 break;
487 }
488 }
489 }
490 } catch (Throwable t) {
491 ExceptionMonitor.getInstance().exceptionCaught(t);
492
493 try {
494 Thread.sleep(1000);
495 } catch (InterruptedException e1) {
496 ExceptionMonitor.getInstance().exceptionCaught(e1);
497 }
498 }
499 }
500 }
501 }
502 }