View Javadoc

1   /*
2    *   @(#) $Id: BaseThreadPool.java 357871 2005-12-20 01:56:40Z trustin $
3    *
4    *   Copyright 2004 The Apache Software Foundation
5    *
6    *   Licensed under the Apache License, Version 2.0 (the "License");
7    *   you may not use this file except in compliance with the License.
8    *   You may obtain a copy of the License at
9    *
10   *       http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *   Unless required by applicable law or agreed to in writing, software
13   *   distributed under the License is distributed on an "AS IS" BASIS,
14   *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   *   See the License for the specific language governing permissions and
16   *   limitations under the License.
17   *
18   */
19  package org.apache.mina.util;
20  
21  import java.util.ArrayList;
22  import java.util.IdentityHashMap;
23  import java.util.Iterator;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.Set;
27  
28  import org.apache.mina.common.Session;
29  
30  /***
31   * A base implementation of Thread-pooling filters.
32   * This filter forwards events to its thread pool.  This is an implementation of
33   * <a href="http://deuce.doc.wustl.edu/doc/pspdfs/lf.pdf">Leader/Followers
34   * thread pool</a> by Douglas C. Schmidt et al.
35   * 
36   * @author The Apache Directory Project (dev@directory.apache.org)
37   * @version $Rev: 357871 $, $Date: 2005-12-20 10:56:40 +0900 (Tue, 20 Dec 2005) $
38   */
39  public abstract class BaseThreadPool implements ThreadPool
40  {
41      /***
42       * Default maximum size of thread pool (2G).
43       */
44      public static final int DEFAULT_MAXIMUM_POOL_SIZE = Integer.MAX_VALUE;
45  
46      /***
47       * Default keep-alive time of thread pool (1 min).
48       */
49      public static final int DEFAULT_KEEP_ALIVE_TIME = 60 * 1000;
50  
51      /***
52       * A queue which contains {@link Integer}s which represents reusable
53       * thread IDs.  {@link Worker} first checks this queue and then
54       * uses {@link #threadId} when no reusable thread ID is available.
55       */
56      private static final Queue threadIdReuseQueue = new Queue();
57      private static int threadId = 0;
58      
59      private static int acquireThreadId()
60      {
61          synchronized( threadIdReuseQueue )
62          {
63              Integer id = ( Integer ) threadIdReuseQueue.pop();
64              if( id == null )
65              {
66                  return ++ threadId;
67              }
68              else
69              {
70                  return id.intValue();
71              }
72          }
73      }
74      
75      private static void releaseThreadId( int id )
76      {
77          synchronized( threadIdReuseQueue )
78          {
79              threadIdReuseQueue.push( new Integer( id ) );
80          }
81      }
82  
83      private final String threadNamePrefix;
84      private final Map buffers = new IdentityHashMap();
85      private final BlockingQueue unfetchedSessionBuffers = new BlockingQueue();
86      private final Set allSessionBuffers = new IdentityHashSet();
87  
88      private Worker leader;
89      private final Stack followers = new Stack();
90      private final Set allWorkers = new IdentityHashSet();
91  
92      private int maximumPoolSize = DEFAULT_MAXIMUM_POOL_SIZE;
93      private int keepAliveTime = DEFAULT_KEEP_ALIVE_TIME;
94  
95      private boolean started;
96      private boolean shuttingDown;
97  
98      private int poolSize;
99      private final Object poolSizeLock = new Object();
100 
101     /***
102      * Creates a new instance with default thread pool settings.
103      * You'll have to invoke {@link #start()} method to start threads actually.
104      *
105      * @param threadNamePrefix the prefix of the thread names this pool will create.
106      */
107     public BaseThreadPool( String threadNamePrefix )
108     {
109         if( threadNamePrefix == null )
110         {
111             throw new NullPointerException( "threadNamePrefix" );
112         }
113         threadNamePrefix = threadNamePrefix.trim();
114         if( threadNamePrefix.length() == 0 )
115         {
116             throw new IllegalArgumentException( "threadNamePrefix is empty." );
117         }
118         this.threadNamePrefix = threadNamePrefix;
119     }
120     
121     public String getThreadNamePrefix()
122     {
123         return threadNamePrefix;
124     }
125 
126     public int getPoolSize()
127     {
128         synchronized( poolSizeLock )
129         {
130             return poolSize;
131         }
132     }
133 
134     public int getMaximumPoolSize()
135     {
136         return maximumPoolSize;
137     }
138 
139     public int getKeepAliveTime()
140     {
141         return keepAliveTime;
142     }
143 
144     public void setMaximumPoolSize( int maximumPoolSize )
145     {
146         if( maximumPoolSize <= 0 )
147             throw new IllegalArgumentException();
148         this.maximumPoolSize = maximumPoolSize;
149     }
150 
151     public void setKeepAliveTime( int keepAliveTime )
152     {
153         this.keepAliveTime = keepAliveTime;
154     }
155 
156     public synchronized void start()
157     {
158         if( started )
159             return;
160 
161         shuttingDown = false;
162 
163         leader = new Worker();
164         leader.start();
165         leader.lead();
166 
167         started = true;
168     }
169 
170     public synchronized void stop()
171     {
172         if( !started )
173             return;
174 
175         shuttingDown = true;
176         while( getPoolSize() != 0 )
177         {
178             List allWorkers;
179             synchronized( poolSizeLock )
180             {
181                 allWorkers = new ArrayList( this.allWorkers );
182             }
183             
184             for( Iterator i = allWorkers.iterator(); i.hasNext(); )
185             {
186                 Worker worker = ( Worker ) i.next();
187                 while( worker.isAlive() )
188                 {
189                     worker.interrupt();
190                     try
191                     {
192                         // This timeout will help us from 
193                         // infinite lock-up and interrupt workers again.
194                         worker.join( 100 ); 
195                     }
196                     catch( InterruptedException e )
197                     {
198                     }
199                 }
200             }
201         }
202 
203         this.allSessionBuffers.clear();
204         this.unfetchedSessionBuffers.clear();
205         this.buffers.clear();
206         this.followers.clear();
207         this.leader = null;
208 
209         started = false;
210     }
211 
212     private void increasePoolSize( Worker worker )
213     {
214         synchronized( poolSizeLock )
215         {
216             poolSize++;
217             allWorkers.add( worker );
218         }
219     }
220 
221     private void decreasePoolSize( Worker worker )
222     {
223         synchronized( poolSizeLock )
224         {
225             poolSize--;
226             allWorkers.remove( worker );
227         }
228     }
229 
230     protected void fireEvent( Object nextFilter, Session session,
231                               EventType type, Object data )
232     {
233         final BlockingQueue unfetchedSessionBuffers = this.unfetchedSessionBuffers;
234         final Set allSessionBuffers = this.allSessionBuffers;
235         final Event event = new Event( type, nextFilter, data );
236 
237         synchronized( unfetchedSessionBuffers )
238         {
239             final SessionBuffer buf = getSessionBuffer( session );
240             final Queue eventQueue = buf.eventQueue;
241 
242             synchronized( buf )
243             {
244                 eventQueue.push( event );
245             }
246 
247             if( !allSessionBuffers.contains( buf ) )
248             {
249                 allSessionBuffers.add( buf );
250                 unfetchedSessionBuffers.push( buf );
251             }
252         }
253     }
254 
255     /***
256      * Implement this method to forward events to <tt>nextFilter</tt>.
257      */
258     protected abstract void processEvent( Object nextFilter, Session session,
259                                           EventType type, Object data );
260 
261     
262     /***
263      * Implement this method to fetch (or pop) a {@link SessionBuffer} from
264      * the given <tt>unfetchedSessionBuffers</tt>.  The default implementation
265      * simply pops the buffer from it.  You could prioritize the fetch order.
266      * 
267      * @return A non-null {@link SessionBuffer}
268      */
269     protected SessionBuffer fetchSessionBuffer( Queue unfetchedSessionBuffers )
270     {
271         return ( SessionBuffer ) unfetchedSessionBuffers.pop();
272     }
273 
274     private SessionBuffer getSessionBuffer( Session session )
275     {
276         final Map buffers = this.buffers;
277         SessionBuffer buf = ( SessionBuffer ) buffers.get( session );
278         if( buf == null )
279         {
280             synchronized( buffers )
281             {
282                 buf = ( SessionBuffer ) buffers.get( session );
283                 if( buf == null )
284                 {
285                     buf = new SessionBuffer( session );
286                     buffers.put( session, buf );
287                 }
288             }
289         }
290         return buf;
291     }
292 
293     private void removeSessionBuffer( SessionBuffer buf )
294     {
295         final Map buffers = this.buffers;
296         final Session session = buf.session;
297         synchronized( buffers )
298         {
299             buffers.remove( session );
300         }
301     }
302 
303     protected static class SessionBuffer
304     {
305         private final Session session;
306 
307         private final Queue eventQueue = new Queue();
308 
309         private SessionBuffer( Session session )
310         {
311             this.session = session;
312         }
313         
314         public Session getSession()
315         {
316             return session;
317         }
318         
319         public Queue getEventQueue()
320         {
321             return eventQueue;
322         }
323     }
324 
325     private class Worker extends Thread
326     {
327         private final int id;
328         private final Object promotionLock = new Object();
329         private boolean dead;
330 
331         private Worker()
332         {
333             int id = acquireThreadId();
334             this.id = id;
335             this.setName( threadNamePrefix + '-' + id );
336             increasePoolSize( this );
337         }
338 
339         public boolean lead()
340         {
341             final Object promotionLock = this.promotionLock;
342             synchronized( promotionLock )
343             {
344                 if( dead )
345                 {
346                     return false;
347                 }
348 
349                 leader = this;
350                 promotionLock.notify();
351             }
352             
353             return true;
354         }
355 
356         public void run()
357         {
358             for( ;; )
359             {
360                 if( !waitForPromotion() )
361                     break;
362 
363                 SessionBuffer buf = fetchBuffer();
364                 giveUpLead();
365 
366                 if( buf == null )
367                 {
368                     break;
369                 }
370 
371                 processEvents( buf );
372                 follow();
373                 releaseBuffer( buf );
374             }
375 
376             decreasePoolSize( this );
377             releaseThreadId( id );
378         }
379 
380         private SessionBuffer fetchBuffer()
381         {
382             BlockingQueue unfetchedSessionBuffers = BaseThreadPool.this.unfetchedSessionBuffers;
383             synchronized( unfetchedSessionBuffers )
384             {
385                 while( !shuttingDown )
386                 {
387                     try
388                     {
389                         unfetchedSessionBuffers.waitForNewItem();
390                     }
391                     catch( InterruptedException e )
392                     {
393                         continue;
394                     }
395 
396                     return BaseThreadPool.this.fetchSessionBuffer( unfetchedSessionBuffers );
397                 }
398             }
399             
400             return null;
401         }
402 
403         private void processEvents( SessionBuffer buf )
404         {
405             final Session session = buf.session;
406             final Queue eventQueue = buf.eventQueue;
407             for( ;; )
408             {
409                 Event event;
410                 synchronized( buf )
411                 {
412                     event = ( Event ) eventQueue.pop();
413                     if( event == null )
414                         break;
415                 }
416                 processEvent( event.getNextFilter(), session,
417                               event.getType(), event.getData() );
418             }
419         }
420 
421         private void follow()
422         {
423             final Object promotionLock = this.promotionLock;
424             final Stack followers = BaseThreadPool.this.followers;
425             synchronized( promotionLock )
426             {
427                 if( this != leader )
428                 {
429                     synchronized( followers )
430                     {
431                         followers.push( this );
432                     }
433                 }
434             }
435         }
436 
437         private void releaseBuffer( SessionBuffer buf )
438         {
439             final BlockingQueue unfetchedSessionBuffers = BaseThreadPool.this.unfetchedSessionBuffers;
440             final Set allSessionBuffers = BaseThreadPool.this.allSessionBuffers;
441             final Queue eventQueue = buf.eventQueue;
442 
443             synchronized( unfetchedSessionBuffers )
444             {
445                 if( eventQueue.isEmpty() )
446                 {
447                     allSessionBuffers.remove( buf );
448                     removeSessionBuffer( buf );
449                 }
450                 else
451                 {
452                     unfetchedSessionBuffers.push( buf );
453                 }
454             }
455         }
456 
457         private boolean waitForPromotion()
458         {
459             final Object promotionLock = this.promotionLock;
460 
461             final long startTime = System.currentTimeMillis();
462             long currentTime = startTime;
463             
464             synchronized( promotionLock )
465             {
466                 while( this != leader && !shuttingDown )
467                 {
468                     // Calculate remaining keep-alive time
469                     int keepAliveTime = getKeepAliveTime();
470                     if( keepAliveTime > 0 )
471                     {
472                         keepAliveTime -= ( currentTime - startTime );
473                     }
474                     else
475                     {
476                         keepAliveTime = Integer.MAX_VALUE;
477                     }
478                     
479                     // Break the loop if there's no remaining keep-alive time.
480                     if( keepAliveTime <= 0 )
481                     {
482                         break;
483                     }
484 
485                     // Wait for promotion
486                     try
487                     {
488                         promotionLock.wait( keepAliveTime );
489                     }
490                     catch( InterruptedException e )
491                     {
492                     }
493 
494                     // Update currentTime for the next iteration
495                     currentTime = System.currentTimeMillis();
496                 }
497 
498                 boolean timeToLead = this == leader && !shuttingDown;
499 
500                 if( !timeToLead )
501                 {
502                     // time to die
503                     synchronized( followers )
504                     {
505                         followers.remove( this );
506                     }
507 
508                     // Mark as dead explicitly when we've got promotionLock.
509                     dead = true;
510                 }
511 
512                 return timeToLead;
513             }
514         }
515 
516         private void giveUpLead()
517         {
518             final Stack followers = BaseThreadPool.this.followers;
519             Worker worker;
520             do
521             {
522                 synchronized( followers )
523                 {
524                     worker = ( Worker ) followers.pop();
525                 }
526 
527                 if( worker == null )
528                 {
529                     // Increase the number of threads if we
530                     // are not shutting down and we can increase the number.
531                     if( !shuttingDown
532                         && getPoolSize() < getMaximumPoolSize() )
533                     {
534                         worker = new Worker();
535                         worker.lead();
536                         worker.start();
537                     }
538 
539                     // This loop should end because:
540                     // 1) lead() is called already,
541                     // 2) or it is shutting down and there's no more threads left.
542                     break;
543                 }
544             }
545             while( !worker.lead() );
546         }
547     }
548 }