1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.mina.util;
20
21 import java.util.ArrayList;
22 import java.util.IdentityHashMap;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Set;
27
28 import org.apache.mina.common.Session;
29
30 /***
31 * A base implementation of Thread-pooling filters.
32 * This filter forwards events to its thread pool. This is an implementation of
33 * <a href="http://deuce.doc.wustl.edu/doc/pspdfs/lf.pdf">Leader/Followers
34 * thread pool</a> by Douglas C. Schmidt et al.
35 *
36 * @author The Apache Directory Project (dev@directory.apache.org)
37 * @version $Rev: 357871 $, $Date: 2005-12-20 10:56:40 +0900 (Tue, 20 Dec 2005) $
38 */
39 public abstract class BaseThreadPool implements ThreadPool
40 {
41 /***
42 * Default maximum size of thread pool (2G).
43 */
44 public static final int DEFAULT_MAXIMUM_POOL_SIZE = Integer.MAX_VALUE;
45
46 /***
47 * Default keep-alive time of thread pool (1 min).
48 */
49 public static final int DEFAULT_KEEP_ALIVE_TIME = 60 * 1000;
50
51 /***
52 * A queue which contains {@link Integer}s which represents reusable
53 * thread IDs. {@link Worker} first checks this queue and then
54 * uses {@link #threadId} when no reusable thread ID is available.
55 */
56 private static final Queue threadIdReuseQueue = new Queue();
57 private static int threadId = 0;
58
59 private static int acquireThreadId()
60 {
61 synchronized( threadIdReuseQueue )
62 {
63 Integer id = ( Integer ) threadIdReuseQueue.pop();
64 if( id == null )
65 {
66 return ++ threadId;
67 }
68 else
69 {
70 return id.intValue();
71 }
72 }
73 }
74
75 private static void releaseThreadId( int id )
76 {
77 synchronized( threadIdReuseQueue )
78 {
79 threadIdReuseQueue.push( new Integer( id ) );
80 }
81 }
82
83 private final String threadNamePrefix;
84 private final Map buffers = new IdentityHashMap();
85 private final BlockingQueue unfetchedSessionBuffers = new BlockingQueue();
86 private final Set allSessionBuffers = new IdentityHashSet();
87
88 private Worker leader;
89 private final Stack followers = new Stack();
90 private final Set allWorkers = new IdentityHashSet();
91
92 private int maximumPoolSize = DEFAULT_MAXIMUM_POOL_SIZE;
93 private int keepAliveTime = DEFAULT_KEEP_ALIVE_TIME;
94
95 private boolean started;
96 private boolean shuttingDown;
97
98 private int poolSize;
99 private final Object poolSizeLock = new Object();
100
101 /***
102 * Creates a new instance with default thread pool settings.
103 * You'll have to invoke {@link #start()} method to start threads actually.
104 *
105 * @param threadNamePrefix the prefix of the thread names this pool will create.
106 */
107 public BaseThreadPool( String threadNamePrefix )
108 {
109 if( threadNamePrefix == null )
110 {
111 throw new NullPointerException( "threadNamePrefix" );
112 }
113 threadNamePrefix = threadNamePrefix.trim();
114 if( threadNamePrefix.length() == 0 )
115 {
116 throw new IllegalArgumentException( "threadNamePrefix is empty." );
117 }
118 this.threadNamePrefix = threadNamePrefix;
119 }
120
121 public String getThreadNamePrefix()
122 {
123 return threadNamePrefix;
124 }
125
126 public int getPoolSize()
127 {
128 synchronized( poolSizeLock )
129 {
130 return poolSize;
131 }
132 }
133
134 public int getMaximumPoolSize()
135 {
136 return maximumPoolSize;
137 }
138
139 public int getKeepAliveTime()
140 {
141 return keepAliveTime;
142 }
143
144 public void setMaximumPoolSize( int maximumPoolSize )
145 {
146 if( maximumPoolSize <= 0 )
147 throw new IllegalArgumentException();
148 this.maximumPoolSize = maximumPoolSize;
149 }
150
151 public void setKeepAliveTime( int keepAliveTime )
152 {
153 this.keepAliveTime = keepAliveTime;
154 }
155
156 public synchronized void start()
157 {
158 if( started )
159 return;
160
161 shuttingDown = false;
162
163 leader = new Worker();
164 leader.start();
165 leader.lead();
166
167 started = true;
168 }
169
170 public synchronized void stop()
171 {
172 if( !started )
173 return;
174
175 shuttingDown = true;
176 while( getPoolSize() != 0 )
177 {
178 List allWorkers;
179 synchronized( poolSizeLock )
180 {
181 allWorkers = new ArrayList( this.allWorkers );
182 }
183
184 for( Iterator i = allWorkers.iterator(); i.hasNext(); )
185 {
186 Worker worker = ( Worker ) i.next();
187 while( worker.isAlive() )
188 {
189 worker.interrupt();
190 try
191 {
192
193
194 worker.join( 100 );
195 }
196 catch( InterruptedException e )
197 {
198 }
199 }
200 }
201 }
202
203 this.allSessionBuffers.clear();
204 this.unfetchedSessionBuffers.clear();
205 this.buffers.clear();
206 this.followers.clear();
207 this.leader = null;
208
209 started = false;
210 }
211
212 private void increasePoolSize( Worker worker )
213 {
214 synchronized( poolSizeLock )
215 {
216 poolSize++;
217 allWorkers.add( worker );
218 }
219 }
220
221 private void decreasePoolSize( Worker worker )
222 {
223 synchronized( poolSizeLock )
224 {
225 poolSize--;
226 allWorkers.remove( worker );
227 }
228 }
229
230 protected void fireEvent( Object nextFilter, Session session,
231 EventType type, Object data )
232 {
233 final BlockingQueue unfetchedSessionBuffers = this.unfetchedSessionBuffers;
234 final Set allSessionBuffers = this.allSessionBuffers;
235 final Event event = new Event( type, nextFilter, data );
236
237 synchronized( unfetchedSessionBuffers )
238 {
239 final SessionBuffer buf = getSessionBuffer( session );
240 final Queue eventQueue = buf.eventQueue;
241
242 synchronized( buf )
243 {
244 eventQueue.push( event );
245 }
246
247 if( !allSessionBuffers.contains( buf ) )
248 {
249 allSessionBuffers.add( buf );
250 unfetchedSessionBuffers.push( buf );
251 }
252 }
253 }
254
255 /***
256 * Implement this method to forward events to <tt>nextFilter</tt>.
257 */
258 protected abstract void processEvent( Object nextFilter, Session session,
259 EventType type, Object data );
260
261
262 /***
263 * Implement this method to fetch (or pop) a {@link SessionBuffer} from
264 * the given <tt>unfetchedSessionBuffers</tt>. The default implementation
265 * simply pops the buffer from it. You could prioritize the fetch order.
266 *
267 * @return A non-null {@link SessionBuffer}
268 */
269 protected SessionBuffer fetchSessionBuffer( Queue unfetchedSessionBuffers )
270 {
271 return ( SessionBuffer ) unfetchedSessionBuffers.pop();
272 }
273
274 private SessionBuffer getSessionBuffer( Session session )
275 {
276 final Map buffers = this.buffers;
277 SessionBuffer buf = ( SessionBuffer ) buffers.get( session );
278 if( buf == null )
279 {
280 synchronized( buffers )
281 {
282 buf = ( SessionBuffer ) buffers.get( session );
283 if( buf == null )
284 {
285 buf = new SessionBuffer( session );
286 buffers.put( session, buf );
287 }
288 }
289 }
290 return buf;
291 }
292
293 private void removeSessionBuffer( SessionBuffer buf )
294 {
295 final Map buffers = this.buffers;
296 final Session session = buf.session;
297 synchronized( buffers )
298 {
299 buffers.remove( session );
300 }
301 }
302
303 protected static class SessionBuffer
304 {
305 private final Session session;
306
307 private final Queue eventQueue = new Queue();
308
309 private SessionBuffer( Session session )
310 {
311 this.session = session;
312 }
313
314 public Session getSession()
315 {
316 return session;
317 }
318
319 public Queue getEventQueue()
320 {
321 return eventQueue;
322 }
323 }
324
325 private class Worker extends Thread
326 {
327 private final int id;
328 private final Object promotionLock = new Object();
329 private boolean dead;
330
331 private Worker()
332 {
333 int id = acquireThreadId();
334 this.id = id;
335 this.setName( threadNamePrefix + '-' + id );
336 increasePoolSize( this );
337 }
338
339 public boolean lead()
340 {
341 final Object promotionLock = this.promotionLock;
342 synchronized( promotionLock )
343 {
344 if( dead )
345 {
346 return false;
347 }
348
349 leader = this;
350 promotionLock.notify();
351 }
352
353 return true;
354 }
355
356 public void run()
357 {
358 for( ;; )
359 {
360 if( !waitForPromotion() )
361 break;
362
363 SessionBuffer buf = fetchBuffer();
364 giveUpLead();
365
366 if( buf == null )
367 {
368 break;
369 }
370
371 processEvents( buf );
372 follow();
373 releaseBuffer( buf );
374 }
375
376 decreasePoolSize( this );
377 releaseThreadId( id );
378 }
379
380 private SessionBuffer fetchBuffer()
381 {
382 BlockingQueue unfetchedSessionBuffers = BaseThreadPool.this.unfetchedSessionBuffers;
383 synchronized( unfetchedSessionBuffers )
384 {
385 while( !shuttingDown )
386 {
387 try
388 {
389 unfetchedSessionBuffers.waitForNewItem();
390 }
391 catch( InterruptedException e )
392 {
393 continue;
394 }
395
396 return BaseThreadPool.this.fetchSessionBuffer( unfetchedSessionBuffers );
397 }
398 }
399
400 return null;
401 }
402
403 private void processEvents( SessionBuffer buf )
404 {
405 final Session session = buf.session;
406 final Queue eventQueue = buf.eventQueue;
407 for( ;; )
408 {
409 Event event;
410 synchronized( buf )
411 {
412 event = ( Event ) eventQueue.pop();
413 if( event == null )
414 break;
415 }
416 processEvent( event.getNextFilter(), session,
417 event.getType(), event.getData() );
418 }
419 }
420
421 private void follow()
422 {
423 final Object promotionLock = this.promotionLock;
424 final Stack followers = BaseThreadPool.this.followers;
425 synchronized( promotionLock )
426 {
427 if( this != leader )
428 {
429 synchronized( followers )
430 {
431 followers.push( this );
432 }
433 }
434 }
435 }
436
437 private void releaseBuffer( SessionBuffer buf )
438 {
439 final BlockingQueue unfetchedSessionBuffers = BaseThreadPool.this.unfetchedSessionBuffers;
440 final Set allSessionBuffers = BaseThreadPool.this.allSessionBuffers;
441 final Queue eventQueue = buf.eventQueue;
442
443 synchronized( unfetchedSessionBuffers )
444 {
445 if( eventQueue.isEmpty() )
446 {
447 allSessionBuffers.remove( buf );
448 removeSessionBuffer( buf );
449 }
450 else
451 {
452 unfetchedSessionBuffers.push( buf );
453 }
454 }
455 }
456
457 private boolean waitForPromotion()
458 {
459 final Object promotionLock = this.promotionLock;
460
461 final long startTime = System.currentTimeMillis();
462 long currentTime = startTime;
463
464 synchronized( promotionLock )
465 {
466 while( this != leader && !shuttingDown )
467 {
468
469 int keepAliveTime = getKeepAliveTime();
470 if( keepAliveTime > 0 )
471 {
472 keepAliveTime -= ( currentTime - startTime );
473 }
474 else
475 {
476 keepAliveTime = Integer.MAX_VALUE;
477 }
478
479
480 if( keepAliveTime <= 0 )
481 {
482 break;
483 }
484
485
486 try
487 {
488 promotionLock.wait( keepAliveTime );
489 }
490 catch( InterruptedException e )
491 {
492 }
493
494
495 currentTime = System.currentTimeMillis();
496 }
497
498 boolean timeToLead = this == leader && !shuttingDown;
499
500 if( !timeToLead )
501 {
502
503 synchronized( followers )
504 {
505 followers.remove( this );
506 }
507
508
509 dead = true;
510 }
511
512 return timeToLead;
513 }
514 }
515
516 private void giveUpLead()
517 {
518 final Stack followers = BaseThreadPool.this.followers;
519 Worker worker;
520 do
521 {
522 synchronized( followers )
523 {
524 worker = ( Worker ) followers.pop();
525 }
526
527 if( worker == null )
528 {
529
530
531 if( !shuttingDown
532 && getPoolSize() < getMaximumPoolSize() )
533 {
534 worker = new Worker();
535 worker.lead();
536 worker.start();
537 }
538
539
540
541
542 break;
543 }
544 }
545 while( !worker.lead() );
546 }
547 }
548 }