View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.socket.apr;
21  
22  import java.io.IOException;
23  import java.nio.ByteBuffer;
24  import java.util.HashMap;
25  import java.util.Iterator;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.concurrent.Executor;
29  
30  import org.apache.mina.core.RuntimeIoException;
31  import org.apache.mina.core.buffer.IoBuffer;
32  import org.apache.mina.core.file.FileRegion;
33  import org.apache.mina.core.polling.AbstractPollingIoProcessor;
34  import org.apache.mina.util.CircularQueue;
35  import org.apache.tomcat.jni.Poll;
36  import org.apache.tomcat.jni.Pool;
37  import org.apache.tomcat.jni.Socket;
38  import org.apache.tomcat.jni.Status;
39  
40  /**
41   * The class in charge of processing socket level IO events for the {@link AprSocketConnector}
42   *
43   * @author The Apache MINA Project (dev@mina.apache.org)
44   */
45  public final class AprIoProcessor extends AbstractPollingIoProcessor<AprSession> {
46      private static final int POLLSET_SIZE = 1024;
47  
48      private final Map<Long, AprSession> allSessions =
49          new HashMap<Long, AprSession>(POLLSET_SIZE);
50  
51      private final Object wakeupLock = new Object();
52      private final long wakeupSocket;
53      private volatile boolean toBeWakenUp;
54  
55      private final long pool;
56      private final long bufferPool; // memory pool
57      private final long pollset; // socket poller
58      private final long[] polledSockets = new long[POLLSET_SIZE << 1];
59      private final List<AprSession> polledSessions =
60          new CircularQueue<AprSession>(POLLSET_SIZE);
61  
62      /**
63       * Create a new instance of {@link AprIoProcessor} with a given Exector for 
64       * handling I/Os events.
65       * 
66       * @param executor the {@link Executor} for handling I/O events
67       */
68      public AprIoProcessor(Executor executor) {
69          super(executor);
70  
71          // initialize a memory pool for APR functions
72          pool = Pool.create(AprLibrary.getInstance().getRootPool());
73          bufferPool = Pool.create(AprLibrary.getInstance().getRootPool());
74  
75          try {
76              wakeupSocket = Socket.create(
77                      Socket.APR_INET, Socket.SOCK_DGRAM, Socket.APR_PROTO_UDP, pool);
78          } catch (RuntimeException e) {
79              throw e;
80          } catch (Error e) {
81              throw e;
82          } catch (Exception e) {
83              throw new RuntimeIoException("Failed to create a wakeup socket.", e);
84          }
85  
86          boolean success = false;
87          long newPollset;
88          try {
89              newPollset = Poll.create(
90                      POLLSET_SIZE,
91                      pool,
92                      Poll.APR_POLLSET_THREADSAFE,
93                      Long.MAX_VALUE);
94  
95              if (newPollset == 0) {
96                  newPollset = Poll.create(
97                          62,
98                          pool,
99                          Poll.APR_POLLSET_THREADSAFE,
100                         Long.MAX_VALUE);
101             }
102 
103             pollset = newPollset;
104             if (pollset < 0) {
105                 if (Status.APR_STATUS_IS_ENOTIMPL(- (int) pollset)) {
106                     throw new RuntimeIoException(
107                             "Thread-safe pollset is not supported in this platform.");
108                 }
109             }
110             success = true;
111         } catch (RuntimeException e) {
112             throw e;
113         } catch (Error e) {
114             throw e;
115         } catch (Exception e) {
116             throw new RuntimeIoException("Failed to create a pollset.", e);
117         } finally {
118             if (!success) {
119                 dispose();
120             }
121         }
122     }
123 
124     /**
125      * {@inheritDoc}
126      */
127     @Override
128     protected void dispose0() {
129         Poll.destroy(pollset);
130         Socket.close(wakeupSocket);
131         Pool.destroy(bufferPool);
132         Pool.destroy(pool);
133     }
134 
135     /**
136      * {@inheritDoc}
137      */
138     @Override
139     protected int select() throws Exception {
140         return select(Integer.MAX_VALUE);
141     }
142 
143     /**
144      * {@inheritDoc}
145      */
146      @Override
147     protected int select(long timeout) throws Exception {
148         int rv = Poll.poll(pollset, 1000 * timeout, polledSockets, false);
149         if (rv <= 0) {
150             if (rv != -120001) {
151                 throwException(rv);
152             }
153 
154             rv = Poll.maintain(pollset, polledSockets, true);
155             if (rv > 0) {
156                 for (int i = 0; i < rv; i ++) {
157                     long socket = polledSockets[i];
158                     AprSession session = allSessions.get(socket);
159                     if (session == null) {
160                         continue;
161                     }
162 
163                     int flag = (session.isInterestedInRead()? Poll.APR_POLLIN : 0) |
164                                (session.isInterestedInWrite()? Poll.APR_POLLOUT : 0);
165 
166                     Poll.add(pollset, socket, flag);
167                 }
168             } else if (rv < 0) {
169                 throwException(rv);
170             }
171 
172             return 0;
173         } else {
174             rv <<= 1;
175             if (!polledSessions.isEmpty()) {
176                 polledSessions.clear();
177             }
178             for (int i = 0; i < rv; i ++) {
179                 long flag = polledSockets[i];
180                 long socket = polledSockets[++i];
181                 if (socket == wakeupSocket) {
182                     synchronized (wakeupLock) {
183                         Poll.remove(pollset, wakeupSocket);
184                         toBeWakenUp = false;
185                     }
186                     continue;
187                 }
188                 AprSession session = allSessions.get(socket);
189                 if (session == null) {
190                     continue;
191                 }
192 
193                 session.setReadable((flag & Poll.APR_POLLIN) != 0);
194                 session.setWritable((flag & Poll.APR_POLLOUT) != 0);
195 
196                 polledSessions.add(session);
197             }
198 
199             return polledSessions.size();
200         }
201     }
202 
203      /**
204      * {@inheritDoc}
205      */
206     @Override
207     protected boolean isSelectorEmpty() {
208         return allSessions.isEmpty();
209     }
210 
211     /**
212      * {@inheritDoc}
213      */
214     @Override
215     protected void wakeup() {
216         if (toBeWakenUp) {
217             return;
218         }
219 
220         // Add a dummy socket to the pollset.
221         synchronized (wakeupLock) {
222             toBeWakenUp = true;
223             Poll.add(pollset, wakeupSocket, Poll.APR_POLLOUT);
224         }
225     }
226 
227     /**
228      * {@inheritDoc}
229      */
230     @Override
231     protected Iterator<AprSession> allSessions() {
232         return allSessions.values().iterator();
233     }
234 
235     /**
236      * {@inheritDoc}
237      */
238     @Override
239     protected Iterator<AprSession> selectedSessions() {
240         return polledSessions.iterator();
241     }
242 
243     @Override
244     protected void init(AprSession session) throws Exception {
245         long s = session.getDescriptor();
246         Socket.optSet(s, Socket.APR_SO_NONBLOCK, 1);
247         Socket.timeoutSet(s, 0);
248 
249         int rv = Poll.add(pollset, s, Poll.APR_POLLIN);
250         if (rv != Status.APR_SUCCESS) {
251             throwException(rv);
252         }
253 
254         session.setInterestedInRead(true);
255         allSessions.put(s, session);
256     }
257 
258     /**
259      * {@inheritDoc}
260      */
261     @Override
262     protected void destroy(AprSession session) throws Exception {
263         if (allSessions.remove(session.getDescriptor()) == null) {
264             // Already destroyed.
265             return;
266         }
267 
268         int ret = Poll.remove(pollset, session.getDescriptor());
269         try {
270             if (ret != Status.APR_SUCCESS) {
271                 throwException(ret);
272             }
273         } finally {
274             ret = Socket.close(session.getDescriptor());
275             
276             // destroying the session because it won't be reused 
277             // after this point
278             Socket.destroy(session.getDescriptor());
279             session.setDescriptor(0);
280             
281             if (ret != Status.APR_SUCCESS) {
282                 throwException(ret);
283             }
284         }
285     }
286 
287     /**
288      * {@inheritDoc}
289      */
290     @Override
291     protected SessionState state(AprSession session) {
292         long socket = session.getDescriptor();
293         if (socket != 0) {
294             return SessionState.OPEN;
295         } else if (allSessions.get(socket) != null) {
296             return SessionState.PREPARING; // will occur ?
297         } else {
298             return SessionState.CLOSED;
299         }
300     }
301 
302     /**
303      * {@inheritDoc}
304      */
305     @Override
306     protected boolean isReadable(AprSession session) {
307         return session.isReadable();
308     }
309 
310     /**
311      * {@inheritDoc}
312      */
313     @Override
314     protected boolean isWritable(AprSession session) {
315         return session.isWritable();
316     }
317 
318     /**
319      * {@inheritDoc}
320      */
321     @Override
322     protected boolean isInterestedInRead(AprSession session) {
323         return session.isInterestedInRead();
324     }
325 
326     /**
327      * {@inheritDoc}
328      */
329     @Override
330     protected boolean isInterestedInWrite(AprSession session) {
331         return session.isInterestedInWrite();
332     }
333 
334     /**
335      * {@inheritDoc}
336      */
337     @Override
338     protected void setInterestedInRead(AprSession session, boolean value) throws Exception {
339         if (session.isInterestedInRead() == value) {
340             return;
341         }
342 
343         int rv = Poll.remove(pollset, session.getDescriptor());
344         if (rv != Status.APR_SUCCESS) {
345             throwException(rv);
346         }
347 
348         int flags = (value ? Poll.APR_POLLIN : 0)
349                 | (session.isInterestedInWrite() ? Poll.APR_POLLOUT : 0);
350 
351         rv = Poll.add(pollset, session.getDescriptor(), flags);
352         if (rv == Status.APR_SUCCESS) {
353             session.setInterestedInRead(value);
354         } else {
355             throwException(rv);
356         }
357     }
358 
359     /**
360      * {@inheritDoc}
361      */
362     @Override
363     protected void setInterestedInWrite(AprSession session, boolean value) throws Exception {
364         if (session.isInterestedInWrite() == value) {
365             return;
366         }
367 
368         int rv = Poll.remove(pollset, session.getDescriptor());
369         if (rv != Status.APR_SUCCESS) {
370             throwException(rv);
371         }
372 
373         int flags = (session.isInterestedInRead() ? Poll.APR_POLLIN : 0)
374                 | (value ? Poll.APR_POLLOUT : 0);
375 
376         rv = Poll.add(pollset, session.getDescriptor(), flags);
377         if (rv == Status.APR_SUCCESS) {
378             session.setInterestedInWrite(value);
379         } else {
380             throwException(rv);
381         }
382     }
383 
384     /**
385      * {@inheritDoc}
386      */
387     @Override
388     protected int read(AprSession session, IoBuffer buffer) throws Exception {
389         int bytes;
390         int capacity = buffer.remaining();
391         // Using Socket.recv() directly causes memory leak. :-(
392         ByteBuffer b = Pool.alloc(bufferPool, capacity);
393         try {
394             bytes = Socket.recvb(
395                     session.getDescriptor(), b, 0, capacity);
396             if (bytes > 0) {
397                 b.position(0);
398                 b.limit(bytes);
399                 buffer.put(b);
400             } else if (bytes < 0) {
401                 if (Status.APR_STATUS_IS_EOF(-bytes)) {
402                     bytes = -1;
403                 } else if (Status.APR_STATUS_IS_EAGAIN(-bytes)) {
404                     bytes = 0;
405                 } else {
406                     throwException(bytes);
407                 }
408             }
409         } finally {
410             Pool.clear(bufferPool);
411         }
412         return bytes;
413     }
414 
415     /**
416      * {@inheritDoc}
417      */
418     @Override
419     protected int write(AprSession session, IoBuffer buf, int length) throws Exception {
420         int writtenBytes;
421         if (buf.isDirect()) {
422             writtenBytes = Socket.sendb(
423                     session.getDescriptor(), buf.buf(), buf.position(), length);
424         } else {
425             writtenBytes = Socket.send(
426                     session.getDescriptor(), buf.array(), buf.position(), length);
427             if (writtenBytes > 0) {
428                 buf.skip(writtenBytes);
429             }
430         }
431 
432         if (writtenBytes < 0) {
433             if (Status.APR_STATUS_IS_EAGAIN(-writtenBytes)) {
434                 writtenBytes = 0;
435             } else if (Status.APR_STATUS_IS_EOF(-writtenBytes)) {
436                 writtenBytes = 0;
437             } else {
438                 throwException(writtenBytes);
439             }
440         }
441         return writtenBytes;
442     }
443 
444     /**
445      * {@inheritDoc}
446      */
447     @Override
448     protected int transferFile(AprSession session, FileRegion region, int length)
449             throws Exception {
450         throw new UnsupportedOperationException();
451     }
452 
453     private void throwException(int code) throws IOException {
454         throw new IOException(
455                 org.apache.tomcat.jni.Error.strerror(-code) +
456                 " (code: " + code + ")");
457     }
458 }