View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.common;
21  
22  import java.util.concurrent.Executor;
23  import java.util.concurrent.ExecutorService;
24  import java.util.concurrent.Executors;
25  import java.util.concurrent.atomic.AtomicInteger;
26  
27  import org.slf4j.Logger;
28  import org.slf4j.LoggerFactory;
29  
30  /**
31   * An {@link IoProcessor} pool that distributes {@link IoSession}s into one or more
32   * {@link IoProcessor}s. Most current transport implementations use this pool internally
33   * to perform better in a multi-core environment, and therefore, you won't need to 
34   * use this pool directly unless you are running multiple {@link IoService}s in the
35   * same JVM.
36   * <p>
37   * If you are running multiple {@link IoService}s, you could want to share the pool
38   * among all services.  To do so, you can create a new {@link SimpleIoProcessorPool}
39   * instance by yourself and provide the pool as a constructor parameter when you
40   * create the services.
41   * <p>
42   * This pool uses Java reflection API to create multiple {@link IoProcessor} instances.
43   * It tries to instantiate the processor in the following order:
44   * <ol>
45   * <li>A public constructor with one {@link ExecutorService} parameter.</li>
46   * <li>A public constructor with one {@link Executor} parameter.</li>
47   * <li>A public default constructor</li>
48   * </ol>
49   * The following is an example for the NIO socket transport:
50   * <pre><code>
51   * // Create a shared pool.
52   * SimpleIoProcessorPool&lt;NioSession&gt; pool = 
53   *         new SimpleIoProcessorPool&lt;NioSession&gt;(NioProcessor.class, 16);
54   * 
55   * // Create two services that share the same pool.
56   * SocketAcceptor acceptor = new NioSocketAcceptor(pool);
57   * SocketConnector connector = new NioSocketConnector(pool);
58   * 
59   * ...
60   * 
61   * // Release related resources.
62   * connector.dispose();
63   * acceptor.dispose();
64   * pool.dispose();
65   * </code></pre>
66   * 
67   * @author The Apache MINA Project (dev@mina.apache.org)
68   * @version $Rev: 609876 $, $Date: 2008-01-07 22:40:38 -0700 (Mon, 07 Jan 2008) $
69   * 
70   * @param <T> the type of the {@link IoSession} to be managed by the specified
71   *            {@link IoProcessor}.
72   */
73  public class SimpleIoProcessorPool<T extends AbstractIoSession> implements IoProcessor<T> {
74      
75      private static final int DEFAULT_SIZE = Runtime.getRuntime().availableProcessors() + 1;
76      private static final AttributeKey PROCESSOR = new AttributeKey(SimpleIoProcessorPool.class, "processor");
77      
78      private final Logger logger = LoggerFactory.getLogger(getClass());
79  
80      private final IoProcessor<T>[] pool;
81      private final AtomicInteger processorDistributor = new AtomicInteger();
82      private final Executor executor;
83      private final boolean createdExecutor;
84      
85      private final Object disposalLock = new Object();
86      private volatile boolean disposing;
87      private volatile boolean disposed;
88      
89      public SimpleIoProcessorPool(Class<? extends IoProcessor<T>> processorType) {
90          this(processorType, null, DEFAULT_SIZE);
91      }
92      
93      public SimpleIoProcessorPool(Class<? extends IoProcessor<T>> processorType, int size) {
94          this(processorType, null, size);
95      }
96  
97      public SimpleIoProcessorPool(Class<? extends IoProcessor<T>> processorType, Executor executor) {
98          this(processorType, executor, DEFAULT_SIZE);
99      }
100     
101     @SuppressWarnings("unchecked")
102     public SimpleIoProcessorPool(Class<? extends IoProcessor<T>> processorType, Executor executor, int size) {
103         if (processorType == null) {
104             throw new NullPointerException("processorType");
105         }
106         if (size <= 0) {
107             throw new IllegalArgumentException(
108                     "size: " + size + " (expected: positive integer)");
109         }
110         
111         if (executor == null) {
112             this.executor = executor = Executors.newCachedThreadPool();
113             this.createdExecutor = true;
114         } else {
115             this.executor = executor;
116             this.createdExecutor = false;
117         }
118         
119         pool = new IoProcessor[size];
120         
121         boolean success = false;
122         try {
123             for (int i = 0; i < pool.length; i ++) {
124                 IoProcessor<T> processor = null;
125                 
126                 // Try to create a new processor with a proper constructor.
127                 try {
128                     try {
129                         processor = processorType.getConstructor(ExecutorService.class).newInstance(executor);
130                     } catch (NoSuchMethodException e) {
131                         // To the next step...
132                     }
133                     
134                     if (processor == null) {
135                         try {
136                             processor = processorType.getConstructor(Executor.class).newInstance(executor);
137                         } catch (NoSuchMethodException e) {
138                             // To the next step...
139                         }
140                     }
141                     
142                     if (processor == null) {
143                         try {
144                             processor = processorType.getConstructor().newInstance();
145                         } catch (NoSuchMethodException e) {
146                             // To the next step...
147                         }
148                     }
149                 } catch (RuntimeException e) {
150                     throw e;
151                 } catch (Exception e) {
152                     throw new RuntimeIoException(
153                             "Failed to create a new instance of " + processorType.getName(), e);
154                 }
155                 
156                 // Raise an exception if no proper constructor is found.
157                 if (processor == null) {
158                     throw new IllegalArgumentException(
159                             String.valueOf(processorType) + " must have a public constructor " +
160                             "with one " + ExecutorService.class.getSimpleName() + " parameter, " +
161                             "a public constructor with one " + Executor.class.getSimpleName() + 
162                             " parameter or a public default constructor.");
163                 }
164                 
165                 pool[i] = processor;
166             }
167             
168             success = true;
169         } finally {
170             if (!success) {
171                 dispose();
172             }
173         }
174     }
175     
176     public final void add(T session) {
177         getProcessor(session).add(session);
178     }
179 
180     public final void flush(T session) {
181         getProcessor(session).flush(session);
182     }
183 
184     public final void remove(T session) {
185         getProcessor(session).remove(session);
186     }
187 
188     public final void updateTrafficMask(T session) {
189         getProcessor(session).updateTrafficMask(session);
190     }
191     
192     public boolean isDisposed() {
193         return disposed;
194     }
195 
196     public boolean isDisposing() {
197         return disposing;
198     }
199 
200     public final void dispose() {
201         if (disposed) {
202             return;
203         }
204 
205         synchronized (disposalLock) {
206             if (!disposing) {
207                 disposing = true;
208                 for (int i = pool.length - 1; i >= 0; i --) {
209                     if (pool[i] == null || pool[i].isDisposing()) {
210                         continue;
211                     }
212 
213                     try {
214                         pool[i].dispose();
215                     } catch (Exception e) {
216                         logger.warn(
217                                 "Failed to dispose a " +
218                                 pool[i].getClass().getSimpleName() +
219                                 " at index " + i + ".", e);
220                     } finally {
221                         pool[i] = null;
222                     }
223                 }
224                 
225                 if (createdExecutor) {
226                     ((ExecutorService) executor).shutdown();
227                 }
228             }
229         }
230 
231         disposed = true;
232     }
233     
234     @SuppressWarnings("unchecked")
235     private IoProcessor<T> getProcessor(T session) {
236         IoProcessor<T> p = (IoProcessor<T>) session.getAttribute(PROCESSOR);
237         if (p == null) {
238             p = nextProcessor();
239             IoProcessor<T> oldp =
240                 (IoProcessor<T>) session.setAttributeIfAbsent(PROCESSOR, p);
241             if (oldp != null) {
242                 p = oldp;
243             }
244         }
245         
246         return p;
247     }
248 
249     private IoProcessor<T> nextProcessor() {
250         checkDisposal();
251         return pool[Math.abs(processorDistributor.getAndIncrement()) % pool.length];
252     }
253 
254     private void checkDisposal() {
255         if (disposed) {
256             throw new IllegalStateException("A disposed processor cannot be accessed.");
257         }
258     }
259 }