1 | /* |
2 | * @(#) $Id: SocketIoProcessor.java 332218 2005-11-10 03:52:42Z trustin $ |
3 | * |
4 | * Copyright 2004 The Apache Software Foundation |
5 | * |
6 | * Licensed under the Apache License, Version 2.0 (the "License"); |
7 | * you may not use this file except in compliance with the License. |
8 | * You may obtain a copy of the License at |
9 | * |
10 | * http://www.apache.org/licenses/LICENSE-2.0 |
11 | * |
12 | * Unless required by applicable law or agreed to in writing, software |
13 | * distributed under the License is distributed on an "AS IS" BASIS, |
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
15 | * See the License for the specific language governing permissions and |
16 | * limitations under the License. |
17 | * |
18 | */ |
19 | package org.apache.mina.io.socket; |
20 | |
21 | import java.io.IOException; |
22 | import java.nio.channels.SelectionKey; |
23 | import java.nio.channels.Selector; |
24 | import java.nio.channels.SocketChannel; |
25 | import java.util.Iterator; |
26 | import java.util.Set; |
27 | |
28 | import org.apache.mina.common.ByteBuffer; |
29 | import org.apache.mina.common.IdleStatus; |
30 | import org.apache.mina.common.SessionConfig; |
31 | import org.apache.mina.io.WriteTimeoutException; |
32 | import org.apache.mina.util.Queue; |
33 | import org.slf4j.Logger; |
34 | import org.slf4j.LoggerFactory; |
35 | |
36 | /** |
37 | * Performs all I/O operations for sockets which is connected or bound. |
38 | * This class is used by MINA internally. |
39 | * |
40 | * @author The Apache Directory Project (dev@directory.apache.org) |
41 | * @version $Rev: 332218 $, $Date: 2005-11-10 12:52:42 +0900 $, |
42 | */ |
43 | class SocketIoProcessor |
44 | { |
45 | private static final Logger log = LoggerFactory.getLogger( SocketIoProcessor.class ); |
46 | private static final SocketIoProcessor instance; |
47 | |
48 | static |
49 | { |
50 | SocketIoProcessor tmp; |
51 | |
52 | try |
53 | { |
54 | tmp = new SocketIoProcessor(); |
55 | } |
56 | catch( IOException e ) |
57 | { |
58 | InternalError error = new InternalError( |
59 | "Failed to open selector." ); |
60 | error.initCause( e ); |
61 | throw error; |
62 | } |
63 | |
64 | instance = tmp; |
65 | } |
66 | |
67 | private final Selector selector; |
68 | |
69 | private final Queue newSessions = new Queue(); |
70 | |
71 | private final Queue removingSessions = new Queue(); |
72 | |
73 | private final Queue flushingSessions = new Queue(); |
74 | |
75 | private final Queue readableSessions = new Queue(); |
76 | |
77 | private Worker worker; |
78 | |
79 | private long lastIdleCheckTime = System.currentTimeMillis(); |
80 | |
81 | private SocketIoProcessor() throws IOException |
82 | { |
83 | selector = Selector.open(); |
84 | } |
85 | |
86 | static SocketIoProcessor getInstance() |
87 | { |
88 | return instance; |
89 | } |
90 | |
91 | void addSession( SocketSession session ) |
92 | { |
93 | synchronized( this ) |
94 | { |
95 | synchronized( newSessions ) |
96 | { |
97 | newSessions.push( session ); |
98 | } |
99 | startupWorker(); |
100 | } |
101 | |
102 | selector.wakeup(); |
103 | } |
104 | |
105 | void removeSession( SocketSession session ) |
106 | { |
107 | scheduleRemove( session ); |
108 | startupWorker(); |
109 | selector.wakeup(); |
110 | } |
111 | |
112 | private synchronized void startupWorker() |
113 | { |
114 | if( worker == null ) |
115 | { |
116 | worker = new Worker(); |
117 | worker.start(); |
118 | } |
119 | } |
120 | |
121 | void flushSession( SocketSession session ) |
122 | { |
123 | scheduleFlush( session ); |
124 | selector.wakeup(); |
125 | } |
126 | |
127 | void addReadableSession( SocketSession session ) |
128 | { |
129 | synchronized( readableSessions ) |
130 | { |
131 | readableSessions.push( session ); |
132 | } |
133 | selector.wakeup(); |
134 | } |
135 | |
136 | private void addSessions() |
137 | { |
138 | if( newSessions.isEmpty() ) |
139 | return; |
140 | |
141 | SocketSession session; |
142 | |
143 | for( ;; ) |
144 | { |
145 | synchronized( newSessions ) |
146 | { |
147 | session = ( SocketSession ) newSessions.pop(); |
148 | } |
149 | |
150 | if( session == null ) |
151 | break; |
152 | |
153 | SocketChannel ch = session.getChannel(); |
154 | boolean registered; |
155 | |
156 | try |
157 | { |
158 | ch.configureBlocking( false ); |
159 | session.setSelectionKey( ch.register( selector, |
160 | SelectionKey.OP_READ, |
161 | session ) ); |
162 | registered = true; |
163 | } |
164 | catch( IOException e ) |
165 | { |
166 | registered = false; |
167 | session.getManagerFilterChain().exceptionCaught( session, e ); |
168 | } |
169 | |
170 | if( registered ) |
171 | { |
172 | session.getManagerFilterChain().sessionOpened( session ); |
173 | } |
174 | } |
175 | } |
176 | |
177 | private void removeSessions() |
178 | { |
179 | if( removingSessions.isEmpty() ) |
180 | return; |
181 | |
182 | for( ;; ) |
183 | { |
184 | SocketSession session; |
185 | |
186 | synchronized( removingSessions ) |
187 | { |
188 | session = ( SocketSession ) removingSessions.pop(); |
189 | } |
190 | |
191 | if( session == null ) |
192 | break; |
193 | |
194 | SocketChannel ch = session.getChannel(); |
195 | SelectionKey key = session.getSelectionKey(); |
196 | // Retry later if session is not yet fully initialized. |
197 | // (In case that Session.close() is called before addSession() is processed) |
198 | if( key == null ) |
199 | { |
200 | scheduleRemove( session ); |
201 | break; |
202 | } |
203 | |
204 | // skip if channel is already closed |
205 | if( !key.isValid() ) |
206 | { |
207 | continue; |
208 | } |
209 | |
210 | try |
211 | { |
212 | key.cancel(); |
213 | ch.close(); |
214 | } |
215 | catch( IOException e ) |
216 | { |
217 | session.getManagerFilterChain().exceptionCaught( session, e ); |
218 | } |
219 | finally |
220 | { |
221 | releaseWriteBuffers( session ); |
222 | |
223 | session.getManagerFilterChain().sessionClosed( session ); |
224 | session.notifyClose(); |
225 | } |
226 | } |
227 | } |
228 | |
229 | private void processSessions( Set selectedKeys ) |
230 | { |
231 | Iterator it = selectedKeys.iterator(); |
232 | |
233 | while( it.hasNext() ) |
234 | { |
235 | SelectionKey key = ( SelectionKey ) it.next(); |
236 | SocketSession session = ( SocketSession ) key.attachment(); |
237 | |
238 | if( key.isReadable() ) |
239 | { |
240 | read( session ); |
241 | } |
242 | |
243 | if( key.isWritable() ) |
244 | { |
245 | scheduleFlush( session ); |
246 | } |
247 | } |
248 | |
249 | selectedKeys.clear(); |
250 | } |
251 | |
252 | private void read( SocketSession session ) |
253 | { |
254 | ByteBuffer buf = ByteBuffer.allocate( |
255 | (( SocketSessionConfig ) session.getConfig()).getSessionReceiveBufferSize() ); |
256 | SocketChannel ch = session.getChannel(); |
257 | |
258 | try |
259 | { |
260 | int readBytes = 0; |
261 | int ret; |
262 | |
263 | buf.clear(); |
264 | |
265 | try |
266 | { |
267 | while( ( ret = ch.read( buf.buf() ) ) > 0 ) |
268 | { |
269 | readBytes += ret; |
270 | } |
271 | } |
272 | finally |
273 | { |
274 | buf.flip(); |
275 | } |
276 | |
277 | session.increaseReadBytes( readBytes ); |
278 | session.resetIdleCount( IdleStatus.BOTH_IDLE ); |
279 | session.resetIdleCount( IdleStatus.READER_IDLE ); |
280 | |
281 | if( readBytes > 0 ) |
282 | { |
283 | ByteBuffer newBuf = ByteBuffer.allocate( readBytes ); |
284 | newBuf.put( buf ); |
285 | newBuf.flip(); |
286 | session.getManagerFilterChain().dataRead( session, newBuf ); |
287 | } |
288 | if( ret < 0 ) |
289 | { |
290 | scheduleRemove( session ); |
291 | } |
292 | } |
293 | catch( Throwable e ) |
294 | { |
295 | if( e instanceof IOException ) |
296 | scheduleRemove( session ); |
297 | session.getManagerFilterChain().exceptionCaught( session, e ); |
298 | } |
299 | finally |
300 | { |
301 | buf.release(); |
302 | } |
303 | } |
304 | |
305 | private void scheduleRemove( SocketSession session ) |
306 | { |
307 | synchronized( removingSessions ) |
308 | { |
309 | removingSessions.push( session ); |
310 | } |
311 | } |
312 | |
313 | private void scheduleFlush( SocketSession session ) |
314 | { |
315 | synchronized( flushingSessions ) |
316 | { |
317 | flushingSessions.push( session ); |
318 | } |
319 | } |
320 | |
321 | private void notifyIdleSessions() |
322 | { |
323 | // process idle sessions |
324 | long currentTime = System.currentTimeMillis(); |
325 | if( ( currentTime - lastIdleCheckTime ) >= 1000 ) |
326 | { |
327 | lastIdleCheckTime = currentTime; |
328 | Set keys = selector.keys(); |
329 | if( keys != null ) |
330 | { |
331 | for( Iterator it = keys.iterator(); it.hasNext(); ) |
332 | { |
333 | SelectionKey key = ( SelectionKey ) it.next(); |
334 | SocketSession session = ( SocketSession ) key.attachment(); |
335 | notifyIdleSession( session, currentTime ); |
336 | } |
337 | } |
338 | } |
339 | } |
340 | |
341 | private void notifyIdleSession( SocketSession session, long currentTime ) |
342 | { |
343 | SessionConfig config = session.getConfig(); |
344 | |
345 | notifyIdleSession0( |
346 | session, currentTime, |
347 | config.getIdleTimeInMillis( IdleStatus.BOTH_IDLE ), |
348 | IdleStatus.BOTH_IDLE, |
349 | Math.max( session.getLastIoTime(), session.getLastIdleTime( IdleStatus.BOTH_IDLE ) ) ); |
350 | notifyIdleSession0( |
351 | session, currentTime, |
352 | config.getIdleTimeInMillis( IdleStatus.READER_IDLE ), |
353 | IdleStatus.READER_IDLE, |
354 | Math.max( session.getLastReadTime(), session.getLastIdleTime( IdleStatus.READER_IDLE ) ) ); |
355 | notifyIdleSession0( |
356 | session, currentTime, |
357 | config.getIdleTimeInMillis( IdleStatus.WRITER_IDLE ), |
358 | IdleStatus.WRITER_IDLE, |
359 | Math.max( session.getLastWriteTime(), session.getLastIdleTime( IdleStatus.WRITER_IDLE ) ) ); |
360 | |
361 | notifyWriteTimeoutSession( session, currentTime, config |
362 | .getWriteTimeoutInMillis(), session.getLastWriteTime() ); |
363 | } |
364 | |
365 | private void notifyIdleSession0( SocketSession session, long currentTime, |
366 | long idleTime, IdleStatus status, |
367 | long lastIoTime ) |
368 | { |
369 | if( idleTime > 0 && lastIoTime != 0 |
370 | && ( currentTime - lastIoTime ) >= idleTime ) |
371 | { |
372 | session.increaseIdleCount( status ); |
373 | session.getManagerFilterChain().sessionIdle( session, status ); |
374 | } |
375 | } |
376 | |
377 | private void notifyWriteTimeoutSession( SocketSession session, |
378 | long currentTime, |
379 | long writeTimeout, long lastIoTime ) |
380 | { |
381 | SelectionKey key = session.getSelectionKey(); |
382 | if( writeTimeout > 0 |
383 | && ( currentTime - lastIoTime ) >= writeTimeout |
384 | && key != null && key.isValid() |
385 | && ( key.interestOps() & SelectionKey.OP_WRITE ) != 0 ) |
386 | { |
387 | session |
388 | .getManagerFilterChain() |
389 | .exceptionCaught( session, new WriteTimeoutException() ); |
390 | } |
391 | } |
392 | |
393 | private void flushSessions() |
394 | { |
395 | if( flushingSessions.size() == 0 ) |
396 | return; |
397 | |
398 | for( ;; ) |
399 | { |
400 | SocketSession session; |
401 | |
402 | synchronized( flushingSessions ) |
403 | { |
404 | session = ( SocketSession ) flushingSessions.pop(); |
405 | } |
406 | |
407 | if( session == null ) |
408 | break; |
409 | |
410 | if( !session.isConnected() ) |
411 | { |
412 | releaseWriteBuffers( session ); |
413 | continue; |
414 | } |
415 | |
416 | // If encountered write request before session is initialized, |
417 | // (In case that Session.write() is called before addSession() is processed) |
418 | SelectionKey key = session.getSelectionKey(); |
419 | if( key == null ) |
420 | { |
421 | // Reschedule for later write |
422 | scheduleFlush( session ); |
423 | break; |
424 | } |
425 | if( !key.isValid() ) |
426 | { |
427 | continue; |
428 | } |
429 | |
430 | try |
431 | { |
432 | flush( session ); |
433 | } |
434 | catch( IOException e ) |
435 | { |
436 | scheduleRemove( session ); |
437 | session.getManagerFilterChain().exceptionCaught( session, e ); |
438 | } |
439 | } |
440 | } |
441 | |
442 | private void releaseWriteBuffers( SocketSession session ) |
443 | { |
444 | Queue writeBufferQueue = session.getWriteBufferQueue(); |
445 | session.getWriteMarkerQueue().clear(); |
446 | ByteBuffer buf; |
447 | |
448 | while( ( buf = (ByteBuffer) writeBufferQueue.pop() ) != null ) |
449 | { |
450 | try |
451 | { |
452 | buf.release(); |
453 | } |
454 | catch( IllegalStateException e ) |
455 | { |
456 | session.getManagerFilterChain().exceptionCaught( session, e ); |
457 | } |
458 | } |
459 | } |
460 | |
461 | private void flush( SocketSession session ) throws IOException |
462 | { |
463 | // Clear OP_WRITE |
464 | SelectionKey key = session.getSelectionKey(); |
465 | key.interestOps( key.interestOps() & ( ~SelectionKey.OP_WRITE ) ); |
466 | |
467 | SocketChannel ch = session.getChannel(); |
468 | |
469 | Queue writeBufferQueue = session.getWriteBufferQueue(); |
470 | Queue writeMarkerQueue = session.getWriteMarkerQueue(); |
471 | |
472 | ByteBuffer buf; |
473 | Object marker; |
474 | for( ;; ) |
475 | { |
476 | synchronized( writeBufferQueue ) |
477 | { |
478 | buf = ( ByteBuffer ) writeBufferQueue.first(); |
479 | marker = writeMarkerQueue.first(); |
480 | } |
481 | |
482 | if( buf == null ) |
483 | break; |
484 | |
485 | if( buf.remaining() == 0 ) |
486 | { |
487 | synchronized( writeBufferQueue ) |
488 | { |
489 | writeBufferQueue.pop(); |
490 | writeMarkerQueue.pop(); |
491 | } |
492 | try |
493 | { |
494 | buf.release(); |
495 | } |
496 | catch( IllegalStateException e ) |
497 | { |
498 | session.getManagerFilterChain().exceptionCaught( session, e ); |
499 | } |
500 | |
501 | session.increaseWrittenWriteRequests(); |
502 | session.getManagerFilterChain().dataWritten( session, marker ); |
503 | continue; |
504 | } |
505 | |
506 | int writtenBytes = ch.write( buf.buf() ); |
507 | if( writtenBytes > 0 ) |
508 | { |
509 | session.increaseWrittenBytes( writtenBytes ); |
510 | session.resetIdleCount( IdleStatus.BOTH_IDLE ); |
511 | session.resetIdleCount( IdleStatus.WRITER_IDLE ); |
512 | } |
513 | |
514 | if( buf.hasRemaining() ) |
515 | { |
516 | // Kernel buffer is full |
517 | key.interestOps( key.interestOps() | SelectionKey.OP_WRITE ); |
518 | break; |
519 | } |
520 | } |
521 | } |
522 | |
523 | private class Worker extends Thread |
524 | { |
525 | public Worker() |
526 | { |
527 | super( "SocketIoProcessor" ); |
528 | } |
529 | |
530 | public void run() |
531 | { |
532 | for( ;; ) |
533 | { |
534 | try |
535 | { |
536 | int nKeys = selector.select( 1000 ); |
537 | addSessions(); |
538 | |
539 | if( nKeys > 0 ) |
540 | { |
541 | processSessions( selector.selectedKeys() ); |
542 | } |
543 | |
544 | flushSessions(); |
545 | removeSessions(); |
546 | notifyIdleSessions(); |
547 | |
548 | if( selector.keys().isEmpty() ) |
549 | { |
550 | synchronized( SocketIoProcessor.this ) |
551 | { |
552 | if( selector.keys().isEmpty() && |
553 | newSessions.isEmpty() ) |
554 | { |
555 | worker = null; |
556 | break; |
557 | } |
558 | } |
559 | } |
560 | } |
561 | catch( Throwable t ) |
562 | { |
563 | log.warn( "Unexpected exception.", t ); |
564 | |
565 | try |
566 | { |
567 | Thread.sleep( 1000 ); |
568 | } |
569 | catch( InterruptedException e1 ) |
570 | { |
571 | } |
572 | } |
573 | } |
574 | } |
575 | } |
576 | } |