EMMA Coverage Report (generated Mon Jul 11 13:15:38 KST 2005)
[all classes][org.apache.mina.util]

COVERAGE SUMMARY FOR SOURCE FILE [BaseThreadPool.java]

nameclass, %method, %block, %line, %
BaseThreadPool.java75%  (3/4)92%  (22/24)81%  (550/680)86%  (164.7/191)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class BaseThreadPool100% (1/1)86%  (12/14)82%  (241/294)84%  (70.9/84)
setKeepAliveTime (int): void 0%   (0/1)0%   (0/4)0%   (0/2)
setMaximumPoolSize (int): void 0%   (0/1)0%   (0/10)0%   (0/4)
getPoolSize (): int 100% (1/1)67%  (10/15)67%  (2/3)
decreasePoolSize (): void 100% (1/1)75%  (15/20)93%  (3.7/4)
increasePoolSize (): void 100% (1/1)75%  (15/20)93%  (3.7/4)
removeSessionBuffer (BaseThreadPool$SessionBuffer): void 100% (1/1)78%  (18/23)96%  (5.8/6)
fireEvent (Object, Session, EventType, Object): void 100% (1/1)83%  (50/60)95%  (13.2/14)
getSessionBuffer (Session): BaseThreadPool$SessionBuffer 100% (1/1)88%  (37/42)94%  (9.4/10)
stop (): void 100% (1/1)91%  (30/33)82%  (14/17)
start (): void 100% (1/1)96%  (23/24)88%  (7/8)
<static initializer> 100% (1/1)100% (3/3)100% (1/1)
BaseThreadPool (): void 100% (1/1)100% (34/34)100% (9/9)
getKeepAliveTime (): int 100% (1/1)100% (3/3)100% (1/1)
getMaximumPoolSize (): int 100% (1/1)100% (3/3)100% (1/1)
     
class BaseThreadPool$10%   (0/1)100% (0/0)100% (0/0)100% (0/0)
     
class BaseThreadPool$Worker100% (1/1)100% (9/9)79%  (298/375)87%  (89.8/103)
waitForPromotion (): boolean 100% (1/1)58%  (38/66)64%  (10.9/17)
follow (): void 100% (1/1)75%  (30/40)89%  (8/9)
releaseBuffer (BaseThreadPool$SessionBuffer): void 100% (1/1)78%  (31/40)85%  (9.4/11)
lead (): void 100% (1/1)78%  (18/23)96%  (5.8/6)
fetchBuffer (): BaseThreadPool$SessionBuffer 100% (1/1)84%  (48/57)90%  (16.2/18)
giveUpLead (): void 100% (1/1)84%  (54/64)94%  (14.1/15)
processEvents (BaseThreadPool$SessionBuffer): void 100% (1/1)87%  (34/39)94%  (9.4/10)
run (): void 100% (1/1)96%  (24/25)92%  (11/12)
BaseThreadPool$Worker (BaseThreadPool): void 100% (1/1)100% (21/21)100% (5/5)
     
class BaseThreadPool$SessionBuffer100% (1/1)100% (1/1)100% (11/11)100% (4/4)
BaseThreadPool$SessionBuffer (Session): void 100% (1/1)100% (11/11)100% (4/4)

1/*
2 *   @(#) $Id: BaseThreadPool.java 210062 2005-07-11 03:52:38Z trustin $
3 *
4 *   Copyright 2004 The Apache Software Foundation
5 *
6 *   Licensed under the Apache License, Version 2.0 (the "License");
7 *   you may not use this file except in compliance with the License.
8 *   You may obtain a copy of the License at
9 *
10 *       http://www.apache.org/licenses/LICENSE-2.0
11 *
12 *   Unless required by applicable law or agreed to in writing, software
13 *   distributed under the License is distributed on an "AS IS" BASIS,
14 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 *   See the License for the specific language governing permissions and
16 *   limitations under the License.
17 *
18 */
19package org.apache.mina.util;
20 
21import java.util.HashSet;
22import java.util.IdentityHashMap;
23import java.util.Iterator;
24import java.util.Map;
25import java.util.Set;
26 
27import org.apache.mina.common.Session;
28 
29/**
30 * A base implementation of Thread-pooling filters.
31 * This filter forwards events to its thread pool.  This is an implementation of
32 * <a href="http://deuce.doc.wustl.edu/doc/pspdfs/lf.pdf">Leader/Followers
33 * thread pool</a> by Douglas C. Schmidt et al.
34 * 
35 * @author Trustin Lee (trustin@apache.org)
36 * @version $Rev: 210062 $, $Date: 2005-07-11 12:52:38 +0900 $
37 */
38public abstract class BaseThreadPool implements ThreadPool
39{
40    /**
41     * Default maximum size of thread pool (2G).
42     */
43    public static final int DEFAULT_MAXIMUM_POOL_SIZE = Integer.MAX_VALUE;
44 
45    /**
46     * Default keep-alive time of thread pool (1 min).
47     */
48    public static final int DEFAULT_KEEP_ALIVE_TIME = 60 * 1000;
49 
50    private static volatile int threadId = 0;
51 
52    private final Map buffers = new IdentityHashMap();
53 
54    private final Stack followers = new Stack();
55 
56    private final BlockingSet readySessionBuffers = new BlockingSet();
57 
58    private final Set busySessionBuffers = new HashSet();
59 
60    private Worker leader;
61 
62    private int maximumPoolSize = DEFAULT_MAXIMUM_POOL_SIZE;
63 
64    private int keepAliveTime = DEFAULT_KEEP_ALIVE_TIME;
65 
66    private boolean started;
67 
68    private boolean shuttingDown;
69 
70    private int poolSize;
71 
72    private final Object poolSizeLock = new Object();
73 
74    /**
75     * Creates a new instance with default thread pool settings.
76     * You'll have to invoke {@link #start()} method to start threads actually.
77     */
78    protected BaseThreadPool()
79    {
80    }
81 
82    public int getPoolSize()
83    {
84        synchronized( poolSizeLock )
85        {
86            return poolSize;
87        }
88    }
89 
90    public int getMaximumPoolSize()
91    {
92        return maximumPoolSize;
93    }
94 
95    public int getKeepAliveTime()
96    {
97        return keepAliveTime;
98    }
99 
100    public void setMaximumPoolSize( int maximumPoolSize )
101    {
102        if( maximumPoolSize <= 0 )
103            throw new IllegalArgumentException();
104        this.maximumPoolSize = maximumPoolSize;
105    }
106 
107    public void setKeepAliveTime( int keepAliveTime )
108    {
109        this.keepAliveTime = keepAliveTime;
110    }
111 
112    public synchronized void start()
113    {
114        if( started )
115            return;
116 
117        shuttingDown = false;
118 
119        leader = new Worker();
120        leader.start();
121        leader.lead();
122 
123        started = true;
124    }
125 
126    public synchronized void stop()
127    {
128        if( !started )
129            return;
130 
131        shuttingDown = true;
132        Worker lastLeader = null;
133        for( ;; )
134        {
135            Worker leader = this.leader;
136            if( lastLeader == leader )
137                break;
138 
139            while( leader.isAlive() )
140            {
141                leader.interrupt();
142                try
143                {
144                    leader.join();
145                }
146                catch( InterruptedException e )
147                {
148                }
149            }
150 
151            lastLeader = leader;
152        }
153 
154        started = false;
155    }
156 
157    private void increasePoolSize()
158    {
159        synchronized( poolSizeLock )
160        {
161            poolSize++;
162        }
163    }
164 
165    private void decreasePoolSize()
166    {
167        synchronized( poolSizeLock )
168        {
169            poolSize--;
170        }
171    }
172 
173    protected void fireEvent( Object nextFilter, Session session,
174                              EventType type, Object data )
175    {
176        final BlockingSet readySessionBuffers = this.readySessionBuffers;
177        final Set busySessionBuffers = this.busySessionBuffers;
178        final Event event = new Event( type, nextFilter, data );
179 
180        synchronized( readySessionBuffers )
181        {
182            final SessionBuffer buf = getSessionBuffer( session );
183            final Queue eventQueue = buf.eventQueue;
184 
185            synchronized( buf )
186            {
187                eventQueue.push( event );
188            }
189 
190            if( !busySessionBuffers.contains( buf ) )
191            {
192                busySessionBuffers.add( buf );
193                readySessionBuffers.add( buf );
194            }
195        }
196    }
197 
198    /**
199     * Implement this method to forward events to <tt>nextFilter</tt>.
200     */
201    protected abstract void processEvent( Object nextFilter, Session session,
202                                          EventType type, Object data );
203 
204    private SessionBuffer getSessionBuffer( Session session )
205    {
206        final Map buffers = this.buffers;
207        SessionBuffer buf = ( SessionBuffer ) buffers.get( session );
208        if( buf == null )
209        {
210            synchronized( buffers )
211            {
212                buf = ( SessionBuffer ) buffers.get( session );
213                if( buf == null )
214                {
215                    buf = new SessionBuffer( session );
216                    buffers.put( session, buf );
217                }
218            }
219        }
220        return buf;
221    }
222 
223    private void removeSessionBuffer( SessionBuffer buf )
224    {
225        final Map buffers = this.buffers;
226        final Session session = buf.session;
227        synchronized( buffers )
228        {
229            buffers.remove( session );
230        }
231    }
232 
233    private static class SessionBuffer
234    {
235        private final Session session;
236 
237        private final Queue eventQueue = new Queue();
238 
239        private SessionBuffer( Session session )
240        {
241            this.session = session;
242        }
243    }
244 
245    private class Worker extends Thread
246    {
247        private final Object promotionLock = new Object();
248 
249        private Worker()
250        {
251            super( "IoThreadPool-" + ( threadId++ ) );
252            increasePoolSize();
253        }
254 
255        public void lead()
256        {
257            final Object promotionLock = this.promotionLock;
258            synchronized( promotionLock )
259            {
260                leader = this;
261                promotionLock.notify();
262            }
263        }
264 
265        public void run()
266        {
267            for( ;; )
268            {
269                if( !waitForPromotion() )
270                    break;
271 
272                SessionBuffer buf = fetchBuffer();
273                giveUpLead();
274 
275                if( buf == null )
276                {
277                    break;
278                }
279 
280                processEvents( buf );
281                follow();
282                releaseBuffer( buf );
283            }
284 
285            decreasePoolSize();
286        }
287 
288        private SessionBuffer fetchBuffer()
289        {
290            SessionBuffer buf = null;
291            BlockingSet readySessionBuffers = BaseThreadPool.this.readySessionBuffers;
292            synchronized( readySessionBuffers )
293            {
294                do
295                {
296                    buf = null;
297                    try
298                    {
299                        readySessionBuffers.waitForNewItem();
300                    }
301                    catch( InterruptedException e )
302                    {
303                        break;
304                    }
305 
306                    Iterator it = readySessionBuffers.iterator();
307                    if( !it.hasNext() )
308                    {
309                        // exceeded keepAliveTime
310                        break;
311                    }
312 
313                    do
314                    {
315                        buf = null;
316                        buf = ( SessionBuffer ) it.next();
317                        it.remove();
318                    }
319                    while( buf != null && buf.eventQueue.isEmpty()
320                           && it.hasNext() );
321                }
322                while( buf != null && buf.eventQueue.isEmpty() );
323            }
324 
325            return buf;
326        }
327 
328        private void processEvents( SessionBuffer buf )
329        {
330            final Session session = buf.session;
331            final Queue eventQueue = buf.eventQueue;
332            for( ;; )
333            {
334                Event event;
335                synchronized( buf )
336                {
337                    event = ( Event ) eventQueue.pop();
338                    if( event == null )
339                        break;
340                }
341                processEvent( event.getNextFilter(), session,
342                              event.getType(), event.getData() );
343            }
344        }
345 
346        private void follow()
347        {
348            final Object promotionLock = this.promotionLock;
349            final Stack followers = BaseThreadPool.this.followers;
350            synchronized( promotionLock )
351            {
352                if( this != leader )
353                {
354                    synchronized( followers )
355                    {
356                        followers.push( this );
357                    }
358                }
359            }
360        }
361 
362        private void releaseBuffer( SessionBuffer buf )
363        {
364            final BlockingSet readySessionBuffers = BaseThreadPool.this.readySessionBuffers;
365            final Set busySessionBuffers = BaseThreadPool.this.busySessionBuffers;
366            final Queue eventQueue = buf.eventQueue;
367 
368            synchronized( readySessionBuffers )
369            {
370                if( eventQueue.isEmpty() )
371                {
372                    busySessionBuffers.remove( buf );
373                    removeSessionBuffer( buf );
374                }
375                else
376                {
377                    readySessionBuffers.add( buf );
378                }
379            }
380        }
381 
382        private boolean waitForPromotion()
383        {
384            final Object promotionLock = this.promotionLock;
385 
386            synchronized( promotionLock )
387            {
388                if( this != leader )
389                {
390                    try
391                    {
392                        int keepAliveTime = getKeepAliveTime();
393                        if( keepAliveTime > 0 )
394                        {
395                            promotionLock.wait( keepAliveTime );
396                        }
397                        else
398                        {
399                            promotionLock.wait();
400                        }
401                    }
402                    catch( InterruptedException e )
403                    {
404                    }
405                }
406 
407                boolean timeToLead = this == leader;
408 
409                if( !timeToLead )
410                {
411                    // time to die
412                    synchronized( followers )
413                    {
414                        followers.remove( this );
415                    }
416                }
417 
418                return timeToLead;
419            }
420        }
421 
422        private void giveUpLead()
423        {
424            final Stack followers = BaseThreadPool.this.followers;
425            Worker worker;
426            synchronized( followers )
427            {
428                worker = ( Worker ) followers.pop();
429            }
430 
431            if( worker != null )
432            {
433                worker.lead();
434            }
435            else
436            {
437                if( !shuttingDown )
438                {
439                    synchronized( BaseThreadPool.this )
440                    {
441                        if( !shuttingDown
442                            && getPoolSize() < getMaximumPoolSize() )
443                        {
444                            worker = new Worker();
445                            worker.start();
446                            worker.lead();
447                        }
448                    }
449                }
450            }
451        }
452    }
453}

[all classes][org.apache.mina.util]
EMMA 2.0.4217 (C) Vladimir Roubtsov