1 | /* |
2 | * @(#) $Id: BaseThreadPool.java 210062 2005-07-11 03:52:38Z trustin $ |
3 | * |
4 | * Copyright 2004 The Apache Software Foundation |
5 | * |
6 | * Licensed under the Apache License, Version 2.0 (the "License"); |
7 | * you may not use this file except in compliance with the License. |
8 | * You may obtain a copy of the License at |
9 | * |
10 | * http://www.apache.org/licenses/LICENSE-2.0 |
11 | * |
12 | * Unless required by applicable law or agreed to in writing, software |
13 | * distributed under the License is distributed on an "AS IS" BASIS, |
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
15 | * See the License for the specific language governing permissions and |
16 | * limitations under the License. |
17 | * |
18 | */ |
19 | package org.apache.mina.util; |
20 | |
21 | import java.util.HashSet; |
22 | import java.util.IdentityHashMap; |
23 | import java.util.Iterator; |
24 | import java.util.Map; |
25 | import java.util.Set; |
26 | |
27 | import org.apache.mina.common.Session; |
28 | |
29 | /** |
30 | * A base implementation of Thread-pooling filters. |
31 | * This filter forwards events to its thread pool. This is an implementation of |
32 | * <a href="http://deuce.doc.wustl.edu/doc/pspdfs/lf.pdf">Leader/Followers |
33 | * thread pool</a> by Douglas C. Schmidt et al. |
34 | * |
35 | * @author Trustin Lee (trustin@apache.org) |
36 | * @version $Rev: 210062 $, $Date: 2005-07-11 12:52:38 +0900 $ |
37 | */ |
38 | public abstract class BaseThreadPool implements ThreadPool |
39 | { |
40 | /** |
41 | * Default maximum size of thread pool (2G). |
42 | */ |
43 | public static final int DEFAULT_MAXIMUM_POOL_SIZE = Integer.MAX_VALUE; |
44 | |
45 | /** |
46 | * Default keep-alive time of thread pool (1 min). |
47 | */ |
48 | public static final int DEFAULT_KEEP_ALIVE_TIME = 60 * 1000; |
49 | |
50 | private static volatile int threadId = 0; |
51 | |
52 | private final Map buffers = new IdentityHashMap(); |
53 | |
54 | private final Stack followers = new Stack(); |
55 | |
56 | private final BlockingSet readySessionBuffers = new BlockingSet(); |
57 | |
58 | private final Set busySessionBuffers = new HashSet(); |
59 | |
60 | private Worker leader; |
61 | |
62 | private int maximumPoolSize = DEFAULT_MAXIMUM_POOL_SIZE; |
63 | |
64 | private int keepAliveTime = DEFAULT_KEEP_ALIVE_TIME; |
65 | |
66 | private boolean started; |
67 | |
68 | private boolean shuttingDown; |
69 | |
70 | private int poolSize; |
71 | |
72 | private final Object poolSizeLock = new Object(); |
73 | |
74 | /** |
75 | * Creates a new instance with default thread pool settings. |
76 | * You'll have to invoke {@link #start()} method to start threads actually. |
77 | */ |
78 | protected BaseThreadPool() |
79 | { |
80 | } |
81 | |
82 | public int getPoolSize() |
83 | { |
84 | synchronized( poolSizeLock ) |
85 | { |
86 | return poolSize; |
87 | } |
88 | } |
89 | |
90 | public int getMaximumPoolSize() |
91 | { |
92 | return maximumPoolSize; |
93 | } |
94 | |
95 | public int getKeepAliveTime() |
96 | { |
97 | return keepAliveTime; |
98 | } |
99 | |
100 | public void setMaximumPoolSize( int maximumPoolSize ) |
101 | { |
102 | if( maximumPoolSize <= 0 ) |
103 | throw new IllegalArgumentException(); |
104 | this.maximumPoolSize = maximumPoolSize; |
105 | } |
106 | |
107 | public void setKeepAliveTime( int keepAliveTime ) |
108 | { |
109 | this.keepAliveTime = keepAliveTime; |
110 | } |
111 | |
112 | public synchronized void start() |
113 | { |
114 | if( started ) |
115 | return; |
116 | |
117 | shuttingDown = false; |
118 | |
119 | leader = new Worker(); |
120 | leader.start(); |
121 | leader.lead(); |
122 | |
123 | started = true; |
124 | } |
125 | |
126 | public synchronized void stop() |
127 | { |
128 | if( !started ) |
129 | return; |
130 | |
131 | shuttingDown = true; |
132 | Worker lastLeader = null; |
133 | for( ;; ) |
134 | { |
135 | Worker leader = this.leader; |
136 | if( lastLeader == leader ) |
137 | break; |
138 | |
139 | while( leader.isAlive() ) |
140 | { |
141 | leader.interrupt(); |
142 | try |
143 | { |
144 | leader.join(); |
145 | } |
146 | catch( InterruptedException e ) |
147 | { |
148 | } |
149 | } |
150 | |
151 | lastLeader = leader; |
152 | } |
153 | |
154 | started = false; |
155 | } |
156 | |
157 | private void increasePoolSize() |
158 | { |
159 | synchronized( poolSizeLock ) |
160 | { |
161 | poolSize++; |
162 | } |
163 | } |
164 | |
165 | private void decreasePoolSize() |
166 | { |
167 | synchronized( poolSizeLock ) |
168 | { |
169 | poolSize--; |
170 | } |
171 | } |
172 | |
173 | protected void fireEvent( Object nextFilter, Session session, |
174 | EventType type, Object data ) |
175 | { |
176 | final BlockingSet readySessionBuffers = this.readySessionBuffers; |
177 | final Set busySessionBuffers = this.busySessionBuffers; |
178 | final Event event = new Event( type, nextFilter, data ); |
179 | |
180 | synchronized( readySessionBuffers ) |
181 | { |
182 | final SessionBuffer buf = getSessionBuffer( session ); |
183 | final Queue eventQueue = buf.eventQueue; |
184 | |
185 | synchronized( buf ) |
186 | { |
187 | eventQueue.push( event ); |
188 | } |
189 | |
190 | if( !busySessionBuffers.contains( buf ) ) |
191 | { |
192 | busySessionBuffers.add( buf ); |
193 | readySessionBuffers.add( buf ); |
194 | } |
195 | } |
196 | } |
197 | |
198 | /** |
199 | * Implement this method to forward events to <tt>nextFilter</tt>. |
200 | */ |
201 | protected abstract void processEvent( Object nextFilter, Session session, |
202 | EventType type, Object data ); |
203 | |
204 | private SessionBuffer getSessionBuffer( Session session ) |
205 | { |
206 | final Map buffers = this.buffers; |
207 | SessionBuffer buf = ( SessionBuffer ) buffers.get( session ); |
208 | if( buf == null ) |
209 | { |
210 | synchronized( buffers ) |
211 | { |
212 | buf = ( SessionBuffer ) buffers.get( session ); |
213 | if( buf == null ) |
214 | { |
215 | buf = new SessionBuffer( session ); |
216 | buffers.put( session, buf ); |
217 | } |
218 | } |
219 | } |
220 | return buf; |
221 | } |
222 | |
223 | private void removeSessionBuffer( SessionBuffer buf ) |
224 | { |
225 | final Map buffers = this.buffers; |
226 | final Session session = buf.session; |
227 | synchronized( buffers ) |
228 | { |
229 | buffers.remove( session ); |
230 | } |
231 | } |
232 | |
233 | private static class SessionBuffer |
234 | { |
235 | private final Session session; |
236 | |
237 | private final Queue eventQueue = new Queue(); |
238 | |
239 | private SessionBuffer( Session session ) |
240 | { |
241 | this.session = session; |
242 | } |
243 | } |
244 | |
245 | private class Worker extends Thread |
246 | { |
247 | private final Object promotionLock = new Object(); |
248 | |
249 | private Worker() |
250 | { |
251 | super( "IoThreadPool-" + ( threadId++ ) ); |
252 | increasePoolSize(); |
253 | } |
254 | |
255 | public void lead() |
256 | { |
257 | final Object promotionLock = this.promotionLock; |
258 | synchronized( promotionLock ) |
259 | { |
260 | leader = this; |
261 | promotionLock.notify(); |
262 | } |
263 | } |
264 | |
265 | public void run() |
266 | { |
267 | for( ;; ) |
268 | { |
269 | if( !waitForPromotion() ) |
270 | break; |
271 | |
272 | SessionBuffer buf = fetchBuffer(); |
273 | giveUpLead(); |
274 | |
275 | if( buf == null ) |
276 | { |
277 | break; |
278 | } |
279 | |
280 | processEvents( buf ); |
281 | follow(); |
282 | releaseBuffer( buf ); |
283 | } |
284 | |
285 | decreasePoolSize(); |
286 | } |
287 | |
288 | private SessionBuffer fetchBuffer() |
289 | { |
290 | SessionBuffer buf = null; |
291 | BlockingSet readySessionBuffers = BaseThreadPool.this.readySessionBuffers; |
292 | synchronized( readySessionBuffers ) |
293 | { |
294 | do |
295 | { |
296 | buf = null; |
297 | try |
298 | { |
299 | readySessionBuffers.waitForNewItem(); |
300 | } |
301 | catch( InterruptedException e ) |
302 | { |
303 | break; |
304 | } |
305 | |
306 | Iterator it = readySessionBuffers.iterator(); |
307 | if( !it.hasNext() ) |
308 | { |
309 | // exceeded keepAliveTime |
310 | break; |
311 | } |
312 | |
313 | do |
314 | { |
315 | buf = null; |
316 | buf = ( SessionBuffer ) it.next(); |
317 | it.remove(); |
318 | } |
319 | while( buf != null && buf.eventQueue.isEmpty() |
320 | && it.hasNext() ); |
321 | } |
322 | while( buf != null && buf.eventQueue.isEmpty() ); |
323 | } |
324 | |
325 | return buf; |
326 | } |
327 | |
328 | private void processEvents( SessionBuffer buf ) |
329 | { |
330 | final Session session = buf.session; |
331 | final Queue eventQueue = buf.eventQueue; |
332 | for( ;; ) |
333 | { |
334 | Event event; |
335 | synchronized( buf ) |
336 | { |
337 | event = ( Event ) eventQueue.pop(); |
338 | if( event == null ) |
339 | break; |
340 | } |
341 | processEvent( event.getNextFilter(), session, |
342 | event.getType(), event.getData() ); |
343 | } |
344 | } |
345 | |
346 | private void follow() |
347 | { |
348 | final Object promotionLock = this.promotionLock; |
349 | final Stack followers = BaseThreadPool.this.followers; |
350 | synchronized( promotionLock ) |
351 | { |
352 | if( this != leader ) |
353 | { |
354 | synchronized( followers ) |
355 | { |
356 | followers.push( this ); |
357 | } |
358 | } |
359 | } |
360 | } |
361 | |
362 | private void releaseBuffer( SessionBuffer buf ) |
363 | { |
364 | final BlockingSet readySessionBuffers = BaseThreadPool.this.readySessionBuffers; |
365 | final Set busySessionBuffers = BaseThreadPool.this.busySessionBuffers; |
366 | final Queue eventQueue = buf.eventQueue; |
367 | |
368 | synchronized( readySessionBuffers ) |
369 | { |
370 | if( eventQueue.isEmpty() ) |
371 | { |
372 | busySessionBuffers.remove( buf ); |
373 | removeSessionBuffer( buf ); |
374 | } |
375 | else |
376 | { |
377 | readySessionBuffers.add( buf ); |
378 | } |
379 | } |
380 | } |
381 | |
382 | private boolean waitForPromotion() |
383 | { |
384 | final Object promotionLock = this.promotionLock; |
385 | |
386 | synchronized( promotionLock ) |
387 | { |
388 | if( this != leader ) |
389 | { |
390 | try |
391 | { |
392 | int keepAliveTime = getKeepAliveTime(); |
393 | if( keepAliveTime > 0 ) |
394 | { |
395 | promotionLock.wait( keepAliveTime ); |
396 | } |
397 | else |
398 | { |
399 | promotionLock.wait(); |
400 | } |
401 | } |
402 | catch( InterruptedException e ) |
403 | { |
404 | } |
405 | } |
406 | |
407 | boolean timeToLead = this == leader; |
408 | |
409 | if( !timeToLead ) |
410 | { |
411 | // time to die |
412 | synchronized( followers ) |
413 | { |
414 | followers.remove( this ); |
415 | } |
416 | } |
417 | |
418 | return timeToLead; |
419 | } |
420 | } |
421 | |
422 | private void giveUpLead() |
423 | { |
424 | final Stack followers = BaseThreadPool.this.followers; |
425 | Worker worker; |
426 | synchronized( followers ) |
427 | { |
428 | worker = ( Worker ) followers.pop(); |
429 | } |
430 | |
431 | if( worker != null ) |
432 | { |
433 | worker.lead(); |
434 | } |
435 | else |
436 | { |
437 | if( !shuttingDown ) |
438 | { |
439 | synchronized( BaseThreadPool.this ) |
440 | { |
441 | if( !shuttingDown |
442 | && getPoolSize() < getMaximumPoolSize() ) |
443 | { |
444 | worker = new Worker(); |
445 | worker.start(); |
446 | worker.lead(); |
447 | } |
448 | } |
449 | } |
450 | } |
451 | } |
452 | } |
453 | } |