View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *  
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *  
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License. 
18   *  
19   */
20  package org.apache.mina.transport.socket.apr;
21  
22  import java.io.IOException;
23  import java.nio.ByteBuffer;
24  import java.util.HashMap;
25  import java.util.Iterator;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.concurrent.Executor;
29  
30  import org.apache.mina.common.AbstractPollingIoProcessor;
31  import org.apache.mina.common.FileRegion;
32  import org.apache.mina.common.IoBuffer;
33  import org.apache.mina.common.RuntimeIoException;
34  import org.apache.mina.util.CircularQueue;
35  import org.apache.tomcat.jni.Poll;
36  import org.apache.tomcat.jni.Pool;
37  import org.apache.tomcat.jni.Socket;
38  import org.apache.tomcat.jni.Status;
39  
40  /**
41   * The class in charge of processing socket level IO events for the {@link AprSocketConnector}
42   * 
43   * @author The Apache MINA Project (dev@mina.apache.org)
44   * @version $Rev: 599788 $, $Date: 2007-11-30 04:42:57 -0700 (Fri, 30 Nov 2007) $
45   */
46  
47  public final class AprIoProcessor extends AbstractPollingIoProcessor<AprSession> {
48      private static final int POLLSET_SIZE = 1024;
49      
50      private final Map<Long, AprSession> allSessions =
51          new HashMap<Long, AprSession>(POLLSET_SIZE);
52      
53      private final Object wakeupLock = new Object();
54      private long wakeupSocket;
55      private volatile boolean toBeWakenUp;
56  
57      private final long bufferPool; // memory pool
58      private final long pollset; // socket poller
59      private final long[] polledSockets = new long[POLLSET_SIZE << 1];
60      private final List<AprSession> polledSessions =
61          new CircularQueue<AprSession>(POLLSET_SIZE);
62  
63      public AprIoProcessor(Executor executor) {
64          super(executor);
65          
66          try {
67              wakeupSocket = Socket.create(
68                      Socket.APR_INET, Socket.SOCK_DGRAM, Socket.APR_PROTO_UDP, AprLibrary
69                      .getInstance().getRootPool());
70          } catch (RuntimeException e) {
71              throw e;
72          } catch (Error e) {
73              throw e;
74          } catch (Exception e) {
75              throw new RuntimeIoException("Failed to create a wakeup socket.", e);
76          }
77  
78          // initialize a memory pool for APR functions
79          bufferPool = Pool.create(AprLibrary.getInstance().getRootPool());
80          
81          boolean success = false;
82          long newPollset;
83          try {
84              newPollset = Poll.create(
85                      POLLSET_SIZE,
86                      AprLibrary.getInstance().getRootPool(),
87                      Poll.APR_POLLSET_THREADSAFE,
88                      Long.MAX_VALUE);
89  
90              if (newPollset == 0) {
91                  newPollset = Poll.create(
92                          62,
93                          AprLibrary.getInstance().getRootPool(),
94                          Poll.APR_POLLSET_THREADSAFE,
95                          Long.MAX_VALUE);
96              }
97              
98              pollset = newPollset;
99              if (pollset < 0) {
100                 if (Status.APR_STATUS_IS_ENOTIMPL(- (int) pollset)) {
101                     throw new RuntimeIoException(
102                             "Thread-safe pollset is not supported in this platform.");
103                 }
104             }
105             success = true;
106         } catch (RuntimeException e) {
107             throw e;
108         } catch (Error e) {
109             throw e;
110         } catch (Exception e) {
111             throw new RuntimeIoException("Failed to create a pollset.", e);
112         } finally {
113             if (!success) {
114                 dispose();
115             }
116         }
117     }
118 
119     @Override
120     protected void dispose0() {
121         Poll.destroy(pollset);
122         Pool.destroy(bufferPool);
123         Socket.close(wakeupSocket);
124     }
125 
126     @Override
127     protected boolean select(int timeout) throws Exception {
128         int rv = Poll.poll(pollset, 1000 * timeout, polledSockets, false);
129         if (rv <= 0) {
130             if (rv != -120001) {
131                 throwException(rv);
132             }
133             
134             rv = Poll.maintain(pollset, polledSockets, true);
135             if (rv > 0) {
136                 for (int i = 0; i < rv; i ++) {
137                     long socket = polledSockets[i];
138                     AprSession session = allSessions.get(socket);
139                     if (session == null) {
140                         continue;
141                     }
142                     
143                     int flag = (session.isInterestedInRead()? Poll.APR_POLLIN : 0) |
144                                (session.isInterestedInWrite()? Poll.APR_POLLOUT : 0);
145 
146                     Poll.add(pollset, socket, flag);
147                 }
148             } else if (rv < 0) {
149                 throwException(rv);
150             }
151 
152             return false;
153         } else {
154             rv <<= 1;
155             if (!polledSessions.isEmpty()) {
156                 polledSessions.clear();
157             }
158             for (int i = 0; i < rv; i ++) {
159                 long flag = polledSockets[i];
160                 long socket = polledSockets[++i];
161                 if (socket == wakeupSocket) {
162                     synchronized (wakeupLock) {
163                         Poll.remove(pollset, wakeupSocket);
164                         toBeWakenUp = false;
165                     }
166                     continue;
167                 }
168                 AprSession session = allSessions.get(socket);
169                 if (session == null) {
170                     continue;
171                 }
172                 
173                 session.setReadable((flag & Poll.APR_POLLIN) != 0);
174                 session.setWritable((flag & Poll.APR_POLLOUT) != 0);
175                 
176                 polledSessions.add(session);
177             }
178             
179             return !polledSessions.isEmpty();
180         }
181     }
182 
183     @Override
184     protected void wakeup() {
185         if (toBeWakenUp) {
186             return;
187         }
188         
189         // Add a dummy socket to the pollset.
190         synchronized (wakeupLock) {
191             toBeWakenUp = true;
192             Poll.add(pollset, wakeupSocket, Poll.APR_POLLOUT);
193         }
194     }
195 
196     @Override
197     protected Iterator<AprSession> allSessions() {
198         return allSessions.values().iterator();
199     }
200 
201     @Override
202     protected Iterator<AprSession> selectedSessions() {
203         return polledSessions.iterator();
204     }
205 
206     @Override
207     protected void init(AprSession session) throws Exception {
208         long s = session.getDescriptor();
209         Socket.optSet(s, Socket.APR_SO_NONBLOCK, 1);
210         Socket.timeoutSet(s, 0);
211         
212         int rv = Poll.add(pollset, s, Poll.APR_POLLIN);
213         if (rv != Status.APR_SUCCESS) {
214             throwException(rv);
215         }
216         
217         session.setInterestedInRead(true);
218         allSessions.put(s, session);
219     }
220 
221     @Override
222     protected void destroy(AprSession session) throws Exception {
223         allSessions.remove(session.getDescriptor());
224         int ret = Poll.remove(pollset, session.getDescriptor());
225         try {
226             if (ret != Status.APR_SUCCESS) {
227                 throwException(ret);
228             }
229         } finally {
230             ret = Socket.close(session.getDescriptor());
231             if (ret != Status.APR_SUCCESS) {
232                 throwException(ret);
233             }
234         }
235     }
236 
237     @Override
238     protected SessionState state(AprSession session) {
239         long socket = session.getDescriptor();
240         if (socket > 0) {
241             return SessionState.OPEN;
242         } else if (allSessions.get(socket) != null) {
243             return SessionState.PREPARING; // will occur ?
244         } else {
245             return SessionState.CLOSED;
246         }
247     }
248 
249     @Override
250     protected boolean isReadable(AprSession session) {
251         return session.isReadable();
252     }
253 
254     @Override
255     protected boolean isWritable(AprSession session) {
256         return session.isWritable();
257     }
258 
259     @Override
260     protected boolean isInterestedInRead(AprSession session) {
261         return session.isInterestedInRead();
262     }
263 
264     @Override
265     protected boolean isInterestedInWrite(AprSession session) {
266         return session.isInterestedInWrite();
267     }
268 
269     @Override
270     protected void setInterestedInRead(AprSession session, boolean value) throws Exception {
271         int rv = Poll.remove(pollset, session.getDescriptor());
272         if (rv != Status.APR_SUCCESS) {
273             throwException(rv);
274         }
275 
276         int flags = (value ? Poll.APR_POLLIN : 0)
277                 | (session.isInterestedInWrite() ? Poll.APR_POLLOUT : 0);
278 
279         rv = Poll.add(pollset, session.getDescriptor(), flags);
280         if (rv == Status.APR_SUCCESS) {
281             session.setInterestedInRead(value);
282         } else {
283             throwException(rv);
284         }
285     }
286 
287     @Override
288     protected void setInterestedInWrite(AprSession session, boolean value) throws Exception {
289         int rv = Poll.remove(pollset, session.getDescriptor());
290         if (rv != Status.APR_SUCCESS) {
291             throwException(rv);
292         }
293 
294         int flags = (session.isInterestedInRead() ? Poll.APR_POLLIN : 0)
295                 | (value ? Poll.APR_POLLOUT : 0);
296 
297         rv = Poll.add(pollset, session.getDescriptor(), flags);
298         if (rv == Status.APR_SUCCESS) {
299             session.setInterestedInWrite(value);
300         } else {
301             throwException(rv);
302         }
303     }
304 
305     @Override
306     protected int read(AprSession session, IoBuffer buffer) throws Exception {
307         int bytes;
308         // Using Socket.recv() directly causes memory leak. :-(
309         ByteBuffer b = Pool.alloc(bufferPool, buffer.remaining());
310         try {
311             bytes = Socket.recvb(
312                     session.getDescriptor(), b, 0, b.remaining());
313             if (bytes > 0) {
314                 b.position(0);
315                 b.limit(bytes);
316                 buffer.put(b);
317             } else if (bytes < 0) {
318                 if (Status.APR_STATUS_IS_EOF(-bytes)) {
319                     bytes = -1;
320                 } else if (Status.APR_STATUS_IS_EAGAIN(-bytes)) {
321                     bytes = 0;
322                 } else {
323                     throwException(bytes);
324                 }
325             }
326         } finally {
327             Pool.clear(bufferPool);
328         }
329         return bytes;
330     }
331 
332     @Override
333     protected int write(AprSession session, IoBuffer buf, int length) throws Exception {
334         int writtenBytes;
335         if (buf.isDirect()) {
336             writtenBytes = Socket.sendb(
337                     session.getDescriptor(), buf.buf(), buf.position(), length);
338         } else {
339             writtenBytes = Socket.send(
340                     session.getDescriptor(), buf.array(), buf.position(), length);
341             if (writtenBytes > 0) {
342                 buf.skip(writtenBytes);
343             }
344         }
345         
346         if (writtenBytes < 0) {
347             if (Status.APR_STATUS_IS_EAGAIN(-writtenBytes)) {
348                 writtenBytes = 0;
349             } else if (Status.APR_STATUS_IS_EOF(-writtenBytes)) {
350                 writtenBytes = 0;
351             } else {
352                 throwException(writtenBytes);
353             }
354         }
355         return writtenBytes;
356     }
357 
358     @Override
359     protected int transferFile(AprSession session, FileRegion region, int length)
360             throws Exception {
361         throw new UnsupportedOperationException();
362     }
363 
364     private void throwException(int code) throws IOException {
365         throw new IOException(
366                 org.apache.tomcat.jni.Error.strerror(-code) +
367                 " (code: " + code + ")");
368     }
369 }