1 package org.apache.mina.transport.socket.apr;
2
3 import java.io.IOException;
4 import java.net.InetSocketAddress;
5 import java.net.SocketAddress;
6 import java.util.Iterator;
7 import java.util.List;
8 import java.util.concurrent.Executor;
9
10 import org.apache.mina.common.AbstractPollingIoAcceptor;
11 import org.apache.mina.common.IoProcessor;
12 import org.apache.mina.common.RuntimeIoException;
13 import org.apache.mina.common.TransportMetadata;
14 import org.apache.mina.transport.socket.DefaultSocketSessionConfig;
15 import org.apache.mina.transport.socket.SocketAcceptor;
16 import org.apache.mina.transport.socket.SocketSessionConfig;
17 import org.apache.mina.util.CircularQueue;
18 import org.apache.tomcat.jni.Address;
19 import org.apache.tomcat.jni.Poll;
20 import org.apache.tomcat.jni.Pool;
21 import org.apache.tomcat.jni.Socket;
22 import org.apache.tomcat.jni.Status;
23
24 public final class AprSocketAcceptor extends AbstractPollingIoAcceptor<AprSession, Long> implements SocketAcceptor {
25
26 private static final int POLLSET_SIZE = 1024;
27
28 private final Object wakeupLock = new Object();
29 private long wakeupSocket;
30 private volatile boolean toBeWakenUp;
31
32 private int backlog = 50;
33 private boolean reuseAddress = true;
34
35 private volatile long pool;
36 private volatile long pollset;
37 private final long[] polledSockets = new long[POLLSET_SIZE << 1];
38 private final List<Long> polledHandles =
39 new CircularQueue<Long>(POLLSET_SIZE);
40
41 public AprSocketAcceptor() {
42 super(new DefaultSocketSessionConfig(), AprIoProcessor.class);
43 }
44
45 public AprSocketAcceptor(int processorCount) {
46 super(new DefaultSocketSessionConfig(), AprIoProcessor.class, processorCount);
47 }
48
49 public AprSocketAcceptor(IoProcessor<AprSession> processor) {
50 super(new DefaultSocketSessionConfig(), processor);
51 }
52
53 public AprSocketAcceptor(Executor executor,
54 IoProcessor<AprSession> processor) {
55 super(new DefaultSocketSessionConfig(), executor, processor);
56 }
57
58 @Override
59 protected AprSession accept(IoProcessor<AprSession> processor, Long handle) throws Exception {
60 long s = Socket.accept(handle);
61 boolean success = false;
62 try {
63 AprSession result = new AprSocketSession(this, processor, s);
64 success = true;
65 return result;
66 } finally {
67 if (!success) {
68 Socket.close(s);
69 }
70 }
71 }
72
73 @Override
74 protected Long open(SocketAddress localAddress) throws Exception {
75 InetSocketAddress la = (InetSocketAddress) localAddress;
76 long handle = Socket.create(
77 Socket.APR_INET, Socket.SOCK_STREAM, Socket.APR_PROTO_TCP, pool);
78
79 boolean success = false;
80 try {
81 Socket.optSet(handle, Socket.APR_SO_NONBLOCK, 1);
82 Socket.timeoutSet(handle, 0);
83
84
85 Socket.optSet(handle, Socket.APR_SO_REUSEADDR, isReuseAddress()? 1 : 0);
86 Socket.optSet(handle, Socket.APR_SO_RCVBUF, getSessionConfig().getReceiveBufferSize());
87
88
89 long sa;
90 if (la != null) {
91 if (la.getAddress() == null) {
92 sa = Address.info(Address.APR_ANYADDR, Socket.APR_INET, la.getPort(), 0, pool);
93 } else {
94 sa = Address.info(la.getAddress().getHostAddress(), Socket.APR_INET, la.getPort(), 0, pool);
95 }
96 } else {
97 sa = Address.info(Address.APR_ANYADDR, Socket.APR_INET, 0, 0, pool);
98 }
99
100 int result = Socket.bind(handle, sa);
101 if (result != Status.APR_SUCCESS) {
102 throwException(result);
103 }
104 result = Socket.listen(handle, getBacklog());
105 if (result != Status.APR_SUCCESS) {
106 throwException(result);
107 }
108
109 result = Poll.add(pollset, handle, Poll.APR_POLLIN);
110 if (result != Status.APR_SUCCESS) {
111 throwException(result);
112 }
113 success = true;
114 } finally {
115 if (!success) {
116 close(handle);
117 }
118 }
119 return handle;
120 }
121
122 @Override
123 protected void init() throws Exception {
124 wakeupSocket = Socket.create(
125 Socket.APR_INET, Socket.SOCK_DGRAM, Socket.APR_PROTO_UDP, AprLibrary
126 .getInstance().getRootPool());
127
128
129 pool = Pool.create(AprLibrary.getInstance().getRootPool());
130
131 pollset = Poll.create(
132 POLLSET_SIZE,
133 pool,
134 Poll.APR_POLLSET_THREADSAFE,
135 Long.MAX_VALUE);
136
137 if (pollset <= 0) {
138 pollset = Poll.create(
139 62,
140 pool,
141 Poll.APR_POLLSET_THREADSAFE,
142 Long.MAX_VALUE);
143 }
144
145 if (pollset <= 0) {
146 if (Status.APR_STATUS_IS_ENOTIMPL(- (int) pollset)) {
147 throw new RuntimeIoException(
148 "Thread-safe pollset is not supported in this platform.");
149 }
150 }
151 }
152
153 @Override
154 protected void destroy() throws Exception {
155 if (wakeupSocket > 0) {
156 Socket.close(wakeupSocket);
157 }
158 if (pollset > 0) {
159 Poll.destroy(pollset);
160 }
161 if (pool > 0) {
162 Pool.destroy(pool);
163 }
164 }
165
166 @Override
167 protected SocketAddress localAddress(Long handle) throws Exception {
168 long la = Address.get(Socket.APR_LOCAL, handle);
169 return new InetSocketAddress(Address.getip(la), Address.getInfo(la).port);
170 }
171
172 @Override
173 protected boolean select() throws Exception {
174 int rv = Poll.poll(pollset, Integer.MAX_VALUE, polledSockets, false);
175 if (rv <= 0) {
176 if (rv != -120001) {
177 throwException(rv);
178 }
179
180 rv = Poll.maintain(pollset, polledSockets, true);
181 if (rv > 0) {
182 for (int i = 0; i < rv; i ++) {
183 Poll.add(pollset, polledSockets[i], Poll.APR_POLLIN);
184 }
185 } else if (rv < 0) {
186 throwException(rv);
187 }
188
189 return false;
190 } else {
191 rv <<= 1;
192 if (!polledHandles.isEmpty()) {
193 polledHandles.clear();
194 }
195
196 for (int i = 0; i < rv; i ++) {
197 long flag = polledSockets[i];
198 long socket = polledSockets[++i];
199 if (socket == wakeupSocket) {
200 synchronized (wakeupLock) {
201 Poll.remove(pollset, wakeupSocket);
202 toBeWakenUp = false;
203 }
204 continue;
205 }
206
207 if ((flag & Poll.APR_POLLIN) != 0) {
208 polledHandles.add(socket);
209 }
210 }
211 return !polledHandles.isEmpty();
212 }
213 }
214
215 @Override
216 protected Iterator<Long> selectedHandles() {
217 return polledHandles.iterator();
218 }
219
220 @Override
221 protected void close(Long handle) throws Exception {
222 Poll.remove(pollset, handle);
223 int result = Socket.close(handle);
224 if (result != Status.APR_SUCCESS) {
225 throwException(result);
226 }
227 }
228
229 @Override
230 protected void wakeup() {
231 if (toBeWakenUp) {
232 return;
233 }
234
235
236 synchronized (wakeupLock) {
237 toBeWakenUp = true;
238 Poll.add(pollset, wakeupSocket, Poll.APR_POLLOUT);
239 }
240 }
241
242 public int getBacklog() {
243 return backlog;
244 }
245
246 public boolean isReuseAddress() {
247 return reuseAddress;
248 }
249
250 public void setBacklog(int backlog) {
251 synchronized (bindLock) {
252 if (isActive()) {
253 throw new IllegalStateException(
254 "backlog can't be set while the acceptor is bound.");
255 }
256
257 this.backlog = backlog;
258 }
259 }
260
261 @Override
262 public InetSocketAddress getLocalAddress() {
263 return (InetSocketAddress) super.getLocalAddress();
264 }
265
266 @Override
267 public InetSocketAddress getDefaultLocalAddress() {
268 return (InetSocketAddress) super.getDefaultLocalAddress();
269 }
270
271 public void setDefaultLocalAddress(InetSocketAddress localAddress) {
272 super.setDefaultLocalAddress(localAddress);
273 }
274
275 public void setReuseAddress(boolean reuseAddress) {
276 synchronized (bindLock) {
277 if (isActive()) {
278 throw new IllegalStateException(
279 "backlog can't be set while the acceptor is bound.");
280 }
281
282 this.reuseAddress = reuseAddress;
283 }
284 }
285
286 public TransportMetadata getTransportMetadata() {
287 return AprSocketSession.METADATA;
288 }
289
290 @Override
291 public SocketSessionConfig getSessionConfig() {
292 return (SocketSessionConfig) super.getSessionConfig();
293 }
294
295 private void throwException(int code) throws IOException {
296 throw new IOException(
297 org.apache.tomcat.jni.Error.strerror(-code) +
298 " (code: " + code + ")");
299 }
300 }