View Javadoc

1   /*
2    *   @(#) $Id: BaseThreadPool.java 327113 2005-10-21 06:59:15Z trustin $
3    *
4    *   Copyright 2004 The Apache Software Foundation
5    *
6    *   Licensed under the Apache License, Version 2.0 (the "License");
7    *   you may not use this file except in compliance with the License.
8    *   You may obtain a copy of the License at
9    *
10   *       http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *   Unless required by applicable law or agreed to in writing, software
13   *   distributed under the License is distributed on an "AS IS" BASIS,
14   *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   *   See the License for the specific language governing permissions and
16   *   limitations under the License.
17   *
18   */
19  package org.apache.mina.util;
20  
21  import java.util.HashSet;
22  import java.util.IdentityHashMap;
23  import java.util.Map;
24  import java.util.Set;
25  
26  import org.apache.mina.common.Session;
27  
28  /***
29   * A base implementation of Thread-pooling filters.
30   * This filter forwards events to its thread pool.  This is an implementation of
31   * <a href="http://deuce.doc.wustl.edu/doc/pspdfs/lf.pdf">Leader/Followers
32   * thread pool</a> by Douglas C. Schmidt et al.
33   * 
34   * @author The Apache Directory Project (dev@directory.apache.org)
35   * @version $Rev: 327113 $, $Date: 2005-10-21 15:59:15 +0900 $
36   */
37  public abstract class BaseThreadPool implements ThreadPool
38  {
39      /***
40       * Default maximum size of thread pool (2G).
41       */
42      public static final int DEFAULT_MAXIMUM_POOL_SIZE = Integer.MAX_VALUE;
43  
44      /***
45       * Default keep-alive time of thread pool (1 min).
46       */
47      public static final int DEFAULT_KEEP_ALIVE_TIME = 60 * 1000;
48  
49      /***
50       * A queue which contains {@link Integer}s which represents reusable
51       * thread IDs.  {@link Worker} first checks this queue and then
52       * uses {@link #threadId} when no reusable thread ID is available.
53       */
54      private static final Queue threadIdReuseQueue = new Queue();
55      private static int threadId = 0;
56      
57      private static int acquireThreadId()
58      {
59          synchronized( threadIdReuseQueue )
60          {
61              Integer id = ( Integer ) threadIdReuseQueue.pop();
62              if( id == null )
63              {
64                  return ++ threadId;
65              }
66              else
67              {
68                  return id.intValue();
69              }
70          }
71      }
72      
73      private static void releaseThreadId( int id )
74      {
75          synchronized( threadIdReuseQueue )
76          {
77              threadIdReuseQueue.push( new Integer( id ) );
78          }
79      }
80  
81      private final String threadNamePrefix;
82      private final Map buffers = new IdentityHashMap();
83      private final Stack followers = new Stack();
84      private final BlockingQueue unfetchedSessionBuffers = new BlockingQueue();
85      private final Set allSessionBuffers = new HashSet();
86  
87      private Worker leader;
88  
89      private int maximumPoolSize = DEFAULT_MAXIMUM_POOL_SIZE;
90      private int keepAliveTime = DEFAULT_KEEP_ALIVE_TIME;
91  
92      private boolean started;
93      private boolean shuttingDown;
94  
95      private int poolSize;
96      private final Object poolSizeLock = new Object();
97  
98      /***
99       * Creates a new instance with default thread pool settings.
100      * You'll have to invoke {@link #start()} method to start threads actually.
101      *
102      * @param threadNamePrefix the prefix of the thread names this pool will create.
103      */
104     public BaseThreadPool( String threadNamePrefix )
105     {
106         if( threadNamePrefix == null )
107         {
108             throw new NullPointerException( "threadNamePrefix" );
109         }
110         threadNamePrefix = threadNamePrefix.trim();
111         if( threadNamePrefix.length() == 0 )
112         {
113             throw new IllegalArgumentException( "threadNamePrefix is empty." );
114         }
115         this.threadNamePrefix = threadNamePrefix;
116     }
117     
118     public String getThreadNamePrefix()
119     {
120         return threadNamePrefix;
121     }
122 
123     public int getPoolSize()
124     {
125         synchronized( poolSizeLock )
126         {
127             return poolSize;
128         }
129     }
130 
131     public int getMaximumPoolSize()
132     {
133         return maximumPoolSize;
134     }
135 
136     public int getKeepAliveTime()
137     {
138         return keepAliveTime;
139     }
140 
141     public void setMaximumPoolSize( int maximumPoolSize )
142     {
143         if( maximumPoolSize <= 0 )
144             throw new IllegalArgumentException();
145         this.maximumPoolSize = maximumPoolSize;
146     }
147 
148     public void setKeepAliveTime( int keepAliveTime )
149     {
150         this.keepAliveTime = keepAliveTime;
151     }
152 
153     public synchronized void start()
154     {
155         if( started )
156             return;
157 
158         shuttingDown = false;
159 
160         leader = new Worker();
161         leader.start();
162         leader.lead();
163 
164         started = true;
165     }
166 
167     public synchronized void stop()
168     {
169         if( !started )
170             return;
171 
172         shuttingDown = true;
173         Worker lastLeader = null;
174         for( ;; )
175         {
176             Worker leader = this.leader;
177             if( lastLeader == leader )
178                 break;
179 
180             while( leader.isAlive() )
181             {
182                 leader.interrupt();
183                 try
184                 {
185                     leader.join();
186                 }
187                 catch( InterruptedException e )
188                 {
189                 }
190             }
191 
192             lastLeader = leader;
193         }
194 
195         started = false;
196     }
197 
198     private void increasePoolSize()
199     {
200         synchronized( poolSizeLock )
201         {
202             poolSize++;
203         }
204     }
205 
206     private void decreasePoolSize()
207     {
208         synchronized( poolSizeLock )
209         {
210             poolSize--;
211         }
212     }
213 
214     protected void fireEvent( Object nextFilter, Session session,
215                               EventType type, Object data )
216     {
217         final BlockingQueue unfetchedSessionBuffers = this.unfetchedSessionBuffers;
218         final Set allSessionBuffers = this.allSessionBuffers;
219         final Event event = new Event( type, nextFilter, data );
220 
221         synchronized( unfetchedSessionBuffers )
222         {
223             final SessionBuffer buf = getSessionBuffer( session );
224             final Queue eventQueue = buf.eventQueue;
225 
226             synchronized( buf )
227             {
228                 eventQueue.push( event );
229             }
230 
231             if( !allSessionBuffers.contains( buf ) )
232             {
233                 allSessionBuffers.add( buf );
234                 unfetchedSessionBuffers.push( buf );
235             }
236         }
237     }
238 
239     /***
240      * Implement this method to forward events to <tt>nextFilter</tt>.
241      */
242     protected abstract void processEvent( Object nextFilter, Session session,
243                                           EventType type, Object data );
244 
245     
246     /***
247      * Implement this method to fetch (or pop) a {@link SessionBuffer} from
248      * the given <tt>unfetchedSessionBuffers</tt>.  The default implementation
249      * simply pops the buffer from it.  You could prioritize the fetch order.
250      * 
251      * @return A non-null {@link SessionBuffer}
252      */
253     protected SessionBuffer fetchSessionBuffer( Queue unfetchedSessionBuffers )
254     {
255         return ( SessionBuffer ) unfetchedSessionBuffers.pop();
256     }
257 
258     private SessionBuffer getSessionBuffer( Session session )
259     {
260         final Map buffers = this.buffers;
261         SessionBuffer buf = ( SessionBuffer ) buffers.get( session );
262         if( buf == null )
263         {
264             synchronized( buffers )
265             {
266                 buf = ( SessionBuffer ) buffers.get( session );
267                 if( buf == null )
268                 {
269                     buf = new SessionBuffer( session );
270                     buffers.put( session, buf );
271                 }
272             }
273         }
274         return buf;
275     }
276 
277     private void removeSessionBuffer( SessionBuffer buf )
278     {
279         final Map buffers = this.buffers;
280         final Session session = buf.session;
281         synchronized( buffers )
282         {
283             buffers.remove( session );
284         }
285     }
286 
287     protected static class SessionBuffer
288     {
289         private final Session session;
290 
291         private final Queue eventQueue = new Queue();
292 
293         private SessionBuffer( Session session )
294         {
295             this.session = session;
296         }
297         
298         public Session getSession()
299         {
300             return session;
301         }
302         
303         public Queue getEventQueue()
304         {
305             return eventQueue;
306         }
307     }
308 
309     private class Worker extends Thread
310     {
311         private final int id;
312         private final Object promotionLock = new Object();
313         private boolean dead;
314 
315         private Worker()
316         {
317             int id = acquireThreadId();
318             this.id = id;
319             this.setName( threadNamePrefix + '-' + id );
320             increasePoolSize();
321         }
322 
323         public boolean lead()
324         {
325             final Object promotionLock = this.promotionLock;
326             synchronized( promotionLock )
327             {
328                 if( dead )
329                 {
330                     return false;
331                 }
332 
333                 leader = this;
334                 promotionLock.notify();
335             }
336             
337             return true;
338         }
339 
340         public void run()
341         {
342             for( ;; )
343             {
344                 if( !waitForPromotion() )
345                     break;
346 
347                 SessionBuffer buf = fetchBuffer();
348                 giveUpLead();
349 
350                 if( buf == null )
351                 {
352                     break;
353                 }
354 
355                 processEvents( buf );
356                 follow();
357                 releaseBuffer( buf );
358             }
359 
360             decreasePoolSize();
361             releaseThreadId( id );
362         }
363 
364         private SessionBuffer fetchBuffer()
365         {
366             BlockingQueue unfetchedSessionBuffers = BaseThreadPool.this.unfetchedSessionBuffers;
367             synchronized( unfetchedSessionBuffers )
368             {
369                 for( ;; )
370                 {
371                     if( shuttingDown )
372                     {
373                         return null;
374                     }
375 
376                     try
377                     {
378                         unfetchedSessionBuffers.waitForNewItem();
379                     }
380                     catch( InterruptedException e )
381                     {
382                         if( shuttingDown )
383                         {
384                             return null;
385                         }
386                         else
387                         {
388                             continue;
389                         }
390                     }
391 
392                     return BaseThreadPool.this.fetchSessionBuffer( unfetchedSessionBuffers );
393                 }
394             }
395         }
396 
397         private void processEvents( SessionBuffer buf )
398         {
399             final Session session = buf.session;
400             final Queue eventQueue = buf.eventQueue;
401             for( ;; )
402             {
403                 Event event;
404                 synchronized( buf )
405                 {
406                     event = ( Event ) eventQueue.pop();
407                     if( event == null )
408                         break;
409                 }
410                 processEvent( event.getNextFilter(), session,
411                               event.getType(), event.getData() );
412             }
413         }
414 
415         private void follow()
416         {
417             final Object promotionLock = this.promotionLock;
418             final Stack followers = BaseThreadPool.this.followers;
419             synchronized( promotionLock )
420             {
421                 if( this != leader )
422                 {
423                     synchronized( followers )
424                     {
425                         followers.push( this );
426                     }
427                 }
428             }
429         }
430 
431         private void releaseBuffer( SessionBuffer buf )
432         {
433             final BlockingQueue unfetchedSessionBuffers = BaseThreadPool.this.unfetchedSessionBuffers;
434             final Set allSessionBuffers = BaseThreadPool.this.allSessionBuffers;
435             final Queue eventQueue = buf.eventQueue;
436 
437             synchronized( unfetchedSessionBuffers )
438             {
439                 if( eventQueue.isEmpty() )
440                 {
441                     allSessionBuffers.remove( buf );
442                     removeSessionBuffer( buf );
443                 }
444                 else
445                 {
446                     unfetchedSessionBuffers.push( buf );
447                 }
448             }
449         }
450 
451         private boolean waitForPromotion()
452         {
453             final Object promotionLock = this.promotionLock;
454 
455             final long startTime = System.currentTimeMillis();
456             long currentTime = startTime;
457             
458             synchronized( promotionLock )
459             {
460                 while( this != leader )
461                 {
462                     // Calculate remaining keep-alive time
463                     int keepAliveTime = getKeepAliveTime();
464                     if( keepAliveTime > 0 )
465                     {
466                         keepAliveTime -= ( currentTime - startTime );
467                     }
468                     else
469                     {
470                         keepAliveTime = Integer.MAX_VALUE;
471                     }
472                     
473                     // Break the loop if there's no remaining keep-alive time.
474                     if( keepAliveTime <= 0 )
475                     {
476                         break;
477                     }
478 
479                     // Wait for promotion
480                     try
481                     {
482                         promotionLock.wait( keepAliveTime );
483                     }
484                     catch( InterruptedException e )
485                     {
486                         if( shuttingDown )
487                         {
488                             break;
489                         }
490                     }
491 
492                     // Update currentTime for the next iteration
493                     currentTime = System.currentTimeMillis();
494                 }
495 
496                 boolean timeToLead = this == leader && !shuttingDown;
497 
498                 if( !timeToLead )
499                 {
500                     // time to die
501                     synchronized( followers )
502                     {
503                         followers.remove( this );
504                     }
505 
506                     // Mark as dead explicitly when we've got promotionLock.
507                     dead = true;
508                 }
509 
510                 return timeToLead;
511             }
512         }
513 
514         private void giveUpLead()
515         {
516             final Stack followers = BaseThreadPool.this.followers;
517             Worker worker;
518             do
519             {
520                 synchronized( followers )
521                 {
522                     worker = ( Worker ) followers.pop();
523                 }
524 
525                 if( worker == null )
526                 {
527                     // Increase the number of threads if we
528                     // are not shutting down and we can increase the number.
529                     if( !shuttingDown
530                         && getPoolSize() < getMaximumPoolSize() )
531                     {
532                         worker = new Worker();
533                         worker.lead();
534                         worker.start();
535                     }
536 
537                     // This loop should end because:
538                     // 1) lead() is called already,
539                     // 2) or it is shutting down and there's no more threads left.
540                     break;
541                 }
542             }
543             while( !worker.lead() );
544         }
545     }
546 }