1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.mina.util;
20
21 import java.util.HashSet;
22 import java.util.IdentityHashMap;
23 import java.util.Iterator;
24 import java.util.Map;
25 import java.util.Set;
26
27 import org.apache.mina.common.Session;
28
29 /***
30 * A base implementation of Thread-pooling filters.
31 * This filter forwards events to its thread pool. This is an implementation of
32 * <a href="http://deuce.doc.wustl.edu/doc/pspdfs/lf.pdf">Leader/Followers
33 * thread pool</a> by Douglas C. Schmidt et al.
34 *
35 * @author Trustin Lee (trustin@apache.org)
36 * @version $Rev: 264677 $, $Date: 2005-08-30 11:44:35 +0900 $
37 */
38 public abstract class BaseThreadPool implements ThreadPool
39 {
40 /***
41 * Default maximum size of thread pool (2G).
42 */
43 public static final int DEFAULT_MAXIMUM_POOL_SIZE = Integer.MAX_VALUE;
44
45 /***
46 * Default keep-alive time of thread pool (1 min).
47 */
48 public static final int DEFAULT_KEEP_ALIVE_TIME = 60 * 1000;
49
50 /***
51 * A queue which contains {@link Integer}s which represents reusable
52 * thread IDs. {@link Worker} first checks this queue and then
53 * uses {@link #threadId} when no reusable thread ID is available.
54 */
55 private static final Queue threadIdReuseQueue = new Queue();
56 private static int threadId = 0;
57
58 private static int acquireThreadId()
59 {
60 synchronized( threadIdReuseQueue )
61 {
62 Integer id = ( Integer ) threadIdReuseQueue.pop();
63 if( id == null )
64 {
65 return ++ threadId;
66 }
67 else
68 {
69 return id.intValue();
70 }
71 }
72 }
73
74 private static void releaseThreadId( int id )
75 {
76 synchronized( threadIdReuseQueue )
77 {
78 threadIdReuseQueue.push( new Integer( id ) );
79 }
80 }
81
82 private final String threadNamePrefix;
83 private final Map buffers = new IdentityHashMap();
84 private final Stack followers = new Stack();
85 private final BlockingSet readySessionBuffers = new BlockingSet();
86 private final Set busySessionBuffers = new HashSet();
87
88 private Worker leader;
89
90 private int maximumPoolSize = DEFAULT_MAXIMUM_POOL_SIZE;
91 private int keepAliveTime = DEFAULT_KEEP_ALIVE_TIME;
92
93 private boolean started;
94 private boolean shuttingDown;
95
96 private int poolSize;
97 private final Object poolSizeLock = new Object();
98
99 /***
100 * Creates a new instance with default thread pool settings.
101 * You'll have to invoke {@link #start()} method to start threads actually.
102 *
103 * @param threadNamePrefix the prefix of the thread names this pool will create.
104 */
105 public BaseThreadPool( String threadNamePrefix )
106 {
107 if( threadNamePrefix == null )
108 {
109 throw new NullPointerException( "threadNamePrefix" );
110 }
111 threadNamePrefix = threadNamePrefix.trim();
112 if( threadNamePrefix.length() == 0 )
113 {
114 throw new IllegalArgumentException( "threadNamePrefix is empty." );
115 }
116 this.threadNamePrefix = threadNamePrefix;
117 }
118
119 public String getThreadNamePrefix()
120 {
121 return threadNamePrefix;
122 }
123
124 public int getPoolSize()
125 {
126 synchronized( poolSizeLock )
127 {
128 return poolSize;
129 }
130 }
131
132 public int getMaximumPoolSize()
133 {
134 return maximumPoolSize;
135 }
136
137 public int getKeepAliveTime()
138 {
139 return keepAliveTime;
140 }
141
142 public void setMaximumPoolSize( int maximumPoolSize )
143 {
144 if( maximumPoolSize <= 0 )
145 throw new IllegalArgumentException();
146 this.maximumPoolSize = maximumPoolSize;
147 }
148
149 public void setKeepAliveTime( int keepAliveTime )
150 {
151 this.keepAliveTime = keepAliveTime;
152 }
153
154 public synchronized void start()
155 {
156 if( started )
157 return;
158
159 shuttingDown = false;
160
161 leader = new Worker();
162 leader.start();
163 leader.lead();
164
165 started = true;
166 }
167
168 public synchronized void stop()
169 {
170 if( !started )
171 return;
172
173 shuttingDown = true;
174 Worker lastLeader = null;
175 for( ;; )
176 {
177 Worker leader = this.leader;
178 if( lastLeader == leader )
179 break;
180
181 while( leader.isAlive() )
182 {
183 leader.interrupt();
184 try
185 {
186 leader.join();
187 }
188 catch( InterruptedException e )
189 {
190 }
191 }
192
193 lastLeader = leader;
194 }
195
196 started = false;
197 }
198
199 private void increasePoolSize()
200 {
201 synchronized( poolSizeLock )
202 {
203 poolSize++;
204 }
205 }
206
207 private void decreasePoolSize()
208 {
209 synchronized( poolSizeLock )
210 {
211 poolSize--;
212 }
213 }
214
215 protected void fireEvent( Object nextFilter, Session session,
216 EventType type, Object data )
217 {
218 final BlockingSet readySessionBuffers = this.readySessionBuffers;
219 final Set busySessionBuffers = this.busySessionBuffers;
220 final Event event = new Event( type, nextFilter, data );
221
222 synchronized( readySessionBuffers )
223 {
224 final SessionBuffer buf = getSessionBuffer( session );
225 final Queue eventQueue = buf.eventQueue;
226
227 synchronized( buf )
228 {
229 eventQueue.push( event );
230 }
231
232 if( !busySessionBuffers.contains( buf ) )
233 {
234 busySessionBuffers.add( buf );
235 readySessionBuffers.add( buf );
236 }
237 }
238 }
239
240 /***
241 * Implement this method to forward events to <tt>nextFilter</tt>.
242 */
243 protected abstract void processEvent( Object nextFilter, Session session,
244 EventType type, Object data );
245
246 private SessionBuffer getSessionBuffer( Session session )
247 {
248 final Map buffers = this.buffers;
249 SessionBuffer buf = ( SessionBuffer ) buffers.get( session );
250 if( buf == null )
251 {
252 synchronized( buffers )
253 {
254 buf = ( SessionBuffer ) buffers.get( session );
255 if( buf == null )
256 {
257 buf = new SessionBuffer( session );
258 buffers.put( session, buf );
259 }
260 }
261 }
262 return buf;
263 }
264
265 private void removeSessionBuffer( SessionBuffer buf )
266 {
267 final Map buffers = this.buffers;
268 final Session session = buf.session;
269 synchronized( buffers )
270 {
271 buffers.remove( session );
272 }
273 }
274
275 private static class SessionBuffer
276 {
277 private final Session session;
278
279 private final Queue eventQueue = new Queue();
280
281 private SessionBuffer( Session session )
282 {
283 this.session = session;
284 }
285 }
286
287 private class Worker extends Thread
288 {
289 private final int id;
290 private final Object promotionLock = new Object();
291 private boolean dead;
292
293 private Worker()
294 {
295 int id = acquireThreadId();
296 this.id = id;
297 this.setName( threadNamePrefix + '-' + id );
298 increasePoolSize();
299 }
300
301 public boolean lead()
302 {
303 final Object promotionLock = this.promotionLock;
304 synchronized( promotionLock )
305 {
306 if( dead )
307 {
308 return false;
309 }
310
311 leader = this;
312 promotionLock.notify();
313 }
314
315 return true;
316 }
317
318 public void run()
319 {
320 for( ;; )
321 {
322 if( !waitForPromotion() )
323 break;
324
325 SessionBuffer buf = fetchBuffer();
326 giveUpLead();
327
328 if( buf == null )
329 {
330 break;
331 }
332
333 processEvents( buf );
334 follow();
335 releaseBuffer( buf );
336 }
337
338 decreasePoolSize();
339 releaseThreadId( id );
340 }
341
342 private SessionBuffer fetchBuffer()
343 {
344 SessionBuffer buf;
345 BlockingSet readySessionBuffers = BaseThreadPool.this.readySessionBuffers;
346 synchronized( readySessionBuffers )
347 {
348 for( ;; )
349 {
350 try
351 {
352 readySessionBuffers.waitForNewItem();
353 }
354 catch( InterruptedException e )
355 {
356 if( shuttingDown )
357 {
358 return null;
359 }
360 else
361 {
362 continue;
363 }
364 }
365
366 Iterator it = readySessionBuffers.iterator();
367 while( it.hasNext() )
368 {
369 buf = ( SessionBuffer ) it.next();
370 it.remove();
371 if( buf != null && !buf.eventQueue.isEmpty() )
372 {
373 return buf;
374 }
375 }
376 }
377 }
378 }
379
380 private void processEvents( SessionBuffer buf )
381 {
382 final Session session = buf.session;
383 final Queue eventQueue = buf.eventQueue;
384 for( ;; )
385 {
386 Event event;
387 synchronized( buf )
388 {
389 event = ( Event ) eventQueue.pop();
390 if( event == null )
391 break;
392 }
393 processEvent( event.getNextFilter(), session,
394 event.getType(), event.getData() );
395 }
396 }
397
398 private void follow()
399 {
400 final Object promotionLock = this.promotionLock;
401 final Stack followers = BaseThreadPool.this.followers;
402 synchronized( promotionLock )
403 {
404 if( this != leader )
405 {
406 synchronized( followers )
407 {
408 followers.push( this );
409 }
410 }
411 }
412 }
413
414 private void releaseBuffer( SessionBuffer buf )
415 {
416 final BlockingSet readySessionBuffers = BaseThreadPool.this.readySessionBuffers;
417 final Set busySessionBuffers = BaseThreadPool.this.busySessionBuffers;
418 final Queue eventQueue = buf.eventQueue;
419
420 synchronized( readySessionBuffers )
421 {
422 if( eventQueue.isEmpty() )
423 {
424 busySessionBuffers.remove( buf );
425 removeSessionBuffer( buf );
426 }
427 else
428 {
429 readySessionBuffers.add( buf );
430 }
431 }
432 }
433
434 private boolean waitForPromotion()
435 {
436 final Object promotionLock = this.promotionLock;
437
438 final long startTime = System.currentTimeMillis();
439 long currentTime = startTime;
440
441 synchronized( promotionLock )
442 {
443 while( this != leader )
444 {
445
446 int keepAliveTime = getKeepAliveTime();
447 if( keepAliveTime > 0 )
448 {
449 keepAliveTime -= ( currentTime - startTime );
450 }
451 else
452 {
453 keepAliveTime = Integer.MAX_VALUE;
454 }
455
456
457 if( keepAliveTime <= 0 )
458 {
459 break;
460 }
461
462
463 try
464 {
465 promotionLock.wait( keepAliveTime );
466 }
467 catch( InterruptedException e )
468 {
469 }
470
471
472 currentTime = System.currentTimeMillis();
473 }
474
475 boolean timeToLead = this == leader;
476
477 if( !timeToLead )
478 {
479
480 synchronized( followers )
481 {
482 followers.remove( this );
483 }
484
485
486 dead = true;
487 }
488
489 return timeToLead;
490 }
491 }
492
493 private void giveUpLead()
494 {
495 final Stack followers = BaseThreadPool.this.followers;
496 Worker worker;
497 do
498 {
499 synchronized( followers )
500 {
501 worker = ( Worker ) followers.pop();
502 }
503
504 if( worker == null )
505 {
506
507
508 if( !shuttingDown
509 && getPoolSize() < getMaximumPoolSize() )
510 {
511 worker = new Worker();
512 worker.lead();
513 worker.start();
514 }
515
516
517
518
519 break;
520 }
521 }
522 while( !worker.lead() );
523 }
524 }
525 }