1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.socket.nio;
21
22 import java.io.IOException;
23 import java.nio.channels.ByteChannel;
24 import java.nio.channels.SelectableChannel;
25 import java.nio.channels.SelectionKey;
26 import java.nio.channels.Selector;
27 import java.util.Iterator;
28 import java.util.Set;
29 import java.util.concurrent.Executor;
30
31 import org.apache.mina.core.RuntimeIoException;
32 import org.apache.mina.core.buffer.IoBuffer;
33 import org.apache.mina.core.file.FileRegion;
34 import org.apache.mina.core.polling.AbstractPollingIoProcessor;
35 import org.apache.mina.core.session.SessionState;
36
37
38
39
40
41
42 public final class NioProcessor extends AbstractPollingIoProcessor<NioSession> {
43
44 private final Selector selector;
45
46
47
48
49
50
51
52 public NioProcessor(Executor executor) {
53 super(executor);
54 try {
55
56 selector = Selector.open();
57 } catch (IOException e) {
58 throw new RuntimeIoException("Failed to open a selector.", e);
59 }
60 }
61
62 @Override
63 protected void dispose0() throws Exception {
64 selector.close();
65 }
66
67 @Override
68 protected int select(long timeout) throws Exception {
69 return selector.select(timeout);
70 }
71
72 @Override
73 protected int select() throws Exception {
74 return selector.select();
75 }
76
77 @Override
78 protected boolean isSelectorEmpty() {
79 return selector.keys().isEmpty();
80 }
81
82 @Override
83 protected void wakeup() {
84 selector.wakeup();
85 }
86
87 @Override
88 protected Iterator<NioSession> allSessions() {
89 return new IoSessionIterator(selector.keys());
90 }
91
92 @SuppressWarnings("synthetic-access")
93 @Override
94 protected Iterator<NioSession> selectedSessions() {
95 return new IoSessionIterator(selector.selectedKeys());
96 }
97
98 @Override
99 protected void init(NioSession session) throws Exception {
100 SelectableChannel ch = (SelectableChannel) session.getChannel();
101 ch.configureBlocking(false);
102 session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ, session));
103 }
104
105 @Override
106 protected void destroy(NioSession session) throws Exception {
107 ByteChannel ch = session.getChannel();
108 SelectionKey key = session.getSelectionKey();
109 if (key != null) {
110 key.cancel();
111 }
112 ch.close();
113 }
114
115
116
117
118 @Override
119 protected SessionState getState(NioSession session) {
120 SelectionKey key = session.getSelectionKey();
121
122 if (key == null) {
123
124 return SessionState.OPENING;
125 }
126
127 if (key.isValid()) {
128
129 return SessionState.OPENED;
130 } else {
131
132 return SessionState.CLOSING;
133 }
134 }
135
136 @Override
137 protected boolean isReadable(NioSession session) {
138 SelectionKey key = session.getSelectionKey();
139 return key.isValid() && key.isReadable();
140 }
141
142 @Override
143 protected boolean isWritable(NioSession session) {
144 SelectionKey key = session.getSelectionKey();
145 return key.isValid() && key.isWritable();
146 }
147
148 @Override
149 protected boolean isInterestedInRead(NioSession session) {
150 SelectionKey key = session.getSelectionKey();
151 return key.isValid() && (key.interestOps() & SelectionKey.OP_READ) != 0;
152 }
153
154 @Override
155 protected boolean isInterestedInWrite(NioSession session) {
156 SelectionKey key = session.getSelectionKey();
157 return key.isValid() && (key.interestOps() & SelectionKey.OP_WRITE) != 0;
158 }
159
160
161
162
163 @Override
164 protected void setInterestedInRead(NioSession session, boolean isInterested) throws Exception {
165 SelectionKey key = session.getSelectionKey();
166 int oldInterestOps = key.interestOps();
167 int newInterestOps = oldInterestOps;
168
169 if (isInterested) {
170 newInterestOps |= SelectionKey.OP_READ;
171 } else {
172 newInterestOps &= ~SelectionKey.OP_READ;
173 }
174
175 if (oldInterestOps != newInterestOps) {
176 key.interestOps(newInterestOps);
177 }
178 }
179
180
181
182
183 @Override
184 protected void setInterestedInWrite(NioSession session, boolean isInterested) throws Exception {
185 SelectionKey key = session.getSelectionKey();
186 int oldInterestOps = key.interestOps();
187 int newInterestOps = oldInterestOps;
188
189 if (isInterested) {
190 newInterestOps |= SelectionKey.OP_WRITE;
191 } else {
192 newInterestOps &= ~SelectionKey.OP_WRITE;
193 }
194
195 if (oldInterestOps != newInterestOps) {
196 key.interestOps(newInterestOps);
197 }
198 }
199
200 @Override
201 protected int read(NioSession session, IoBuffer buf) throws Exception {
202 return session.getChannel().read(buf.buf());
203 }
204
205 @Override
206 protected int write(NioSession session, IoBuffer buf, int length) throws Exception {
207 if (buf.remaining() <= length) {
208 return session.getChannel().write(buf.buf());
209 }
210
211 int oldLimit = buf.limit();
212 buf.limit(buf.position() + length);
213 try {
214 return session.getChannel().write(buf.buf());
215 } finally {
216 buf.limit(oldLimit);
217 }
218 }
219
220 @Override
221 protected int transferFile(NioSession session, FileRegion region, int length) throws Exception {
222 try {
223 return (int) region.getFileChannel().transferTo(region.getPosition(), length, session.getChannel());
224 } catch (IOException e) {
225
226
227 String message = e.getMessage();
228 if (message != null && message.contains("temporarily unavailable")) {
229 return 0;
230 }
231
232 throw e;
233 }
234 }
235
236
237
238
239
240 protected static class IoSessionIterator<NioSession> implements Iterator<NioSession> {
241 private final Iterator<SelectionKey> iterator;
242
243
244
245
246
247
248 private IoSessionIterator(Set<SelectionKey> keys) {
249 iterator = keys.iterator();
250 }
251
252
253
254
255 public boolean hasNext() {
256 return iterator.hasNext();
257 }
258
259
260
261
262 public NioSession next() {
263 SelectionKey key = iterator.next();
264 NioSession nioSession = (NioSession) key.attachment();
265 return nioSession;
266 }
267
268
269
270
271 public void remove() {
272 iterator.remove();
273 }
274 }
275 }