1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.socket.apr;
21
22 import java.io.IOException;
23 import java.nio.ByteBuffer;
24 import java.util.HashMap;
25 import java.util.Iterator;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.concurrent.Executor;
29
30 import org.apache.mina.core.RuntimeIoException;
31 import org.apache.mina.core.buffer.IoBuffer;
32 import org.apache.mina.core.file.FileRegion;
33 import org.apache.mina.core.polling.AbstractPollingIoProcessor;
34 import org.apache.mina.util.CircularQueue;
35 import org.apache.tomcat.jni.Poll;
36 import org.apache.tomcat.jni.Pool;
37 import org.apache.tomcat.jni.Socket;
38 import org.apache.tomcat.jni.Status;
39
40
41
42
43
44
45 public final class AprIoProcessor extends AbstractPollingIoProcessor<AprSession> {
46 private static final int POLLSET_SIZE = 1024;
47
48 private final Map<Long, AprSession> allSessions =
49 new HashMap<Long, AprSession>(POLLSET_SIZE);
50
51 private final Object wakeupLock = new Object();
52 private final long wakeupSocket;
53 private volatile boolean toBeWakenUp;
54
55 private final long pool;
56 private final long bufferPool;
57 private final long pollset;
58 private final long[] polledSockets = new long[POLLSET_SIZE << 1];
59 private final List<AprSession> polledSessions =
60 new CircularQueue<AprSession>(POLLSET_SIZE);
61
62
63
64
65
66
67
68 public AprIoProcessor(Executor executor) {
69 super(executor);
70
71
72 pool = Pool.create(AprLibrary.getInstance().getRootPool());
73 bufferPool = Pool.create(AprLibrary.getInstance().getRootPool());
74
75 try {
76 wakeupSocket = Socket.create(
77 Socket.APR_INET, Socket.SOCK_DGRAM, Socket.APR_PROTO_UDP, pool);
78 } catch (RuntimeException e) {
79 throw e;
80 } catch (Error e) {
81 throw e;
82 } catch (Exception e) {
83 throw new RuntimeIoException("Failed to create a wakeup socket.", e);
84 }
85
86 boolean success = false;
87 long newPollset;
88 try {
89 newPollset = Poll.create(
90 POLLSET_SIZE,
91 pool,
92 Poll.APR_POLLSET_THREADSAFE,
93 Long.MAX_VALUE);
94
95 if (newPollset == 0) {
96 newPollset = Poll.create(
97 62,
98 pool,
99 Poll.APR_POLLSET_THREADSAFE,
100 Long.MAX_VALUE);
101 }
102
103 pollset = newPollset;
104 if (pollset < 0) {
105 if (Status.APR_STATUS_IS_ENOTIMPL(- (int) pollset)) {
106 throw new RuntimeIoException(
107 "Thread-safe pollset is not supported in this platform.");
108 }
109 }
110 success = true;
111 } catch (RuntimeException e) {
112 throw e;
113 } catch (Error e) {
114 throw e;
115 } catch (Exception e) {
116 throw new RuntimeIoException("Failed to create a pollset.", e);
117 } finally {
118 if (!success) {
119 dispose();
120 }
121 }
122 }
123
124
125
126
127 @Override
128 protected void dispose0() {
129 Poll.destroy(pollset);
130 Socket.close(wakeupSocket);
131 Pool.destroy(bufferPool);
132 Pool.destroy(pool);
133 }
134
135
136
137
138 @Override
139 protected int select() throws Exception {
140 return select(Integer.MAX_VALUE);
141 }
142
143
144
145
146 @Override
147 protected int select(long timeout) throws Exception {
148 int rv = Poll.poll(pollset, 1000 * timeout, polledSockets, false);
149 if (rv <= 0) {
150 if (rv != -120001) {
151 throwException(rv);
152 }
153
154 rv = Poll.maintain(pollset, polledSockets, true);
155 if (rv > 0) {
156 for (int i = 0; i < rv; i ++) {
157 long socket = polledSockets[i];
158 AprSession session = allSessions.get(socket);
159 if (session == null) {
160 continue;
161 }
162
163 int flag = (session.isInterestedInRead()? Poll.APR_POLLIN : 0) |
164 (session.isInterestedInWrite()? Poll.APR_POLLOUT : 0);
165
166 Poll.add(pollset, socket, flag);
167 }
168 } else if (rv < 0) {
169 throwException(rv);
170 }
171
172 return 0;
173 } else {
174 rv <<= 1;
175 if (!polledSessions.isEmpty()) {
176 polledSessions.clear();
177 }
178 for (int i = 0; i < rv; i ++) {
179 long flag = polledSockets[i];
180 long socket = polledSockets[++i];
181 if (socket == wakeupSocket) {
182 synchronized (wakeupLock) {
183 Poll.remove(pollset, wakeupSocket);
184 toBeWakenUp = false;
185 }
186 continue;
187 }
188 AprSession session = allSessions.get(socket);
189 if (session == null) {
190 continue;
191 }
192
193 session.setReadable((flag & Poll.APR_POLLIN) != 0);
194 session.setWritable((flag & Poll.APR_POLLOUT) != 0);
195
196 polledSessions.add(session);
197 }
198
199 return polledSessions.size();
200 }
201 }
202
203
204
205
206 @Override
207 protected boolean isSelectorEmpty() {
208 return allSessions.isEmpty();
209 }
210
211
212
213
214 @Override
215 protected void wakeup() {
216 if (toBeWakenUp) {
217 return;
218 }
219
220
221 synchronized (wakeupLock) {
222 toBeWakenUp = true;
223 Poll.add(pollset, wakeupSocket, Poll.APR_POLLOUT);
224 }
225 }
226
227
228
229
230 @Override
231 protected Iterator<AprSession> allSessions() {
232 return allSessions.values().iterator();
233 }
234
235
236
237
238 @Override
239 protected Iterator<AprSession> selectedSessions() {
240 return polledSessions.iterator();
241 }
242
243 @Override
244 protected void init(AprSession session) throws Exception {
245 long s = session.getDescriptor();
246 Socket.optSet(s, Socket.APR_SO_NONBLOCK, 1);
247 Socket.timeoutSet(s, 0);
248
249 int rv = Poll.add(pollset, s, Poll.APR_POLLIN);
250 if (rv != Status.APR_SUCCESS) {
251 throwException(rv);
252 }
253
254 session.setInterestedInRead(true);
255 allSessions.put(s, session);
256 }
257
258
259
260
261 @Override
262 protected void destroy(AprSession session) throws Exception {
263 if (allSessions.remove(session.getDescriptor()) == null) {
264
265 return;
266 }
267
268 int ret = Poll.remove(pollset, session.getDescriptor());
269 try {
270 if (ret != Status.APR_SUCCESS) {
271 throwException(ret);
272 }
273 } finally {
274 ret = Socket.close(session.getDescriptor());
275
276
277
278 Socket.destroy(session.getDescriptor());
279 session.setDescriptor(0);
280
281 if (ret != Status.APR_SUCCESS) {
282 throwException(ret);
283 }
284 }
285 }
286
287
288
289
290 @Override
291 protected SessionState state(AprSession session) {
292 long socket = session.getDescriptor();
293 if (socket != 0) {
294 return SessionState.OPEN;
295 } else if (allSessions.get(socket) != null) {
296 return SessionState.PREPARING;
297 } else {
298 return SessionState.CLOSED;
299 }
300 }
301
302
303
304
305 @Override
306 protected boolean isReadable(AprSession session) {
307 return session.isReadable();
308 }
309
310
311
312
313 @Override
314 protected boolean isWritable(AprSession session) {
315 return session.isWritable();
316 }
317
318
319
320
321 @Override
322 protected boolean isInterestedInRead(AprSession session) {
323 return session.isInterestedInRead();
324 }
325
326
327
328
329 @Override
330 protected boolean isInterestedInWrite(AprSession session) {
331 return session.isInterestedInWrite();
332 }
333
334
335
336
337 @Override
338 protected void setInterestedInRead(AprSession session, boolean value) throws Exception {
339 if (session.isInterestedInRead() == value) {
340 return;
341 }
342
343 int rv = Poll.remove(pollset, session.getDescriptor());
344 if (rv != Status.APR_SUCCESS) {
345 throwException(rv);
346 }
347
348 int flags = (value ? Poll.APR_POLLIN : 0)
349 | (session.isInterestedInWrite() ? Poll.APR_POLLOUT : 0);
350
351 rv = Poll.add(pollset, session.getDescriptor(), flags);
352 if (rv == Status.APR_SUCCESS) {
353 session.setInterestedInRead(value);
354 } else {
355 throwException(rv);
356 }
357 }
358
359
360
361
362 @Override
363 protected void setInterestedInWrite(AprSession session, boolean value) throws Exception {
364 if (session.isInterestedInWrite() == value) {
365 return;
366 }
367
368 int rv = Poll.remove(pollset, session.getDescriptor());
369 if (rv != Status.APR_SUCCESS) {
370 throwException(rv);
371 }
372
373 int flags = (session.isInterestedInRead() ? Poll.APR_POLLIN : 0)
374 | (value ? Poll.APR_POLLOUT : 0);
375
376 rv = Poll.add(pollset, session.getDescriptor(), flags);
377 if (rv == Status.APR_SUCCESS) {
378 session.setInterestedInWrite(value);
379 } else {
380 throwException(rv);
381 }
382 }
383
384
385
386
387 @Override
388 protected int read(AprSession session, IoBuffer buffer) throws Exception {
389 int bytes;
390 int capacity = buffer.remaining();
391
392 ByteBuffer b = Pool.alloc(bufferPool, capacity);
393 try {
394 bytes = Socket.recvb(
395 session.getDescriptor(), b, 0, capacity);
396 if (bytes > 0) {
397 b.position(0);
398 b.limit(bytes);
399 buffer.put(b);
400 } else if (bytes < 0) {
401 if (Status.APR_STATUS_IS_EOF(-bytes)) {
402 bytes = -1;
403 } else if (Status.APR_STATUS_IS_EAGAIN(-bytes)) {
404 bytes = 0;
405 } else {
406 throwException(bytes);
407 }
408 }
409 } finally {
410 Pool.clear(bufferPool);
411 }
412 return bytes;
413 }
414
415
416
417
418 @Override
419 protected int write(AprSession session, IoBuffer buf, int length) throws Exception {
420 int writtenBytes;
421 if (buf.isDirect()) {
422 writtenBytes = Socket.sendb(
423 session.getDescriptor(), buf.buf(), buf.position(), length);
424 } else {
425 writtenBytes = Socket.send(
426 session.getDescriptor(), buf.array(), buf.position(), length);
427 if (writtenBytes > 0) {
428 buf.skip(writtenBytes);
429 }
430 }
431
432 if (writtenBytes < 0) {
433 if (Status.APR_STATUS_IS_EAGAIN(-writtenBytes)) {
434 writtenBytes = 0;
435 } else if (Status.APR_STATUS_IS_EOF(-writtenBytes)) {
436 writtenBytes = 0;
437 } else {
438 throwException(writtenBytes);
439 }
440 }
441 return writtenBytes;
442 }
443
444
445
446
447 @Override
448 protected int transferFile(AprSession session, FileRegion region, int length)
449 throws Exception {
450 throw new UnsupportedOperationException();
451 }
452
453 private void throwException(int code) throws IOException {
454 throw new IOException(
455 org.apache.tomcat.jni.Error.strerror(-code) +
456 " (code: " + code + ")");
457 }
458 }