1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.socket.nio;
21
22 import java.io.IOException;
23 import java.nio.channels.ByteChannel;
24 import java.nio.channels.SelectableChannel;
25 import java.nio.channels.SelectionKey;
26 import java.nio.channels.Selector;
27 import java.util.Iterator;
28 import java.util.Set;
29 import java.util.concurrent.Executor;
30
31 import org.apache.mina.core.RuntimeIoException;
32 import org.apache.mina.core.buffer.IoBuffer;
33 import org.apache.mina.core.file.FileRegion;
34 import org.apache.mina.core.polling.AbstractPollingIoProcessor;
35 import org.apache.mina.core.session.SessionState;
36
37
38
39
40
41
42 public final class NioProcessor extends AbstractPollingIoProcessor<NioSession> {
43
44 private Selector selector;
45
46
47
48
49
50
51
52 public NioProcessor(Executor executor) {
53 super(executor);
54
55 try {
56
57 selector = Selector.open();
58 } catch (IOException e) {
59 throw new RuntimeIoException("Failed to open a selector.", e);
60 }
61 }
62
63 @Override
64 protected void doDispose() throws Exception {
65 selector.close();
66 }
67
68 @Override
69 protected int select(long timeout) throws Exception {
70 return selector.select(timeout);
71 }
72
73 @Override
74 protected int select() throws Exception {
75 return selector.select();
76 }
77
78 @Override
79 protected boolean isSelectorEmpty() {
80 return selector.keys().isEmpty();
81 }
82
83 @Override
84 protected void wakeup() {
85 wakeupCalled.getAndSet(true);
86 selector.wakeup();
87 }
88
89 @Override
90 protected Iterator<NioSession> allSessions() {
91 return new IoSessionIterator(selector.keys());
92 }
93
94 @SuppressWarnings("synthetic-access")
95 @Override
96 protected Iterator<NioSession> selectedSessions() {
97 return new IoSessionIterator(selector.selectedKeys());
98 }
99
100 @Override
101 protected void init(NioSession session) throws Exception {
102 SelectableChannel ch = (SelectableChannel) session.getChannel();
103 ch.configureBlocking(false);
104 session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ,
105 session));
106 }
107
108 @Override
109 protected void destroy(NioSession session) throws Exception {
110 ByteChannel ch = session.getChannel();
111 SelectionKey key = session.getSelectionKey();
112 if (key != null) {
113 key.cancel();
114 }
115 ch.close();
116 }
117
118
119
120
121 @Override
122 protected SessionState getState(NioSession session) {
123 SelectionKey key = session.getSelectionKey();
124
125 if (key == null) {
126
127 return SessionState.OPENING;
128 }
129
130 if (key.isValid()) {
131
132 return SessionState.OPENED;
133 } else {
134
135 return SessionState.CLOSING;
136 }
137 }
138
139 @Override
140 protected boolean isReadable(NioSession session) {
141 SelectionKey key = session.getSelectionKey();
142 return key.isValid() && key.isReadable();
143 }
144
145 @Override
146 protected boolean isWritable(NioSession session) {
147 SelectionKey key = session.getSelectionKey();
148 return key.isValid() && key.isWritable();
149 }
150
151 @Override
152 protected boolean isInterestedInRead(NioSession session) {
153 SelectionKey key = session.getSelectionKey();
154 return key.isValid() && (key.interestOps() & SelectionKey.OP_READ) != 0;
155 }
156
157 @Override
158 protected boolean isInterestedInWrite(NioSession session) {
159 SelectionKey key = session.getSelectionKey();
160 return key.isValid()
161 && (key.interestOps() & SelectionKey.OP_WRITE) != 0;
162 }
163
164
165
166
167 @Override
168 protected void setInterestedInRead(NioSession session, boolean isInterested)
169 throws Exception {
170 SelectionKey key = session.getSelectionKey();
171 int oldInterestOps = key.interestOps();
172 int newInterestOps = oldInterestOps;
173
174 if (isInterested) {
175 newInterestOps |= SelectionKey.OP_READ;
176 } else {
177 newInterestOps &= ~SelectionKey.OP_READ;
178 }
179
180 if (oldInterestOps != newInterestOps) {
181 key.interestOps(newInterestOps);
182 }
183 }
184
185
186
187
188 @Override
189 protected void setInterestedInWrite(NioSession session, boolean isInterested)
190 throws Exception {
191 SelectionKey key = session.getSelectionKey();
192
193 if (key == null) {
194 return;
195 }
196
197 int newInterestOps = key.interestOps();
198
199 if (isInterested) {
200 newInterestOps |= SelectionKey.OP_WRITE;
201
202 } else {
203 newInterestOps &= ~SelectionKey.OP_WRITE;
204
205 }
206
207 key.interestOps(newInterestOps);
208 }
209
210 @Override
211 protected int read(NioSession session, IoBuffer buf) throws Exception {
212 ByteChannel channel = session.getChannel();
213
214 return session.getChannel().read(buf.buf());
215 }
216
217 @Override
218 protected int write(NioSession session, IoBuffer buf, int length)
219 throws Exception {
220 if (buf.remaining() <= length) {
221 return session.getChannel().write(buf.buf());
222 }
223
224 int oldLimit = buf.limit();
225 buf.limit(buf.position() + length);
226 try {
227 return session.getChannel().write(buf.buf());
228 } finally {
229 buf.limit(oldLimit);
230 }
231 }
232
233 @Override
234 protected int transferFile(NioSession session, FileRegion region, int length)
235 throws Exception {
236 try {
237 return (int) region.getFileChannel().transferTo(
238 region.getPosition(), length, session.getChannel());
239 } catch (IOException e) {
240
241
242 String message = e.getMessage();
243 if (message != null && message.contains("temporarily unavailable")) {
244 return 0;
245 }
246
247 throw e;
248 }
249 }
250
251
252
253
254
255 protected static class IoSessionIterator<NioSession> implements
256 Iterator<NioSession> {
257 private final Iterator<SelectionKey> iterator;
258
259
260
261
262
263
264
265 private IoSessionIterator(Set<SelectionKey> keys) {
266 iterator = keys.iterator();
267 }
268
269
270
271
272 public boolean hasNext() {
273 return iterator.hasNext();
274 }
275
276
277
278
279 public NioSession next() {
280 SelectionKey key = iterator.next();
281 NioSession nioSession = (NioSession) key.attachment();
282 return nioSession;
283 }
284
285
286
287
288 public void remove() {
289 iterator.remove();
290 }
291 }
292 }