View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.polling;
21  
22  import java.net.SocketAddress;
23  import java.nio.channels.ClosedSelectorException;
24  import java.nio.channels.spi.SelectorProvider;
25  import java.util.Collections;
26  import java.util.HashMap;
27  import java.util.HashSet;
28  import java.util.Iterator;
29  import java.util.List;
30  import java.util.Map;
31  import java.util.Queue;
32  import java.util.Set;
33  import java.util.concurrent.ConcurrentHashMap;
34  import java.util.concurrent.ConcurrentLinkedQueue;
35  import java.util.concurrent.Executor;
36  import java.util.concurrent.Executors;
37  import java.util.concurrent.Semaphore;
38  import java.util.concurrent.atomic.AtomicReference;
39  
40  import org.apache.mina.core.RuntimeIoException;
41  import org.apache.mina.core.filterchain.IoFilter;
42  import org.apache.mina.core.service.AbstractIoAcceptor;
43  import org.apache.mina.core.service.IoAcceptor;
44  import org.apache.mina.core.service.IoHandler;
45  import org.apache.mina.core.service.IoProcessor;
46  import org.apache.mina.core.service.SimpleIoProcessorPool;
47  import org.apache.mina.core.session.AbstractIoSession;
48  import org.apache.mina.core.session.IoSession;
49  import org.apache.mina.core.session.IoSessionConfig;
50  import org.apache.mina.transport.socket.SocketSessionConfig;
51  import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
52  import org.apache.mina.util.ExceptionMonitor;
53  
54  /**
55   * A base class for implementing transport using a polling strategy. The
56   * underlying sockets will be checked in an active loop and woke up when an
57   * socket needed to be processed. This class handle the logic behind binding,
58   * accepting and disposing the server sockets. An {@link Executor} will be used
59   * for running client accepting and an {@link AbstractPollingIoProcessor} will
60   * be used for processing client I/O operations like reading, writing and
61   * closing.
62   * 
63   * All the low level methods for binding, accepting, closing need to be provided
64   * by the subclassing implementation.
65   * 
66   * @see NioSocketAcceptor for a example of implementation
67   * 
68   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
69   */
70  public abstract class AbstractPollingIoAcceptor<S extends AbstractIoSession, H> extends AbstractIoAcceptor {
71      /** A lock used to protect the selector to be waked up before it's created */
72      private final Semaphore lock = new Semaphore(1);
73  
74      private final IoProcessor<S> processor;
75  
76      private final boolean createdProcessor;
77  
78      private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
79  
80      private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
81  
82      private final Map<SocketAddress, H> boundHandles = Collections.synchronizedMap(new HashMap<SocketAddress, H>());
83  
84      private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture();
85  
86      /** A flag set when the acceptor has been created and initialized */
87      private volatile boolean selectable;
88  
89      /** The thread responsible of accepting incoming requests */
90      private AtomicReference<Acceptor> acceptorRef = new AtomicReference<Acceptor>();
91  
92      protected boolean reuseAddress = false;
93  
94      /**
95       * Define the number of socket that can wait to be accepted. Default
96       * to 50 (as in the SocketServer default).
97       */
98      protected int backlog = 50;
99  
100     /**
101      * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
102      * session configuration, a class of {@link IoProcessor} which will be instantiated in a
103      * {@link SimpleIoProcessorPool} for better scaling in multiprocessor systems. The default
104      * pool size will be used.
105      * 
106      * @see SimpleIoProcessorPool
107      * 
108      * @param sessionConfig
109      *            the default configuration for the managed {@link IoSession}
110      * @param processorClass a {@link Class} of {@link IoProcessor} for the associated {@link IoSession}
111      *            type.
112      */
113     protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass) {
114         this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass), true, null);
115     }
116 
117     /**
118      * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
119      * session configuration, a class of {@link IoProcessor} which will be instantiated in a
120      * {@link SimpleIoProcessorPool} for using multiple thread for better scaling in multiprocessor
121      * systems.
122      * 
123      * @see SimpleIoProcessorPool
124      * 
125      * @param sessionConfig
126      *            the default configuration for the managed {@link IoSession}
127      * @param processorClass a {@link Class} of {@link IoProcessor} for the associated {@link IoSession}
128      *            type.
129      * @param processorCount the amount of processor to instantiate for the pool
130      */
131     protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass,
132             int processorCount) {
133         this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass, processorCount), true, null);
134     }
135 
136     /**
137      * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
138      * session configuration, a class of {@link IoProcessor} which will be instantiated in a
139      * {@link SimpleIoProcessorPool} for using multiple thread for better scaling in multiprocessor
140      * systems.
141      *
142      * @see SimpleIoProcessorPool
143      *
144      * @param sessionConfig
145      *            the default configuration for the managed {@link IoSession}
146      * @param processorClass a {@link Class}�of {@link IoProcessor} for the associated {@link IoSession}
147      *            type.
148      * @param processorCount the amount of processor to instantiate for the pool
149      * @param selectorProvider The SelectorProvider to use
150      */
151     protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass,
152             int processorCount, SelectorProvider selectorProvider ) {
153         this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass, processorCount, selectorProvider), true, selectorProvider);
154     }
155 
156     /**
157      * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
158      * session configuration, a default {@link Executor} will be created using
159      * {@link Executors#newCachedThreadPool()}.
160      * 
161      * {@see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)}
162      * 
163      * @param sessionConfig
164      *            the default configuration for the managed {@link IoSession}
165      * @param processor the {@link IoProcessor} for processing the {@link IoSession} of this transport, triggering
166      *            events to the bound {@link IoHandler} and processing the chains of {@link IoFilter}
167      */
168     protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, IoProcessor<S> processor) {
169         this(sessionConfig, null, processor, false, null);
170     }
171 
172     /**
173      * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a
174      * default session configuration and an {@link Executor} for handling I/O
175      * events. If a null {@link Executor} is provided, a default one will be
176      * created using {@link Executors#newCachedThreadPool()}.
177      * 
178      * @see AbstractIoService(IoSessionConfig, Executor)
179      * 
180      * @param sessionConfig
181      *            the default configuration for the managed {@link IoSession}
182      * @param executor
183      *            the {@link Executor} used for handling asynchronous execution
184      *            of I/O events. Can be <code>null</code>.
185      * @param processor
186      *            the {@link IoProcessor} for processing the {@link IoSession}
187      *            of this transport, triggering events to the bound
188      *            {@link IoHandler} and processing the chains of
189      *            {@link IoFilter}
190      */
191     protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor<S> processor) {
192         this(sessionConfig, executor, processor, false, null);
193     }
194 
195     /**
196      * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a
197      * default session configuration and an {@link Executor} for handling I/O
198      * events. If a null {@link Executor} is provided, a default one will be
199      * created using {@link Executors#newCachedThreadPool()}.
200      * 
201      * @see AbstractIoService(IoSessionConfig, Executor)
202      * 
203      * @param sessionConfig
204      *            the default configuration for the managed {@link IoSession}
205      * @param executor
206      *            the {@link Executor} used for handling asynchronous execution
207      *            of I/O events. Can be <code>null</code>.
208      * @param processor
209      *            the {@link IoProcessor} for processing the {@link IoSession}
210      *            of this transport, triggering events to the bound
211      *            {@link IoHandler} and processing the chains of
212      *            {@link IoFilter}
213      * @param createdProcessor
214      *            tagging the processor as automatically created, so it will be
215      *            automatically disposed
216      */
217     private AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor<S> processor,
218             boolean createdProcessor, SelectorProvider selectorProvider) {
219         super(sessionConfig, executor);
220 
221         if (processor == null) {
222             throw new IllegalArgumentException("processor");
223         }
224 
225         this.processor = processor;
226         this.createdProcessor = createdProcessor;
227 
228         try {
229             // Initialize the selector
230             init(selectorProvider);
231 
232             // The selector is now ready, we can switch the
233             // flag to true so that incoming connection can be accepted
234             selectable = true;
235         } catch (RuntimeException e) {
236             throw e;
237         } catch (Exception e) {
238             throw new RuntimeIoException("Failed to initialize.", e);
239         } finally {
240             if (!selectable) {
241                 try {
242                     destroy();
243                 } catch (Exception e) {
244                     ExceptionMonitor.getInstance().exceptionCaught(e);
245                 }
246             }
247         }
248     }
249 
250     /**
251      * Initialize the polling system, will be called at construction time.
252      * @throws Exception any exception thrown by the underlying system calls
253      */
254     protected abstract void init() throws Exception;
255 
256     /**
257      * Initialize the polling system, will be called at construction time.
258      * @throws Exception any exception thrown by the underlying system calls
259      */
260     protected abstract void init(SelectorProvider selectorProvider) throws Exception;
261 
262     /**
263      * Destroy the polling system, will be called when this {@link IoAcceptor}
264      * implementation will be disposed.
265      * @throws Exception any exception thrown by the underlying systems calls
266      */
267     protected abstract void destroy() throws Exception;
268 
269     /**
270      * Check for acceptable connections, interrupt when at least a server is ready for accepting.
271      * All the ready server socket descriptors need to be returned by {@link #selectedHandles()}
272      * @return The number of sockets having got incoming client
273      * @throws Exception any exception thrown by the underlying systems calls
274      */
275     protected abstract int select() throws Exception;
276 
277     /**
278      * Interrupt the {@link #select()} method. Used when the poll set need to be modified.
279      */
280     protected abstract void wakeup();
281 
282     /**
283      * {@link Iterator} for the set of server sockets found with acceptable incoming connections
284      *  during the last {@link #select()} call.
285      * @return the list of server handles ready
286      */
287     protected abstract Iterator<H> selectedHandles();
288 
289     /**
290      * Open a server socket for a given local address.
291      * @param localAddress the associated local address
292      * @return the opened server socket
293      * @throws Exception any exception thrown by the underlying systems calls
294      */
295     protected abstract H open(SocketAddress localAddress) throws Exception;
296 
297     /**
298      * Get the local address associated with a given server socket
299      * @param handle the server socket
300      * @return the local {@link SocketAddress} associated with this handle
301      * @throws Exception any exception thrown by the underlying systems calls
302      */
303     protected abstract SocketAddress localAddress(H handle) throws Exception;
304 
305     /**
306      * Accept a client connection for a server socket and return a new {@link IoSession}
307      * associated with the given {@link IoProcessor}
308      * @param processor the {@link IoProcessor} to associate with the {@link IoSession}
309      * @param handle the server handle
310      * @return the created {@link IoSession}
311      * @throws Exception any exception thrown by the underlying systems calls
312      */
313     protected abstract S accept(IoProcessor<S> processor, H handle) throws Exception;
314 
315     /**
316      * Close a server socket.
317      * @param handle the server socket
318      * @throws Exception any exception thrown by the underlying systems calls
319      */
320     protected abstract void close(H handle) throws Exception;
321 
322     /**
323      * {@inheritDoc}
324      */
325     @Override
326     protected void dispose0() throws Exception {
327         unbind();
328 
329         startupAcceptor();
330         wakeup();
331     }
332 
333     /**
334      * {@inheritDoc}
335      */
336     @Override
337     protected final Set<SocketAddress> bindInternal(List<? extends SocketAddress> localAddresses) throws Exception {
338         // Create a bind request as a Future operation. When the selector
339         // have handled the registration, it will signal this future.
340         AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
341 
342         // adds the Registration request to the queue for the Workers
343         // to handle
344         registerQueue.add(request);
345 
346         // creates the Acceptor instance and has the local
347         // executor kick it off.
348         startupAcceptor();
349 
350         // As we just started the acceptor, we have to unblock the select()
351         // in order to process the bind request we just have added to the
352         // registerQueue.
353         try {
354             lock.acquire();
355 
356             // Wait a bit to give a chance to the Acceptor thread to do the select()
357             Thread.sleep(10);
358             wakeup();
359         } finally {
360             lock.release();
361         }
362 
363         // Now, we wait until this request is completed.
364         request.awaitUninterruptibly();
365 
366         if (request.getException() != null) {
367             throw request.getException();
368         }
369 
370         // Update the local addresses.
371         // setLocalAddresses() shouldn't be called from the worker thread
372         // because of deadlock.
373         Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();
374 
375         for (H handle : boundHandles.values()) {
376             newLocalAddresses.add(localAddress(handle));
377         }
378 
379         return newLocalAddresses;
380     }
381 
382     /**
383      * This method is called by the doBind() and doUnbind()
384      * methods.  If the acceptor is null, the acceptor object will
385      * be created and kicked off by the executor.  If the acceptor
386      * object is null, probably already created and this class
387      * is now working, then nothing will happen and the method
388      * will just return.
389      */
390     private void startupAcceptor() throws InterruptedException {
391         // If the acceptor is not ready, clear the queues
392         // TODO : they should already be clean : do we have to do that ?
393         if (!selectable) {
394             registerQueue.clear();
395             cancelQueue.clear();
396         }
397 
398         // start the acceptor if not already started
399         Acceptor acceptor = acceptorRef.get();
400 
401         if (acceptor == null) {
402             lock.acquire();
403             acceptor = new Acceptor();
404 
405             if (acceptorRef.compareAndSet(null, acceptor)) {
406                 executeWorker(acceptor);
407             } else {
408                 lock.release();
409             }
410         }
411     }
412 
413     /**
414      * {@inheritDoc}
415      */
416     @Override
417     protected final void unbind0(List<? extends SocketAddress> localAddresses) throws Exception {
418         AcceptorOperationFuture future = new AcceptorOperationFuture(localAddresses);
419 
420         cancelQueue.add(future);
421         startupAcceptor();
422         wakeup();
423 
424         future.awaitUninterruptibly();
425         if (future.getException() != null) {
426             throw future.getException();
427         }
428     }
429 
430     /**
431      * This class is called by the startupAcceptor() method and is
432      * placed into a NamePreservingRunnable class.
433      * It's a thread accepting incoming connections from clients.
434      * The loop is stopped when all the bound handlers are unbound.
435      */
436     private class Acceptor implements Runnable {
437         public void run() {
438             assert (acceptorRef.get() == this);
439 
440             int nHandles = 0;
441 
442             // Release the lock
443             lock.release();
444 
445             while (selectable) {
446                 try {
447                     // Detect if we have some keys ready to be processed
448                     // The select() will be woke up if some new connection
449                     // have occurred, or if the selector has been explicitly
450                     // woke up
451                     int selected = select();
452 
453                     // this actually sets the selector to OP_ACCEPT,
454                     // and binds to the port on which this class will
455                     // listen on
456                     nHandles += registerHandles();
457 
458                     // Now, if the number of registred handles is 0, we can
459                     // quit the loop: we don't have any socket listening
460                     // for incoming connection.
461                     if (nHandles == 0) {
462                         acceptorRef.set(null);
463 
464                         if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
465                             assert (acceptorRef.get() != this);
466                             break;
467                         }
468 
469                         if (!acceptorRef.compareAndSet(null, this)) {
470                             assert (acceptorRef.get() != this);
471                             break;
472                         }
473 
474                         assert (acceptorRef.get() == this);
475                     }
476 
477                     if (selected > 0) {
478                         // We have some connection request, let's process
479                         // them here.
480                         processHandles(selectedHandles());
481                     }
482 
483                     // check to see if any cancellation request has been made.
484                     nHandles -= unregisterHandles();
485                 } catch (ClosedSelectorException cse) {
486                     // If the selector has been closed, we can exit the loop
487                     ExceptionMonitor.getInstance().exceptionCaught(cse);
488                     break;
489                 } catch (Exception e) {
490                     ExceptionMonitor.getInstance().exceptionCaught(e);
491 
492                     try {
493                         Thread.sleep(1000);
494                     } catch (InterruptedException e1) {
495                         ExceptionMonitor.getInstance().exceptionCaught(e1);
496                     }
497                 }
498             }
499 
500             // Cleanup all the processors, and shutdown the acceptor.
501             if (selectable && isDisposing()) {
502                 selectable = false;
503                 try {
504                     if (createdProcessor) {
505                         processor.dispose();
506                     }
507                 } finally {
508                     try {
509                         synchronized (disposalLock) {
510                             if (isDisposing()) {
511                                 destroy();
512                             }
513                         }
514                     } catch (Exception e) {
515                         ExceptionMonitor.getInstance().exceptionCaught(e);
516                     } finally {
517                         disposalFuture.setDone();
518                     }
519                 }
520             }
521         }
522 
523         /**
524          * This method will process new sessions for the Worker class.  All
525          * keys that have had their status updates as per the Selector.selectedKeys()
526          * method will be processed here.  Only keys that are ready to accept
527          * connections are handled here.
528          * <p/>
529          * Session objects are created by making new instances of SocketSessionImpl
530          * and passing the session object to the SocketIoProcessor class.
531          */
532         @SuppressWarnings("unchecked")
533         private void processHandles(Iterator<H> handles) throws Exception {
534             while (handles.hasNext()) {
535                 H handle = handles.next();
536                 handles.remove();
537 
538                 // Associates a new created connection to a processor,
539                 // and get back a session
540                 S session = accept(processor, handle);
541 
542                 if (session == null) {
543                     continue;
544                 }
545 
546                 initSession(session, null, null);
547 
548                 // add the session to the SocketIoProcessor
549                 session.getProcessor().add(session);
550             }
551         }
552     }
553 
554     /**
555      * Sets up the socket communications.  Sets items such as:
556      * <p/>
557      * Blocking
558      * Reuse address
559      * Receive buffer size
560      * Bind to listen port
561      * Registers OP_ACCEPT for selector
562      */
563     private int registerHandles() {
564         for (;;) {
565             // The register queue contains the list of services to manage
566             // in this acceptor.
567             AcceptorOperationFuture future = registerQueue.poll();
568 
569             if (future == null) {
570                 return 0;
571             }
572 
573             // We create a temporary map to store the bound handles,
574             // as we may have to remove them all if there is an exception
575             // during the sockets opening.
576             Map<SocketAddress, H> newHandles = new ConcurrentHashMap<SocketAddress, H>();
577             List<SocketAddress> localAddresses = future.getLocalAddresses();
578 
579             try {
580                 // Process all the addresses
581                 for (SocketAddress a : localAddresses) {
582                     H handle = open(a);
583                     newHandles.put(localAddress(handle), handle);
584                 }
585 
586                 // Everything went ok, we can now update the map storing
587                 // all the bound sockets.
588                 boundHandles.putAll(newHandles);
589 
590                 // and notify.
591                 future.setDone();
592                 return newHandles.size();
593             } catch (Exception e) {
594                 // We store the exception in the future
595                 future.setException(e);
596             } finally {
597                 // Roll back if failed to bind all addresses.
598                 if (future.getException() != null) {
599                     for (H handle : newHandles.values()) {
600                         try {
601                             close(handle);
602                         } catch (Exception e) {
603                             ExceptionMonitor.getInstance().exceptionCaught(e);
604                         }
605                     }
606 
607                     // TODO : add some comment : what is the wakeup() waking up ?
608                     wakeup();
609                 }
610             }
611         }
612     }
613 
614     /**
615      * This method just checks to see if anything has been placed into the
616      * cancellation queue.  The only thing that should be in the cancelQueue
617      * is CancellationRequest objects and the only place this happens is in
618      * the doUnbind() method.
619      */
620     private int unregisterHandles() {
621         int cancelledHandles = 0;
622         for (;;) {
623             AcceptorOperationFuture future = cancelQueue.poll();
624             if (future == null) {
625                 break;
626             }
627 
628             // close the channels
629             for (SocketAddress a : future.getLocalAddresses()) {
630                 H handle = boundHandles.remove(a);
631 
632                 if (handle == null) {
633                     continue;
634                 }
635 
636                 try {
637                     close(handle);
638                     wakeup(); // wake up again to trigger thread death
639                 } catch (Exception e) {
640                     ExceptionMonitor.getInstance().exceptionCaught(e);
641                 } finally {
642                     cancelledHandles++;
643                 }
644             }
645 
646             future.setDone();
647         }
648 
649         return cancelledHandles;
650     }
651 
652     /**
653      * {@inheritDoc}
654      */
655     public final IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) {
656         throw new UnsupportedOperationException();
657     }
658 
659     /**
660      * @return the backLog
661      */
662     public int getBacklog() {
663         return backlog;
664     }
665 
666     /**
667      * Sets the Backlog parameter
668      * 
669      * @param backlog
670      *            the backlog variable
671      */
672     public void setBacklog(int backlog) {
673         synchronized (bindLock) {
674             if (isActive()) {
675                 throw new IllegalStateException("backlog can't be set while the acceptor is bound.");
676             }
677 
678             this.backlog = backlog;
679         }
680     }
681 
682     /**
683      * @return the flag that sets the reuseAddress information
684      */
685     public boolean isReuseAddress() {
686         return reuseAddress;
687     }
688 
689     /**
690      * Set the Reuse Address flag
691      * 
692      * @param reuseAddress
693      *            The flag to set
694      */
695     public void setReuseAddress(boolean reuseAddress) {
696         synchronized (bindLock) {
697             if (isActive()) {
698                 throw new IllegalStateException("backlog can't be set while the acceptor is bound.");
699             }
700 
701             this.reuseAddress = reuseAddress;
702         }
703     }
704 
705     /**
706      * {@inheritDoc}
707      */
708     public SocketSessionConfig getSessionConfig() {
709         return (SocketSessionConfig)sessionConfig;
710     }
711 }