View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *  
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *  
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License. 
18   *  
19   */
20  package org.apache.mina.transport.socket.nio;
21  
22  import java.io.IOException;
23  import java.nio.channels.SelectionKey;
24  import java.nio.channels.Selector;
25  import java.nio.channels.SocketChannel;
26  import java.util.Iterator;
27  import java.util.Set;
28  
29  import org.apache.mina.common.ByteBuffer;
30  import org.apache.mina.common.ExceptionMonitor;
31  import org.apache.mina.common.IdleStatus;
32  import org.apache.mina.common.IoSession;
33  import org.apache.mina.common.WriteTimeoutException;
34  import org.apache.mina.common.IoFilter.WriteRequest;
35  import org.apache.mina.util.NamePreservingRunnable;
36  import org.apache.mina.util.Queue;
37  
38  import edu.emory.mathcs.backport.java.util.concurrent.Executor;
39  
40  /**
41   * Performs all I/O operations for sockets which is connected or bound. This class is used by MINA internally.
42   *
43   * @author The Apache Directory Project (mina-dev@directory.apache.org)
44   * @version $Rev: 592224 $, $Date: 2007-11-06 03:11:40 +0100 (Tue, 06 Nov 2007) $,
45   */
46  class SocketIoProcessor {
47  
48      /**
49       * The maximum loop count for a write operation until
50       * {@link #write(IoSession, IoBuffer)} returns non-zero value.
51       * It is similar to what a spin lock is for in concurrency programming.
52       * It improves memory utilization and write throughput significantly.
53       */
54      private static final int WRITE_SPIN_COUNT = 256;
55      
56      private final Object lock = new Object();
57  
58      private final String threadName;
59  
60      private final Executor executor;
61  
62      /**
63       * @noinspection FieldAccessedSynchronizedAndUnsynchronized
64       */
65      private Selector selector;
66  
67      private final Queue newSessions = new Queue();
68  
69      private final Queue removingSessions = new Queue();
70  
71      private final Queue flushingSessions = new Queue();
72  
73      private final Queue trafficControllingSessions = new Queue();
74  
75      private Worker worker;
76  
77      private long lastIdleCheckTime = System.currentTimeMillis();
78  
79      SocketIoProcessor(String threadName, Executor executor) {
80          this.threadName = threadName;
81          this.executor = executor;
82      }
83  
84      void addNew(SocketSessionImpl session) throws IOException {
85          synchronized (newSessions) {
86              newSessions.push(session);
87          }
88  
89          startupWorker();
90      }
91  
92      void remove(SocketSessionImpl session) throws IOException {
93          scheduleRemove(session);
94          startupWorker();
95      }
96  
97      private Selector getSelector() {
98          synchronized (lock) {
99              return this.selector;
100         }
101     }
102     
103     private void startupWorker() throws IOException {
104         synchronized (lock) {
105             if (worker == null) {
106                 selector = Selector.open();
107                 worker = new Worker();
108                 executor.execute(new NamePreservingRunnable(worker, threadName));
109             }
110             selector.wakeup();
111         }
112     }
113 
114     void flush(SocketSessionImpl session) {
115         if (scheduleFlush(session)) {
116             Selector selector = getSelector();
117             if (selector != null) {
118                 selector.wakeup();
119             }
120         }
121     }
122 
123     void updateTrafficMask(SocketSessionImpl session) {
124         scheduleTrafficControl(session);
125         Selector selector = getSelector();
126         if (selector != null) {
127             selector.wakeup();
128         }
129     }
130 
131     private void scheduleRemove(SocketSessionImpl session) {
132         synchronized (removingSessions) {
133             removingSessions.push(session);
134         }
135     }
136 
137     private boolean scheduleFlush(SocketSessionImpl session) {
138         if (session.setScheduledForFlush(true)) {
139             synchronized (flushingSessions) {
140                 flushingSessions.push(session);
141             }
142             
143             return true;
144         }
145         
146         return false;
147     }
148 
149     private void scheduleTrafficControl(SocketSessionImpl session) {
150         synchronized (trafficControllingSessions) {
151             trafficControllingSessions.push(session);
152         }
153     }
154 
155     private void doAddNew() {
156         if (newSessions.isEmpty())
157             return;
158 
159         Selector selector = getSelector();
160         for (;;) {
161             SocketSessionImpl session;
162 
163             synchronized (newSessions) {
164                 session = (SocketSessionImpl) newSessions.pop();
165             }
166 
167             if (session == null)
168                 break;
169 
170             SocketChannel ch = session.getChannel();
171             try {
172                 ch.configureBlocking(false);
173                 session.setSelectionKey(ch.register(selector,
174                         SelectionKey.OP_READ, session));
175 
176                 // AbstractIoFilterChain.CONNECT_FUTURE is cleared inside here
177                 // in AbstractIoFilterChain.fireSessionOpened().
178                 session.getServiceListeners().fireSessionCreated(session);
179             } catch (IOException e) {
180                 // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute
181                 // and call ConnectFuture.setException().
182                 session.getFilterChain().fireExceptionCaught(session, e);
183             }
184         }
185     }
186 
187     private void doRemove() {
188         if (removingSessions.isEmpty())
189             return;
190 
191         for (;;) {
192             SocketSessionImpl session;
193 
194             synchronized (removingSessions) {
195                 session = (SocketSessionImpl) removingSessions.pop();
196             }
197 
198             if (session == null)
199                 break;
200 
201             SocketChannel ch = session.getChannel();
202             SelectionKey key = session.getSelectionKey();
203             // Retry later if session is not yet fully initialized.
204             // (In case that Session.close() is called before addSession() is processed)
205             if (key == null) {
206                 scheduleRemove(session);
207                 break;
208             }
209             // skip if channel is already closed
210             if (!key.isValid()) {
211                 continue;
212             }
213 
214             try {
215                 key.cancel();
216                 ch.close();
217             } catch (IOException e) {
218                 session.getFilterChain().fireExceptionCaught(session, e);
219             } finally {
220                 releaseWriteBuffers(session);
221                 session.getServiceListeners().fireSessionDestroyed(session);
222             }
223         }
224     }
225 
226     private void process(Set selectedKeys) {
227         Iterator it = selectedKeys.iterator();
228 
229         while (it.hasNext()) {
230             SelectionKey key = (SelectionKey) it.next();
231             SocketSessionImpl session = (SocketSessionImpl) key.attachment();
232 
233             if (key.isReadable() && session.getTrafficMask().isReadable()) {
234                 read(session);
235             }
236 
237             if (key.isWritable() && session.getTrafficMask().isWritable()) {
238                 scheduleFlush(session);
239             }
240         }
241 
242         selectedKeys.clear();
243     }
244 
245     private void read(SocketSessionImpl session) {
246         ByteBuffer buf = ByteBuffer.allocate(session.getReadBufferSize());
247         SocketChannel ch = session.getChannel();
248 
249         try {
250             int readBytes = 0;
251             int ret;
252 
253             try {
254                 while ((ret = ch.read(buf.buf())) > 0) {
255                     readBytes += ret;
256                 }
257             } finally {
258                 buf.flip();
259             }
260 
261             session.increaseReadBytes(readBytes);
262 
263             if (readBytes > 0) {
264                 session.getFilterChain().fireMessageReceived(session, buf);
265                 buf = null;
266                 
267                 if (readBytes * 2 < session.getReadBufferSize()) {
268                     session.decreaseReadBufferSize();
269                 } else if (readBytes == session.getReadBufferSize()) {
270                     session.increaseReadBufferSize();
271                 }
272             }
273             if (ret < 0) {
274                 scheduleRemove(session);
275             }
276         } catch (Throwable e) {
277             if (e instanceof IOException)
278                 scheduleRemove(session);
279             session.getFilterChain().fireExceptionCaught(session, e);
280         } finally {
281             if (buf != null)
282                 buf.release();
283         }
284     }
285 
286     private void notifyIdleness() {
287         // process idle sessions
288         long currentTime = System.currentTimeMillis();
289         if ((currentTime - lastIdleCheckTime) >= 1000) {
290             lastIdleCheckTime = currentTime;
291             Selector selector = getSelector();
292             Set keys = selector.keys();
293             if (keys != null) {
294                 for (Iterator it = keys.iterator(); it.hasNext();) {
295                     SelectionKey key = (SelectionKey) it.next();
296                     SocketSessionImpl session = (SocketSessionImpl) key
297                             .attachment();
298                     notifyIdleness(session, currentTime);
299                 }
300             }
301         }
302     }
303 
304     private void notifyIdleness(SocketSessionImpl session, long currentTime) {
305         notifyIdleness0(session, currentTime, session
306                 .getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
307                 IdleStatus.BOTH_IDLE, Math.max(session.getLastIoTime(), session
308                         .getLastIdleTime(IdleStatus.BOTH_IDLE)));
309         notifyIdleness0(session, currentTime, session
310                 .getIdleTimeInMillis(IdleStatus.READER_IDLE),
311                 IdleStatus.READER_IDLE, Math.max(session.getLastReadTime(),
312                         session.getLastIdleTime(IdleStatus.READER_IDLE)));
313         notifyIdleness0(session, currentTime, session
314                 .getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
315                 IdleStatus.WRITER_IDLE, Math.max(session.getLastWriteTime(),
316                         session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
317 
318         notifyWriteTimeout(session, currentTime, session
319                 .getWriteTimeoutInMillis(), session.getLastWriteTime());
320     }
321 
322     private void notifyIdleness0(SocketSessionImpl session, long currentTime,
323             long idleTime, IdleStatus status, long lastIoTime) {
324         if (idleTime > 0 && lastIoTime != 0
325                 && (currentTime - lastIoTime) >= idleTime) {
326             session.increaseIdleCount(status);
327             session.getFilterChain().fireSessionIdle(session, status);
328         }
329     }
330 
331     private void notifyWriteTimeout(SocketSessionImpl session,
332             long currentTime, long writeTimeout, long lastIoTime) {
333         SelectionKey key = session.getSelectionKey();
334         if (writeTimeout > 0 && (currentTime - lastIoTime) >= writeTimeout
335                 && key != null && key.isValid()
336                 && (key.interestOps() & SelectionKey.OP_WRITE) != 0) {
337             session.getFilterChain().fireExceptionCaught(session,
338                     new WriteTimeoutException());
339         }
340     }
341 
342     private void doFlush() {
343         if (flushingSessions.size() == 0)
344             return;
345 
346         for (;;) {
347             SocketSessionImpl session;
348 
349             synchronized (flushingSessions) {
350                 session = (SocketSessionImpl) flushingSessions.pop();
351             }
352 
353             if (session == null)
354                 break;
355             
356             session.setScheduledForFlush(false);
357 
358             if (!session.isConnected()) {
359                 releaseWriteBuffers(session);
360                 continue;
361             }
362 
363             SelectionKey key = session.getSelectionKey();
364             // Retry later if session is not yet fully initialized.
365             // (In case that Session.write() is called before addSession() is processed)
366             if (key == null) {
367                 scheduleFlush(session);
368                 break;
369             }
370 
371             // Skip if the channel is already closed.
372             if (!key.isValid()) {
373                 continue;
374             }
375 
376             try {
377                 boolean flushedAll = doFlush(session);
378                 if (flushedAll && !session.getWriteRequestQueue().isEmpty() && !session.isScheduledForFlush()) {
379                     scheduleFlush(session);
380                 }
381             } catch (IOException e) {
382                 scheduleRemove(session);
383                 session.getFilterChain().fireExceptionCaught(session, e);
384             }
385         }
386     }
387 
388     private void releaseWriteBuffers(SocketSessionImpl session) {
389         Queue writeRequestQueue = session.getWriteRequestQueue();
390         WriteRequest req;
391 
392         if ((req = (WriteRequest) writeRequestQueue.pop()) != null) {
393             ByteBuffer buf = (ByteBuffer) req.getMessage();
394             try {
395                 buf.release();
396             } catch (IllegalStateException e) {
397                 session.getFilterChain().fireExceptionCaught(session, e);
398             } finally {
399                 // The first unwritten empty buffer must be
400                 // forwarded to the filter chain.
401                 if (buf.hasRemaining()) {
402                     req.getFuture().setWritten(false);
403                 } else {
404                     session.getFilterChain().fireMessageSent(session, req);                    
405                 }
406             }
407 
408             // Discard others.
409             while ((req = (WriteRequest) writeRequestQueue.pop()) != null) {
410                 try {
411                     ((ByteBuffer) req.getMessage()).release();
412                 } catch (IllegalStateException e) {
413                     session.getFilterChain().fireExceptionCaught(session, e);
414                 } finally {
415                     req.getFuture().setWritten(false);
416                 }
417             }
418         }
419     }
420 
421     private boolean doFlush(SocketSessionImpl session) throws IOException {
422         SocketChannel ch = session.getChannel();
423         if (!ch.isConnected()) {
424             scheduleRemove(session);
425             return false;
426         }
427         
428         // Clear OP_WRITE
429         SelectionKey key = session.getSelectionKey();
430         key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
431 
432         Queue writeRequestQueue = session.getWriteRequestQueue();
433 
434         int writtenBytes = 0;
435         int maxWrittenBytes = ((SocketSessionConfig) session.getConfig()).getSendBufferSize() << 1;
436         try {
437             for (;;) {
438                 WriteRequest req;
439     
440                 synchronized (writeRequestQueue) {
441                     req = (WriteRequest) writeRequestQueue.first();
442                 }
443     
444                 if (req == null)
445                     break;
446     
447                 ByteBuffer buf = (ByteBuffer) req.getMessage();
448                 if (buf.remaining() == 0) {
449                     synchronized (writeRequestQueue) {
450                         writeRequestQueue.pop();
451                     }
452     
453                     buf.reset();
454                     
455                     if (!buf.hasRemaining()) {
456                         session.increaseWrittenMessages();
457                     }
458                     
459                     session.getFilterChain().fireMessageSent(session, req);
460                     continue;
461                 }
462     
463                 int localWrittenBytes = 0;
464                 for (int i = WRITE_SPIN_COUNT; i > 0; i --) {
465                     localWrittenBytes = ch.write(buf.buf());
466                     if (localWrittenBytes != 0 || !buf.hasRemaining()) {
467                         break;
468                     }
469                 }
470 
471                 writtenBytes += localWrittenBytes;
472 
473                 if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
474                     // Kernel buffer is full or wrote too much.
475                     key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
476                     return false;
477                 }
478             }
479         } finally {
480             session.increaseWrittenBytes(writtenBytes);
481         }
482         
483         return true;
484     }
485 
486     private void doUpdateTrafficMask() {
487         if (trafficControllingSessions.isEmpty())
488             return;
489 
490         for (;;) {
491             SocketSessionImpl session;
492 
493             synchronized (trafficControllingSessions) {
494                 session = (SocketSessionImpl) trafficControllingSessions.pop();
495             }
496 
497             if (session == null)
498                 break;
499 
500             SelectionKey key = session.getSelectionKey();
501             // Retry later if session is not yet fully initialized.
502             // (In case that Session.suspend??() or session.resume??() is 
503             // called before addSession() is processed)
504             if (key == null) {
505                 scheduleTrafficControl(session);
506                 break;
507             }
508             // skip if channel is already closed
509             if (!key.isValid()) {
510                 continue;
511             }
512 
513             // The normal is OP_READ and, if there are write requests in the
514             // session's write queue, set OP_WRITE to trigger flushing.
515             int ops = SelectionKey.OP_READ;
516             Queue writeRequestQueue = session.getWriteRequestQueue();
517             synchronized (writeRequestQueue) {
518                 if (!writeRequestQueue.isEmpty()) {
519                     ops |= SelectionKey.OP_WRITE;
520                 }
521             }
522 
523             // Now mask the preferred ops with the mask of the current session
524             int mask = session.getTrafficMask().getInterestOps();
525             key.interestOps(ops & mask);
526         }
527     }
528 
529     private class Worker implements Runnable {
530         public void run() {
531             Selector selector = getSelector();
532             for (;;) {
533                 try {
534                     int nKeys = selector.select(1000);
535                     doAddNew();
536                     doUpdateTrafficMask();
537 
538                     if (nKeys > 0) {
539                         process(selector.selectedKeys());
540                     }
541 
542                     doFlush();
543                     doRemove();
544                     notifyIdleness();
545 
546                     if (selector.keys().isEmpty()) {
547                         synchronized (lock) {
548                             if (selector.keys().isEmpty()
549                                     && newSessions.isEmpty()) {
550                                 worker = null;
551 
552                                 try {
553                                     selector.close();
554                                 } catch (IOException e) {
555                                     ExceptionMonitor.getInstance()
556                                             .exceptionCaught(e);
557                                 } finally {
558                                     selector = null;
559                                 }
560 
561                                 break;
562                             }
563                         }
564                     }
565                 } catch (Throwable t) {
566                     ExceptionMonitor.getInstance().exceptionCaught(t);
567 
568                     try {
569                         Thread.sleep(1000);
570                     } catch (InterruptedException e1) {
571                         ExceptionMonitor.getInstance().exceptionCaught(e1);
572                     }
573                 }
574             }
575         }
576     }
577 }