View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *  
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *  
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License. 
18   *  
19   */
20  package org.apache.mina.transport.socket.nio;
21  
22  import java.io.IOException;
23  import java.nio.channels.SelectionKey;
24  import java.nio.channels.Selector;
25  import java.nio.channels.SocketChannel;
26  import java.util.Iterator;
27  import java.util.Set;
28  
29  import org.apache.mina.common.ByteBuffer;
30  import org.apache.mina.common.ExceptionMonitor;
31  import org.apache.mina.common.IdleStatus;
32  import org.apache.mina.common.IoFilter.WriteRequest;
33  import org.apache.mina.common.WriteTimeoutException;
34  import org.apache.mina.util.NamePreservingRunnable;
35  import org.apache.mina.util.Queue;
36  import edu.emory.mathcs.backport.java.util.concurrent.Executor;
37  
38  /**
39   * Performs all I/O operations for sockets which is connected or bound. This class is used by MINA internally.
40   *
41   * @author The Apache Directory Project (mina-dev@directory.apache.org)
42   * @version $Rev: 556539 $, $Date: 2007-07-16 16:48:36 +0900 (월, 16  7월 2007) $,
43   */
44  class SocketIoProcessor {
45      private final Object lock = new Object();
46  
47      private final String threadName;
48  
49      private final Executor executor;
50  
51      /**
52       * @noinspection FieldAccessedSynchronizedAndUnsynchronized
53       */
54      private Selector selector;
55  
56      private final Queue newSessions = new Queue();
57  
58      private final Queue removingSessions = new Queue();
59  
60      private final Queue flushingSessions = new Queue();
61  
62      private final Queue trafficControllingSessions = new Queue();
63  
64      private Worker worker;
65  
66      private long lastIdleCheckTime = System.currentTimeMillis();
67  
68      SocketIoProcessor(String threadName, Executor executor) {
69          this.threadName = threadName;
70          this.executor = executor;
71      }
72  
73      void addNew(SocketSessionImpl session) throws IOException {
74          synchronized (newSessions) {
75              newSessions.push(session);
76          }
77  
78          startupWorker();
79      }
80  
81      void remove(SocketSessionImpl session) throws IOException {
82          scheduleRemove(session);
83          startupWorker();
84      }
85  
86      private void startupWorker() throws IOException {
87          synchronized (lock) {
88              if (worker == null) {
89                  selector = Selector.open();
90                  worker = new Worker();
91                  executor.execute(new NamePreservingRunnable(worker));
92              }
93              selector.wakeup();
94          }
95      }
96  
97      void flush(SocketSessionImpl session) {
98          scheduleFlush(session);
99          Selector selector = this.selector;
100         if (selector != null) {
101             selector.wakeup();
102         }
103     }
104 
105     void updateTrafficMask(SocketSessionImpl session) {
106         scheduleTrafficControl(session);
107         Selector selector = this.selector;
108         if (selector != null) {
109             selector.wakeup();
110         }
111     }
112 
113     private void scheduleRemove(SocketSessionImpl session) {
114         synchronized (removingSessions) {
115             removingSessions.push(session);
116         }
117     }
118 
119     private void scheduleFlush(SocketSessionImpl session) {
120         synchronized (flushingSessions) {
121             flushingSessions.push(session);
122         }
123     }
124 
125     private void scheduleTrafficControl(SocketSessionImpl session) {
126         synchronized (trafficControllingSessions) {
127             trafficControllingSessions.push(session);
128         }
129     }
130 
131     private void doAddNew() {
132         if (newSessions.isEmpty())
133             return;
134 
135         for (;;) {
136             SocketSessionImpl session;
137 
138             synchronized (newSessions) {
139                 session = (SocketSessionImpl) newSessions.pop();
140             }
141 
142             if (session == null)
143                 break;
144 
145             SocketChannel ch = session.getChannel();
146             try {
147                 ch.configureBlocking(false);
148                 session.setSelectionKey(ch.register(selector,
149                         SelectionKey.OP_READ, session));
150 
151                 // AbstractIoFilterChain.CONNECT_FUTURE is cleared inside here
152                 // in AbstractIoFilterChain.fireSessionOpened().
153                 session.getServiceListeners().fireSessionCreated(session);
154             } catch (IOException e) {
155                 // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute
156                 // and call ConnectFuture.setException().
157                 session.getFilterChain().fireExceptionCaught(session, e);
158             }
159         }
160     }
161 
162     private void doRemove() {
163         if (removingSessions.isEmpty())
164             return;
165 
166         for (;;) {
167             SocketSessionImpl session;
168 
169             synchronized (removingSessions) {
170                 session = (SocketSessionImpl) removingSessions.pop();
171             }
172 
173             if (session == null)
174                 break;
175 
176             SocketChannel ch = session.getChannel();
177             SelectionKey key = session.getSelectionKey();
178             // Retry later if session is not yet fully initialized.
179             // (In case that Session.close() is called before addSession() is processed)
180             if (key == null) {
181                 scheduleRemove(session);
182                 break;
183             }
184             // skip if channel is already closed
185             if (!key.isValid()) {
186                 continue;
187             }
188 
189             try {
190                 key.cancel();
191                 ch.close();
192             } catch (IOException e) {
193                 session.getFilterChain().fireExceptionCaught(session, e);
194             } finally {
195                 releaseWriteBuffers(session);
196                 session.getServiceListeners().fireSessionDestroyed(session);
197             }
198         }
199     }
200 
201     private void process(Set selectedKeys) {
202         Iterator it = selectedKeys.iterator();
203 
204         while (it.hasNext()) {
205             SelectionKey key = (SelectionKey) it.next();
206             SocketSessionImpl session = (SocketSessionImpl) key.attachment();
207 
208             if (key.isReadable() && session.getTrafficMask().isReadable()) {
209                 read(session);
210             }
211 
212             if (key.isWritable() && session.getTrafficMask().isWritable()) {
213                 scheduleFlush(session);
214             }
215         }
216 
217         selectedKeys.clear();
218     }
219 
220     private void read(SocketSessionImpl session) {
221         ByteBuffer buf = ByteBuffer.allocate(session.getReadBufferSize());
222         SocketChannel ch = session.getChannel();
223 
224         try {
225             int readBytes = 0;
226             int ret;
227 
228             try {
229                 while ((ret = ch.read(buf.buf())) > 0) {
230                     readBytes += ret;
231                 }
232             } finally {
233                 buf.flip();
234             }
235 
236             session.increaseReadBytes(readBytes);
237 
238             if (readBytes > 0) {
239                 session.getFilterChain().fireMessageReceived(session, buf);
240                 buf = null;
241                 
242                 if (readBytes * 2 < session.getReadBufferSize()) {
243                     if (session.getReadBufferSize() > 64) {
244                         session.setReadBufferSize(session.getReadBufferSize() >>> 1);
245                     }
246                 } else if (readBytes == session.getReadBufferSize()) {
247                     session.setReadBufferSize(session.getReadBufferSize() << 1);
248                 }
249             }
250             if (ret < 0) {
251                 scheduleRemove(session);
252             }
253         } catch (Throwable e) {
254             if (e instanceof IOException)
255                 scheduleRemove(session);
256             session.getFilterChain().fireExceptionCaught(session, e);
257         } finally {
258             if (buf != null)
259                 buf.release();
260         }
261     }
262 
263     private void notifyIdleness() {
264         // process idle sessions
265         long currentTime = System.currentTimeMillis();
266         if ((currentTime - lastIdleCheckTime) >= 1000) {
267             lastIdleCheckTime = currentTime;
268             Set keys = selector.keys();
269             if (keys != null) {
270                 for (Iterator it = keys.iterator(); it.hasNext();) {
271                     SelectionKey key = (SelectionKey) it.next();
272                     SocketSessionImpl session = (SocketSessionImpl) key
273                             .attachment();
274                     notifyIdleness(session, currentTime);
275                 }
276             }
277         }
278     }
279 
280     private void notifyIdleness(SocketSessionImpl session, long currentTime) {
281         notifyIdleness0(session, currentTime, session
282                 .getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
283                 IdleStatus.BOTH_IDLE, Math.max(session.getLastIoTime(), session
284                         .getLastIdleTime(IdleStatus.BOTH_IDLE)));
285         notifyIdleness0(session, currentTime, session
286                 .getIdleTimeInMillis(IdleStatus.READER_IDLE),
287                 IdleStatus.READER_IDLE, Math.max(session.getLastReadTime(),
288                         session.getLastIdleTime(IdleStatus.READER_IDLE)));
289         notifyIdleness0(session, currentTime, session
290                 .getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
291                 IdleStatus.WRITER_IDLE, Math.max(session.getLastWriteTime(),
292                         session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
293 
294         notifyWriteTimeout(session, currentTime, session
295                 .getWriteTimeoutInMillis(), session.getLastWriteTime());
296     }
297 
298     private void notifyIdleness0(SocketSessionImpl session, long currentTime,
299             long idleTime, IdleStatus status, long lastIoTime) {
300         if (idleTime > 0 && lastIoTime != 0
301                 && (currentTime - lastIoTime) >= idleTime) {
302             session.increaseIdleCount(status);
303             session.getFilterChain().fireSessionIdle(session, status);
304         }
305     }
306 
307     private void notifyWriteTimeout(SocketSessionImpl session,
308             long currentTime, long writeTimeout, long lastIoTime) {
309         SelectionKey key = session.getSelectionKey();
310         if (writeTimeout > 0 && (currentTime - lastIoTime) >= writeTimeout
311                 && key != null && key.isValid()
312                 && (key.interestOps() & SelectionKey.OP_WRITE) != 0) {
313             session.getFilterChain().fireExceptionCaught(session,
314                     new WriteTimeoutException());
315         }
316     }
317 
318     private void doFlush() {
319         if (flushingSessions.size() == 0)
320             return;
321 
322         for (;;) {
323             SocketSessionImpl session;
324 
325             synchronized (flushingSessions) {
326                 session = (SocketSessionImpl) flushingSessions.pop();
327             }
328 
329             if (session == null)
330                 break;
331 
332             if (!session.isConnected()) {
333                 releaseWriteBuffers(session);
334                 continue;
335             }
336 
337             SelectionKey key = session.getSelectionKey();
338             // Retry later if session is not yet fully initialized.
339             // (In case that Session.write() is called before addSession() is processed)
340             if (key == null) {
341                 scheduleFlush(session);
342                 break;
343             }
344 
345             // Skip if the channel is already closed.
346             if (!key.isValid()) {
347                 continue;
348             }
349 
350             try {
351                 doFlush(session);
352             } catch (IOException e) {
353                 scheduleRemove(session);
354                 session.getFilterChain().fireExceptionCaught(session, e);
355             }
356         }
357     }
358 
359     private void releaseWriteBuffers(SocketSessionImpl session) {
360         Queue writeRequestQueue = session.getWriteRequestQueue();
361         WriteRequest req;
362 
363         while ((req = (WriteRequest) writeRequestQueue.pop()) != null) {
364             try {
365                 ((ByteBuffer) req.getMessage()).release();
366             } catch (IllegalStateException e) {
367                 session.getFilterChain().fireExceptionCaught(session, e);
368             } finally {
369                 req.getFuture().setWritten(false);
370             }
371         }
372     }
373 
374     private void doFlush(SocketSessionImpl session) throws IOException {
375         // Clear OP_WRITE
376         SelectionKey key = session.getSelectionKey();
377         key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
378 
379         SocketChannel ch = session.getChannel();
380         Queue writeRequestQueue = session.getWriteRequestQueue();
381 
382         for (;;) {
383             WriteRequest req;
384 
385             synchronized (writeRequestQueue) {
386                 req = (WriteRequest) writeRequestQueue.first();
387             }
388 
389             if (req == null)
390                 break;
391 
392             ByteBuffer buf = (ByteBuffer) req.getMessage();
393             if (buf.remaining() == 0) {
394                 synchronized (writeRequestQueue) {
395                     writeRequestQueue.pop();
396                 }
397 
398                 session.increaseWrittenMessages();
399 
400                 buf.reset();
401                 session.getFilterChain().fireMessageSent(session, req);
402                 continue;
403             }
404 
405             if (key.isWritable()) {
406                 int writtenBytes = ch.write(buf.buf());
407                 if (writtenBytes > 0) {
408                     session.increaseWrittenBytes(writtenBytes);
409                 }
410             }
411 
412             if (buf.hasRemaining()) {
413                 // Kernel buffer is full
414                 key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
415                 break;
416             }
417         }
418     }
419 
420     private void doUpdateTrafficMask() {
421         if (trafficControllingSessions.isEmpty())
422             return;
423 
424         for (;;) {
425             SocketSessionImpl session;
426 
427             synchronized (trafficControllingSessions) {
428                 session = (SocketSessionImpl) trafficControllingSessions.pop();
429             }
430 
431             if (session == null)
432                 break;
433 
434             SelectionKey key = session.getSelectionKey();
435             // Retry later if session is not yet fully initialized.
436             // (In case that Session.suspend??() or session.resume??() is 
437             // called before addSession() is processed)
438             if (key == null) {
439                 scheduleTrafficControl(session);
440                 break;
441             }
442             // skip if channel is already closed
443             if (!key.isValid()) {
444                 continue;
445             }
446 
447             // The normal is OP_READ and, if there are write requests in the
448             // session's write queue, set OP_WRITE to trigger flushing.
449             int ops = SelectionKey.OP_READ;
450             Queue writeRequestQueue = session.getWriteRequestQueue();
451             synchronized (writeRequestQueue) {
452                 if (!writeRequestQueue.isEmpty()) {
453                     ops |= SelectionKey.OP_WRITE;
454                 }
455             }
456 
457             // Now mask the preferred ops with the mask of the current session
458             int mask = session.getTrafficMask().getInterestOps();
459             key.interestOps(ops & mask);
460         }
461     }
462 
463     private class Worker implements Runnable {
464         public void run() {
465             Thread.currentThread().setName(SocketIoProcessor.this.threadName);
466 
467             for (;;) {
468                 try {
469                     int nKeys = selector.select(1000);
470                     doAddNew();
471                     doUpdateTrafficMask();
472 
473                     if (nKeys > 0) {
474                         process(selector.selectedKeys());
475                     }
476 
477                     doFlush();
478                     doRemove();
479                     notifyIdleness();
480 
481                     if (selector.keys().isEmpty()) {
482                         synchronized (lock) {
483                             if (selector.keys().isEmpty()
484                                     && newSessions.isEmpty()) {
485                                 worker = null;
486 
487                                 try {
488                                     selector.close();
489                                 } catch (IOException e) {
490                                     ExceptionMonitor.getInstance()
491                                             .exceptionCaught(e);
492                                 } finally {
493                                     selector = null;
494                                 }
495 
496                                 break;
497                             }
498                         }
499                     }
500                 } catch (Throwable t) {
501                     ExceptionMonitor.getInstance().exceptionCaught(t);
502 
503                     try {
504                         Thread.sleep(1000);
505                     } catch (InterruptedException e1) {
506                         ExceptionMonitor.getInstance().exceptionCaught(e1);
507                     }
508                 }
509             }
510         }
511     }
512 
513 }