1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.core.service;
21
22 import java.lang.reflect.Constructor;
23 import java.util.Arrays;
24 import java.util.concurrent.Executor;
25 import java.util.concurrent.ExecutorService;
26 import java.util.concurrent.Executors;
27 import java.util.concurrent.ThreadPoolExecutor;
28
29 import org.apache.mina.core.RuntimeIoException;
30 import org.apache.mina.core.session.AbstractIoSession;
31 import org.apache.mina.core.session.AttributeKey;
32 import org.apache.mina.core.session.IoSession;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78 public class SimpleIoProcessorPool<S extends AbstractIoSession> implements IoProcessor<S> {
79
80 private final static Logger LOGGER = LoggerFactory.getLogger(SimpleIoProcessorPool.class);
81
82
83 private static final int DEFAULT_SIZE = Runtime.getRuntime().availableProcessors() + 1;
84
85
86 private static final AttributeKey PROCESSOR = new AttributeKey( SimpleIoProcessorPool.class, "processor");
87
88
89 private final IoProcessor<S>[] pool;
90
91
92 private final Executor executor;
93
94
95 private final boolean createdExecutor;
96
97
98 private final Object disposalLock = new Object();
99
100
101 private volatile boolean disposing;
102
103
104 private volatile boolean disposed;
105
106
107
108
109
110
111
112 public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType) {
113 this(processorType, null, DEFAULT_SIZE);
114 }
115
116
117
118
119
120
121
122
123 public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType, int size) {
124 this(processorType, null, size);
125 }
126
127
128
129
130
131
132
133 public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType, Executor executor) {
134 this(processorType, executor, DEFAULT_SIZE);
135 }
136
137
138
139
140
141
142
143
144 @SuppressWarnings("unchecked")
145 public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType,
146 Executor executor, int size) {
147 if (processorType == null) {
148 throw new IllegalArgumentException("processorType");
149 }
150
151 if (size <= 0) {
152 throw new IllegalArgumentException("size: " + size
153 + " (expected: positive integer)");
154 }
155
156
157 createdExecutor = (executor == null);
158
159 if (createdExecutor) {
160 this.executor = Executors.newCachedThreadPool();
161
162 ((ThreadPoolExecutor)this.executor).setRejectedExecutionHandler( new ThreadPoolExecutor.CallerRunsPolicy() );
163 } else {
164 this.executor = executor;
165 }
166
167 pool = new IoProcessor[size];
168
169 boolean success = false;
170 Constructor<? extends IoProcessor<S>> processorConstructor = null;
171 boolean usesExecutorArg = true;
172
173 try {
174
175 try {
176 try {
177 processorConstructor = processorType.getConstructor(ExecutorService.class);
178 pool[0] = processorConstructor.newInstance(this.executor);
179 } catch (NoSuchMethodException e1) {
180
181 try {
182 processorConstructor = processorType.getConstructor(Executor.class);
183 pool[0] = processorConstructor.newInstance(this.executor);
184 } catch (NoSuchMethodException e2) {
185
186 try {
187 processorConstructor = processorType.getConstructor();
188 usesExecutorArg = false;
189 pool[0] = processorConstructor.newInstance();
190 } catch (NoSuchMethodException e3) {
191
192 }
193 }
194 }
195 } catch (RuntimeException re) {
196 LOGGER.error("Cannot create an IoProcessor :{}", re.getMessage());
197 throw re;
198 } catch (Exception e) {
199 String msg = "Failed to create a new instance of " + processorType.getName() + ":" + e.getMessage();
200 LOGGER.error(msg, e);
201 throw new RuntimeIoException(msg , e);
202 }
203
204 if (processorConstructor == null) {
205
206 String msg = String.valueOf(processorType)
207 + " must have a public constructor with one "
208 + ExecutorService.class.getSimpleName()
209 + " parameter, a public constructor with one "
210 + Executor.class.getSimpleName()
211 + " parameter or a public default constructor.";
212 LOGGER.error(msg);
213 throw new IllegalArgumentException(msg);
214 }
215
216
217 for (int i = 1; i < pool.length; i++) {
218 try {
219 if (usesExecutorArg) {
220 pool[i] = processorConstructor.newInstance(this.executor);
221 } else {
222 pool[i] = processorConstructor.newInstance();
223 }
224 } catch (Exception e) {
225
226 }
227 }
228
229 success = true;
230 } finally {
231 if (!success) {
232 dispose();
233 }
234 }
235 }
236
237
238
239
240 public final void add(S session) {
241 getProcessor(session).add(session);
242 }
243
244
245
246
247 public final void flush(S session) {
248 getProcessor(session).flush(session);
249 }
250
251
252
253
254 public final void remove(S session) {
255 getProcessor(session).remove(session);
256 }
257
258
259
260
261 public final void updateTrafficControl(S session) {
262 getProcessor(session).updateTrafficControl(session);
263 }
264
265
266
267
268 public boolean isDisposed() {
269 return disposed;
270 }
271
272
273
274
275 public boolean isDisposing() {
276 return disposing;
277 }
278
279
280
281
282 public final void dispose() {
283 if (disposed) {
284 return;
285 }
286
287 synchronized (disposalLock) {
288 if (!disposing) {
289 disposing = true;
290
291 for (IoProcessor<S> ioProcessor : pool) {
292 if (ioProcessor == null) {
293
294 continue;
295 }
296
297 if (ioProcessor.isDisposing()) {
298 continue;
299 }
300
301 try {
302 ioProcessor.dispose();
303 } catch (Exception e) {
304 LOGGER.warn("Failed to dispose the {} IoProcessor.", ioProcessor.getClass().getSimpleName(), e);
305 }
306 }
307
308 if (createdExecutor) {
309 ((ExecutorService) executor).shutdown();
310 }
311 }
312
313 Arrays.fill(pool, null);
314 disposed = true;
315 }
316 }
317
318
319
320
321
322 @SuppressWarnings("unchecked")
323 private IoProcessor<S> getProcessor(S session) {
324 IoProcessor<S> processor = (IoProcessor<S>) session.getAttribute(PROCESSOR);
325
326 if (processor == null) {
327 if (disposed || disposing) {
328 throw new IllegalStateException("A disposed processor cannot be accessed.");
329 }
330
331 processor = pool[Math.abs((int) session.getId()) % pool.length];
332
333 if (processor == null) {
334 throw new IllegalStateException("A disposed processor cannot be accessed.");
335 }
336
337 session.setAttributeIfAbsent(PROCESSOR, processor);
338 }
339
340 return processor;
341 }
342 }