EMMA Coverage Report (generated Wed Jun 08 12:10:57 KST 2005)
[all classes][org.apache.mina.util]

COVERAGE SUMMARY FOR SOURCE FILE [BaseThreadPool.java]

nameclass, %method, %block, %line, %
BaseThreadPool.java75%  (3/4)92%  (22/24)81%  (550/680)86%  (164.7/191)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class BaseThreadPool$10%   (0/1)100% (0/0)100% (0/0)100% (0/0)
     
class BaseThreadPool$Worker100% (1/1)100% (9/9)79%  (298/375)87%  (89.8/103)
waitForPromotion (): boolean 100% (1/1)58%  (38/66)64%  (10.9/17)
follow (): void 100% (1/1)75%  (30/40)89%  (8/9)
releaseBuffer (BaseThreadPool$SessionBuffer): void 100% (1/1)78%  (31/40)85%  (9.4/11)
lead (): void 100% (1/1)78%  (18/23)96%  (5.8/6)
fetchBuffer (): BaseThreadPool$SessionBuffer 100% (1/1)84%  (48/57)90%  (16.2/18)
giveUpLead (): void 100% (1/1)84%  (54/64)94%  (14.1/15)
processEvents (BaseThreadPool$SessionBuffer): void 100% (1/1)87%  (34/39)94%  (9.4/10)
run (): void 100% (1/1)96%  (24/25)92%  (11/12)
BaseThreadPool$Worker (BaseThreadPool): void 100% (1/1)100% (21/21)100% (5/5)
     
class BaseThreadPool100% (1/1)86%  (12/14)82%  (241/294)84%  (70.9/84)
setKeepAliveTime (int): void 0%   (0/1)0%   (0/4)0%   (0/2)
setMaximumPoolSize (int): void 0%   (0/1)0%   (0/10)0%   (0/4)
getPoolSize (): int 100% (1/1)67%  (10/15)67%  (2/3)
decreasePoolSize (): void 100% (1/1)75%  (15/20)93%  (3.7/4)
increasePoolSize (): void 100% (1/1)75%  (15/20)93%  (3.7/4)
removeSessionBuffer (BaseThreadPool$SessionBuffer): void 100% (1/1)78%  (18/23)96%  (5.8/6)
fireEvent (Object, Session, EventType, Object): void 100% (1/1)83%  (50/60)95%  (13.2/14)
getSessionBuffer (Session): BaseThreadPool$SessionBuffer 100% (1/1)88%  (37/42)94%  (9.4/10)
stop (): void 100% (1/1)91%  (30/33)82%  (14/17)
start (): void 100% (1/1)96%  (23/24)88%  (7/8)
<static initializer> 100% (1/1)100% (3/3)100% (1/1)
BaseThreadPool (): void 100% (1/1)100% (34/34)100% (9/9)
getKeepAliveTime (): int 100% (1/1)100% (3/3)100% (1/1)
getMaximumPoolSize (): int 100% (1/1)100% (3/3)100% (1/1)
     
class BaseThreadPool$SessionBuffer100% (1/1)100% (1/1)100% (11/11)100% (4/4)
BaseThreadPool$SessionBuffer (Session): void 100% (1/1)100% (11/11)100% (4/4)

1/*
2 *   @(#) $Id: BaseThreadPool.java 164519 2005-04-25 02:20:46Z trustin $
3 *
4 *   Copyright 2004 The Apache Software Foundation
5 *
6 *   Licensed under the Apache License, Version 2.0 (the "License");
7 *   you may not use this file except in compliance with the License.
8 *   You may obtain a copy of the License at
9 *
10 *       http://www.apache.org/licenses/LICENSE-2.0
11 *
12 *   Unless required by applicable law or agreed to in writing, software
13 *   distributed under the License is distributed on an "AS IS" BASIS,
14 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 *   See the License for the specific language governing permissions and
16 *   limitations under the License.
17 *
18 */
19package org.apache.mina.util;
20 
21import java.util.HashSet;
22import java.util.IdentityHashMap;
23import java.util.Iterator;
24import java.util.Map;
25import java.util.Set;
26 
27import org.apache.mina.common.Session;
28 
29/**
30 * A base implementation of Thread-pooling filters.
31 * This filter forwards events to its thread pool.  This is an implementation of
32 * <a href="http://deuce.doc.wustl.edu/doc/pspdfs/lf.pdf">Leader/Followers
33 * thread pool</a> by Douglas C. Schmidt et al.
34 * 
35 * @author Trustin Lee (trustin@apache.org)
36 * @version $Rev: 164519 $, $Date: 2005-04-25 11:20:46 +0900 $
37 */
38public abstract class BaseThreadPool implements ThreadPool
39{
40    /**
41     * Default maximum size of thread pool (2G).
42     */
43    public static final int DEFAULT_MAXIMUM_POOL_SIZE = Integer.MAX_VALUE;
44 
45    /**
46     * Default keep-alive time of thread pool (1 min).
47     */
48    public static final int DEFAULT_KEEP_ALIVE_TIME = 60 * 1000;
49 
50    private static volatile int threadId = 0;
51 
52    private final Map buffers = new IdentityHashMap();
53 
54    private final Stack followers = new Stack();
55 
56    private final BlockingSet readySessionBuffers = new BlockingSet();
57 
58    private final Set busySessionBuffers = new HashSet();
59 
60    private Worker leader;
61 
62    private int maximumPoolSize = DEFAULT_MAXIMUM_POOL_SIZE;
63 
64    private int keepAliveTime = DEFAULT_KEEP_ALIVE_TIME;
65 
66    private boolean started;
67 
68    private boolean shuttingDown;
69 
70    private int poolSize;
71 
72    private final Object poolSizeLock = new Object();
73 
74    /**
75     * Creates a new instance with default thread pool settings.
76     * You'll have to invoke {@link #start()} method to start threads actually.
77     */
78    protected BaseThreadPool()
79    {
80    }
81 
82    public int getPoolSize()
83    {
84        synchronized( poolSizeLock )
85        {
86            return poolSize;
87        }
88    }
89 
90    public int getMaximumPoolSize()
91    {
92        return maximumPoolSize;
93    }
94 
95    public int getKeepAliveTime()
96    {
97        return keepAliveTime;
98    }
99 
100    public void setMaximumPoolSize( int maximumPoolSize )
101    {
102        if( maximumPoolSize <= 0 )
103            throw new IllegalArgumentException();
104        this.maximumPoolSize = maximumPoolSize;
105    }
106 
107    public void setKeepAliveTime( int keepAliveTime )
108    {
109        this.keepAliveTime = keepAliveTime;
110    }
111 
112    public synchronized void start()
113    {
114        if( started )
115            return;
116 
117        shuttingDown = false;
118 
119        leader = new Worker();
120        leader.start();
121        leader.lead();
122 
123        started = true;
124    }
125 
126    public synchronized void stop()
127    {
128        if( !started )
129            return;
130 
131        shuttingDown = true;
132        Worker lastLeader = null;
133        for( ;; )
134        {
135            Worker leader = this.leader;
136            if( lastLeader == leader )
137                break;
138 
139            while( leader.isAlive() )
140            {
141                leader.interrupt();
142                try
143                {
144                    leader.join();
145                }
146                catch( InterruptedException e )
147                {
148                }
149            }
150 
151            lastLeader = leader;
152        }
153 
154        started = false;
155    }
156 
157    private void increasePoolSize()
158    {
159        synchronized( poolSizeLock )
160        {
161            poolSize++;
162        }
163    }
164 
165    private void decreasePoolSize()
166    {
167        synchronized( poolSizeLock )
168        {
169            poolSize--;
170        }
171    }
172 
173    protected void fireEvent( Object nextFilter, Session session,
174                              EventType type, Object data )
175    {
176        final BlockingSet readySessionBuffers = this.readySessionBuffers;
177        final Set busySessionBuffers = this.busySessionBuffers;
178        final SessionBuffer buf = getSessionBuffer( session );
179        final Queue eventQueue = buf.eventQueue;
180        final Event event = new Event( type, nextFilter, data );
181 
182        synchronized( buf )
183        {
184            eventQueue.push( event );
185        }
186 
187        synchronized( readySessionBuffers )
188        {
189            if( !busySessionBuffers.contains( buf ) )
190            {
191                busySessionBuffers.add( buf );
192                readySessionBuffers.add( buf );
193            }
194        }
195    }
196 
197    /**
198     * Implement this method to forward events to <tt>nextFilter</tt>.
199     */
200    protected abstract void processEvent( Object nextFilter, Session session,
201                                          EventType type, Object data );
202 
203    private SessionBuffer getSessionBuffer( Session session )
204    {
205        final Map buffers = this.buffers;
206        SessionBuffer buf = ( SessionBuffer ) buffers.get( session );
207        if( buf == null )
208        {
209            synchronized( buffers )
210            {
211                buf = ( SessionBuffer ) buffers.get( session );
212                if( buf == null )
213                {
214                    buf = new SessionBuffer( session );
215                    buffers.put( session, buf );
216                }
217            }
218        }
219        return buf;
220    }
221 
222    private void removeSessionBuffer( SessionBuffer buf )
223    {
224        final Map buffers = this.buffers;
225        final Session session = buf.session;
226        synchronized( buffers )
227        {
228            buffers.remove( session );
229        }
230    }
231 
232    private static class SessionBuffer
233    {
234        private final Session session;
235 
236        private final Queue eventQueue = new Queue();
237 
238        private SessionBuffer( Session session )
239        {
240            this.session = session;
241        }
242    }
243 
244    private class Worker extends Thread
245    {
246        private final Object promotionLock = new Object();
247 
248        private Worker()
249        {
250            super( "IoThreadPool-" + ( threadId++ ) );
251            increasePoolSize();
252        }
253 
254        public void lead()
255        {
256            final Object promotionLock = this.promotionLock;
257            synchronized( promotionLock )
258            {
259                leader = this;
260                promotionLock.notify();
261            }
262        }
263 
264        public void run()
265        {
266            for( ;; )
267            {
268                if( !waitForPromotion() )
269                    break;
270 
271                SessionBuffer buf = fetchBuffer();
272                giveUpLead();
273 
274                if( buf == null )
275                {
276                    break;
277                }
278 
279                processEvents( buf );
280                follow();
281                releaseBuffer( buf );
282            }
283 
284            decreasePoolSize();
285        }
286 
287        private SessionBuffer fetchBuffer()
288        {
289            SessionBuffer buf = null;
290            BlockingSet readySessionBuffers = BaseThreadPool.this.readySessionBuffers;
291            synchronized( readySessionBuffers )
292            {
293                do
294                {
295                    buf = null;
296                    try
297                    {
298                        readySessionBuffers.waitForNewItem();
299                    }
300                    catch( InterruptedException e )
301                    {
302                        break;
303                    }
304 
305                    Iterator it = readySessionBuffers.iterator();
306                    if( !it.hasNext() )
307                    {
308                        // exceeded keepAliveTime
309                        break;
310                    }
311 
312                    do
313                    {
314                        buf = null;
315                        buf = ( SessionBuffer ) it.next();
316                        it.remove();
317                    }
318                    while( buf != null && buf.eventQueue.isEmpty()
319                           && it.hasNext() );
320                }
321                while( buf != null && buf.eventQueue.isEmpty() );
322            }
323 
324            return buf;
325        }
326 
327        private void processEvents( SessionBuffer buf )
328        {
329            final Session session = buf.session;
330            final Queue eventQueue = buf.eventQueue;
331            for( ;; )
332            {
333                Event event;
334                synchronized( buf )
335                {
336                    event = ( Event ) eventQueue.pop();
337                    if( event == null )
338                        break;
339                }
340                processEvent( event.getNextFilter(), session,
341                              event.getType(), event.getData() );
342            }
343        }
344 
345        private void follow()
346        {
347            final Object promotionLock = this.promotionLock;
348            final Stack followers = BaseThreadPool.this.followers;
349            synchronized( promotionLock )
350            {
351                if( this != leader )
352                {
353                    synchronized( followers )
354                    {
355                        followers.push( this );
356                    }
357                }
358            }
359        }
360 
361        private void releaseBuffer( SessionBuffer buf )
362        {
363            final BlockingSet readySessionBuffers = BaseThreadPool.this.readySessionBuffers;
364            final Set busySessionBuffers = BaseThreadPool.this.busySessionBuffers;
365            final Queue eventQueue = buf.eventQueue;
366 
367            synchronized( readySessionBuffers )
368            {
369                busySessionBuffers.remove( buf );
370                if( eventQueue.isEmpty() )
371                {
372                    removeSessionBuffer( buf );
373                }
374                else
375                {
376                    readySessionBuffers.add( buf );
377                }
378            }
379        }
380 
381        private boolean waitForPromotion()
382        {
383            final Object promotionLock = this.promotionLock;
384 
385            synchronized( promotionLock )
386            {
387                if( this != leader )
388                {
389                    try
390                    {
391                        int keepAliveTime = getKeepAliveTime();
392                        if( keepAliveTime > 0 )
393                        {
394                            promotionLock.wait( keepAliveTime );
395                        }
396                        else
397                        {
398                            promotionLock.wait();
399                        }
400                    }
401                    catch( InterruptedException e )
402                    {
403                    }
404                }
405 
406                boolean timeToLead = this == leader;
407 
408                if( !timeToLead )
409                {
410                    // time to die
411                    synchronized( followers )
412                    {
413                        followers.remove( this );
414                    }
415                }
416 
417                return timeToLead;
418            }
419        }
420 
421        private void giveUpLead()
422        {
423            final Stack followers = BaseThreadPool.this.followers;
424            Worker worker;
425            synchronized( followers )
426            {
427                worker = ( Worker ) followers.pop();
428            }
429 
430            if( worker != null )
431            {
432                worker.lead();
433            }
434            else
435            {
436                if( !shuttingDown )
437                {
438                    synchronized( BaseThreadPool.this )
439                    {
440                        if( !shuttingDown
441                            && getPoolSize() < getMaximumPoolSize() )
442                        {
443                            worker = new Worker();
444                            worker.start();
445                            worker.lead();
446                        }
447                    }
448                }
449            }
450        }
451    }
452}

[all classes][org.apache.mina.util]
EMMA 2.0.4217 (C) Vladimir Roubtsov