1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.socket.nio;
21
22 import java.io.IOException;
23 import java.nio.channels.SelectionKey;
24 import java.nio.channels.Selector;
25 import java.nio.channels.SocketChannel;
26 import java.util.Queue;
27 import java.util.Set;
28 import java.util.concurrent.ConcurrentLinkedQueue;
29 import java.util.concurrent.Executor;
30
31 import org.apache.mina.common.ByteBuffer;
32 import org.apache.mina.common.ExceptionMonitor;
33 import org.apache.mina.common.IdleStatus;
34 import org.apache.mina.common.IoFilter.WriteRequest;
35 import org.apache.mina.common.WriteTimeoutException;
36 import org.apache.mina.util.NamePreservingRunnable;
37
38
39
40
41
42
43
44 class SocketIoProcessor {
45 private final Object lock = new Object();
46
47 private final String threadName;
48
49 private final Executor executor;
50
51 private volatile Selector selector;
52
53 private final Queue<SocketSessionImpl> newSessions = new ConcurrentLinkedQueue<SocketSessionImpl>();
54
55 private final Queue<SocketSessionImpl> removingSessions = new ConcurrentLinkedQueue<SocketSessionImpl>();
56
57 private final Queue<SocketSessionImpl> flushingSessions = new ConcurrentLinkedQueue<SocketSessionImpl>();
58
59 private final Queue<SocketSessionImpl> trafficControllingSessions = new ConcurrentLinkedQueue<SocketSessionImpl>();
60
61 private Worker worker;
62
63 private long lastIdleCheckTime = System.currentTimeMillis();
64
65 SocketIoProcessor(String threadName, Executor executor) {
66 this.threadName = threadName;
67 this.executor = executor;
68 }
69
70 void addNew(SocketSessionImpl session) throws IOException {
71 newSessions.add(session);
72 startupWorker();
73 }
74
75 void remove(SocketSessionImpl session) throws IOException {
76 scheduleRemove(session);
77 startupWorker();
78 }
79
80 private void startupWorker() throws IOException {
81 synchronized (lock) {
82 if (worker == null) {
83 selector = Selector.open();
84 worker = new Worker();
85 executor.execute(new NamePreservingRunnable(worker));
86 }
87 selector.wakeup();
88 }
89 }
90
91 void flush(SocketSessionImpl session) {
92 scheduleFlush(session);
93 Selector selector = this.selector;
94 if (selector != null) {
95 selector.wakeup();
96 }
97 }
98
99 void updateTrafficMask(SocketSessionImpl session) {
100 scheduleTrafficControl(session);
101 Selector selector = this.selector;
102 if (selector != null) {
103 selector.wakeup();
104 }
105 }
106
107 private void scheduleRemove(SocketSessionImpl session) {
108 removingSessions.add(session);
109 }
110
111 private void scheduleFlush(SocketSessionImpl session) {
112 flushingSessions.add(session);
113 }
114
115 private void scheduleTrafficControl(SocketSessionImpl session) {
116 trafficControllingSessions.add(session);
117 }
118
119 private void doAddNew() {
120 Selector selector = this.selector;
121 for (;;) {
122 SocketSessionImpl session = newSessions.poll();
123
124 if (session == null)
125 break;
126
127 SocketChannel ch = session.getChannel();
128 try {
129 ch.configureBlocking(false);
130 session.setSelectionKey(ch.register(selector,
131 SelectionKey.OP_READ, session));
132
133
134
135 session.getServiceListeners().fireSessionCreated(session);
136 } catch (IOException e) {
137
138
139 session.getFilterChain().fireExceptionCaught(session, e);
140 }
141 }
142 }
143
144 private void doRemove() {
145 for (;;) {
146 SocketSessionImpl session = removingSessions.poll();
147
148 if (session == null)
149 break;
150
151 SocketChannel ch = session.getChannel();
152 SelectionKey key = session.getSelectionKey();
153
154
155 if (key == null) {
156 scheduleRemove(session);
157 break;
158 }
159
160 if (!key.isValid()) {
161 continue;
162 }
163
164 try {
165 key.cancel();
166 ch.close();
167 } catch (IOException e) {
168 session.getFilterChain().fireExceptionCaught(session, e);
169 } finally {
170 releaseWriteBuffers(session);
171 session.getServiceListeners().fireSessionDestroyed(session);
172 }
173 }
174 }
175
176 private void process(Set<SelectionKey> selectedKeys) {
177 for (SelectionKey key : selectedKeys) {
178 SocketSessionImpl session = (SocketSessionImpl) key.attachment();
179
180 if (key.isReadable() && session.getTrafficMask().isReadable()) {
181 read(session);
182 }
183
184 if (key.isWritable() && session.getTrafficMask().isWritable()) {
185 scheduleFlush(session);
186 }
187 }
188
189 selectedKeys.clear();
190 }
191
192 private void read(SocketSessionImpl session) {
193 ByteBuffer buf = ByteBuffer.allocate(session.getReadBufferSize());
194 SocketChannel ch = session.getChannel();
195
196 try {
197 int readBytes = 0;
198 int ret;
199
200 try {
201 while ((ret = ch.read(buf.buf())) > 0) {
202 readBytes += ret;
203 }
204 } finally {
205 buf.flip();
206 }
207
208 session.increaseReadBytes(readBytes);
209
210 if (readBytes > 0) {
211 session.getFilterChain().fireMessageReceived(session, buf);
212 buf = null;
213
214 if (readBytes * 2 < session.getReadBufferSize()) {
215 if (session.getReadBufferSize() > 64) {
216 session.setReadBufferSize(session.getReadBufferSize() >>> 1);
217 }
218 } else if (readBytes == session.getReadBufferSize()) {
219 session.setReadBufferSize(session.getReadBufferSize() << 1);
220 }
221 }
222 if (ret < 0) {
223 scheduleRemove(session);
224 }
225 } catch (Throwable e) {
226 if (e instanceof IOException)
227 scheduleRemove(session);
228 session.getFilterChain().fireExceptionCaught(session, e);
229 } finally {
230 if (buf != null)
231 buf.release();
232 }
233 }
234
235 private void notifyIdleness() {
236
237 long currentTime = System.currentTimeMillis();
238 if ((currentTime - lastIdleCheckTime) >= 1000) {
239 lastIdleCheckTime = currentTime;
240 Set<SelectionKey> keys = selector.keys();
241 if (keys != null) {
242 for (SelectionKey key : keys) {
243 SocketSessionImpl session = (SocketSessionImpl) key
244 .attachment();
245 notifyIdleness(session, currentTime);
246 }
247 }
248 }
249 }
250
251 private void notifyIdleness(SocketSessionImpl session, long currentTime) {
252 notifyIdleness0(session, currentTime, session
253 .getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
254 IdleStatus.BOTH_IDLE, Math.max(session.getLastIoTime(), session
255 .getLastIdleTime(IdleStatus.BOTH_IDLE)));
256 notifyIdleness0(session, currentTime, session
257 .getIdleTimeInMillis(IdleStatus.READER_IDLE),
258 IdleStatus.READER_IDLE, Math.max(session.getLastReadTime(),
259 session.getLastIdleTime(IdleStatus.READER_IDLE)));
260 notifyIdleness0(session, currentTime, session
261 .getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
262 IdleStatus.WRITER_IDLE, Math.max(session.getLastWriteTime(),
263 session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
264
265 notifyWriteTimeout(session, currentTime, session
266 .getWriteTimeoutInMillis(), session.getLastWriteTime());
267 }
268
269 private void notifyIdleness0(SocketSessionImpl session, long currentTime,
270 long idleTime, IdleStatus status, long lastIoTime) {
271 if (idleTime > 0 && lastIoTime != 0
272 && (currentTime - lastIoTime) >= idleTime) {
273 session.increaseIdleCount(status);
274 session.getFilterChain().fireSessionIdle(session, status);
275 }
276 }
277
278 private void notifyWriteTimeout(SocketSessionImpl session,
279 long currentTime, long writeTimeout, long lastIoTime) {
280 SelectionKey key = session.getSelectionKey();
281 if (writeTimeout > 0 && (currentTime - lastIoTime) >= writeTimeout
282 && key != null && key.isValid()
283 && (key.interestOps() & SelectionKey.OP_WRITE) != 0) {
284 session.getFilterChain().fireExceptionCaught(session,
285 new WriteTimeoutException());
286 }
287 }
288
289 private void doFlush() {
290 for (;;) {
291 SocketSessionImpl session = flushingSessions.poll();
292
293 if (session == null)
294 break;
295
296 if (!session.isConnected()) {
297 releaseWriteBuffers(session);
298 continue;
299 }
300
301 SelectionKey key = session.getSelectionKey();
302
303
304 if (key == null) {
305 scheduleFlush(session);
306 break;
307 }
308
309
310 if (!key.isValid()) {
311 continue;
312 }
313
314 try {
315 doFlush(session);
316 } catch (IOException e) {
317 scheduleRemove(session);
318 session.getFilterChain().fireExceptionCaught(session, e);
319 }
320 }
321 }
322
323 private void releaseWriteBuffers(SocketSessionImpl session) {
324 Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
325 WriteRequest req;
326
327 if ((req = writeRequestQueue.poll()) != null) {
328 ByteBuffer buf = (ByteBuffer) req.getMessage();
329 try {
330 buf.release();
331 } catch (IllegalStateException e) {
332 session.getFilterChain().fireExceptionCaught(session, e);
333 } finally {
334
335
336 if (buf.hasRemaining()) {
337 req.getFuture().setWritten(false);
338 } else {
339 session.getFilterChain().fireMessageSent(session, req);
340 }
341 }
342
343
344 while ((req = writeRequestQueue.poll()) != null) {
345 try {
346 ((ByteBuffer) req.getMessage()).release();
347 } catch (IllegalStateException e) {
348 session.getFilterChain().fireExceptionCaught(session, e);
349 } finally {
350 req.getFuture().setWritten(false);
351 }
352 }
353 }
354 }
355
356 private void doFlush(SocketSessionImpl session) throws IOException {
357
358 SelectionKey key = session.getSelectionKey();
359 key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
360
361 SocketChannel ch = session.getChannel();
362 Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
363
364 for (;;) {
365 WriteRequest req = writeRequestQueue.peek();
366
367 if (req == null)
368 break;
369
370 ByteBuffer buf = (ByteBuffer) req.getMessage();
371 if (buf.remaining() == 0) {
372 writeRequestQueue.poll();
373
374 session.increaseWrittenMessages();
375
376 buf.reset();
377 session.getFilterChain().fireMessageSent(session, req);
378 continue;
379 }
380
381 if (key.isWritable()) {
382 int writtenBytes = ch.write(buf.buf());
383 if (writtenBytes > 0) {
384 session.increaseWrittenBytes(writtenBytes);
385 }
386 }
387
388 if (buf.hasRemaining()) {
389
390 key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
391 break;
392 }
393 }
394 }
395
396 private void doUpdateTrafficMask() {
397 if (trafficControllingSessions.isEmpty())
398 return;
399
400 for (;;) {
401 SocketSessionImpl session = trafficControllingSessions.poll();
402
403 if (session == null)
404 break;
405
406 SelectionKey key = session.getSelectionKey();
407
408
409
410 if (key == null) {
411 scheduleTrafficControl(session);
412 break;
413 }
414
415 if (!key.isValid()) {
416 continue;
417 }
418
419
420
421 int ops = SelectionKey.OP_READ;
422 Queue<WriteRequest> writeRequestQueue = session
423 .getWriteRequestQueue();
424 synchronized (writeRequestQueue) {
425 if (!writeRequestQueue.isEmpty()) {
426 ops |= SelectionKey.OP_WRITE;
427 }
428 }
429
430
431 int mask = session.getTrafficMask().getInterestOps();
432 key.interestOps(ops & mask);
433 }
434 }
435
436 private class Worker implements Runnable {
437 public void run() {
438 Thread.currentThread().setName(SocketIoProcessor.this.threadName);
439
440 Selector selector = SocketIoProcessor.this.selector;
441 for (;;) {
442 try {
443 int nKeys = selector.select(1000);
444 doAddNew();
445 doUpdateTrafficMask();
446
447 if (nKeys > 0) {
448 process(selector.selectedKeys());
449 }
450
451 doFlush();
452 doRemove();
453 notifyIdleness();
454
455 if (selector.keys().isEmpty()) {
456 synchronized (lock) {
457 if (selector.keys().isEmpty()
458 && newSessions.isEmpty()) {
459 worker = null;
460
461 try {
462 selector.close();
463 } catch (IOException e) {
464 ExceptionMonitor.getInstance()
465 .exceptionCaught(e);
466 } finally {
467 selector = null;
468 }
469
470 break;
471 }
472 }
473 }
474 } catch (Throwable t) {
475 ExceptionMonitor.getInstance().exceptionCaught(t);
476
477 try {
478 Thread.sleep(1000);
479 } catch (InterruptedException e1) {
480 ExceptionMonitor.getInstance().exceptionCaught(e1);
481 }
482 }
483 }
484 }
485 }
486 }