EMMA Coverage Report (generated Fri Oct 21 16:16:13 KST 2005)
[all classes][org.apache.mina.util]

COVERAGE SUMMARY FOR SOURCE FILE [BaseThreadPool.java]

nameclass, %method, %block, %line, %
BaseThreadPool.java75%  (3/4)83%  (25/30)81%  (636/784)85%  (184.5/218)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class BaseThreadPool100% (1/1)83%  (15/18)80%  (297/373)83%  (85.6/103)
getThreadNamePrefix (): String 0%   (0/1)0%   (0/3)0%   (0/1)
setKeepAliveTime (int): void 0%   (0/1)0%   (0/4)0%   (0/2)
setMaximumPoolSize (int): void 0%   (0/1)0%   (0/10)0%   (0/4)
getPoolSize (): int 100% (1/1)67%  (10/15)67%  (2/3)
releaseThreadId (int): void 100% (1/1)74%  (14/19)93%  (3.7/4)
decreasePoolSize (): void 100% (1/1)75%  (15/20)93%  (3.7/4)
increasePoolSize (): void 100% (1/1)75%  (15/20)93%  (3.7/4)
removeSessionBuffer (BaseThreadPool$SessionBuffer): void 100% (1/1)78%  (18/23)96%  (5.8/6)
BaseThreadPool (String): void 100% (1/1)82%  (45/55)87%  (13/15)
acquireThreadId (): int 100% (1/1)82%  (23/28)83%  (5/6)
fireEvent (Object, Session, EventType, Object): void 100% (1/1)83%  (49/59)95%  (13.2/14)
getSessionBuffer (Session): BaseThreadPool$SessionBuffer 100% (1/1)88%  (37/42)94%  (9.4/10)
stop (): void 100% (1/1)91%  (30/33)82%  (14/17)
start (): void 100% (1/1)96%  (24/25)88%  (7/8)
<static initializer> 100% (1/1)100% (7/7)100% (2/2)
fetchSessionBuffer (Queue): BaseThreadPool$SessionBuffer 100% (1/1)100% (4/4)100% (1/1)
getKeepAliveTime (): int 100% (1/1)100% (3/3)100% (1/1)
getMaximumPoolSize (): int 100% (1/1)100% (3/3)100% (1/1)
     
class BaseThreadPool$10%   (0/1)100% (0/0)100% (0/0)100% (0/0)
     
class BaseThreadPool$SessionBuffer100% (1/1)33%  (1/3)65%  (11/17)67%  (4/6)
getEventQueue (): Queue 0%   (0/1)0%   (0/3)0%   (0/1)
getSession (): Session 0%   (0/1)0%   (0/3)0%   (0/1)
BaseThreadPool$SessionBuffer (Session): void 100% (1/1)100% (11/11)100% (4/4)
     
class BaseThreadPool$Worker100% (1/1)100% (9/9)83%  (328/394)87%  (94.9/109)
lead (): boolean 100% (1/1)71%  (22/31)83%  (6.7/8)
follow (): void 100% (1/1)75%  (30/40)89%  (8/9)
fetchBuffer (): BaseThreadPool$SessionBuffer 100% (1/1)76%  (31/41)75%  (9/12)
releaseBuffer (BaseThreadPool$SessionBuffer): void 100% (1/1)79%  (31/39)85%  (9.4/11)
waitForPromotion (): boolean 100% (1/1)80%  (77/96)77%  (20.7/27)
processEvents (BaseThreadPool$SessionBuffer): void 100% (1/1)87%  (34/39)94%  (9.4/10)
giveUpLead (): void 100% (1/1)90%  (44/49)97%  (11.7/12)
BaseThreadPool$Worker (BaseThreadPool): void 100% (1/1)100% (31/31)100% (7/7)
run (): void 100% (1/1)100% (28/28)100% (13/13)

1/*
2 *   @(#) $Id: BaseThreadPool.java 327113 2005-10-21 06:59:15Z trustin $
3 *
4 *   Copyright 2004 The Apache Software Foundation
5 *
6 *   Licensed under the Apache License, Version 2.0 (the "License");
7 *   you may not use this file except in compliance with the License.
8 *   You may obtain a copy of the License at
9 *
10 *       http://www.apache.org/licenses/LICENSE-2.0
11 *
12 *   Unless required by applicable law or agreed to in writing, software
13 *   distributed under the License is distributed on an "AS IS" BASIS,
14 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 *   See the License for the specific language governing permissions and
16 *   limitations under the License.
17 *
18 */
19package org.apache.mina.util;
20 
21import java.util.HashSet;
22import java.util.IdentityHashMap;
23import java.util.Map;
24import java.util.Set;
25 
26import org.apache.mina.common.Session;
27 
28/**
29 * A base implementation of Thread-pooling filters.
30 * This filter forwards events to its thread pool.  This is an implementation of
31 * <a href="http://deuce.doc.wustl.edu/doc/pspdfs/lf.pdf">Leader/Followers
32 * thread pool</a> by Douglas C. Schmidt et al.
33 * 
34 * @author The Apache Directory Project (dev@directory.apache.org)
35 * @version $Rev: 327113 $, $Date: 2005-10-21 15:59:15 +0900 $
36 */
37public abstract class BaseThreadPool implements ThreadPool
38{
39    /**
40     * Default maximum size of thread pool (2G).
41     */
42    public static final int DEFAULT_MAXIMUM_POOL_SIZE = Integer.MAX_VALUE;
43 
44    /**
45     * Default keep-alive time of thread pool (1 min).
46     */
47    public static final int DEFAULT_KEEP_ALIVE_TIME = 60 * 1000;
48 
49    /**
50     * A queue which contains {@link Integer}s which represents reusable
51     * thread IDs.  {@link Worker} first checks this queue and then
52     * uses {@link #threadId} when no reusable thread ID is available.
53     */
54    private static final Queue threadIdReuseQueue = new Queue();
55    private static int threadId = 0;
56    
57    private static int acquireThreadId()
58    {
59        synchronized( threadIdReuseQueue )
60        {
61            Integer id = ( Integer ) threadIdReuseQueue.pop();
62            if( id == null )
63            {
64                return ++ threadId;
65            }
66            else
67            {
68                return id.intValue();
69            }
70        }
71    }
72    
73    private static void releaseThreadId( int id )
74    {
75        synchronized( threadIdReuseQueue )
76        {
77            threadIdReuseQueue.push( new Integer( id ) );
78        }
79    }
80 
81    private final String threadNamePrefix;
82    private final Map buffers = new IdentityHashMap();
83    private final Stack followers = new Stack();
84    private final BlockingQueue unfetchedSessionBuffers = new BlockingQueue();
85    private final Set allSessionBuffers = new HashSet();
86 
87    private Worker leader;
88 
89    private int maximumPoolSize = DEFAULT_MAXIMUM_POOL_SIZE;
90    private int keepAliveTime = DEFAULT_KEEP_ALIVE_TIME;
91 
92    private boolean started;
93    private boolean shuttingDown;
94 
95    private int poolSize;
96    private final Object poolSizeLock = new Object();
97 
98    /**
99     * Creates a new instance with default thread pool settings.
100     * You'll have to invoke {@link #start()} method to start threads actually.
101     *
102     * @param threadNamePrefix the prefix of the thread names this pool will create.
103     */
104    public BaseThreadPool( String threadNamePrefix )
105    {
106        if( threadNamePrefix == null )
107        {
108            throw new NullPointerException( "threadNamePrefix" );
109        }
110        threadNamePrefix = threadNamePrefix.trim();
111        if( threadNamePrefix.length() == 0 )
112        {
113            throw new IllegalArgumentException( "threadNamePrefix is empty." );
114        }
115        this.threadNamePrefix = threadNamePrefix;
116    }
117    
118    public String getThreadNamePrefix()
119    {
120        return threadNamePrefix;
121    }
122 
123    public int getPoolSize()
124    {
125        synchronized( poolSizeLock )
126        {
127            return poolSize;
128        }
129    }
130 
131    public int getMaximumPoolSize()
132    {
133        return maximumPoolSize;
134    }
135 
136    public int getKeepAliveTime()
137    {
138        return keepAliveTime;
139    }
140 
141    public void setMaximumPoolSize( int maximumPoolSize )
142    {
143        if( maximumPoolSize <= 0 )
144            throw new IllegalArgumentException();
145        this.maximumPoolSize = maximumPoolSize;
146    }
147 
148    public void setKeepAliveTime( int keepAliveTime )
149    {
150        this.keepAliveTime = keepAliveTime;
151    }
152 
153    public synchronized void start()
154    {
155        if( started )
156            return;
157 
158        shuttingDown = false;
159 
160        leader = new Worker();
161        leader.start();
162        leader.lead();
163 
164        started = true;
165    }
166 
167    public synchronized void stop()
168    {
169        if( !started )
170            return;
171 
172        shuttingDown = true;
173        Worker lastLeader = null;
174        for( ;; )
175        {
176            Worker leader = this.leader;
177            if( lastLeader == leader )
178                break;
179 
180            while( leader.isAlive() )
181            {
182                leader.interrupt();
183                try
184                {
185                    leader.join();
186                }
187                catch( InterruptedException e )
188                {
189                }
190            }
191 
192            lastLeader = leader;
193        }
194 
195        started = false;
196    }
197 
198    private void increasePoolSize()
199    {
200        synchronized( poolSizeLock )
201        {
202            poolSize++;
203        }
204    }
205 
206    private void decreasePoolSize()
207    {
208        synchronized( poolSizeLock )
209        {
210            poolSize--;
211        }
212    }
213 
214    protected void fireEvent( Object nextFilter, Session session,
215                              EventType type, Object data )
216    {
217        final BlockingQueue unfetchedSessionBuffers = this.unfetchedSessionBuffers;
218        final Set allSessionBuffers = this.allSessionBuffers;
219        final Event event = new Event( type, nextFilter, data );
220 
221        synchronized( unfetchedSessionBuffers )
222        {
223            final SessionBuffer buf = getSessionBuffer( session );
224            final Queue eventQueue = buf.eventQueue;
225 
226            synchronized( buf )
227            {
228                eventQueue.push( event );
229            }
230 
231            if( !allSessionBuffers.contains( buf ) )
232            {
233                allSessionBuffers.add( buf );
234                unfetchedSessionBuffers.push( buf );
235            }
236        }
237    }
238 
239    /**
240     * Implement this method to forward events to <tt>nextFilter</tt>.
241     */
242    protected abstract void processEvent( Object nextFilter, Session session,
243                                          EventType type, Object data );
244 
245    
246    /**
247     * Implement this method to fetch (or pop) a {@link SessionBuffer} from
248     * the given <tt>unfetchedSessionBuffers</tt>.  The default implementation
249     * simply pops the buffer from it.  You could prioritize the fetch order.
250     * 
251     * @return A non-null {@link SessionBuffer}
252     */
253    protected SessionBuffer fetchSessionBuffer( Queue unfetchedSessionBuffers )
254    {
255        return ( SessionBuffer ) unfetchedSessionBuffers.pop();
256    }
257 
258    private SessionBuffer getSessionBuffer( Session session )
259    {
260        final Map buffers = this.buffers;
261        SessionBuffer buf = ( SessionBuffer ) buffers.get( session );
262        if( buf == null )
263        {
264            synchronized( buffers )
265            {
266                buf = ( SessionBuffer ) buffers.get( session );
267                if( buf == null )
268                {
269                    buf = new SessionBuffer( session );
270                    buffers.put( session, buf );
271                }
272            }
273        }
274        return buf;
275    }
276 
277    private void removeSessionBuffer( SessionBuffer buf )
278    {
279        final Map buffers = this.buffers;
280        final Session session = buf.session;
281        synchronized( buffers )
282        {
283            buffers.remove( session );
284        }
285    }
286 
287    protected static class SessionBuffer
288    {
289        private final Session session;
290 
291        private final Queue eventQueue = new Queue();
292 
293        private SessionBuffer( Session session )
294        {
295            this.session = session;
296        }
297        
298        public Session getSession()
299        {
300            return session;
301        }
302        
303        public Queue getEventQueue()
304        {
305            return eventQueue;
306        }
307    }
308 
309    private class Worker extends Thread
310    {
311        private final int id;
312        private final Object promotionLock = new Object();
313        private boolean dead;
314 
315        private Worker()
316        {
317            int id = acquireThreadId();
318            this.id = id;
319            this.setName( threadNamePrefix + '-' + id );
320            increasePoolSize();
321        }
322 
323        public boolean lead()
324        {
325            final Object promotionLock = this.promotionLock;
326            synchronized( promotionLock )
327            {
328                if( dead )
329                {
330                    return false;
331                }
332 
333                leader = this;
334                promotionLock.notify();
335            }
336            
337            return true;
338        }
339 
340        public void run()
341        {
342            for( ;; )
343            {
344                if( !waitForPromotion() )
345                    break;
346 
347                SessionBuffer buf = fetchBuffer();
348                giveUpLead();
349 
350                if( buf == null )
351                {
352                    break;
353                }
354 
355                processEvents( buf );
356                follow();
357                releaseBuffer( buf );
358            }
359 
360            decreasePoolSize();
361            releaseThreadId( id );
362        }
363 
364        private SessionBuffer fetchBuffer()
365        {
366            BlockingQueue unfetchedSessionBuffers = BaseThreadPool.this.unfetchedSessionBuffers;
367            synchronized( unfetchedSessionBuffers )
368            {
369                for( ;; )
370                {
371                    if( shuttingDown )
372                    {
373                        return null;
374                    }
375 
376                    try
377                    {
378                        unfetchedSessionBuffers.waitForNewItem();
379                    }
380                    catch( InterruptedException e )
381                    {
382                        if( shuttingDown )
383                        {
384                            return null;
385                        }
386                        else
387                        {
388                            continue;
389                        }
390                    }
391 
392                    return BaseThreadPool.this.fetchSessionBuffer( unfetchedSessionBuffers );
393                }
394            }
395        }
396 
397        private void processEvents( SessionBuffer buf )
398        {
399            final Session session = buf.session;
400            final Queue eventQueue = buf.eventQueue;
401            for( ;; )
402            {
403                Event event;
404                synchronized( buf )
405                {
406                    event = ( Event ) eventQueue.pop();
407                    if( event == null )
408                        break;
409                }
410                processEvent( event.getNextFilter(), session,
411                              event.getType(), event.getData() );
412            }
413        }
414 
415        private void follow()
416        {
417            final Object promotionLock = this.promotionLock;
418            final Stack followers = BaseThreadPool.this.followers;
419            synchronized( promotionLock )
420            {
421                if( this != leader )
422                {
423                    synchronized( followers )
424                    {
425                        followers.push( this );
426                    }
427                }
428            }
429        }
430 
431        private void releaseBuffer( SessionBuffer buf )
432        {
433            final BlockingQueue unfetchedSessionBuffers = BaseThreadPool.this.unfetchedSessionBuffers;
434            final Set allSessionBuffers = BaseThreadPool.this.allSessionBuffers;
435            final Queue eventQueue = buf.eventQueue;
436 
437            synchronized( unfetchedSessionBuffers )
438            {
439                if( eventQueue.isEmpty() )
440                {
441                    allSessionBuffers.remove( buf );
442                    removeSessionBuffer( buf );
443                }
444                else
445                {
446                    unfetchedSessionBuffers.push( buf );
447                }
448            }
449        }
450 
451        private boolean waitForPromotion()
452        {
453            final Object promotionLock = this.promotionLock;
454 
455            final long startTime = System.currentTimeMillis();
456            long currentTime = startTime;
457            
458            synchronized( promotionLock )
459            {
460                while( this != leader )
461                {
462                    // Calculate remaining keep-alive time
463                    int keepAliveTime = getKeepAliveTime();
464                    if( keepAliveTime > 0 )
465                    {
466                        keepAliveTime -= ( currentTime - startTime );
467                    }
468                    else
469                    {
470                        keepAliveTime = Integer.MAX_VALUE;
471                    }
472                    
473                    // Break the loop if there's no remaining keep-alive time.
474                    if( keepAliveTime <= 0 )
475                    {
476                        break;
477                    }
478 
479                    // Wait for promotion
480                    try
481                    {
482                        promotionLock.wait( keepAliveTime );
483                    }
484                    catch( InterruptedException e )
485                    {
486                        if( shuttingDown )
487                        {
488                            break;
489                        }
490                    }
491 
492                    // Update currentTime for the next iteration
493                    currentTime = System.currentTimeMillis();
494                }
495 
496                boolean timeToLead = this == leader && !shuttingDown;
497 
498                if( !timeToLead )
499                {
500                    // time to die
501                    synchronized( followers )
502                    {
503                        followers.remove( this );
504                    }
505 
506                    // Mark as dead explicitly when we've got promotionLock.
507                    dead = true;
508                }
509 
510                return timeToLead;
511            }
512        }
513 
514        private void giveUpLead()
515        {
516            final Stack followers = BaseThreadPool.this.followers;
517            Worker worker;
518            do
519            {
520                synchronized( followers )
521                {
522                    worker = ( Worker ) followers.pop();
523                }
524 
525                if( worker == null )
526                {
527                    // Increase the number of threads if we
528                    // are not shutting down and we can increase the number.
529                    if( !shuttingDown
530                        && getPoolSize() < getMaximumPoolSize() )
531                    {
532                        worker = new Worker();
533                        worker.lead();
534                        worker.start();
535                    }
536 
537                    // This loop should end because:
538                    // 1) lead() is called already,
539                    // 2) or it is shutting down and there's no more threads left.
540                    break;
541                }
542            }
543            while( !worker.lead() );
544        }
545    }
546}

[all classes][org.apache.mina.util]
EMMA 2.0.4217 (C) Vladimir Roubtsov