View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.socket.nio;
21  
22  import java.io.IOException;
23  import java.nio.channels.ByteChannel;
24  import java.nio.channels.SelectableChannel;
25  import java.nio.channels.SelectionKey;
26  import java.nio.channels.Selector;
27  import java.util.Iterator;
28  import java.util.Set;
29  import java.util.concurrent.Executor;
30  
31  import org.apache.mina.core.RuntimeIoException;
32  import org.apache.mina.core.buffer.IoBuffer;
33  import org.apache.mina.core.file.FileRegion;
34  import org.apache.mina.core.polling.AbstractPollingIoProcessor;
35  import org.apache.mina.core.session.SessionState;
36  
37  /**
38   * TODO Add documentation
39   * 
40   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
41   */
42  public final class NioProcessor extends AbstractPollingIoProcessor<NioSession> {
43      /** The selector associated with this processor */
44      private Selector selector;
45  
46      /**
47       * 
48       * Creates a new instance of NioProcessor.
49       * 
50       * @param executor
51       */
52      public NioProcessor(Executor executor) {
53          super(executor);
54          
55          try {
56              // Open a new selector
57              selector = Selector.open();
58          } catch (IOException e) {
59              throw new RuntimeIoException("Failed to open a selector.", e);
60          }
61      }
62  
63      @Override
64      protected void doDispose() throws Exception {
65          selector.close();
66      }
67  
68      @Override
69      protected int select(long timeout) throws Exception {
70          return selector.select(timeout);
71      }
72  
73      @Override
74      protected int select() throws Exception {
75          return selector.select();
76      }
77  
78      @Override
79      protected boolean isSelectorEmpty() {
80          return selector.keys().isEmpty();
81      }
82  
83      @Override
84      protected void wakeup() {
85          wakeupCalled.getAndSet(true);
86          selector.wakeup();
87      }
88  
89      @Override
90      protected Iterator<NioSession> allSessions() {
91          return new IoSessionIterator(selector.keys());
92      }
93  
94      @SuppressWarnings("synthetic-access")
95      @Override
96      protected Iterator<NioSession> selectedSessions() {
97          return new IoSessionIterator(selector.selectedKeys());
98      }
99  
100     @Override
101     protected void init(NioSession session) throws Exception {
102         SelectableChannel ch = (SelectableChannel) session.getChannel();
103         ch.configureBlocking(false);
104         session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ,
105                 session));
106     }
107 
108     @Override
109     protected void destroy(NioSession session) throws Exception {
110         ByteChannel ch = session.getChannel();
111         SelectionKey key = session.getSelectionKey();
112         if (key != null) {
113             key.cancel();
114         }
115         ch.close();
116     }
117 
118     /**
119      * {@inheritDoc}
120      */
121     @Override
122     protected SessionState getState(NioSession session) {
123         SelectionKey key = session.getSelectionKey();
124 
125         if (key == null) {
126             // The channel is not yet registred to a selector
127             return SessionState.OPENING;
128         }
129 
130         if (key.isValid()) {
131             // The session is opened
132             return SessionState.OPENED;
133         } else {
134             // The session still as to be closed
135             return SessionState.CLOSING;
136         }
137     }
138 
139     @Override
140     protected boolean isReadable(NioSession session) {
141         SelectionKey key = session.getSelectionKey();
142         return key.isValid() && key.isReadable();
143     }
144 
145     @Override
146     protected boolean isWritable(NioSession session) {
147         SelectionKey key = session.getSelectionKey();
148         return key.isValid() && key.isWritable();
149     }
150 
151     @Override
152     protected boolean isInterestedInRead(NioSession session) {
153         SelectionKey key = session.getSelectionKey();
154         return key.isValid() && (key.interestOps() & SelectionKey.OP_READ) != 0;
155     }
156 
157     @Override
158     protected boolean isInterestedInWrite(NioSession session) {
159         SelectionKey key = session.getSelectionKey();
160         return key.isValid()
161                 && (key.interestOps() & SelectionKey.OP_WRITE) != 0;
162     }
163 
164     /**
165      * {@inheritDoc}
166      */
167     @Override
168     protected void setInterestedInRead(NioSession session, boolean isInterested)
169             throws Exception {
170         SelectionKey key = session.getSelectionKey();
171         int oldInterestOps = key.interestOps();
172         int newInterestOps = oldInterestOps;
173 
174         if (isInterested) {
175             newInterestOps |= SelectionKey.OP_READ;
176         } else {
177             newInterestOps &= ~SelectionKey.OP_READ;
178         }
179 
180         if (oldInterestOps != newInterestOps) {
181             key.interestOps(newInterestOps);
182         }
183     }
184 
185     /**
186      * {@inheritDoc}
187      */
188     @Override
189     protected void setInterestedInWrite(NioSession session, boolean isInterested)
190             throws Exception {
191         SelectionKey key = session.getSelectionKey();
192 
193         if (key == null) {
194             return;
195         }
196         
197         int newInterestOps = key.interestOps();
198 
199         if (isInterested) {
200             newInterestOps |= SelectionKey.OP_WRITE;
201             //newInterestOps &= ~SelectionKey.OP_READ;
202         } else {
203             newInterestOps &= ~SelectionKey.OP_WRITE;
204             //newInterestOps |= SelectionKey.OP_READ;
205         }
206 
207         key.interestOps(newInterestOps);
208     }
209 
210     @Override
211     protected int read(NioSession session, IoBuffer buf) throws Exception {
212         ByteChannel channel = session.getChannel();
213         
214         return session.getChannel().read(buf.buf());
215     }
216 
217     @Override
218     protected int write(NioSession session, IoBuffer buf, int length)
219             throws Exception {
220         if (buf.remaining() <= length) {
221             return session.getChannel().write(buf.buf());
222         }
223 
224         int oldLimit = buf.limit();
225         buf.limit(buf.position() + length);
226         try {
227             return session.getChannel().write(buf.buf());
228         } finally {
229             buf.limit(oldLimit);
230         }
231     }
232 
233     @Override
234     protected int transferFile(NioSession session, FileRegion region, int length)
235             throws Exception {
236         try {
237             return (int) region.getFileChannel().transferTo(
238                     region.getPosition(), length, session.getChannel());
239         } catch (IOException e) {
240             // Check to see if the IOException is being thrown due to
241             // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5103988
242             String message = e.getMessage();
243             if (message != null && message.contains("temporarily unavailable")) {
244                 return 0;
245             }
246 
247             throw e;
248         }
249     }
250 
251     /**
252      * An encapsulating iterator around the {@link Selector#selectedKeys()} or
253      * the {@link Selector#keys()} iterator;
254      */
255     protected static class IoSessionIterator<NioSession> implements
256             Iterator<NioSession> {
257         private final Iterator<SelectionKey> iterator;
258 
259         /**
260          * Create this iterator as a wrapper on top of the selectionKey Set.
261          * 
262          * @param keys
263          *            The set of selected sessions
264          */
265         private IoSessionIterator(Set<SelectionKey> keys) {
266             iterator = keys.iterator();
267         }
268 
269         /**
270          * {@inheritDoc}
271          */
272         public boolean hasNext() {
273             return iterator.hasNext();
274         }
275 
276         /**
277          * {@inheritDoc}
278          */
279         public NioSession next() {
280             SelectionKey key = iterator.next();
281             NioSession nioSession = (NioSession) key.attachment();
282             return nioSession;
283         }
284 
285         /**
286          * {@inheritDoc}
287          */
288         public void remove() {
289             iterator.remove();
290         }
291     }
292 }