View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.polling;
21  
22  import java.net.ConnectException;
23  import java.net.SocketAddress;
24  import java.nio.channels.ClosedSelectorException;
25  import java.util.Iterator;
26  import java.util.Queue;
27  import java.util.concurrent.ConcurrentLinkedQueue;
28  import java.util.concurrent.Executor;
29  import java.util.concurrent.Executors;
30  
31  import org.apache.mina.core.RuntimeIoException;
32  import org.apache.mina.core.filterchain.IoFilter;
33  import org.apache.mina.core.future.ConnectFuture;
34  import org.apache.mina.core.future.DefaultConnectFuture;
35  import org.apache.mina.core.service.AbstractIoConnector;
36  import org.apache.mina.core.service.IoConnector;
37  import org.apache.mina.core.service.IoHandler;
38  import org.apache.mina.core.service.IoProcessor;
39  import org.apache.mina.core.service.SimpleIoProcessorPool;
40  import org.apache.mina.core.session.AbstractIoSession;
41  import org.apache.mina.core.session.IoSession;
42  import org.apache.mina.core.session.IoSessionConfig;
43  import org.apache.mina.core.session.IoSessionInitializer;
44  import org.apache.mina.transport.socket.nio.NioSocketConnector;
45  import org.apache.mina.util.ExceptionMonitor;
46  
47  /**
48   * A base class for implementing client transport using a polling strategy. The
49   * underlying sockets will be checked in an active loop and woke up when an
50   * socket needed to be processed. This class handle the logic behind binding,
51   * connecting and disposing the client sockets. A {@link Executor} will be used
52   * for running client connection, and an {@link AbstractPollingIoProcessor} will
53   * be used for processing connected client I/O operations like reading, writing
54   * and closing.
55   * 
56   * All the low level methods for binding, connecting, closing need to be
57   * provided by the subclassing implementation.
58   * 
59   * @see NioSocketConnector for a example of implementation
60   * 
61   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
62   */
63  public abstract class AbstractPollingIoConnector<T extends AbstractIoSession, H>
64          extends AbstractIoConnector {
65  
66      private final Object lock = new Object();
67      private final Queue<ConnectionRequest> connectQueue = new ConcurrentLinkedQueue<ConnectionRequest>();
68      private final Queue<ConnectionRequest> cancelQueue = new ConcurrentLinkedQueue<ConnectionRequest>();
69      private final IoProcessor<T> processor;
70      private final boolean createdProcessor;
71  
72      private final ServiceOperationFuture disposalFuture =
73          new ServiceOperationFuture();
74      private volatile boolean selectable;
75      
76      /** The connector thread */
77      private Connector connector;
78  
79      /**
80       * Constructor for {@link AbstractPollingIoConnector}. You need to provide a default
81       * session configuration, a class of {@link IoProcessor} which will be instantiated in a
82       * {@link SimpleIoProcessorPool} for better scaling in multiprocessor systems. The default
83       * pool size will be used.
84       * 
85       * @see SimpleIoProcessorPool
86       * 
87       * @param sessionConfig
88       *            the default configuration for the managed {@link IoSession}
89       * @param processorClass a {@link Class} of {@link IoProcessor} for the associated {@link IoSession}
90       *            type.
91       */
92      protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Class<? extends IoProcessor<T>> processorClass) {
93          this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass), true);
94      }
95  
96      /**
97       * Constructor for {@link AbstractPollingIoConnector}. You need to provide a default
98       * session configuration, a class of {@link IoProcessor} which will be instantiated in a
99       * {@link SimpleIoProcessorPool} for using multiple thread for better scaling in multiprocessor
100      * systems.
101      * 
102      * @see SimpleIoProcessorPool
103      * 
104      * @param sessionConfig
105      *            the default configuration for the managed {@link IoSession}
106      * @param processorClass a {@link Class} of {@link IoProcessor} for the associated {@link IoSession}
107      *            type.
108      * @param processorCount the amount of processor to instantiate for the pool
109      */
110     protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Class<? extends IoProcessor<T>> processorClass, int processorCount) {
111         this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass, processorCount), true);
112     }
113 
114     /**
115      * Constructor for {@link AbstractPollingIoConnector}. You need to provide a default
116      * session configuration, a default {@link Executor} will be created using
117      * {@link Executors#newCachedThreadPool()}.
118      * 
119      * {@see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)}
120      * 
121      * @param sessionConfig
122      *            the default configuration for the managed {@link IoSession}
123      * @param processor the {@link IoProcessor} for processing the {@link IoSession} of this transport, triggering 
124      *            events to the bound {@link IoHandler} and processing the chains of {@link IoFilter} 
125      */
126     protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, IoProcessor<T> processor) {
127         this(sessionConfig, null, processor, false);
128     }
129 
130     /**
131      * Constructor for {@link AbstractPollingIoConnector}. You need to provide a default
132      * session configuration and an {@link Executor} for handling I/O events. If
133      * null {@link Executor} is provided, a default one will be created using
134      * {@link Executors#newCachedThreadPool()}.
135      * 
136      * {@see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)}
137      * 
138      * @param sessionConfig
139      *            the default configuration for the managed {@link IoSession}
140      * @param executor
141      *            the {@link Executor} used for handling asynchronous execution of I/O
142      *            events. Can be <code>null</code>.
143      * @param processor the {@link IoProcessor} for processing the {@link IoSession} of this transport, triggering 
144      *            events to the bound {@link IoHandler} and processing the chains of {@link IoFilter} 
145      */
146     protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Executor executor, IoProcessor<T> processor) {
147         this(sessionConfig, executor, processor, false);
148     }
149 
150     /**
151      * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
152      * session configuration and an {@link Executor} for handling I/O events. If
153      * null {@link Executor} is provided, a default one will be created using
154      * {@link Executors#newCachedThreadPool()}.
155      * 
156      * {@see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)}
157      * 
158      * @param sessionConfig
159      *            the default configuration for the managed {@link IoSession}
160      * @param executor
161      *            the {@link Executor} used for handling asynchronous execution of I/O
162      *            events. Can be <code>null</code>.
163      * @param processor the {@link IoProcessor} for processing the {@link IoSession} of this transport, triggering 
164      *            events to the bound {@link IoHandler} and processing the chains of {@link IoFilter}
165      * @param createdProcessor tagging the processor as automatically created, so it will be automatically disposed 
166      */
167     private AbstractPollingIoConnector(IoSessionConfig sessionConfig, Executor executor, IoProcessor<T> processor, boolean createdProcessor) {
168         super(sessionConfig, executor);
169 
170         if (processor == null) {
171             throw new IllegalArgumentException("processor");
172         }
173 
174         this.processor = processor;
175         this.createdProcessor = createdProcessor;
176 
177         try {
178             init();
179             selectable = true;
180         } catch (RuntimeException e){
181             throw e;
182         } catch (Exception e) {
183             throw new RuntimeIoException("Failed to initialize.", e);
184         } finally {
185             if (!selectable) {
186                 try {
187                     destroy();
188                 } catch (Exception e) {
189                     ExceptionMonitor.getInstance().exceptionCaught(e);
190                 }
191             }
192         }
193     }
194 
195     /**
196      * Initialize the polling system, will be called at construction time.
197      * @throws Exception any exception thrown by the underlying system calls  
198      */
199     protected abstract void init() throws Exception;
200 
201     /**
202      * Destroy the polling system, will be called when this {@link IoConnector}
203      * implementation will be disposed.  
204      * @throws Exception any exception thrown by the underlying systems calls
205      */
206     protected abstract void destroy() throws Exception;
207     
208     /**
209      * Create a new client socket handle from a local {@link SocketAddress}
210      * @param localAddress the socket address for binding the new client socket 
211      * @return a new client socket handle 
212      * @throws Exception any exception thrown by the underlying systems calls
213      */
214     protected abstract H newHandle(SocketAddress localAddress) throws Exception;
215     
216     /**
217      * Connect a newly created client socket handle to a remote {@link SocketAddress}.
218      * This operation is non-blocking, so at end of the call the socket can be still in connection
219      * process.
220      * @param handle the client socket handle
221      * @param remoteAddress the remote address where to connect
222      * @return <tt>true</tt> if a connection was established, <tt>false</tt> if this client socket 
223      *         is in non-blocking mode and the connection operation is in progress
224      * @throws Exception
225      */
226     protected abstract boolean connect(H handle, SocketAddress remoteAddress) throws Exception;
227     
228     /**
229      * Finish the connection process of a client socket after it was marked as ready to process
230      * by the {@link #select(int)} call. The socket will be connected or reported as connection
231      * failed.
232      * @param handle the client socket handle to finsh to connect
233      * @return true if the socket is connected
234      * @throws Exception any exception thrown by the underlying systems calls
235      */
236     protected abstract boolean finishConnect(H handle) throws Exception;
237     
238     /**
239      * Create a new {@link IoSession} from a connected socket client handle.
240      * Will assign the created {@link IoSession} to the given {@link IoProcessor} for
241      * managing future I/O events.
242      * @param processor the processor in charge of this session
243      * @param handle the newly connected client socket handle
244      * @return a new {@link IoSession}
245      * @throws Exception any exception thrown by the underlying systems calls
246      */
247     protected abstract T newSession(IoProcessor<T> processor, H handle) throws Exception;
248 
249     /**
250      * Close a client socket.
251      * @param handle the client socket
252      * @throws Exception any exception thrown by the underlying systems calls
253      */
254     protected abstract void close(H handle) throws Exception;
255     
256     /**
257      * Interrupt the {@link #select()} method. Used when the poll set need to be modified.
258      */
259     protected abstract void wakeup();
260     
261     /**
262      * Check for connected sockets, interrupt when at least a connection is processed (connected or
263      * failed to connect). All the client socket descriptors processed need to be returned by 
264      * {@link #selectedHandles()}
265      * @return The number of socket having received some data
266      * @throws Exception any exception thrown by the underlying systems calls
267      */
268     protected abstract int select(int timeout) throws Exception;
269     
270     /**
271      * {@link Iterator} for the set of client sockets found connected or 
272      * failed to connect during the last {@link #select()} call.
273      * @return the list of client socket handles to process
274      */
275     protected abstract Iterator<H> selectedHandles();
276     
277     /**
278      * {@link Iterator} for all the client sockets polled for connection.
279      * @return the list of client sockets currently polled for connection
280      */
281     protected abstract Iterator<H> allHandles();
282     
283     /**
284      * Register a new client socket for connection, add it to connection polling
285      * @param handle client socket handle 
286      * @param request the associated {@link ConnectionRequest}
287      * @throws Exception any exception thrown by the underlying systems calls
288      */
289     protected abstract void register(H handle, ConnectionRequest request) throws Exception;
290     
291     /**
292      * get the {@link ConnectionRequest} for a given client socket handle
293      * @param handle the socket client handle 
294      * @return the connection request if the socket is connecting otherwise <code>null</code>
295      */
296     protected abstract ConnectionRequest getConnectionRequest(H handle);
297 
298     /**
299      * {@inheritDoc}
300      */
301     @Override
302     protected final void dispose0() throws Exception {
303         startupWorker();
304         wakeup();
305     }
306 
307     /**
308      * {@inheritDoc}
309      */
310     @Override
311     @SuppressWarnings("unchecked")
312     protected final ConnectFuture connect0(
313             SocketAddress remoteAddress, SocketAddress localAddress,
314             IoSessionInitializer<? extends ConnectFuture> sessionInitializer) {
315         H handle = null;
316         boolean success = false;
317         try {
318             handle = newHandle(localAddress);
319             if (connect(handle, remoteAddress)) {
320                 ConnectFuture future = new DefaultConnectFuture();
321                 T session = newSession(processor, handle);
322                 initSession(session, future, sessionInitializer);
323                 // Forward the remaining process to the IoProcessor.
324                 session.getProcessor().add(session);
325                 success = true;
326                 return future;
327             }
328 
329             success = true;
330         } catch (Exception e) {
331             return DefaultConnectFuture.newFailedFuture(e);
332         } finally {
333             if (!success && handle != null) {
334                 try {
335                     close(handle);
336                 } catch (Exception e) {
337                     ExceptionMonitor.getInstance().exceptionCaught(e);
338                 }
339             }
340         }
341 
342         ConnectionRequest request = new ConnectionRequest(handle, sessionInitializer);
343         connectQueue.add(request);
344         startupWorker();
345         wakeup();
346 
347         return request;
348     }
349 
350     private void startupWorker() {
351         if (!selectable) {
352             connectQueue.clear();
353             cancelQueue.clear();
354         }
355 
356         synchronized (lock) {
357             if (connector == null) {
358                 connector = new Connector();
359                 executeWorker(connector);
360             }
361         }
362     }
363 
364     private int registerNew() {
365         int nHandles = 0;
366         for (; ;) {
367             ConnectionRequest req = connectQueue.poll();
368             if (req == null) {
369                 break;
370             }
371 
372             H handle = req.handle;
373             try {
374                 register(handle, req);
375                 nHandles ++;
376             } catch (Exception e) {
377                 req.setException(e);
378                 try {
379                     close(handle);
380                 } catch (Exception e2) {
381                     ExceptionMonitor.getInstance().exceptionCaught(e2);
382                 }
383             }
384         }
385         return nHandles;
386     }
387 
388     private int cancelKeys() {
389         int nHandles = 0;
390         for (; ;) {
391             ConnectionRequest req = cancelQueue.poll();
392             if (req == null) {
393                 break;
394             }
395 
396             H handle = req.handle;
397             try {
398                 close(handle);
399             } catch (Exception e) {
400                 ExceptionMonitor.getInstance().exceptionCaught(e);
401             } finally {
402                 nHandles ++;
403             }
404         }
405         return nHandles;
406     }
407 
408     /**
409      * Process the incoming connections, creating a new session for each
410      * valid connection. 
411      */
412     private int processConnections(Iterator<H> handlers) {
413         int nHandles = 0;
414         
415         // Loop on each connection request
416         while (handlers.hasNext()) {
417             H handle = handlers.next();
418             handlers.remove();
419 
420             ConnectionRequest connectionRequest = getConnectionRequest(handle);
421             
422             if ( connectionRequest == null) {
423                 continue;
424             }
425             
426             boolean success = false;
427             try {
428                 if (finishConnect(handle)) {
429                     T session = newSession(processor, handle);
430                     initSession(session, connectionRequest, connectionRequest.getSessionInitializer());
431                     // Forward the remaining process to the IoProcessor.
432                     session.getProcessor().add(session);
433                     nHandles ++;
434                 }
435                 success = true;
436             } catch (Throwable e) {
437                 connectionRequest.setException(e);
438             } finally {
439                 if (!success) {
440                     // The connection failed, we have to cancel it.
441                     cancelQueue.offer(connectionRequest);
442                 }
443             }
444         }
445         return nHandles;
446     }
447 
448     private void processTimedOutSessions(Iterator<H> handles) {
449         long currentTime = System.currentTimeMillis();
450 
451         while (handles.hasNext()) {
452             H handle = handles.next();
453             ConnectionRequest connectionRequest = getConnectionRequest(handle);
454 
455             if ((connectionRequest != null) && (currentTime >= connectionRequest.deadline)) {
456                 connectionRequest.setException(
457                         new ConnectException("Connection timed out."));
458                 cancelQueue.offer(connectionRequest);
459             }
460         }
461     }
462 
463     private class Connector implements Runnable {
464 
465         public void run() {
466             int nHandles = 0;
467             while (selectable) {
468                 try {
469                     // the timeout for select shall be smaller of the connect
470                     // timeout or 1 second...
471                     int timeout = (int)Math.min(getConnectTimeoutMillis(), 1000L);
472                     int selected = select(timeout);
473 
474                     nHandles += registerNew();
475 
476                     if (selected > 0) {
477                         nHandles -= processConnections(selectedHandles());
478                     }
479 
480                     processTimedOutSessions(allHandles());
481 
482                     nHandles -= cancelKeys();
483 
484                     if (nHandles == 0) {
485                         synchronized (lock) {
486                             if (connectQueue.isEmpty()) {
487                                 connector = null;
488                                 break;
489                             }
490                         }
491                     }
492                 } catch (ClosedSelectorException cse) {
493                     // If the selector has been closed, we can exit the loop
494                     break;
495                 } catch (Throwable e) {
496                     ExceptionMonitor.getInstance().exceptionCaught(e);
497 
498                     try {
499                         Thread.sleep(1000);
500                     } catch (InterruptedException e1) {
501                         ExceptionMonitor.getInstance().exceptionCaught(e1);
502                     }
503                 }
504             }
505 
506             if (selectable && isDisposing()) {
507                 selectable = false;
508                 try {
509                     if (createdProcessor) {
510                         processor.dispose();
511                     }
512                 } finally {
513                     try {
514                         synchronized (disposalLock) {
515                             if (isDisposing()) {
516                                 destroy();
517                             }
518                         }
519                     } catch (Exception e) {
520                         ExceptionMonitor.getInstance().exceptionCaught(e);
521                     } finally {
522                         disposalFuture.setDone();
523                     }
524                 }
525             }
526         }
527     }
528 
529     public final class ConnectionRequest extends DefaultConnectFuture {
530         private final H handle;
531         private final long deadline;
532         private final IoSessionInitializer<? extends ConnectFuture> sessionInitializer;
533 
534         public ConnectionRequest(H handle, IoSessionInitializer<? extends ConnectFuture> callback) {
535             this.handle = handle;
536             long timeout = getConnectTimeoutMillis();
537             if (timeout <= 0L) {
538                 this.deadline = Long.MAX_VALUE;
539             } else {
540                 this.deadline = System.currentTimeMillis() + timeout;
541             }
542             this.sessionInitializer = callback;
543         }
544 
545         public H getHandle() {
546             return handle;
547         }
548 
549         public long getDeadline() {
550             return deadline;
551         }
552 
553         public IoSessionInitializer<? extends ConnectFuture> getSessionInitializer() {
554             return sessionInitializer;
555         }
556 
557         @Override
558         public void cancel() {
559             if ( !isDone() ) {
560                 super.cancel();
561                 cancelQueue.add(this);
562                 startupWorker();
563                 wakeup();
564             }
565         }
566     }
567 }