1 | /* |
2 | * @(#) $Id: SocketIoProcessor.java 179047 2005-05-30 11:46:50Z trustin $ |
3 | * |
4 | * Copyright 2004 The Apache Software Foundation |
5 | * |
6 | * Licensed under the Apache License, Version 2.0 (the "License"); |
7 | * you may not use this file except in compliance with the License. |
8 | * You may obtain a copy of the License at |
9 | * |
10 | * http://www.apache.org/licenses/LICENSE-2.0 |
11 | * |
12 | * Unless required by applicable law or agreed to in writing, software |
13 | * distributed under the License is distributed on an "AS IS" BASIS, |
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
15 | * See the License for the specific language governing permissions and |
16 | * limitations under the License. |
17 | * |
18 | */ |
19 | package org.apache.mina.io.socket; |
20 | |
21 | import java.io.IOException; |
22 | import java.nio.channels.SelectionKey; |
23 | import java.nio.channels.Selector; |
24 | import java.nio.channels.SocketChannel; |
25 | import java.util.Iterator; |
26 | import java.util.Set; |
27 | |
28 | import org.apache.mina.common.ByteBuffer; |
29 | import org.apache.mina.common.IdleStatus; |
30 | import org.apache.mina.common.SessionConfig; |
31 | import org.apache.mina.io.WriteTimeoutException; |
32 | import org.apache.mina.util.Queue; |
33 | |
34 | /** |
35 | * Performs all I/O operations for sockets which is connected or bound. |
36 | * This class is used by MINA internally. |
37 | * |
38 | * @author Trustin Lee (trustin@apache.org) |
39 | * @version $Rev: 179047 $, $Date: 2005-05-30 20:46:50 +0900 $, |
40 | */ |
41 | class SocketIoProcessor |
42 | { |
43 | private static final SocketIoProcessor instance; |
44 | |
45 | static |
46 | { |
47 | SocketIoProcessor tmp; |
48 | |
49 | try |
50 | { |
51 | tmp = new SocketIoProcessor(); |
52 | } |
53 | catch( IOException e ) |
54 | { |
55 | InternalError error = new InternalError( |
56 | "Failed to open selector." ); |
57 | error.initCause( e ); |
58 | throw error; |
59 | } |
60 | |
61 | instance = tmp; |
62 | } |
63 | |
64 | private final Selector selector; |
65 | |
66 | private final Queue newSessions = new Queue(); |
67 | |
68 | private final Queue removingSessions = new Queue(); |
69 | |
70 | private final Queue flushingSessions = new Queue(); |
71 | |
72 | private final Queue readableSessions = new Queue(); |
73 | |
74 | private Worker worker; |
75 | |
76 | private long lastIdleCheckTime = System.currentTimeMillis(); |
77 | |
78 | private SocketIoProcessor() throws IOException |
79 | { |
80 | selector = Selector.open(); |
81 | } |
82 | |
83 | static SocketIoProcessor getInstance() |
84 | { |
85 | return instance; |
86 | } |
87 | |
88 | void addSession( SocketSession session ) |
89 | { |
90 | synchronized( this ) |
91 | { |
92 | synchronized( newSessions ) |
93 | { |
94 | newSessions.push( session ); |
95 | } |
96 | startupWorker(); |
97 | } |
98 | |
99 | selector.wakeup(); |
100 | } |
101 | |
102 | void removeSession( SocketSession session ) |
103 | { |
104 | scheduleRemove( session ); |
105 | startupWorker(); |
106 | selector.wakeup(); |
107 | } |
108 | |
109 | private synchronized void startupWorker() |
110 | { |
111 | if( worker == null ) |
112 | { |
113 | worker = new Worker(); |
114 | worker.start(); |
115 | } |
116 | } |
117 | |
118 | void flushSession( SocketSession session ) |
119 | { |
120 | scheduleFlush( session ); |
121 | selector.wakeup(); |
122 | } |
123 | |
124 | void addReadableSession( SocketSession session ) |
125 | { |
126 | synchronized( readableSessions ) |
127 | { |
128 | readableSessions.push( session ); |
129 | } |
130 | selector.wakeup(); |
131 | } |
132 | |
133 | private void addSessions() |
134 | { |
135 | if( newSessions.isEmpty() ) |
136 | return; |
137 | |
138 | SocketSession session; |
139 | |
140 | for( ;; ) |
141 | { |
142 | synchronized( newSessions ) |
143 | { |
144 | session = ( SocketSession ) newSessions.pop(); |
145 | } |
146 | |
147 | if( session == null ) |
148 | break; |
149 | |
150 | SocketChannel ch = session.getChannel(); |
151 | boolean registered; |
152 | |
153 | try |
154 | { |
155 | ch.configureBlocking( false ); |
156 | session.setSelectionKey( ch.register( selector, |
157 | SelectionKey.OP_READ, |
158 | session ) ); |
159 | registered = true; |
160 | } |
161 | catch( IOException e ) |
162 | { |
163 | registered = false; |
164 | session.getManagerFilterChain().exceptionCaught( session, e ); |
165 | } |
166 | |
167 | if( registered ) |
168 | { |
169 | session.getManagerFilterChain().sessionOpened( session ); |
170 | } |
171 | } |
172 | } |
173 | |
174 | private void removeSessions() |
175 | { |
176 | if( removingSessions.isEmpty() ) |
177 | return; |
178 | |
179 | for( ;; ) |
180 | { |
181 | SocketSession session; |
182 | |
183 | synchronized( removingSessions ) |
184 | { |
185 | session = ( SocketSession ) removingSessions.pop(); |
186 | } |
187 | |
188 | if( session == null ) |
189 | break; |
190 | |
191 | SocketChannel ch = session.getChannel(); |
192 | SelectionKey key = session.getSelectionKey(); |
193 | // Retry later if session is not yet fully initialized. |
194 | // (In case that Session.close() is called before addSession() is processed) |
195 | if( key == null ) |
196 | { |
197 | scheduleRemove( session ); |
198 | break; |
199 | } |
200 | |
201 | // skip if channel is already closed |
202 | if( !key.isValid() ) |
203 | { |
204 | continue; |
205 | } |
206 | |
207 | try |
208 | { |
209 | key.cancel(); |
210 | ch.close(); |
211 | } |
212 | catch( IOException e ) |
213 | { |
214 | session.getManagerFilterChain().exceptionCaught( session, e ); |
215 | } |
216 | finally |
217 | { |
218 | releaseWriteBuffers( session ); |
219 | |
220 | session.getManagerFilterChain().sessionClosed( session ); |
221 | session.notifyClose(); |
222 | } |
223 | } |
224 | } |
225 | |
226 | private void processSessions( Set selectedKeys ) |
227 | { |
228 | Iterator it = selectedKeys.iterator(); |
229 | |
230 | while( it.hasNext() ) |
231 | { |
232 | SelectionKey key = ( SelectionKey ) it.next(); |
233 | SocketSession session = ( SocketSession ) key.attachment(); |
234 | |
235 | if( key.isReadable() ) |
236 | { |
237 | read( session ); |
238 | } |
239 | |
240 | if( key.isWritable() ) |
241 | { |
242 | scheduleFlush( session ); |
243 | } |
244 | } |
245 | |
246 | selectedKeys.clear(); |
247 | } |
248 | |
249 | private void read( SocketSession session ) |
250 | { |
251 | ByteBuffer buf = ByteBuffer.allocate( |
252 | (( SocketSessionConfig ) session.getConfig()).getSessionReceiveBufferSize() ); |
253 | SocketChannel ch = session.getChannel(); |
254 | |
255 | try |
256 | { |
257 | int readBytes = 0; |
258 | int ret; |
259 | |
260 | buf.clear(); |
261 | |
262 | try |
263 | { |
264 | while( ( ret = ch.read( buf.buf() ) ) > 0 ) |
265 | { |
266 | readBytes += ret; |
267 | } |
268 | } |
269 | finally |
270 | { |
271 | buf.flip(); |
272 | } |
273 | |
274 | session.increaseReadBytes( readBytes ); |
275 | session.setIdle( IdleStatus.BOTH_IDLE, false ); |
276 | session.setIdle( IdleStatus.READER_IDLE, false ); |
277 | |
278 | if( readBytes > 0 ) |
279 | { |
280 | ByteBuffer newBuf = ByteBuffer.allocate( readBytes ); |
281 | newBuf.put( buf ); |
282 | newBuf.flip(); |
283 | session.getManagerFilterChain().dataRead( session, newBuf ); |
284 | } |
285 | if( ret < 0 ) |
286 | { |
287 | scheduleRemove( session ); |
288 | } |
289 | } |
290 | catch( Throwable e ) |
291 | { |
292 | if( e instanceof IOException ) |
293 | scheduleRemove( session ); |
294 | session.getManagerFilterChain().exceptionCaught( session, e ); |
295 | } |
296 | finally |
297 | { |
298 | buf.release(); |
299 | } |
300 | } |
301 | |
302 | private void scheduleRemove( SocketSession session ) |
303 | { |
304 | synchronized( removingSessions ) |
305 | { |
306 | removingSessions.push( session ); |
307 | } |
308 | } |
309 | |
310 | private void scheduleFlush( SocketSession session ) |
311 | { |
312 | synchronized( flushingSessions ) |
313 | { |
314 | flushingSessions.push( session ); |
315 | } |
316 | } |
317 | |
318 | private void notifyIdleSessions() |
319 | { |
320 | Set keys = selector.keys(); |
321 | Iterator it; |
322 | SocketSession session; |
323 | |
324 | // process idle sessions |
325 | long currentTime = System.currentTimeMillis(); |
326 | |
327 | if( ( keys != null ) && ( ( currentTime - lastIdleCheckTime ) >= 1000 ) ) |
328 | { |
329 | lastIdleCheckTime = currentTime; |
330 | it = keys.iterator(); |
331 | |
332 | while( it.hasNext() ) |
333 | { |
334 | SelectionKey key = ( SelectionKey ) it.next(); |
335 | session = ( SocketSession ) key.attachment(); |
336 | |
337 | notifyIdleSession( session, currentTime ); |
338 | } |
339 | } |
340 | } |
341 | |
342 | private void notifyIdleSession( SocketSession session, long currentTime ) |
343 | { |
344 | SessionConfig config = session.getConfig(); |
345 | |
346 | notifyIdleSession0( session, currentTime, config |
347 | .getIdleTimeInMillis( IdleStatus.BOTH_IDLE ), |
348 | IdleStatus.BOTH_IDLE, session.getLastIoTime() ); |
349 | notifyIdleSession0( session, currentTime, config |
350 | .getIdleTimeInMillis( IdleStatus.READER_IDLE ), |
351 | IdleStatus.READER_IDLE, session.getLastReadTime() ); |
352 | notifyIdleSession0( session, currentTime, config |
353 | .getIdleTimeInMillis( IdleStatus.WRITER_IDLE ), |
354 | IdleStatus.WRITER_IDLE, session.getLastWriteTime() ); |
355 | |
356 | notifyWriteTimeoutSession( session, currentTime, config |
357 | .getWriteTimeoutInMillis(), session.getLastWriteTime() ); |
358 | } |
359 | |
360 | private void notifyIdleSession0( SocketSession session, long currentTime, |
361 | long idleTime, IdleStatus status, |
362 | long lastIoTime ) |
363 | { |
364 | if( idleTime > 0 && !session.isIdle( status ) && lastIoTime != 0 |
365 | && ( currentTime - lastIoTime ) >= idleTime ) |
366 | { |
367 | session.setIdle( status, true ); |
368 | session.getManagerFilterChain().sessionIdle( session, status ); |
369 | } |
370 | } |
371 | |
372 | private void notifyWriteTimeoutSession( SocketSession session, |
373 | long currentTime, |
374 | long writeTimeout, long lastIoTime ) |
375 | { |
376 | if( writeTimeout > 0 |
377 | && ( currentTime - lastIoTime ) >= writeTimeout |
378 | && session.getSelectionKey() != null |
379 | && ( session.getSelectionKey().interestOps() & SelectionKey.OP_WRITE ) != 0 ) |
380 | { |
381 | session |
382 | .getManagerFilterChain() |
383 | .exceptionCaught( session, new WriteTimeoutException() ); |
384 | } |
385 | } |
386 | |
387 | private void flushSessions() |
388 | { |
389 | if( flushingSessions.size() == 0 ) |
390 | return; |
391 | |
392 | for( ;; ) |
393 | { |
394 | SocketSession session; |
395 | |
396 | synchronized( flushingSessions ) |
397 | { |
398 | session = ( SocketSession ) flushingSessions.pop(); |
399 | } |
400 | |
401 | if( session == null ) |
402 | break; |
403 | |
404 | if( !session.isConnected() ) |
405 | { |
406 | releaseWriteBuffers( session ); |
407 | continue; |
408 | } |
409 | |
410 | // If encountered write request before session is initialized, |
411 | // (In case that Session.write() is called before addSession() is processed) |
412 | if( session.getSelectionKey() == null ) |
413 | { |
414 | // Reschedule for later write |
415 | scheduleFlush( session ); |
416 | break; |
417 | } |
418 | else |
419 | { |
420 | try |
421 | { |
422 | flush( session ); |
423 | } |
424 | catch( IOException e ) |
425 | { |
426 | scheduleRemove( session ); |
427 | session.getManagerFilterChain().exceptionCaught( session, e ); |
428 | } |
429 | } |
430 | } |
431 | } |
432 | |
433 | private void releaseWriteBuffers( SocketSession session ) |
434 | { |
435 | Queue writeBufferQueue = session.getWriteBufferQueue(); |
436 | session.getWriteMarkerQueue().clear(); |
437 | ByteBuffer buf; |
438 | |
439 | while( ( buf = (ByteBuffer) writeBufferQueue.pop() ) != null ) |
440 | { |
441 | try |
442 | { |
443 | buf.release(); |
444 | } |
445 | catch( IllegalStateException e ) |
446 | { |
447 | session.getManagerFilterChain().exceptionCaught( session, e ); |
448 | } |
449 | } |
450 | } |
451 | |
452 | private void flush( SocketSession session ) throws IOException |
453 | { |
454 | SocketChannel ch = session.getChannel(); |
455 | |
456 | Queue writeBufferQueue = session.getWriteBufferQueue(); |
457 | Queue writeMarkerQueue = session.getWriteMarkerQueue(); |
458 | |
459 | ByteBuffer buf; |
460 | Object marker; |
461 | for( ;; ) |
462 | { |
463 | synchronized( writeBufferQueue ) |
464 | { |
465 | buf = ( ByteBuffer ) writeBufferQueue.first(); |
466 | marker = writeMarkerQueue.first(); |
467 | } |
468 | |
469 | if( buf == null ) |
470 | break; |
471 | |
472 | if( buf.remaining() == 0 ) |
473 | { |
474 | synchronized( writeBufferQueue ) |
475 | { |
476 | writeBufferQueue.pop(); |
477 | writeMarkerQueue.pop(); |
478 | } |
479 | try |
480 | { |
481 | buf.release(); |
482 | } |
483 | catch( IllegalStateException e ) |
484 | { |
485 | session.getManagerFilterChain().exceptionCaught( session, e ); |
486 | } |
487 | |
488 | session.getManagerFilterChain().dataWritten( session, marker ); |
489 | continue; |
490 | } |
491 | |
492 | int writtenBytes = 0; |
493 | try |
494 | { |
495 | writtenBytes = ch.write( buf.buf() ); |
496 | } |
497 | finally |
498 | { |
499 | if( writtenBytes > 0 ) |
500 | { |
501 | session.increaseWrittenBytes( writtenBytes ); |
502 | session.setIdle( IdleStatus.BOTH_IDLE, false ); |
503 | session.setIdle( IdleStatus.WRITER_IDLE, false ); |
504 | } |
505 | |
506 | SelectionKey key = session.getSelectionKey(); |
507 | if( buf.hasRemaining() ) |
508 | { |
509 | // Kernel buffer is full |
510 | key |
511 | .interestOps( key.interestOps() |
512 | | SelectionKey.OP_WRITE ); |
513 | break; |
514 | } |
515 | else |
516 | { |
517 | key.interestOps( key.interestOps() |
518 | & ( ~SelectionKey.OP_WRITE ) ); |
519 | } |
520 | } |
521 | } |
522 | } |
523 | |
524 | private class Worker extends Thread |
525 | { |
526 | public Worker() |
527 | { |
528 | super( "SocketIoProcessor" ); |
529 | } |
530 | |
531 | public void run() |
532 | { |
533 | for( ;; ) |
534 | { |
535 | try |
536 | { |
537 | int nKeys = selector.select( 1000 ); |
538 | addSessions(); |
539 | |
540 | if( nKeys > 0 ) |
541 | { |
542 | processSessions( selector.selectedKeys() ); |
543 | } |
544 | |
545 | flushSessions(); |
546 | removeSessions(); |
547 | notifyIdleSessions(); |
548 | |
549 | if( selector.keys().isEmpty() ) |
550 | { |
551 | synchronized( SocketIoProcessor.this ) |
552 | { |
553 | if( selector.keys().isEmpty() && |
554 | newSessions.isEmpty() ) |
555 | { |
556 | worker = null; |
557 | break; |
558 | } |
559 | } |
560 | } |
561 | } |
562 | catch( IOException e ) |
563 | { |
564 | e.printStackTrace(); |
565 | |
566 | try |
567 | { |
568 | Thread.sleep( 1000 ); |
569 | } |
570 | catch( InterruptedException e1 ) |
571 | { |
572 | } |
573 | } |
574 | } |
575 | } |
576 | } |
577 | } |