View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.socket.nio;
21  
22  import java.io.IOException;
23  import java.nio.channels.SelectionKey;
24  import java.nio.channels.Selector;
25  import java.nio.channels.SocketChannel;
26  import java.util.Queue;
27  import java.util.Set;
28  import java.util.concurrent.ConcurrentLinkedQueue;
29  import java.util.concurrent.Executor;
30  
31  import org.apache.mina.common.ByteBuffer;
32  import org.apache.mina.common.ExceptionMonitor;
33  import org.apache.mina.common.IdleStatus;
34  import org.apache.mina.common.IoFilter.WriteRequest;
35  import org.apache.mina.common.WriteTimeoutException;
36  import org.apache.mina.util.NamePreservingRunnable;
37  
38  /**
39   * Performs all I/O operations for sockets which is connected or bound. This class is used by MINA internally.
40   *
41   * @author The Apache Directory Project (mina-dev@directory.apache.org)
42   * @version $Rev: 588150 $, $Date: 2007-10-25 15:20:04 +0900 (목, 25 10월 2007) $,
43   */
44  class SocketIoProcessor {
45      private final Object lock = new Object();
46  
47      private final String threadName;
48  
49      private final Executor executor;
50  
51      private volatile Selector selector;
52  
53      private final Queue<SocketSessionImpl> newSessions = new ConcurrentLinkedQueue<SocketSessionImpl>();
54  
55      private final Queue<SocketSessionImpl> removingSessions = new ConcurrentLinkedQueue<SocketSessionImpl>();
56  
57      private final Queue<SocketSessionImpl> flushingSessions = new ConcurrentLinkedQueue<SocketSessionImpl>();
58  
59      private final Queue<SocketSessionImpl> trafficControllingSessions = new ConcurrentLinkedQueue<SocketSessionImpl>();
60  
61      private Worker worker;
62  
63      private long lastIdleCheckTime = System.currentTimeMillis();
64  
65      SocketIoProcessor(String threadName, Executor executor) {
66          this.threadName = threadName;
67          this.executor = executor;
68      }
69  
70      void addNew(SocketSessionImpl session) throws IOException {
71          newSessions.add(session);
72          startupWorker();
73      }
74  
75      void remove(SocketSessionImpl session) throws IOException {
76          scheduleRemove(session);
77          startupWorker();
78      }
79  
80      private void startupWorker() throws IOException {
81          synchronized (lock) {
82              if (worker == null) {
83                  selector = Selector.open();
84                  worker = new Worker();
85                  executor.execute(new NamePreservingRunnable(worker, threadName));
86              }
87              selector.wakeup();
88          }
89      }
90  
91      void flush(SocketSessionImpl session) {
92          if ( scheduleFlush(session) ) {
93              Selector selector = this.selector;
94              if (selector != null) {
95                  selector.wakeup();
96              }
97          }
98      }
99  
100     void updateTrafficMask(SocketSessionImpl session) {
101         scheduleTrafficControl(session);
102         Selector selector = this.selector;
103         if (selector != null) {
104             selector.wakeup();
105         }
106     }
107 
108     private void scheduleRemove(SocketSessionImpl session) {
109         removingSessions.add(session);
110     }
111 
112     private boolean scheduleFlush(SocketSessionImpl session) {
113         if (session.setScheduledForFlush(true)) {
114             flushingSessions.add(session);
115 
116             return true;
117         }
118 
119         return false;
120     }
121 
122     private void scheduleTrafficControl(SocketSessionImpl session) {
123         trafficControllingSessions.add(session);
124     }
125 
126     private void doAddNew() {
127         Selector selector = this.selector;
128         for (;;) {
129             SocketSessionImpl session = newSessions.poll();
130 
131             if (session == null)
132                 break;
133 
134             SocketChannel ch = session.getChannel();
135             try {
136                 ch.configureBlocking(false);
137                 session.setSelectionKey(ch.register(selector,
138                         SelectionKey.OP_READ, session));
139 
140                 // AbstractIoFilterChain.CONNECT_FUTURE is cleared inside here
141                 // in AbstractIoFilterChain.fireSessionOpened().
142                 session.getServiceListeners().fireSessionCreated(session);
143             } catch (IOException e) {
144                 // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute
145                 // and call ConnectFuture.setException().
146                 session.getFilterChain().fireExceptionCaught(session, e);
147             }
148         }
149     }
150 
151     private void doRemove() {
152         for (;;) {
153             SocketSessionImpl session = removingSessions.poll();
154 
155             if (session == null)
156                 break;
157 
158             SocketChannel ch = session.getChannel();
159             SelectionKey key = session.getSelectionKey();
160             // Retry later if session is not yet fully initialized.
161             // (In case that Session.close() is called before addSession() is processed)
162             if (key == null) {
163                 scheduleRemove(session);
164                 break;
165             }
166             // skip if channel is already closed
167             if (!key.isValid()) {
168                 continue;
169             }
170 
171             try {
172                 key.cancel();
173                 ch.close();
174             } catch (IOException e) {
175                 session.getFilterChain().fireExceptionCaught(session, e);
176             } finally {
177                 releaseWriteBuffers(session);
178                 session.getServiceListeners().fireSessionDestroyed(session);
179             }
180         }
181     }
182 
183     private void process(Set<SelectionKey> selectedKeys) {
184         for (SelectionKey key : selectedKeys) {
185             SocketSessionImpl session = (SocketSessionImpl) key.attachment();
186 
187             if (key.isReadable() && session.getTrafficMask().isReadable()) {
188                 read(session);
189             }
190 
191             if (key.isWritable() && session.getTrafficMask().isWritable()) {
192                 scheduleFlush(session);
193             }
194         }
195 
196         selectedKeys.clear();
197     }
198 
199     private void read(SocketSessionImpl session) {
200         ByteBuffer buf = ByteBuffer.allocate(session.getReadBufferSize());
201         SocketChannel ch = session.getChannel();
202 
203         try {
204             int readBytes = 0;
205             int ret;
206 
207             try {
208                 while ((ret = ch.read(buf.buf())) > 0) {
209                     readBytes += ret;
210                 }
211             } finally {
212                 buf.flip();
213             }
214 
215             session.increaseReadBytes(readBytes);
216 
217             if (readBytes > 0) {
218                 session.getFilterChain().fireMessageReceived(session, buf);
219                 buf = null;
220 
221                 if (readBytes * 2 < session.getReadBufferSize()) {
222                     session.decreaseReadBufferSize();
223                 } else if (readBytes == session.getReadBufferSize()) {
224                     session.increaseReadBufferSize();
225                 }
226             }
227             if (ret < 0) {
228                 scheduleRemove(session);
229             }
230         } catch (Throwable e) {
231             if (e instanceof IOException)
232                 scheduleRemove(session);
233             session.getFilterChain().fireExceptionCaught(session, e);
234         } finally {
235             if (buf != null)
236                 buf.release();
237         }
238     }
239 
240     private void notifyIdleness() {
241         // process idle sessions
242         long currentTime = System.currentTimeMillis();
243         if ((currentTime - lastIdleCheckTime) >= 1000) {
244             lastIdleCheckTime = currentTime;
245             Set<SelectionKey> keys = selector.keys();
246             if (keys != null) {
247                 for (SelectionKey key : keys) {
248                     SocketSessionImpl session = (SocketSessionImpl) key
249                             .attachment();
250                     notifyIdleness(session, currentTime);
251                 }
252             }
253         }
254     }
255 
256     private void notifyIdleness(SocketSessionImpl session, long currentTime) {
257         notifyIdleness0(session, currentTime, session
258                 .getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
259                 IdleStatus.BOTH_IDLE, Math.max(session.getLastIoTime(), session
260                         .getLastIdleTime(IdleStatus.BOTH_IDLE)));
261         notifyIdleness0(session, currentTime, session
262                 .getIdleTimeInMillis(IdleStatus.READER_IDLE),
263                 IdleStatus.READER_IDLE, Math.max(session.getLastReadTime(),
264                         session.getLastIdleTime(IdleStatus.READER_IDLE)));
265         notifyIdleness0(session, currentTime, session
266                 .getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
267                 IdleStatus.WRITER_IDLE, Math.max(session.getLastWriteTime(),
268                         session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
269 
270         notifyWriteTimeout(session, currentTime, session
271                 .getWriteTimeoutInMillis(), session.getLastWriteTime());
272     }
273 
274     private void notifyIdleness0(SocketSessionImpl session, long currentTime,
275             long idleTime, IdleStatus status, long lastIoTime) {
276         if (idleTime > 0 && lastIoTime != 0
277                 && (currentTime - lastIoTime) >= idleTime) {
278             session.increaseIdleCount(status);
279             session.getFilterChain().fireSessionIdle(session, status);
280         }
281     }
282 
283     private void notifyWriteTimeout(SocketSessionImpl session,
284             long currentTime, long writeTimeout, long lastIoTime) {
285         SelectionKey key = session.getSelectionKey();
286         if (writeTimeout > 0 && (currentTime - lastIoTime) >= writeTimeout
287                 && key != null && key.isValid()
288                 && (key.interestOps() & SelectionKey.OP_WRITE) != 0) {
289             session.getFilterChain().fireExceptionCaught(session,
290                     new WriteTimeoutException());
291         }
292     }
293 
294     private void doFlush() {
295         for (;;) {
296             SocketSessionImpl session = flushingSessions.poll();
297 
298             if (session == null)
299                 break;
300 
301             session.setScheduledForFlush(false);
302 
303             if (!session.isConnected()) {
304                 releaseWriteBuffers(session);
305                 continue;
306             }
307 
308             SelectionKey key = session.getSelectionKey();
309             // Retry later if session is not yet fully initialized.
310             // (In case that Session.write() is called before addSession() is processed)
311             if (key == null) {
312                 scheduleFlush(session);
313                 break;
314             }
315 
316             // Skip if the channel is already closed.
317             if (!key.isValid()) {
318                 continue;
319             }
320 
321             try {
322                 boolean flushedAll = doFlush(session);
323                 if( flushedAll && !session.getWriteRequestQueue().isEmpty() && !session.isScheduledForFlush()) {
324                     scheduleFlush( session );
325                 }
326             } catch (IOException e) {
327                 scheduleRemove(session);
328                 session.getFilterChain().fireExceptionCaught(session, e);
329             }
330         }
331     }
332 
333     private void releaseWriteBuffers(SocketSessionImpl session) {
334         Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
335         WriteRequest req;
336 
337         if ((req = writeRequestQueue.poll()) != null) {
338             ByteBuffer buf = (ByteBuffer) req.getMessage();
339             try {
340                 buf.release();
341             } catch (IllegalStateException e) {
342                 session.getFilterChain().fireExceptionCaught(session, e);
343             } finally {
344                 // The first unwritten empty buffer must be
345                 // forwarded to the filter chain.
346                 if (buf.hasRemaining()) {
347                     req.getFuture().setWritten(false);
348                 } else {
349                     session.getFilterChain().fireMessageSent(session, req);
350                 }
351             }
352 
353             // Discard others.
354             while ((req = writeRequestQueue.poll()) != null) {
355                 try {
356                     ((ByteBuffer) req.getMessage()).release();
357                 } catch (IllegalStateException e) {
358                     session.getFilterChain().fireExceptionCaught(session, e);
359                 } finally {
360                     req.getFuture().setWritten(false);
361                 }
362             }
363         }
364     }
365 
366     private boolean doFlush(SocketSessionImpl session) throws IOException {
367         // Clear OP_WRITE
368         SelectionKey key = session.getSelectionKey();
369         key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
370 
371         SocketChannel ch = session.getChannel();
372         Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
373 
374         int writtenBytes = 0;
375         int maxWrittenBytes = ((SocketSessionConfig) session.getConfig()).getSendBufferSize() << 1;
376         try {
377             for (;;) {
378                 WriteRequest req = writeRequestQueue.peek();
379 
380                 if (req == null)
381                     break;
382 
383                 ByteBuffer buf = (ByteBuffer) req.getMessage();
384                 if (buf.remaining() == 0) {
385                     writeRequestQueue.poll();
386 
387                     buf.reset();
388                     
389                     if (!buf.hasRemaining()) {
390                         session.increaseWrittenMessages();
391                     }
392                     
393                     session.getFilterChain().fireMessageSent(session, req);
394                     continue;
395                 }
396 
397                 if (key.isWritable()) {
398                     writtenBytes += ch.write(buf.buf());
399                 }
400 
401                 if (buf.hasRemaining() || writtenBytes >= maxWrittenBytes) {
402                     // Kernel buffer is full or wrote too much.
403                     key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
404                     return false;
405                 }
406             }
407         } finally {
408             session.increaseWrittenBytes(writtenBytes);
409         }
410 
411         return true;
412     }
413 
414     private void doUpdateTrafficMask() {
415         if (trafficControllingSessions.isEmpty())
416             return;
417 
418         for (;;) {
419             SocketSessionImpl session = trafficControllingSessions.poll();
420 
421             if (session == null)
422                 break;
423 
424             SelectionKey key = session.getSelectionKey();
425             // Retry later if session is not yet fully initialized.
426             // (In case that Session.suspend??() or session.resume??() is
427             // called before addSession() is processed)
428             if (key == null) {
429                 scheduleTrafficControl(session);
430                 break;
431             }
432             // skip if channel is already closed
433             if (!key.isValid()) {
434                 continue;
435             }
436 
437             // The normal is OP_READ and, if there are write requests in the
438             // session's write queue, set OP_WRITE to trigger flushing.
439             int ops = SelectionKey.OP_READ;
440             Queue<WriteRequest> writeRequestQueue = session
441                     .getWriteRequestQueue();
442             synchronized (writeRequestQueue) {
443                 if (!writeRequestQueue.isEmpty()) {
444                     ops |= SelectionKey.OP_WRITE;
445                 }
446             }
447 
448             // Now mask the preferred ops with the mask of the current session
449             int mask = session.getTrafficMask().getInterestOps();
450             key.interestOps(ops & mask);
451         }
452     }
453 
454     private class Worker implements Runnable {
455         public void run() {
456             Selector selector = SocketIoProcessor.this.selector;
457             for (;;) {
458                 try {
459                     int nKeys = selector.select(1000);
460                     doAddNew();
461                     doUpdateTrafficMask();
462 
463                     if (nKeys > 0) {
464                         process(selector.selectedKeys());
465                     }
466 
467                     doFlush();
468                     doRemove();
469                     notifyIdleness();
470 
471                     if (selector.keys().isEmpty()) {
472                         synchronized (lock) {
473                             if (selector.keys().isEmpty()
474                                     && newSessions.isEmpty()) {
475                                 worker = null;
476 
477                                 try {
478                                     selector.close();
479                                 } catch (IOException e) {
480                                     ExceptionMonitor.getInstance()
481                                             .exceptionCaught(e);
482                                 } finally {
483                                     selector = null;
484                                 }
485 
486                                 break;
487                             }
488                         }
489                     }
490                 } catch (Throwable t) {
491                     ExceptionMonitor.getInstance().exceptionCaught(t);
492 
493                     try {
494                         Thread.sleep(1000);
495                     } catch (InterruptedException e1) {
496                         ExceptionMonitor.getInstance().exceptionCaught(e1);
497                     }
498                 }
499             }
500         }
501     }
502 }