1 | /* |
2 | * @(#) $Id: BaseThreadPool.java 164519 2005-04-25 02:20:46Z trustin $ |
3 | * |
4 | * Copyright 2004 The Apache Software Foundation |
5 | * |
6 | * Licensed under the Apache License, Version 2.0 (the "License"); |
7 | * you may not use this file except in compliance with the License. |
8 | * You may obtain a copy of the License at |
9 | * |
10 | * http://www.apache.org/licenses/LICENSE-2.0 |
11 | * |
12 | * Unless required by applicable law or agreed to in writing, software |
13 | * distributed under the License is distributed on an "AS IS" BASIS, |
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
15 | * See the License for the specific language governing permissions and |
16 | * limitations under the License. |
17 | * |
18 | */ |
19 | package org.apache.mina.util; |
20 | |
21 | import java.util.HashSet; |
22 | import java.util.IdentityHashMap; |
23 | import java.util.Iterator; |
24 | import java.util.Map; |
25 | import java.util.Set; |
26 | |
27 | import org.apache.mina.common.Session; |
28 | |
29 | /** |
30 | * A base implementation of Thread-pooling filters. |
31 | * This filter forwards events to its thread pool. This is an implementation of |
32 | * <a href="http://deuce.doc.wustl.edu/doc/pspdfs/lf.pdf">Leader/Followers |
33 | * thread pool</a> by Douglas C. Schmidt et al. |
34 | * |
35 | * @author Trustin Lee (trustin@apache.org) |
36 | * @version $Rev: 164519 $, $Date: 2005-04-25 11:20:46 +0900 $ |
37 | */ |
38 | public abstract class BaseThreadPool implements ThreadPool |
39 | { |
40 | /** |
41 | * Default maximum size of thread pool (2G). |
42 | */ |
43 | public static final int DEFAULT_MAXIMUM_POOL_SIZE = Integer.MAX_VALUE; |
44 | |
45 | /** |
46 | * Default keep-alive time of thread pool (1 min). |
47 | */ |
48 | public static final int DEFAULT_KEEP_ALIVE_TIME = 60 * 1000; |
49 | |
50 | private static volatile int threadId = 0; |
51 | |
52 | private final Map buffers = new IdentityHashMap(); |
53 | |
54 | private final Stack followers = new Stack(); |
55 | |
56 | private final BlockingSet readySessionBuffers = new BlockingSet(); |
57 | |
58 | private final Set busySessionBuffers = new HashSet(); |
59 | |
60 | private Worker leader; |
61 | |
62 | private int maximumPoolSize = DEFAULT_MAXIMUM_POOL_SIZE; |
63 | |
64 | private int keepAliveTime = DEFAULT_KEEP_ALIVE_TIME; |
65 | |
66 | private boolean started; |
67 | |
68 | private boolean shuttingDown; |
69 | |
70 | private int poolSize; |
71 | |
72 | private final Object poolSizeLock = new Object(); |
73 | |
74 | /** |
75 | * Creates a new instance with default thread pool settings. |
76 | * You'll have to invoke {@link #start()} method to start threads actually. |
77 | */ |
78 | protected BaseThreadPool() |
79 | { |
80 | } |
81 | |
82 | public int getPoolSize() |
83 | { |
84 | synchronized( poolSizeLock ) |
85 | { |
86 | return poolSize; |
87 | } |
88 | } |
89 | |
90 | public int getMaximumPoolSize() |
91 | { |
92 | return maximumPoolSize; |
93 | } |
94 | |
95 | public int getKeepAliveTime() |
96 | { |
97 | return keepAliveTime; |
98 | } |
99 | |
100 | public void setMaximumPoolSize( int maximumPoolSize ) |
101 | { |
102 | if( maximumPoolSize <= 0 ) |
103 | throw new IllegalArgumentException(); |
104 | this.maximumPoolSize = maximumPoolSize; |
105 | } |
106 | |
107 | public void setKeepAliveTime( int keepAliveTime ) |
108 | { |
109 | this.keepAliveTime = keepAliveTime; |
110 | } |
111 | |
112 | public synchronized void start() |
113 | { |
114 | if( started ) |
115 | return; |
116 | |
117 | shuttingDown = false; |
118 | |
119 | leader = new Worker(); |
120 | leader.start(); |
121 | leader.lead(); |
122 | |
123 | started = true; |
124 | } |
125 | |
126 | public synchronized void stop() |
127 | { |
128 | if( !started ) |
129 | return; |
130 | |
131 | shuttingDown = true; |
132 | Worker lastLeader = null; |
133 | for( ;; ) |
134 | { |
135 | Worker leader = this.leader; |
136 | if( lastLeader == leader ) |
137 | break; |
138 | |
139 | while( leader.isAlive() ) |
140 | { |
141 | leader.interrupt(); |
142 | try |
143 | { |
144 | leader.join(); |
145 | } |
146 | catch( InterruptedException e ) |
147 | { |
148 | } |
149 | } |
150 | |
151 | lastLeader = leader; |
152 | } |
153 | |
154 | started = false; |
155 | } |
156 | |
157 | private void increasePoolSize() |
158 | { |
159 | synchronized( poolSizeLock ) |
160 | { |
161 | poolSize++; |
162 | } |
163 | } |
164 | |
165 | private void decreasePoolSize() |
166 | { |
167 | synchronized( poolSizeLock ) |
168 | { |
169 | poolSize--; |
170 | } |
171 | } |
172 | |
173 | protected void fireEvent( Object nextFilter, Session session, |
174 | EventType type, Object data ) |
175 | { |
176 | final BlockingSet readySessionBuffers = this.readySessionBuffers; |
177 | final Set busySessionBuffers = this.busySessionBuffers; |
178 | final SessionBuffer buf = getSessionBuffer( session ); |
179 | final Queue eventQueue = buf.eventQueue; |
180 | final Event event = new Event( type, nextFilter, data ); |
181 | |
182 | synchronized( buf ) |
183 | { |
184 | eventQueue.push( event ); |
185 | } |
186 | |
187 | synchronized( readySessionBuffers ) |
188 | { |
189 | if( !busySessionBuffers.contains( buf ) ) |
190 | { |
191 | busySessionBuffers.add( buf ); |
192 | readySessionBuffers.add( buf ); |
193 | } |
194 | } |
195 | } |
196 | |
197 | /** |
198 | * Implement this method to forward events to <tt>nextFilter</tt>. |
199 | */ |
200 | protected abstract void processEvent( Object nextFilter, Session session, |
201 | EventType type, Object data ); |
202 | |
203 | private SessionBuffer getSessionBuffer( Session session ) |
204 | { |
205 | final Map buffers = this.buffers; |
206 | SessionBuffer buf = ( SessionBuffer ) buffers.get( session ); |
207 | if( buf == null ) |
208 | { |
209 | synchronized( buffers ) |
210 | { |
211 | buf = ( SessionBuffer ) buffers.get( session ); |
212 | if( buf == null ) |
213 | { |
214 | buf = new SessionBuffer( session ); |
215 | buffers.put( session, buf ); |
216 | } |
217 | } |
218 | } |
219 | return buf; |
220 | } |
221 | |
222 | private void removeSessionBuffer( SessionBuffer buf ) |
223 | { |
224 | final Map buffers = this.buffers; |
225 | final Session session = buf.session; |
226 | synchronized( buffers ) |
227 | { |
228 | buffers.remove( session ); |
229 | } |
230 | } |
231 | |
232 | private static class SessionBuffer |
233 | { |
234 | private final Session session; |
235 | |
236 | private final Queue eventQueue = new Queue(); |
237 | |
238 | private SessionBuffer( Session session ) |
239 | { |
240 | this.session = session; |
241 | } |
242 | } |
243 | |
244 | private class Worker extends Thread |
245 | { |
246 | private final Object promotionLock = new Object(); |
247 | |
248 | private Worker() |
249 | { |
250 | super( "IoThreadPool-" + ( threadId++ ) ); |
251 | increasePoolSize(); |
252 | } |
253 | |
254 | public void lead() |
255 | { |
256 | final Object promotionLock = this.promotionLock; |
257 | synchronized( promotionLock ) |
258 | { |
259 | leader = this; |
260 | promotionLock.notify(); |
261 | } |
262 | } |
263 | |
264 | public void run() |
265 | { |
266 | for( ;; ) |
267 | { |
268 | if( !waitForPromotion() ) |
269 | break; |
270 | |
271 | SessionBuffer buf = fetchBuffer(); |
272 | giveUpLead(); |
273 | |
274 | if( buf == null ) |
275 | { |
276 | break; |
277 | } |
278 | |
279 | processEvents( buf ); |
280 | follow(); |
281 | releaseBuffer( buf ); |
282 | } |
283 | |
284 | decreasePoolSize(); |
285 | } |
286 | |
287 | private SessionBuffer fetchBuffer() |
288 | { |
289 | SessionBuffer buf = null; |
290 | BlockingSet readySessionBuffers = BaseThreadPool.this.readySessionBuffers; |
291 | synchronized( readySessionBuffers ) |
292 | { |
293 | do |
294 | { |
295 | buf = null; |
296 | try |
297 | { |
298 | readySessionBuffers.waitForNewItem(); |
299 | } |
300 | catch( InterruptedException e ) |
301 | { |
302 | break; |
303 | } |
304 | |
305 | Iterator it = readySessionBuffers.iterator(); |
306 | if( !it.hasNext() ) |
307 | { |
308 | // exceeded keepAliveTime |
309 | break; |
310 | } |
311 | |
312 | do |
313 | { |
314 | buf = null; |
315 | buf = ( SessionBuffer ) it.next(); |
316 | it.remove(); |
317 | } |
318 | while( buf != null && buf.eventQueue.isEmpty() |
319 | && it.hasNext() ); |
320 | } |
321 | while( buf != null && buf.eventQueue.isEmpty() ); |
322 | } |
323 | |
324 | return buf; |
325 | } |
326 | |
327 | private void processEvents( SessionBuffer buf ) |
328 | { |
329 | final Session session = buf.session; |
330 | final Queue eventQueue = buf.eventQueue; |
331 | for( ;; ) |
332 | { |
333 | Event event; |
334 | synchronized( buf ) |
335 | { |
336 | event = ( Event ) eventQueue.pop(); |
337 | if( event == null ) |
338 | break; |
339 | } |
340 | processEvent( event.getNextFilter(), session, |
341 | event.getType(), event.getData() ); |
342 | } |
343 | } |
344 | |
345 | private void follow() |
346 | { |
347 | final Object promotionLock = this.promotionLock; |
348 | final Stack followers = BaseThreadPool.this.followers; |
349 | synchronized( promotionLock ) |
350 | { |
351 | if( this != leader ) |
352 | { |
353 | synchronized( followers ) |
354 | { |
355 | followers.push( this ); |
356 | } |
357 | } |
358 | } |
359 | } |
360 | |
361 | private void releaseBuffer( SessionBuffer buf ) |
362 | { |
363 | final BlockingSet readySessionBuffers = BaseThreadPool.this.readySessionBuffers; |
364 | final Set busySessionBuffers = BaseThreadPool.this.busySessionBuffers; |
365 | final Queue eventQueue = buf.eventQueue; |
366 | |
367 | synchronized( readySessionBuffers ) |
368 | { |
369 | busySessionBuffers.remove( buf ); |
370 | if( eventQueue.isEmpty() ) |
371 | { |
372 | removeSessionBuffer( buf ); |
373 | } |
374 | else |
375 | { |
376 | readySessionBuffers.add( buf ); |
377 | } |
378 | } |
379 | } |
380 | |
381 | private boolean waitForPromotion() |
382 | { |
383 | final Object promotionLock = this.promotionLock; |
384 | |
385 | synchronized( promotionLock ) |
386 | { |
387 | if( this != leader ) |
388 | { |
389 | try |
390 | { |
391 | int keepAliveTime = getKeepAliveTime(); |
392 | if( keepAliveTime > 0 ) |
393 | { |
394 | promotionLock.wait( keepAliveTime ); |
395 | } |
396 | else |
397 | { |
398 | promotionLock.wait(); |
399 | } |
400 | } |
401 | catch( InterruptedException e ) |
402 | { |
403 | } |
404 | } |
405 | |
406 | boolean timeToLead = this == leader; |
407 | |
408 | if( !timeToLead ) |
409 | { |
410 | // time to die |
411 | synchronized( followers ) |
412 | { |
413 | followers.remove( this ); |
414 | } |
415 | } |
416 | |
417 | return timeToLead; |
418 | } |
419 | } |
420 | |
421 | private void giveUpLead() |
422 | { |
423 | final Stack followers = BaseThreadPool.this.followers; |
424 | Worker worker; |
425 | synchronized( followers ) |
426 | { |
427 | worker = ( Worker ) followers.pop(); |
428 | } |
429 | |
430 | if( worker != null ) |
431 | { |
432 | worker.lead(); |
433 | } |
434 | else |
435 | { |
436 | if( !shuttingDown ) |
437 | { |
438 | synchronized( BaseThreadPool.this ) |
439 | { |
440 | if( !shuttingDown |
441 | && getPoolSize() < getMaximumPoolSize() ) |
442 | { |
443 | worker = new Worker(); |
444 | worker.start(); |
445 | worker.lead(); |
446 | } |
447 | } |
448 | } |
449 | } |
450 | } |
451 | } |
452 | } |