1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.socket.nio;
21
22 import java.io.IOException;
23 import java.nio.channels.ByteChannel;
24 import java.nio.channels.DatagramChannel;
25 import java.nio.channels.SelectableChannel;
26 import java.nio.channels.SelectionKey;
27 import java.nio.channels.Selector;
28 import java.nio.channels.SocketChannel;
29 import java.nio.channels.spi.SelectorProvider;
30 import java.util.Iterator;
31 import java.util.Set;
32 import java.util.concurrent.Executor;
33
34 import org.apache.mina.core.RuntimeIoException;
35 import org.apache.mina.core.buffer.IoBuffer;
36 import org.apache.mina.core.file.FileRegion;
37 import org.apache.mina.core.polling.AbstractPollingIoProcessor;
38 import org.apache.mina.core.session.SessionState;
39
40
41
42
43
44
45 public final class NioProcessor extends AbstractPollingIoProcessor<NioSession> {
46
47 private Selector selector;
48
49 private SelectorProvider selectorProvider = null;
50
51
52
53
54
55
56
57 public NioProcessor(Executor executor) {
58 super(executor);
59
60 try {
61
62 selector = Selector.open();
63 } catch (IOException e) {
64 throw new RuntimeIoException("Failed to open a selector.", e);
65 }
66 }
67
68
69
70
71
72
73
74 public NioProcessor(Executor executor, SelectorProvider selectorProvider) {
75 super(executor);
76
77 try {
78
79 if (selectorProvider == null) {
80 selector = Selector.open();
81 } else {
82 selector = selectorProvider.openSelector();
83 }
84
85 } catch (IOException e) {
86 throw new RuntimeIoException("Failed to open a selector.", e);
87 }
88 }
89
90 @Override
91 protected void doDispose() throws Exception {
92 selector.close();
93 }
94
95 @Override
96 protected int select(long timeout) throws Exception {
97 return selector.select(timeout);
98 }
99
100 @Override
101 protected int select() throws Exception {
102 return selector.select();
103 }
104
105 @Override
106 protected boolean isSelectorEmpty() {
107 return selector.keys().isEmpty();
108 }
109
110 @Override
111 protected void wakeup() {
112 wakeupCalled.getAndSet(true);
113 selector.wakeup();
114 }
115
116 @Override
117 protected Iterator<NioSession> allSessions() {
118 return new IoSessionIterator(selector.keys());
119 }
120
121 @SuppressWarnings("synthetic-access")
122 @Override
123 protected Iterator<NioSession> selectedSessions() {
124 return new IoSessionIterator(selector.selectedKeys());
125 }
126
127 @Override
128 protected void init(NioSession session) throws Exception {
129 SelectableChannel ch = (SelectableChannel) session.getChannel();
130 ch.configureBlocking(false);
131 session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ, session));
132 }
133
134 @Override
135 protected void destroy(NioSession session) throws Exception {
136 ByteChannel ch = session.getChannel();
137 SelectionKey key = session.getSelectionKey();
138 if (key != null) {
139 key.cancel();
140 }
141 ch.close();
142 }
143
144
145
146
147
148
149 @Override
150 protected void registerNewSelector() throws IOException {
151 synchronized (selector) {
152 Set<SelectionKey> keys = selector.keys();
153
154
155 Selector newSelector = null;
156
157 if (selectorProvider == null) {
158 newSelector = Selector.open();
159 } else {
160 newSelector = selectorProvider.openSelector();
161 }
162
163
164 for (SelectionKey key : keys) {
165 SelectableChannel ch = key.channel();
166
167
168 NioSession session = (NioSession) key.attachment();
169 SelectionKey newKey = ch.register(newSelector, key.interestOps(), session);
170 session.setSelectionKey(newKey);
171 }
172
173
174 selector.close();
175 selector = newSelector;
176 }
177 }
178
179
180
181
182 @Override
183 protected boolean isBrokenConnection() throws IOException {
184
185 boolean brokenSession = false;
186
187 synchronized (selector) {
188
189 Set<SelectionKey> keys = selector.keys();
190
191
192
193 for (SelectionKey key : keys) {
194 SelectableChannel channel = key.channel();
195
196 if ((((channel instanceof DatagramChannel) && !((DatagramChannel) channel).isConnected()))
197 || ((channel instanceof SocketChannel) && !((SocketChannel) channel).isConnected())) {
198
199
200 key.cancel();
201
202
203 brokenSession = true;
204 }
205 }
206 }
207
208 return brokenSession;
209 }
210
211
212
213
214 @Override
215 protected SessionState getState(NioSession session) {
216 SelectionKey key = session.getSelectionKey();
217
218 if (key == null) {
219
220 return SessionState.OPENING;
221 }
222
223 if (key.isValid()) {
224
225 return SessionState.OPENED;
226 } else {
227
228 return SessionState.CLOSING;
229 }
230 }
231
232 @Override
233 protected boolean isReadable(NioSession session) {
234 SelectionKey key = session.getSelectionKey();
235 return key.isValid() && key.isReadable();
236 }
237
238 @Override
239 protected boolean isWritable(NioSession session) {
240 SelectionKey key = session.getSelectionKey();
241 return key.isValid() && key.isWritable();
242 }
243
244 @Override
245 protected boolean isInterestedInRead(NioSession session) {
246 SelectionKey key = session.getSelectionKey();
247 return key.isValid() && ((key.interestOps() & SelectionKey.OP_READ) != 0);
248 }
249
250 @Override
251 protected boolean isInterestedInWrite(NioSession session) {
252 SelectionKey key = session.getSelectionKey();
253 return key.isValid() && ((key.interestOps() & SelectionKey.OP_WRITE) != 0);
254 }
255
256
257
258
259 @Override
260 protected void setInterestedInRead(NioSession session, boolean isInterested) throws Exception {
261 SelectionKey key = session.getSelectionKey();
262 int oldInterestOps = key.interestOps();
263 int newInterestOps = oldInterestOps;
264
265 if (isInterested) {
266 newInterestOps |= SelectionKey.OP_READ;
267 } else {
268 newInterestOps &= ~SelectionKey.OP_READ;
269 }
270
271 if (oldInterestOps != newInterestOps) {
272 key.interestOps(newInterestOps);
273 }
274 }
275
276
277
278
279 @Override
280 protected void setInterestedInWrite(NioSession session, boolean isInterested) throws Exception {
281 SelectionKey key = session.getSelectionKey();
282
283 if (key == null) {
284 return;
285 }
286
287 int newInterestOps = key.interestOps();
288
289 if (isInterested) {
290 newInterestOps |= SelectionKey.OP_WRITE;
291 } else {
292 newInterestOps &= ~SelectionKey.OP_WRITE;
293 }
294
295 key.interestOps(newInterestOps);
296 }
297
298 @Override
299 protected int read(NioSession session, IoBuffer buf) throws Exception {
300 ByteChannel channel = session.getChannel();
301
302 return channel.read(buf.buf());
303 }
304
305 @Override
306 protected int write(NioSession session, IoBuffer buf, int length) throws Exception {
307 if (buf.remaining() <= length) {
308 return session.getChannel().write(buf.buf());
309 }
310
311 int oldLimit = buf.limit();
312 buf.limit(buf.position() + length);
313 try {
314 return session.getChannel().write(buf.buf());
315 } finally {
316 buf.limit(oldLimit);
317 }
318 }
319
320 @Override
321 protected int transferFile(NioSession session, FileRegion region, int length) throws Exception {
322 try {
323 return (int) region.getFileChannel().transferTo(region.getPosition(), length, session.getChannel());
324 } catch (IOException e) {
325
326
327 String message = e.getMessage();
328 if ((message != null) && message.contains("temporarily unavailable")) {
329 return 0;
330 }
331
332 throw e;
333 }
334 }
335
336
337
338
339
340 protected static class IoSessionIterator<NioSession> implements Iterator<NioSession> {
341 private final Iterator<SelectionKey> iterator;
342
343
344
345
346
347
348
349 private IoSessionIterator(Set<SelectionKey> keys) {
350 iterator = keys.iterator();
351 }
352
353
354
355
356 public boolean hasNext() {
357 return iterator.hasNext();
358 }
359
360
361
362
363 public NioSession next() {
364 SelectionKey key = iterator.next();
365 NioSession nioSession = (NioSession) key.attachment();
366 return nioSession;
367 }
368
369
370
371
372 public void remove() {
373 iterator.remove();
374 }
375 }
376 }