1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.mina.io.socket;
20
21 import java.io.IOException;
22 import java.nio.channels.CancelledKeyException;
23 import java.nio.channels.SelectionKey;
24 import java.nio.channels.Selector;
25 import java.nio.channels.SocketChannel;
26 import java.util.Iterator;
27 import java.util.Set;
28
29 import org.apache.mina.common.ByteBuffer;
30 import org.apache.mina.common.IdleStatus;
31 import org.apache.mina.common.SessionConfig;
32 import org.apache.mina.io.WriteTimeoutException;
33 import org.apache.mina.util.Queue;
34
35 /***
36 * Performs all I/O operations for sockets which is connected or bound.
37 * This class is used by MINA internally.
38 *
39 * @author Trustin Lee (trustin@apache.org)
40 * @version $Rev: 264677 $, $Date: 2005-08-30 11:44:35 +0900 $,
41 */
42 class SocketIoProcessor
43 {
44 private static final SocketIoProcessor instance;
45
46 static
47 {
48 SocketIoProcessor tmp;
49
50 try
51 {
52 tmp = new SocketIoProcessor();
53 }
54 catch( IOException e )
55 {
56 InternalError error = new InternalError(
57 "Failed to open selector." );
58 error.initCause( e );
59 throw error;
60 }
61
62 instance = tmp;
63 }
64
65 private final Selector selector;
66
67 private final Queue newSessions = new Queue();
68
69 private final Queue removingSessions = new Queue();
70
71 private final Queue flushingSessions = new Queue();
72
73 private final Queue readableSessions = new Queue();
74
75 private Worker worker;
76
77 private long lastIdleCheckTime = System.currentTimeMillis();
78
79 private SocketIoProcessor() throws IOException
80 {
81 selector = Selector.open();
82 }
83
84 static SocketIoProcessor getInstance()
85 {
86 return instance;
87 }
88
89 void addSession( SocketSession session )
90 {
91 synchronized( this )
92 {
93 synchronized( newSessions )
94 {
95 newSessions.push( session );
96 }
97 startupWorker();
98 }
99
100 selector.wakeup();
101 }
102
103 void removeSession( SocketSession session )
104 {
105 scheduleRemove( session );
106 startupWorker();
107 selector.wakeup();
108 }
109
110 private synchronized void startupWorker()
111 {
112 if( worker == null )
113 {
114 worker = new Worker();
115 worker.start();
116 }
117 }
118
119 void flushSession( SocketSession session )
120 {
121 scheduleFlush( session );
122 selector.wakeup();
123 }
124
125 void addReadableSession( SocketSession session )
126 {
127 synchronized( readableSessions )
128 {
129 readableSessions.push( session );
130 }
131 selector.wakeup();
132 }
133
134 private void addSessions()
135 {
136 if( newSessions.isEmpty() )
137 return;
138
139 SocketSession session;
140
141 for( ;; )
142 {
143 synchronized( newSessions )
144 {
145 session = ( SocketSession ) newSessions.pop();
146 }
147
148 if( session == null )
149 break;
150
151 SocketChannel ch = session.getChannel();
152 boolean registered;
153
154 try
155 {
156 ch.configureBlocking( false );
157 session.setSelectionKey( ch.register( selector,
158 SelectionKey.OP_READ,
159 session ) );
160 registered = true;
161 }
162 catch( IOException e )
163 {
164 registered = false;
165 session.getManagerFilterChain().exceptionCaught( session, e );
166 }
167
168 if( registered )
169 {
170 session.getManagerFilterChain().sessionOpened( session );
171 }
172 }
173 }
174
175 private void removeSessions()
176 {
177 if( removingSessions.isEmpty() )
178 return;
179
180 for( ;; )
181 {
182 SocketSession session;
183
184 synchronized( removingSessions )
185 {
186 session = ( SocketSession ) removingSessions.pop();
187 }
188
189 if( session == null )
190 break;
191
192 SocketChannel ch = session.getChannel();
193 SelectionKey key = session.getSelectionKey();
194
195
196 if( key == null )
197 {
198 scheduleRemove( session );
199 break;
200 }
201
202
203 if( !key.isValid() )
204 {
205 continue;
206 }
207
208 try
209 {
210 key.cancel();
211 ch.close();
212 }
213 catch( IOException e )
214 {
215 session.getManagerFilterChain().exceptionCaught( session, e );
216 }
217 finally
218 {
219 releaseWriteBuffers( session );
220
221 session.getManagerFilterChain().sessionClosed( session );
222 session.notifyClose();
223 }
224 }
225 }
226
227 private void processSessions( Set selectedKeys )
228 {
229 Iterator it = selectedKeys.iterator();
230
231 while( it.hasNext() )
232 {
233 SelectionKey key = ( SelectionKey ) it.next();
234 SocketSession session = ( SocketSession ) key.attachment();
235
236 if( key.isReadable() )
237 {
238 read( session );
239 }
240
241 if( key.isWritable() )
242 {
243 scheduleFlush( session );
244 }
245 }
246
247 selectedKeys.clear();
248 }
249
250 private void read( SocketSession session )
251 {
252 ByteBuffer buf = ByteBuffer.allocate(
253 (( SocketSessionConfig ) session.getConfig()).getSessionReceiveBufferSize() );
254 SocketChannel ch = session.getChannel();
255
256 try
257 {
258 int readBytes = 0;
259 int ret;
260
261 buf.clear();
262
263 try
264 {
265 while( ( ret = ch.read( buf.buf() ) ) > 0 )
266 {
267 readBytes += ret;
268 }
269 }
270 finally
271 {
272 buf.flip();
273 }
274
275 session.increaseReadBytes( readBytes );
276 session.setIdle( IdleStatus.BOTH_IDLE, false );
277 session.setIdle( IdleStatus.READER_IDLE, false );
278
279 if( readBytes > 0 )
280 {
281 ByteBuffer newBuf = ByteBuffer.allocate( readBytes );
282 newBuf.put( buf );
283 newBuf.flip();
284 session.getManagerFilterChain().dataRead( session, newBuf );
285 }
286 if( ret < 0 )
287 {
288 scheduleRemove( session );
289 }
290 }
291 catch( Throwable e )
292 {
293 if( e instanceof IOException )
294 scheduleRemove( session );
295 session.getManagerFilterChain().exceptionCaught( session, e );
296 }
297 finally
298 {
299 buf.release();
300 }
301 }
302
303 private void scheduleRemove( SocketSession session )
304 {
305 synchronized( removingSessions )
306 {
307 removingSessions.push( session );
308 }
309 }
310
311 private void scheduleFlush( SocketSession session )
312 {
313 synchronized( flushingSessions )
314 {
315 flushingSessions.push( session );
316 }
317 }
318
319 private void notifyIdleSessions()
320 {
321 Set keys = selector.keys();
322 Iterator it;
323 SocketSession session;
324
325
326 long currentTime = System.currentTimeMillis();
327
328 if( ( keys != null ) && ( ( currentTime - lastIdleCheckTime ) >= 1000 ) )
329 {
330 lastIdleCheckTime = currentTime;
331 it = keys.iterator();
332
333 while( it.hasNext() )
334 {
335 SelectionKey key = ( SelectionKey ) it.next();
336 session = ( SocketSession ) key.attachment();
337
338 notifyIdleSession( session, currentTime );
339 }
340 }
341 }
342
343 private void notifyIdleSession( SocketSession session, long currentTime )
344 {
345 SessionConfig config = session.getConfig();
346
347 notifyIdleSession0( session, currentTime, config
348 .getIdleTimeInMillis( IdleStatus.BOTH_IDLE ),
349 IdleStatus.BOTH_IDLE, session.getLastIoTime() );
350 notifyIdleSession0( session, currentTime, config
351 .getIdleTimeInMillis( IdleStatus.READER_IDLE ),
352 IdleStatus.READER_IDLE, session.getLastReadTime() );
353 notifyIdleSession0( session, currentTime, config
354 .getIdleTimeInMillis( IdleStatus.WRITER_IDLE ),
355 IdleStatus.WRITER_IDLE, session.getLastWriteTime() );
356
357 notifyWriteTimeoutSession( session, currentTime, config
358 .getWriteTimeoutInMillis(), session.getLastWriteTime() );
359 }
360
361 private void notifyIdleSession0( SocketSession session, long currentTime,
362 long idleTime, IdleStatus status,
363 long lastIoTime )
364 {
365 if( idleTime > 0 && !session.isIdle( status ) && lastIoTime != 0
366 && ( currentTime - lastIoTime ) >= idleTime )
367 {
368 session.setIdle( status, true );
369 session.getManagerFilterChain().sessionIdle( session, status );
370 }
371 }
372
373 private void notifyWriteTimeoutSession( SocketSession session,
374 long currentTime,
375 long writeTimeout, long lastIoTime )
376 {
377 if( writeTimeout > 0
378 && ( currentTime - lastIoTime ) >= writeTimeout
379 && session.getSelectionKey() != null
380 && ( session.getSelectionKey().interestOps() & SelectionKey.OP_WRITE ) != 0 )
381 {
382 session
383 .getManagerFilterChain()
384 .exceptionCaught( session, new WriteTimeoutException() );
385 }
386 }
387
388 private void flushSessions()
389 {
390 if( flushingSessions.size() == 0 )
391 return;
392
393 for( ;; )
394 {
395 SocketSession session;
396
397 synchronized( flushingSessions )
398 {
399 session = ( SocketSession ) flushingSessions.pop();
400 }
401
402 if( session == null )
403 break;
404
405 if( !session.isConnected() )
406 {
407 releaseWriteBuffers( session );
408 continue;
409 }
410
411
412
413 if( session.getSelectionKey() == null )
414 {
415
416 scheduleFlush( session );
417 break;
418 }
419 else
420 {
421 try
422 {
423 flush( session );
424 }
425 catch( CancelledKeyException e )
426 {
427
428 scheduleRemove( session );
429 }
430 catch( IOException e )
431 {
432 scheduleRemove( session );
433 session.getManagerFilterChain().exceptionCaught( session, e );
434 }
435 }
436 }
437 }
438
439 private void releaseWriteBuffers( SocketSession session )
440 {
441 Queue writeBufferQueue = session.getWriteBufferQueue();
442 session.getWriteMarkerQueue().clear();
443 ByteBuffer buf;
444
445 while( ( buf = (ByteBuffer) writeBufferQueue.pop() ) != null )
446 {
447 try
448 {
449 buf.release();
450 }
451 catch( IllegalStateException e )
452 {
453 session.getManagerFilterChain().exceptionCaught( session, e );
454 }
455 }
456 }
457
458 private void flush( SocketSession session ) throws IOException
459 {
460 SocketChannel ch = session.getChannel();
461
462 Queue writeBufferQueue = session.getWriteBufferQueue();
463 Queue writeMarkerQueue = session.getWriteMarkerQueue();
464
465 ByteBuffer buf;
466 Object marker;
467 for( ;; )
468 {
469 synchronized( writeBufferQueue )
470 {
471 buf = ( ByteBuffer ) writeBufferQueue.first();
472 marker = writeMarkerQueue.first();
473 }
474
475 if( buf == null )
476 break;
477
478 if( buf.remaining() == 0 )
479 {
480 synchronized( writeBufferQueue )
481 {
482 writeBufferQueue.pop();
483 writeMarkerQueue.pop();
484 }
485 try
486 {
487 buf.release();
488 }
489 catch( IllegalStateException e )
490 {
491 session.getManagerFilterChain().exceptionCaught( session, e );
492 }
493
494 session.increaseWrittenWriteRequests();
495 session.getManagerFilterChain().dataWritten( session, marker );
496 continue;
497 }
498
499 int writtenBytes = 0;
500 try
501 {
502 writtenBytes = ch.write( buf.buf() );
503 }
504 finally
505 {
506 if( writtenBytes > 0 )
507 {
508 session.increaseWrittenBytes( writtenBytes );
509 session.setIdle( IdleStatus.BOTH_IDLE, false );
510 session.setIdle( IdleStatus.WRITER_IDLE, false );
511 }
512
513 SelectionKey key = session.getSelectionKey();
514 if( buf.hasRemaining() )
515 {
516
517 key
518 .interestOps( key.interestOps()
519 | SelectionKey.OP_WRITE );
520 break;
521 }
522 else
523 {
524 key.interestOps( key.interestOps()
525 & ( ~SelectionKey.OP_WRITE ) );
526 }
527 }
528 }
529 }
530
531 private class Worker extends Thread
532 {
533 public Worker()
534 {
535 super( "SocketIoProcessor" );
536 }
537
538 public void run()
539 {
540 for( ;; )
541 {
542 try
543 {
544 int nKeys = selector.select( 1000 );
545 addSessions();
546
547 if( nKeys > 0 )
548 {
549 processSessions( selector.selectedKeys() );
550 }
551
552 flushSessions();
553 removeSessions();
554 notifyIdleSessions();
555
556 if( selector.keys().isEmpty() )
557 {
558 synchronized( SocketIoProcessor.this )
559 {
560 if( selector.keys().isEmpty() &&
561 newSessions.isEmpty() )
562 {
563 worker = null;
564 break;
565 }
566 }
567 }
568 }
569 catch( IOException e )
570 {
571 e.printStackTrace();
572
573 try
574 {
575 Thread.sleep( 1000 );
576 }
577 catch( InterruptedException e1 )
578 {
579 }
580 }
581 }
582 }
583 }
584 }