%line | %branch | |||||||||
---|---|---|---|---|---|---|---|---|---|---|
org.apache.jcs.engine.PooledCacheEventQueue$RemoveAllEvent |
|
|
1 | package org.apache.jcs.engine; |
|
2 | ||
3 | /* |
|
4 | * Licensed to the Apache Software Foundation (ASF) under one |
|
5 | * or more contributor license agreements. See the NOTICE file |
|
6 | * distributed with this work for additional information |
|
7 | * regarding copyright ownership. The ASF licenses this file |
|
8 | * to you under the Apache License, Version 2.0 (the |
|
9 | * "License"); you may not use this file except in compliance |
|
10 | * with the License. You may obtain a copy of the License at |
|
11 | * |
|
12 | * http://www.apache.org/licenses/LICENSE-2.0 |
|
13 | * |
|
14 | * Unless required by applicable law or agreed to in writing, |
|
15 | * software distributed under the License is distributed on an |
|
16 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
|
17 | * KIND, either express or implied. See the License for the |
|
18 | * specific language governing permissions and limitations |
|
19 | * under the License. |
|
20 | */ |
|
21 | ||
22 | import java.io.IOException; |
|
23 | import java.io.Serializable; |
|
24 | import java.util.ArrayList; |
|
25 | ||
26 | import org.apache.commons.logging.Log; |
|
27 | import org.apache.commons.logging.LogFactory; |
|
28 | import org.apache.jcs.engine.behavior.ICacheElement; |
|
29 | import org.apache.jcs.engine.behavior.ICacheEventQueue; |
|
30 | import org.apache.jcs.engine.behavior.ICacheListener; |
|
31 | import org.apache.jcs.engine.stats.StatElement; |
|
32 | import org.apache.jcs.engine.stats.Stats; |
|
33 | import org.apache.jcs.engine.stats.behavior.IStatElement; |
|
34 | import org.apache.jcs.engine.stats.behavior.IStats; |
|
35 | import org.apache.jcs.utils.threadpool.ThreadPool; |
|
36 | import org.apache.jcs.utils.threadpool.ThreadPoolManager; |
|
37 | ||
38 | import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer; |
|
39 | ||
40 | /** |
|
41 | * An event queue is used to propagate ordered cache events to one and only one target listener. |
|
42 | * <p> |
|
43 | * This is a modified version of the experimental version. It uses a PooledExecutor and a |
|
44 | * BoundedBuffer to queue up events and execute them as threads become available. |
|
45 | * <p> |
|
46 | * The PooledExecutor is static, because presumably these processes will be IO bound, so throwing |
|
47 | * more than a few threads at them will serve no purpose other than to saturate the IO interface. In |
|
48 | * light of this, having one thread per region seems unnecessary. This may prove to be false. |
|
49 | * <p> |
|
50 | * @author Aaron Smuts |
|
51 | * @author Travis Savo <tsavo@ifilm.com> |
|
52 | */ |
|
53 | public class PooledCacheEventQueue |
|
54 | implements ICacheEventQueue |
|
55 | { |
|
56 | private static final int queueType = POOLED_QUEUE_TYPE; |
|
57 | ||
58 | private static final Log log = LogFactory.getLog( PooledCacheEventQueue.class ); |
|
59 | ||
60 | // time to wait for an event before snuffing the background thread |
|
61 | // if the queue is empty. |
|
62 | // make configurable later |
|
63 | private int waitToDieMillis = 10000; |
|
64 | ||
65 | private ICacheListener listener; |
|
66 | ||
67 | private long listenerId; |
|
68 | ||
69 | private String cacheName; |
|
70 | ||
71 | private int maxFailure; |
|
72 | ||
73 | // in milliseconds |
|
74 | private int waitBeforeRetry; |
|
75 | ||
76 | private boolean destroyed = true; |
|
77 | ||
78 | private boolean working = true; |
|
79 | ||
80 | // The Thread Pool to execute events with. |
|
81 | private ThreadPool pool = null; |
|
82 | ||
83 | /** |
|
84 | * Constructor for the CacheEventQueue object |
|
85 | * <p> |
|
86 | * @param listener |
|
87 | * @param listenerId |
|
88 | * @param cacheName |
|
89 | * @param maxFailure |
|
90 | * @param waitBeforeRetry |
|
91 | * @param threadPoolName |
|
92 | */ |
|
93 | public PooledCacheEventQueue( ICacheListener listener, long listenerId, String cacheName, int maxFailure, |
|
94 | int waitBeforeRetry, String threadPoolName ) |
|
95 | { |
|
96 | if ( listener == null ) |
|
97 | { |
|
98 | throw new IllegalArgumentException( "listener must not be null" ); |
|
99 | } |
|
100 | ||
101 | this.listener = listener; |
|
102 | this.listenerId = listenerId; |
|
103 | this.cacheName = cacheName; |
|
104 | this.maxFailure = maxFailure <= 0 ? 3 : maxFailure; |
|
105 | this.waitBeforeRetry = waitBeforeRetry <= 0 ? 500 : waitBeforeRetry; |
|
106 | ||
107 | // this will share the same pool with other event queues by default. |
|
108 | if ( threadPoolName == null ) |
|
109 | { |
|
110 | threadPoolName = "cache_event_queue"; |
|
111 | } |
|
112 | pool = ThreadPoolManager.getInstance().getPool( threadPoolName ); |
|
113 | ||
114 | if ( log.isDebugEnabled() ) |
|
115 | { |
|
116 | log.debug( "Constructed: " + this ); |
|
117 | } |
|
118 | } |
|
119 | ||
120 | /* |
|
121 | * (non-Javadoc) |
|
122 | * @see org.apache.jcs.engine.behavior.ICacheEventQueue#getQueueType() |
|
123 | */ |
|
124 | public int getQueueType() |
|
125 | { |
|
126 | return queueType; |
|
127 | } |
|
128 | ||
129 | /** |
|
130 | * Event Q is emtpy. |
|
131 | */ |
|
132 | public synchronized void stopProcessing() |
|
133 | { |
|
134 | destroyed = true; |
|
135 | } |
|
136 | ||
137 | /** |
|
138 | * Returns the time to wait for events before killing the background thread. |
|
139 | * <p> |
|
140 | * @return the time to wait before shutting down in ms. |
|
141 | */ |
|
142 | public int getWaitToDieMillis() |
|
143 | { |
|
144 | return waitToDieMillis; |
|
145 | } |
|
146 | ||
147 | /** |
|
148 | * Sets the time to wait for events before killing the background thread. |
|
149 | * <p> |
|
150 | * @param wtdm |
|
151 | */ |
|
152 | public void setWaitToDieMillis( int wtdm ) |
|
153 | { |
|
154 | waitToDieMillis = wtdm; |
|
155 | } |
|
156 | ||
157 | /** |
|
158 | * @return String info. |
|
159 | */ |
|
160 | public String toString() |
|
161 | { |
|
162 | return "CacheEventQueue [listenerId=" + listenerId + ", cacheName=" + cacheName + "]"; |
|
163 | } |
|
164 | ||
165 | /** |
|
166 | * @return true if not destroyed. |
|
167 | */ |
|
168 | public boolean isAlive() |
|
169 | { |
|
170 | return ( !destroyed ); |
|
171 | } |
|
172 | ||
173 | /** |
|
174 | * @param aState |
|
175 | */ |
|
176 | public void setAlive( boolean aState ) |
|
177 | { |
|
178 | destroyed = !aState; |
|
179 | } |
|
180 | ||
181 | /** |
|
182 | * @return The listenerId value |
|
183 | */ |
|
184 | public long getListenerId() |
|
185 | { |
|
186 | return listenerId; |
|
187 | } |
|
188 | ||
189 | /** |
|
190 | * Destroy the queue. Interrupt all threads. |
|
191 | */ |
|
192 | public synchronized void destroy() |
|
193 | { |
|
194 | if ( !destroyed ) |
|
195 | { |
|
196 | destroyed = true; |
|
197 | // TODO decide whether to shutdown or interrupt |
|
198 | // pool.getPool().shutdownNow(); |
|
199 | pool.getPool().interruptAll(); |
|
200 | if ( log.isInfoEnabled() ) |
|
201 | { |
|
202 | log.info( "Cache event queue destroyed: " + this ); |
|
203 | } |
|
204 | } |
|
205 | } |
|
206 | ||
207 | /** |
|
208 | * Constructs a PutEvent for the object and passes it to the event queue. |
|
209 | * <p> |
|
210 | * @param ce The feature to be added to the PutEvent attribute |
|
211 | * @exception IOException |
|
212 | */ |
|
213 | public synchronized void addPutEvent( ICacheElement ce ) |
|
214 | throws IOException |
|
215 | { |
|
216 | if ( isWorking() ) |
|
217 | { |
|
218 | put( new PutEvent( ce ) ); |
|
219 | } |
|
220 | else |
|
221 | { |
|
222 | if ( log.isWarnEnabled() ) |
|
223 | { |
|
224 | log.warn( "Not enqueuing Put Event for [" + this + "] because it's non-functional." ); |
|
225 | } |
|
226 | } |
|
227 | } |
|
228 | ||
229 | /** |
|
230 | * @param key The feature to be added to the RemoveEvent attribute |
|
231 | * @exception IOException |
|
232 | */ |
|
233 | public synchronized void addRemoveEvent( Serializable key ) |
|
234 | throws IOException |
|
235 | { |
|
236 | if ( isWorking() ) |
|
237 | { |
|
238 | put( new RemoveEvent( key ) ); |
|
239 | } |
|
240 | else |
|
241 | { |
|
242 | if ( log.isWarnEnabled() ) |
|
243 | { |
|
244 | log.warn( "Not enqueuing Remove Event for [" + this + "] because it's non-functional." ); |
|
245 | } |
|
246 | } |
|
247 | } |
|
248 | ||
249 | /** |
|
250 | * @exception IOException |
|
251 | */ |
|
252 | public synchronized void addRemoveAllEvent() |
|
253 | throws IOException |
|
254 | { |
|
255 | if ( isWorking() ) |
|
256 | { |
|
257 | put( new RemoveAllEvent() ); |
|
258 | } |
|
259 | else |
|
260 | { |
|
261 | if ( log.isWarnEnabled() ) |
|
262 | { |
|
263 | log.warn( "Not enqueuing RemoveAll Event for [" + this + "] because it's non-functional." ); |
|
264 | } |
|
265 | } |
|
266 | } |
|
267 | ||
268 | /** |
|
269 | * @exception IOException |
|
270 | */ |
|
271 | public synchronized void addDisposeEvent() |
|
272 | throws IOException |
|
273 | { |
|
274 | if ( isWorking() ) |
|
275 | { |
|
276 | put( new DisposeEvent() ); |
|
277 | } |
|
278 | else |
|
279 | { |
|
280 | if ( log.isWarnEnabled() ) |
|
281 | { |
|
282 | log.warn( "Not enqueuing Dispose Event for [" + this + "] because it's non-functional." ); |
|
283 | } |
|
284 | } |
|
285 | } |
|
286 | ||
287 | /** |
|
288 | * Adds an event to the queue. |
|
289 | * <p> |
|
290 | * @param event |
|
291 | */ |
|
292 | private void put( AbstractCacheEvent event ) |
|
293 | { |
|
294 | try |
|
295 | { |
|
296 | pool.execute( event ); |
|
297 | } |
|
298 | catch ( InterruptedException e ) |
|
299 | { |
|
300 | log.error( e ); |
|
301 | } |
|
302 | } |
|
303 | ||
304 | /** |
|
305 | * @return Statistics info |
|
306 | */ |
|
307 | public String getStats() |
|
308 | { |
|
309 | return getStatistics().toString(); |
|
310 | } |
|
311 | ||
312 | /* |
|
313 | * (non-Javadoc) |
|
314 | * @see org.apache.jcs.engine.behavior.ICacheEventQueue#getStatistics() |
|
315 | */ |
|
316 | public IStats getStatistics() |
|
317 | { |
|
318 | IStats stats = new Stats(); |
|
319 | stats.setTypeName( "Pooled Cache Event Queue" ); |
|
320 | ||
321 | ArrayList elems = new ArrayList(); |
|
322 | ||
323 | IStatElement se = null; |
|
324 | ||
325 | se = new StatElement(); |
|
326 | se.setName( "Working" ); |
|
327 | se.setData( "" + this.working ); |
|
328 | elems.add( se ); |
|
329 | ||
330 | se = new StatElement(); |
|
331 | se.setName( "Destroyed" ); |
|
332 | se.setData( "" + this.isAlive() ); |
|
333 | elems.add( se ); |
|
334 | ||
335 | se = new StatElement(); |
|
336 | se.setName( "Empty" ); |
|
337 | se.setData( "" + this.isEmpty() ); |
|
338 | elems.add( se ); |
|
339 | ||
340 | if ( pool.getQueue() != null ) |
|
341 | { |
|
342 | if ( pool.getQueue() instanceof BoundedBuffer ) |
|
343 | { |
|
344 | BoundedBuffer bb = (BoundedBuffer) pool.getQueue(); |
|
345 | se = new StatElement(); |
|
346 | se.setName( "Queue Size" ); |
|
347 | se.setData( "" + bb.size() ); |
|
348 | elems.add( se ); |
|
349 | ||
350 | se = new StatElement(); |
|
351 | se.setName( "Queue Capacity" ); |
|
352 | se.setData( "" + bb.capacity() ); |
|
353 | elems.add( se ); |
|
354 | } |
|
355 | } |
|
356 | ||
357 | se = new StatElement(); |
|
358 | se.setName( "Pool Size" ); |
|
359 | se.setData( "" + pool.getPool().getPoolSize() ); |
|
360 | elems.add( se ); |
|
361 | ||
362 | se = new StatElement(); |
|
363 | se.setName( "Maximum Pool Size" ); |
|
364 | se.setData( "" + pool.getPool().getMaximumPoolSize() ); |
|
365 | elems.add( se ); |
|
366 | ||
367 | // get an array and put them in the Stats object |
|
368 | IStatElement[] ses = (IStatElement[]) elems.toArray( new StatElement[elems.size()] ); |
|
369 | stats.setStatElements( ses ); |
|
370 | ||
371 | return stats; |
|
372 | } |
|
373 | ||
374 | // /////////////////////////// Inner classes ///////////////////////////// |
|
375 | ||
376 | /** |
|
377 | * Retries before declaring failure. |
|
378 | * <p> |
|
379 | * @author asmuts |
|
380 | * @created January 15, 2002 |
|
381 | */ |
|
382 | private abstract class AbstractCacheEvent |
|
383 | implements Runnable |
|
384 | { |
|
385 | int failures = 0; |
|
386 | ||
387 | boolean done = false; |
|
388 | ||
389 | /** |
|
390 | * Main processing method for the AbstractCacheEvent object. It calls the abstract doRun |
|
391 | * method that all concrete instances must implement. |
|
392 | */ |
|
393 | public void run() |
|
394 | { |
|
395 | try |
|
396 | { |
|
397 | doRun(); |
|
398 | } |
|
399 | catch ( IOException e ) |
|
400 | { |
|
401 | if ( log.isWarnEnabled() ) |
|
402 | { |
|
403 | log.warn( e ); |
|
404 | } |
|
405 | if ( ++failures >= maxFailure ) |
|
406 | { |
|
407 | if ( log.isWarnEnabled() ) |
|
408 | { |
|
409 | log.warn( "Error while running event from Queue: " + this |
|
410 | + ". Dropping Event and marking Event Queue as non-functional." ); |
|
411 | } |
|
412 | setWorking( false ); |
|
413 | setAlive( false ); |
|
414 | return; |
|
415 | } |
|
416 | if ( log.isInfoEnabled() ) |
|
417 | { |
|
418 | log.info( "Error while running event from Queue: " + this + ". Retrying..." ); |
|
419 | } |
|
420 | try |
|
421 | { |
|
422 | Thread.sleep( waitBeforeRetry ); |
|
423 | run(); |
|
424 | } |
|
425 | catch ( InterruptedException ie ) |
|
426 | { |
|
427 | if ( log.isErrorEnabled() ) |
|
428 | { |
|
429 | log.warn( "Interrupted while sleeping for retry on event " + this + "." ); |
|
430 | } |
|
431 | setWorking( false ); |
|
432 | setAlive( false ); |
|
433 | } |
|
434 | } |
|
435 | } |
|
436 | ||
437 | /** |
|
438 | * @exception IOException |
|
439 | */ |
|
440 | protected abstract void doRun() |
|
441 | throws IOException; |
|
442 | } |
|
443 | ||
444 | /** |
|
445 | * An event that puts an item to a ICacheListener |
|
446 | * <p> |
|
447 | * @author asmuts |
|
448 | * @created January 15, 2002 |
|
449 | */ |
|
450 | private class PutEvent |
|
451 | extends AbstractCacheEvent |
|
452 | { |
|
453 | private ICacheElement ice; |
|
454 | ||
455 | /** |
|
456 | * Constructor for the PutEvent object |
|
457 | * @param ice |
|
458 | * @exception IOException |
|
459 | */ |
|
460 | PutEvent( ICacheElement ice ) |
|
461 | throws IOException |
|
462 | { |
|
463 | this.ice = ice; |
|
464 | } |
|
465 | ||
466 | /** |
|
467 | * Tells the ICacheListener to handle the put. |
|
468 | * <p> |
|
469 | * @exception IOException |
|
470 | */ |
|
471 | protected void doRun() |
|
472 | throws IOException |
|
473 | { |
|
474 | listener.handlePut( ice ); |
|
475 | } |
|
476 | ||
477 | public String toString() |
|
478 | { |
|
479 | return new StringBuffer( "PutEvent for key: " ).append( ice.getKey() ).append( " value: " ) |
|
480 | .append( ice.getVal() ).toString(); |
|
481 | } |
|
482 | ||
483 | } |
|
484 | ||
485 | /** |
|
486 | * An event that knows how to call remove on an ICacheListener |
|
487 | * <p> |
|
488 | * @author asmuts |
|
489 | * @created January 15, 2002 |
|
490 | */ |
|
491 | private class RemoveEvent |
|
492 | extends AbstractCacheEvent |
|
493 | { |
|
494 | private Serializable key; |
|
495 | ||
496 | /** |
|
497 | * Constructor for the RemoveEvent object |
|
498 | * @param key |
|
499 | * @exception IOException |
|
500 | */ |
|
501 | RemoveEvent( Serializable key ) |
|
502 | throws IOException |
|
503 | { |
|
504 | this.key = key; |
|
505 | } |
|
506 | ||
507 | /** |
|
508 | * Calls remove on the listner. |
|
509 | * <p> |
|
510 | * @exception IOException |
|
511 | */ |
|
512 | protected void doRun() |
|
513 | throws IOException |
|
514 | { |
|
515 | listener.handleRemove( cacheName, key ); |
|
516 | } |
|
517 | ||
518 | /* |
|
519 | * (non-Javadoc) |
|
520 | * @see java.lang.Object#toString() |
|
521 | */ |
|
522 | public String toString() |
|
523 | { |
|
524 | return new StringBuffer( "RemoveEvent for " ).append( key ).toString(); |
|
525 | } |
|
526 | ||
527 | } |
|
528 | ||
529 | /** |
|
530 | * An event that knows how to call remove all on an ICacheListener |
|
531 | * <p> |
|
532 | * @author asmuts |
|
533 | * @created January 15, 2002 |
|
534 | */ |
|
535 | 0 | private class RemoveAllEvent |
536 | extends AbstractCacheEvent |
|
537 | { |
|
538 | /** |
|
539 | * Call removeAll on the listener. |
|
540 | * <p> |
|
541 | * @exception IOException |
|
542 | */ |
|
543 | protected void doRun() |
|
544 | throws IOException |
|
545 | { |
|
546 | 0 | listener.handleRemoveAll( cacheName ); |
547 | 0 | } |
548 | ||
549 | /* |
|
550 | * (non-Javadoc) |
|
551 | * @see java.lang.Object#toString() |
|
552 | */ |
|
553 | public String toString() |
|
554 | { |
|
555 | 0 | return "RemoveAllEvent"; |
556 | } |
|
557 | ||
558 | } |
|
559 | ||
560 | /** |
|
561 | * The Event put into the queue for dispose requests. |
|
562 | * <p> |
|
563 | * @author asmuts |
|
564 | * @created January 15, 2002 |
|
565 | */ |
|
566 | private class DisposeEvent |
|
567 | extends AbstractCacheEvent |
|
568 | { |
|
569 | /** |
|
570 | * Called when gets to the end of the queue |
|
571 | * <p> |
|
572 | * @exception IOException |
|
573 | */ |
|
574 | protected void doRun() |
|
575 | throws IOException |
|
576 | { |
|
577 | listener.handleDispose( cacheName ); |
|
578 | } |
|
579 | ||
580 | public String toString() |
|
581 | { |
|
582 | return "DisposeEvent"; |
|
583 | } |
|
584 | } |
|
585 | ||
586 | /** |
|
587 | * @return whether or not the queue is functional |
|
588 | */ |
|
589 | public boolean isWorking() |
|
590 | { |
|
591 | return working; |
|
592 | } |
|
593 | ||
594 | /** |
|
595 | * @param isWorkingArg whether the queue is functional |
|
596 | */ |
|
597 | public void setWorking( boolean isWorkingArg ) |
|
598 | { |
|
599 | working = isWorkingArg; |
|
600 | } |
|
601 | ||
602 | /** |
|
603 | * If the Queue is using a bounded channel we can determine the size. If it is zero or we can't |
|
604 | * determine the size, we return true. |
|
605 | * <p> |
|
606 | * @return whether or not there are items in the queue |
|
607 | */ |
|
608 | public boolean isEmpty() |
|
609 | { |
|
610 | if ( pool.getQueue() == null ) |
|
611 | { |
|
612 | return pool.getQueue().peek() == null; |
|
613 | } |
|
614 | else |
|
615 | { |
|
616 | if ( pool.getQueue() instanceof BoundedBuffer ) |
|
617 | { |
|
618 | BoundedBuffer bb = (BoundedBuffer) pool.getQueue(); |
|
619 | return bb.size() == 0; |
|
620 | } |
|
621 | else |
|
622 | { |
|
623 | return true; |
|
624 | } |
|
625 | } |
|
626 | } |
|
627 | ||
628 | /** |
|
629 | * Returns the number of elements in the queue. If the queue cannot determine the size |
|
630 | * accurately it will return 1. |
|
631 | * <p> |
|
632 | * @return number of items in the queue. |
|
633 | */ |
|
634 | public int size() |
|
635 | { |
|
636 | if ( pool.getQueue() == null ) |
|
637 | { |
|
638 | return pool.getQueue().peek() == null ? 0 : 1; |
|
639 | } |
|
640 | else |
|
641 | { |
|
642 | if ( pool.getQueue() instanceof BoundedBuffer ) |
|
643 | { |
|
644 | BoundedBuffer bb = (BoundedBuffer) pool.getQueue(); |
|
645 | return bb.size(); |
|
646 | } |
|
647 | else |
|
648 | { |
|
649 | return 1; |
|
650 | } |
|
651 | } |
|
652 | } |
|
653 | } |
This report is generated by jcoverage, Maven and Maven JCoverage Plugin. |