View Javadoc

1   /*
2    *   @(#) $Id: BaseThreadPool.java 264677 2005-08-30 02:44:35Z trustin $
3    *
4    *   Copyright 2004 The Apache Software Foundation
5    *
6    *   Licensed under the Apache License, Version 2.0 (the "License");
7    *   you may not use this file except in compliance with the License.
8    *   You may obtain a copy of the License at
9    *
10   *       http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *   Unless required by applicable law or agreed to in writing, software
13   *   distributed under the License is distributed on an "AS IS" BASIS,
14   *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   *   See the License for the specific language governing permissions and
16   *   limitations under the License.
17   *
18   */
19  package org.apache.mina.util;
20  
21  import java.util.HashSet;
22  import java.util.IdentityHashMap;
23  import java.util.Iterator;
24  import java.util.Map;
25  import java.util.Set;
26  
27  import org.apache.mina.common.Session;
28  
29  /***
30   * A base implementation of Thread-pooling filters.
31   * This filter forwards events to its thread pool.  This is an implementation of
32   * <a href="http://deuce.doc.wustl.edu/doc/pspdfs/lf.pdf">Leader/Followers
33   * thread pool</a> by Douglas C. Schmidt et al.
34   * 
35   * @author Trustin Lee (trustin@apache.org)
36   * @version $Rev: 264677 $, $Date: 2005-08-30 11:44:35 +0900 $
37   */
38  public abstract class BaseThreadPool implements ThreadPool
39  {
40      /***
41       * Default maximum size of thread pool (2G).
42       */
43      public static final int DEFAULT_MAXIMUM_POOL_SIZE = Integer.MAX_VALUE;
44  
45      /***
46       * Default keep-alive time of thread pool (1 min).
47       */
48      public static final int DEFAULT_KEEP_ALIVE_TIME = 60 * 1000;
49  
50      /***
51       * A queue which contains {@link Integer}s which represents reusable
52       * thread IDs.  {@link Worker} first checks this queue and then
53       * uses {@link #threadId} when no reusable thread ID is available.
54       */
55      private static final Queue threadIdReuseQueue = new Queue();
56      private static int threadId = 0;
57      
58      private static int acquireThreadId()
59      {
60          synchronized( threadIdReuseQueue )
61          {
62              Integer id = ( Integer ) threadIdReuseQueue.pop();
63              if( id == null )
64              {
65                  return ++ threadId;
66              }
67              else
68              {
69                  return id.intValue();
70              }
71          }
72      }
73      
74      private static void releaseThreadId( int id )
75      {
76          synchronized( threadIdReuseQueue )
77          {
78              threadIdReuseQueue.push( new Integer( id ) );
79          }
80      }
81  
82      private final String threadNamePrefix;
83      private final Map buffers = new IdentityHashMap();
84      private final Stack followers = new Stack();
85      private final BlockingSet readySessionBuffers = new BlockingSet();
86      private final Set busySessionBuffers = new HashSet();
87  
88      private Worker leader;
89  
90      private int maximumPoolSize = DEFAULT_MAXIMUM_POOL_SIZE;
91      private int keepAliveTime = DEFAULT_KEEP_ALIVE_TIME;
92  
93      private boolean started;
94      private boolean shuttingDown;
95  
96      private int poolSize;
97      private final Object poolSizeLock = new Object();
98  
99      /***
100      * Creates a new instance with default thread pool settings.
101      * You'll have to invoke {@link #start()} method to start threads actually.
102      *
103      * @param threadNamePrefix the prefix of the thread names this pool will create.
104      */
105     public BaseThreadPool( String threadNamePrefix )
106     {
107         if( threadNamePrefix == null )
108         {
109             throw new NullPointerException( "threadNamePrefix" );
110         }
111         threadNamePrefix = threadNamePrefix.trim();
112         if( threadNamePrefix.length() == 0 )
113         {
114             throw new IllegalArgumentException( "threadNamePrefix is empty." );
115         }
116         this.threadNamePrefix = threadNamePrefix;
117     }
118     
119     public String getThreadNamePrefix()
120     {
121         return threadNamePrefix;
122     }
123 
124     public int getPoolSize()
125     {
126         synchronized( poolSizeLock )
127         {
128             return poolSize;
129         }
130     }
131 
132     public int getMaximumPoolSize()
133     {
134         return maximumPoolSize;
135     }
136 
137     public int getKeepAliveTime()
138     {
139         return keepAliveTime;
140     }
141 
142     public void setMaximumPoolSize( int maximumPoolSize )
143     {
144         if( maximumPoolSize <= 0 )
145             throw new IllegalArgumentException();
146         this.maximumPoolSize = maximumPoolSize;
147     }
148 
149     public void setKeepAliveTime( int keepAliveTime )
150     {
151         this.keepAliveTime = keepAliveTime;
152     }
153 
154     public synchronized void start()
155     {
156         if( started )
157             return;
158 
159         shuttingDown = false;
160 
161         leader = new Worker();
162         leader.start();
163         leader.lead();
164 
165         started = true;
166     }
167 
168     public synchronized void stop()
169     {
170         if( !started )
171             return;
172 
173         shuttingDown = true;
174         Worker lastLeader = null;
175         for( ;; )
176         {
177             Worker leader = this.leader;
178             if( lastLeader == leader )
179                 break;
180 
181             while( leader.isAlive() )
182             {
183                 leader.interrupt();
184                 try
185                 {
186                     leader.join();
187                 }
188                 catch( InterruptedException e )
189                 {
190                 }
191             }
192 
193             lastLeader = leader;
194         }
195 
196         started = false;
197     }
198 
199     private void increasePoolSize()
200     {
201         synchronized( poolSizeLock )
202         {
203             poolSize++;
204         }
205     }
206 
207     private void decreasePoolSize()
208     {
209         synchronized( poolSizeLock )
210         {
211             poolSize--;
212         }
213     }
214 
215     protected void fireEvent( Object nextFilter, Session session,
216                               EventType type, Object data )
217     {
218         final BlockingSet readySessionBuffers = this.readySessionBuffers;
219         final Set busySessionBuffers = this.busySessionBuffers;
220         final Event event = new Event( type, nextFilter, data );
221 
222         synchronized( readySessionBuffers )
223         {
224             final SessionBuffer buf = getSessionBuffer( session );
225             final Queue eventQueue = buf.eventQueue;
226 
227             synchronized( buf )
228             {
229                 eventQueue.push( event );
230             }
231 
232             if( !busySessionBuffers.contains( buf ) )
233             {
234                 busySessionBuffers.add( buf );
235                 readySessionBuffers.add( buf );
236             }
237         }
238     }
239 
240     /***
241      * Implement this method to forward events to <tt>nextFilter</tt>.
242      */
243     protected abstract void processEvent( Object nextFilter, Session session,
244                                           EventType type, Object data );
245 
246     private SessionBuffer getSessionBuffer( Session session )
247     {
248         final Map buffers = this.buffers;
249         SessionBuffer buf = ( SessionBuffer ) buffers.get( session );
250         if( buf == null )
251         {
252             synchronized( buffers )
253             {
254                 buf = ( SessionBuffer ) buffers.get( session );
255                 if( buf == null )
256                 {
257                     buf = new SessionBuffer( session );
258                     buffers.put( session, buf );
259                 }
260             }
261         }
262         return buf;
263     }
264 
265     private void removeSessionBuffer( SessionBuffer buf )
266     {
267         final Map buffers = this.buffers;
268         final Session session = buf.session;
269         synchronized( buffers )
270         {
271             buffers.remove( session );
272         }
273     }
274 
275     private static class SessionBuffer
276     {
277         private final Session session;
278 
279         private final Queue eventQueue = new Queue();
280 
281         private SessionBuffer( Session session )
282         {
283             this.session = session;
284         }
285     }
286 
287     private class Worker extends Thread
288     {
289         private final int id;
290         private final Object promotionLock = new Object();
291         private boolean dead;
292 
293         private Worker()
294         {
295             int id = acquireThreadId();
296             this.id = id;
297             this.setName( threadNamePrefix + '-' + id );
298             increasePoolSize();
299         }
300 
301         public boolean lead()
302         {
303             final Object promotionLock = this.promotionLock;
304             synchronized( promotionLock )
305             {
306                 if( dead )
307                 {
308                     return false;
309                 }
310 
311                 leader = this;
312                 promotionLock.notify();
313             }
314             
315             return true;
316         }
317 
318         public void run()
319         {
320             for( ;; )
321             {
322                 if( !waitForPromotion() )
323                     break;
324 
325                 SessionBuffer buf = fetchBuffer();
326                 giveUpLead();
327 
328                 if( buf == null )
329                 {
330                     break;
331                 }
332 
333                 processEvents( buf );
334                 follow();
335                 releaseBuffer( buf );
336             }
337 
338             decreasePoolSize();
339             releaseThreadId( id );
340         }
341 
342         private SessionBuffer fetchBuffer()
343         {
344             SessionBuffer buf;
345             BlockingSet readySessionBuffers = BaseThreadPool.this.readySessionBuffers;
346             synchronized( readySessionBuffers )
347             {
348                 for( ;; )
349                 {
350                     try
351                     {
352                         readySessionBuffers.waitForNewItem();
353                     }
354                     catch( InterruptedException e )
355                     {
356                         if( shuttingDown )
357                         {
358                             return null;
359                         }
360                         else
361                         {
362                             continue;
363                         }
364                     }
365 
366                     Iterator it = readySessionBuffers.iterator();
367                     while( it.hasNext() )
368                     {
369                         buf = ( SessionBuffer ) it.next();
370                         it.remove();
371                         if( buf != null && !buf.eventQueue.isEmpty() )
372                         {
373                             return buf;
374                         }
375                     }
376                 }
377             }
378         }
379 
380         private void processEvents( SessionBuffer buf )
381         {
382             final Session session = buf.session;
383             final Queue eventQueue = buf.eventQueue;
384             for( ;; )
385             {
386                 Event event;
387                 synchronized( buf )
388                 {
389                     event = ( Event ) eventQueue.pop();
390                     if( event == null )
391                         break;
392                 }
393                 processEvent( event.getNextFilter(), session,
394                               event.getType(), event.getData() );
395             }
396         }
397 
398         private void follow()
399         {
400             final Object promotionLock = this.promotionLock;
401             final Stack followers = BaseThreadPool.this.followers;
402             synchronized( promotionLock )
403             {
404                 if( this != leader )
405                 {
406                     synchronized( followers )
407                     {
408                         followers.push( this );
409                     }
410                 }
411             }
412         }
413 
414         private void releaseBuffer( SessionBuffer buf )
415         {
416             final BlockingSet readySessionBuffers = BaseThreadPool.this.readySessionBuffers;
417             final Set busySessionBuffers = BaseThreadPool.this.busySessionBuffers;
418             final Queue eventQueue = buf.eventQueue;
419 
420             synchronized( readySessionBuffers )
421             {
422                 if( eventQueue.isEmpty() )
423                 {
424                     busySessionBuffers.remove( buf );
425                     removeSessionBuffer( buf );
426                 }
427                 else
428                 {
429                     readySessionBuffers.add( buf );
430                 }
431             }
432         }
433 
434         private boolean waitForPromotion()
435         {
436             final Object promotionLock = this.promotionLock;
437 
438             final long startTime = System.currentTimeMillis();
439             long currentTime = startTime;
440             
441             synchronized( promotionLock )
442             {
443                 while( this != leader )
444                 {
445                     // Calculate remaining keep-alive time
446                     int keepAliveTime = getKeepAliveTime();
447                     if( keepAliveTime > 0 )
448                     {
449                         keepAliveTime -= ( currentTime - startTime );
450                     }
451                     else
452                     {
453                         keepAliveTime = Integer.MAX_VALUE;
454                     }
455                     
456                     // Break the loop if there's no remaining keep-alive time.
457                     if( keepAliveTime <= 0 )
458                     {
459                         break;
460                     }
461 
462                     // Wait for promotion
463                     try
464                     {
465                         promotionLock.wait( keepAliveTime );
466                     }
467                     catch( InterruptedException e )
468                     {
469                     }
470 
471                     // Update currentTime for the next iteration
472                     currentTime = System.currentTimeMillis();
473                 }
474 
475                 boolean timeToLead = this == leader;
476 
477                 if( !timeToLead )
478                 {
479                     // time to die
480                     synchronized( followers )
481                     {
482                         followers.remove( this );
483                     }
484 
485                     // Mark as dead explicitly when we've got promotionLock.
486                     dead = true;
487                 }
488 
489                 return timeToLead;
490             }
491         }
492 
493         private void giveUpLead()
494         {
495             final Stack followers = BaseThreadPool.this.followers;
496             Worker worker;
497             do
498             {
499                 synchronized( followers )
500                 {
501                     worker = ( Worker ) followers.pop();
502                 }
503 
504                 if( worker == null )
505                 {
506                     // Increase the number of threads if we
507                     // are not shutting down and we can increase the number.
508                     if( !shuttingDown
509                         && getPoolSize() < getMaximumPoolSize() )
510                     {
511                         worker = new Worker();
512                         worker.lead();
513                         worker.start();
514                     }
515 
516                     // This loop should end because:
517                     // 1) lead() is called already,
518                     // 2) or it is shutting down and there's no more threads left.
519                     break;
520                 }
521             }
522             while( !worker.lead() );
523         }
524     }
525 }