View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.common;
21  
22  import java.net.SocketAddress;
23  import java.util.Collections;
24  import java.util.HashMap;
25  import java.util.HashSet;
26  import java.util.Iterator;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.Queue;
30  import java.util.Set;
31  import java.util.concurrent.ConcurrentLinkedQueue;
32  import java.util.concurrent.Executor;
33  import java.util.concurrent.ExecutorService;
34  import java.util.concurrent.LinkedBlockingQueue;
35  import java.util.concurrent.RejectedExecutionException;
36  import java.util.concurrent.ThreadPoolExecutor;
37  import java.util.concurrent.TimeUnit;
38  import java.util.concurrent.atomic.AtomicInteger;
39  
40  import org.apache.mina.util.NamePreservingRunnable;
41  
42  /**
43   * @author The Apache MINA Project (dev@mina.apache.org)
44   * @version $Rev: 605069 $, $Date: 2007-12-17 19:47:03 -0700 (Mon, 17 Dec 2007) $
45   */
46  public abstract class AbstractPollingIoAcceptor<T extends AbstractIoSession, H>
47          extends AbstractIoAcceptor {
48  
49      private static final AtomicInteger id = new AtomicInteger();
50  
51      private final Executor executor;
52      private final boolean createdExecutor;
53      private final String threadName;
54      private final IoProcessor<T> processor;
55      private final boolean createdProcessor;
56  
57      private final Object lock = new Object();
58  
59      private final Queue<AcceptorOperationFuture> registerQueue =
60          new ConcurrentLinkedQueue<AcceptorOperationFuture>();
61      private final Queue<AcceptorOperationFuture> cancelQueue =
62          new ConcurrentLinkedQueue<AcceptorOperationFuture>();
63  
64      private final Map<SocketAddress, H> boundHandles =
65          Collections.synchronizedMap(new HashMap<SocketAddress, H>());
66      
67      private final ServiceOperationFuture disposalFuture =
68          new ServiceOperationFuture();
69      private volatile boolean selectable;
70      private Worker worker;
71  
72      /**
73       * Create an acceptor with a single processing thread using a NewThreadExecutor
74       */
75      protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<T>> processorClass) {
76          this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass), true);
77      }
78  
79      protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<T>> processorClass, int processorCount) {
80          this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass, processorCount), true);
81      }
82      
83      protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, IoProcessor<T> processor) {
84          this(sessionConfig, null, processor, false);
85      }
86  
87      protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor<T> processor) {
88          this(sessionConfig, executor, processor, false);
89      }
90  
91      private AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor<T> processor, boolean createdProcessor) {
92          super(sessionConfig);
93          
94          if (processor == null) {
95              throw new NullPointerException("processor");
96          }
97          
98          if (executor == null) {
99              this.executor = new ThreadPoolExecutor(
100                     1, 1, 1L, TimeUnit.SECONDS,
101                     new LinkedBlockingQueue<Runnable>());
102             this.createdExecutor = true;
103         } else {
104             this.executor = executor;
105             this.createdExecutor = false;
106         }
107 
108         this.threadName = getClass().getSimpleName() + '-' + id.incrementAndGet();
109         this.processor = processor;
110         this.createdProcessor = createdProcessor;
111         
112         try {
113             init();
114             selectable = true;
115         } catch (RuntimeException e){
116             throw e;
117         } catch (Exception e) {
118             throw new RuntimeIoException("Failed to initialize.", e);
119         } finally {
120             if (!selectable) {
121                 try {
122                     destroy();
123                 } catch (Exception e) {
124                     ExceptionMonitor.getInstance().exceptionCaught(e);
125                 }
126             }
127         }
128     }
129 
130     protected abstract void init() throws Exception;
131     protected abstract void destroy() throws Exception;
132     protected abstract boolean select() throws Exception;
133     protected abstract void wakeup();
134     protected abstract Iterator<H> selectedHandles();
135     protected abstract H open(SocketAddress localAddress) throws Exception;
136     protected abstract SocketAddress localAddress(H handle) throws Exception;
137     protected abstract T accept(IoProcessor<T> processor, H handle) throws Exception;
138     protected abstract void close(H handle) throws Exception;
139 
140     @Override
141     protected IoFuture dispose0() throws Exception {
142         unbind();
143         if (!disposalFuture.isDone()) {
144             try {
145                 startupWorker();
146                 wakeup();
147             } catch (RejectedExecutionException e) {
148                 if (createdExecutor) {
149                     // Ignore.
150                 } else {
151                     throw e;
152                 }
153             }
154         }
155         return disposalFuture;
156     }
157 
158     @Override
159     protected final Set<SocketAddress> bind0(List<? extends SocketAddress> localAddresses) throws Exception {
160         AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
161 
162         // adds the Registration request to the queue for the Workers
163         // to handle
164         registerQueue.add(request);
165 
166         // creates an instance of a Worker and has the local
167         // executor kick it off.
168         startupWorker();
169         wakeup();
170         request.awaitUninterruptibly();
171 
172         if (request.getException() != null) {
173             throw request.getException();
174         }
175 
176         // Update the local addresses.
177         // setLocalAddresses() shouldn't be called from the worker thread
178         // because of deadlock.
179         Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();
180         for (H handle: boundHandles.values()) {
181             newLocalAddresses.add(localAddress(handle));
182         }
183 
184         return newLocalAddresses;
185     }
186 
187     /**
188      * This method is called by the doBind() and doUnbind()
189      * methods.  If the worker object is not null, presumably
190      * the acceptor is starting up, then the worker object will
191      * be created and kicked off by the executor.  If the worker
192      * object is not null, probably already created and this class
193      * is now working, then nothing will happen and the method
194      * will just return.
195      */
196     private void startupWorker() {
197         if (!selectable) {
198             registerQueue.clear();
199             cancelQueue.clear();
200         }
201 
202         synchronized (lock) {
203             if (worker == null) {
204                 worker = new Worker();
205                 executor.execute(new NamePreservingRunnable(worker, threadName));
206             }
207         }
208     }
209 
210     @Override
211     protected final void unbind0(List<? extends SocketAddress> localAddresses) throws Exception {
212         AcceptorOperationFuture future = new AcceptorOperationFuture(localAddresses);
213 
214         cancelQueue.add(future);
215         startupWorker();
216         wakeup();
217 
218         future.awaitUninterruptibly();
219         if (future.getException() != null) {
220             throw future.getException();
221         }
222     }
223 
224     /**
225      * This class is called by the startupWorker() method and is
226      * placed into a NamePreservingRunnable class.
227      */
228     private class Worker implements Runnable {
229         public void run() {
230             int nHandles = 0;
231             
232             while (selectable) {
233                 try {
234                     // gets the number of keys that are ready to go
235                     boolean selected = select();
236 
237                     // this actually sets the selector to OP_ACCEPT,
238                     // and binds to the port in which this class will
239                     // listen on
240                     nHandles += registerHandles();
241 
242                     if (selected) {
243                         processHandles(selectedHandles());
244                     }
245 
246                     // check to see if any cancellation request has been made.
247                     nHandles -= unregisterHandles();
248 
249                     if (nHandles == 0) {
250                         synchronized (lock) {
251                             if (registerQueue.isEmpty() &&
252                                 cancelQueue.isEmpty()) {
253                                 worker = null;
254                                 break;
255                             }
256                         }
257                     }
258                 } catch (Throwable e) {
259                     ExceptionMonitor.getInstance().exceptionCaught(e);
260 
261                     try {
262                         Thread.sleep(1000);
263                     } catch (InterruptedException e1) {
264                         ExceptionMonitor.getInstance().exceptionCaught(e1);
265                     }
266                 }
267             }
268             
269             if (selectable && isDisposing()) {
270                 selectable = false;
271                 try {
272                     if (createdProcessor) {
273                         processor.dispose();
274                     }
275                 } finally {
276                     try {
277                         destroy();
278                     } catch (Exception e) {
279                         ExceptionMonitor.getInstance().exceptionCaught(e);
280                     } finally {
281                         disposalFuture.setDone();
282                         if (createdExecutor) {
283                             ((ExecutorService) executor).shutdown();
284                         }
285                     }
286                 }
287             }
288         }
289 
290         /**
291          * This method will process new sessions for the Worker class.  All
292          * keys that have had their status updates as per the Selector.selectedKeys()
293          * method will be processed here.  Only keys that are ready to accept
294          * connections are handled here.
295          * <p/>
296          * Session objects are created by making new instances of SocketSessionImpl
297          * and passing the session object to the SocketIoProcessor class.
298          */
299         @SuppressWarnings("unchecked")
300         private void processHandles(Iterator<H> handles) throws Exception {
301             while (handles.hasNext()) {
302                 H handle = handles.next();
303                 handles.remove();
304 
305                 T session = accept(processor, handle);
306                 if (session == null) {
307                     break;
308                 }
309                 
310                 finishSessionInitialization(session, null, null);
311 
312                 // add the session to the SocketIoProcessor
313                 session.getProcessor().add(session);
314             }
315         }
316     }
317 
318     /**
319      * Sets up the socket communications.  Sets items such as:
320      * <p/>
321      * Blocking
322      * Reuse address
323      * Receive buffer size
324      * Bind to listen port
325      * Registers OP_ACCEPT for selector
326      */
327     private int registerHandles() {
328         for (;;) {
329             AcceptorOperationFuture future = registerQueue.poll();
330             if (future == null) {
331                 break;
332             }
333             
334             Map<SocketAddress, H> newHandles = new HashMap<SocketAddress, H>();
335             List<SocketAddress> localAddresses = future.getLocalAddresses();
336             
337             try {
338                 for (SocketAddress a: localAddresses) {
339                     H handle = open(a);
340                     newHandles.put(localAddress(handle), handle);
341                 }
342                 
343                 boundHandles.putAll(newHandles);
344 
345                 // and notify.
346                 future.setDone();
347                 return newHandles.size();
348             } catch (Exception e) {
349                 future.setException(e);
350             } finally {
351                 // Roll back if failed to bind all addresses.
352                 if (future.getException() != null) {
353                     for (H handle: newHandles.values()) {
354                         try {
355                             close(handle);
356                         } catch (Exception e) {
357                             ExceptionMonitor.getInstance().exceptionCaught(e);
358                         }
359                     }
360                     wakeup();
361                 }
362             }
363         }
364         
365         return 0;
366     }
367 
368     /**
369      * This method just checks to see if anything has been placed into the
370      * cancellation queue.  The only thing that should be in the cancelQueue
371      * is CancellationRequest objects and the only place this happens is in
372      * the doUnbind() method.
373      */
374     private int unregisterHandles() {
375         int cancelledHandles = 0;
376         for (; ;) {
377             AcceptorOperationFuture future = cancelQueue.poll();
378             if (future == null) {
379                 break;
380             }
381             
382             // close the channels
383             for (SocketAddress a: future.getLocalAddresses()) {
384                 H handle = boundHandles.remove(a);
385                 if (handle == null) {
386                     continue;
387                 }
388 
389                 try {
390                     close(handle);
391                     wakeup(); // wake up again to trigger thread death
392                 } catch (Throwable e) {
393                     ExceptionMonitor.getInstance().exceptionCaught(e);
394                 } finally {
395                     cancelledHandles ++;
396                 }
397             }
398             
399             future.setDone();
400         }
401         
402         return cancelledHandles;
403     }
404 
405     public final IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) {
406         throw new UnsupportedOperationException();
407     }
408 }