Coverage report

  %line %branch
org.apache.jcs.engine.PooledCacheEventQueue$RemoveAllEvent
0% 
0% 

 1  
 package org.apache.jcs.engine;
 2  
 
 3  
 /*
 4  
  * Licensed to the Apache Software Foundation (ASF) under one
 5  
  * or more contributor license agreements.  See the NOTICE file
 6  
  * distributed with this work for additional information
 7  
  * regarding copyright ownership.  The ASF licenses this file
 8  
  * to you under the Apache License, Version 2.0 (the
 9  
  * "License"); you may not use this file except in compliance
 10  
  * with the License.  You may obtain a copy of the License at
 11  
  *
 12  
  *   http://www.apache.org/licenses/LICENSE-2.0
 13  
  *
 14  
  * Unless required by applicable law or agreed to in writing,
 15  
  * software distributed under the License is distributed on an
 16  
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 17  
  * KIND, either express or implied.  See the License for the
 18  
  * specific language governing permissions and limitations
 19  
  * under the License.
 20  
  */
 21  
 
 22  
 import java.io.IOException;
 23  
 import java.io.Serializable;
 24  
 import java.util.ArrayList;
 25  
 
 26  
 import org.apache.commons.logging.Log;
 27  
 import org.apache.commons.logging.LogFactory;
 28  
 import org.apache.jcs.engine.behavior.ICacheElement;
 29  
 import org.apache.jcs.engine.behavior.ICacheEventQueue;
 30  
 import org.apache.jcs.engine.behavior.ICacheListener;
 31  
 import org.apache.jcs.engine.stats.StatElement;
 32  
 import org.apache.jcs.engine.stats.Stats;
 33  
 import org.apache.jcs.engine.stats.behavior.IStatElement;
 34  
 import org.apache.jcs.engine.stats.behavior.IStats;
 35  
 import org.apache.jcs.utils.threadpool.ThreadPool;
 36  
 import org.apache.jcs.utils.threadpool.ThreadPoolManager;
 37  
 
 38  
 import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
 39  
 
 40  
 /**
 41  
  * An event queue is used to propagate ordered cache events to one and only one target listener.
 42  
  * <p>
 43  
  * This is a modified version of the experimental version. It uses a PooledExecutor and a
 44  
  * BoundedBuffer to queue up events and execute them as threads become available.
 45  
  * <p>
 46  
  * The PooledExecutor is static, because presumably these processes will be IO bound, so throwing
 47  
  * more than a few threads at them will serve no purpose other than to saturate the IO interface. In
 48  
  * light of this, having one thread per region seems unnecessary. This may prove to be false.
 49  
  * <p>
 50  
  * @author Aaron Smuts
 51  
  * @author Travis Savo <tsavo@ifilm.com>
 52  
  */
 53  
 public class PooledCacheEventQueue
 54  
     implements ICacheEventQueue
 55  
 {
 56  
     private static final int queueType = POOLED_QUEUE_TYPE;
 57  
 
 58  
     private static final Log log = LogFactory.getLog( PooledCacheEventQueue.class );
 59  
 
 60  
     // time to wait for an event before snuffing the background thread
 61  
     // if the queue is empty.
 62  
     // make configurable later
 63  
     private int waitToDieMillis = 10000;
 64  
 
 65  
     private ICacheListener listener;
 66  
 
 67  
     private long listenerId;
 68  
 
 69  
     private String cacheName;
 70  
 
 71  
     private int maxFailure;
 72  
 
 73  
     // in milliseconds
 74  
     private int waitBeforeRetry;
 75  
 
 76  
     private boolean destroyed = true;
 77  
 
 78  
     private boolean working = true;
 79  
 
 80  
     // The Thread Pool to execute events with.
 81  
     private ThreadPool pool = null;
 82  
 
 83  
     /**
 84  
      * Constructor for the CacheEventQueue object
 85  
      * <p>
 86  
      * @param listener
 87  
      * @param listenerId
 88  
      * @param cacheName
 89  
      * @param maxFailure
 90  
      * @param waitBeforeRetry
 91  
      * @param threadPoolName
 92  
      */
 93  
     public PooledCacheEventQueue( ICacheListener listener, long listenerId, String cacheName, int maxFailure,
 94  
                                  int waitBeforeRetry, String threadPoolName )
 95  
     {
 96  
         if ( listener == null )
 97  
         {
 98  
             throw new IllegalArgumentException( "listener must not be null" );
 99  
         }
 100  
 
 101  
         this.listener = listener;
 102  
         this.listenerId = listenerId;
 103  
         this.cacheName = cacheName;
 104  
         this.maxFailure = maxFailure <= 0 ? 3 : maxFailure;
 105  
         this.waitBeforeRetry = waitBeforeRetry <= 0 ? 500 : waitBeforeRetry;
 106  
 
 107  
         // this will share the same pool with other event queues by default.
 108  
         if ( threadPoolName == null )
 109  
         {
 110  
             threadPoolName = "cache_event_queue";
 111  
         }
 112  
         pool = ThreadPoolManager.getInstance().getPool( threadPoolName );
 113  
 
 114  
         if ( log.isDebugEnabled() )
 115  
         {
 116  
             log.debug( "Constructed: " + this );
 117  
         }
 118  
     }
 119  
 
 120  
     /*
 121  
      * (non-Javadoc)
 122  
      * @see org.apache.jcs.engine.behavior.ICacheEventQueue#getQueueType()
 123  
      */
 124  
     public int getQueueType()
 125  
     {
 126  
         return queueType;
 127  
     }
 128  
 
 129  
     /**
 130  
      * Event Q is emtpy.
 131  
      */
 132  
     public synchronized void stopProcessing()
 133  
     {
 134  
         destroyed = true;
 135  
     }
 136  
 
 137  
     /**
 138  
      * Returns the time to wait for events before killing the background thread.
 139  
      * <p>
 140  
      * @return the time to wait before shutting down in ms.
 141  
      */
 142  
     public int getWaitToDieMillis()
 143  
     {
 144  
         return waitToDieMillis;
 145  
     }
 146  
 
 147  
     /**
 148  
      * Sets the time to wait for events before killing the background thread.
 149  
      * <p>
 150  
      * @param wtdm
 151  
      */
 152  
     public void setWaitToDieMillis( int wtdm )
 153  
     {
 154  
         waitToDieMillis = wtdm;
 155  
     }
 156  
 
 157  
     /**
 158  
      * @return String info.
 159  
      */
 160  
     public String toString()
 161  
     {
 162  
         return "CacheEventQueue [listenerId=" + listenerId + ", cacheName=" + cacheName + "]";
 163  
     }
 164  
 
 165  
     /**
 166  
      * @return true if not destroyed.
 167  
      */
 168  
     public boolean isAlive()
 169  
     {
 170  
         return ( !destroyed );
 171  
     }
 172  
 
 173  
     /**
 174  
      * @param aState
 175  
      */
 176  
     public void setAlive( boolean aState )
 177  
     {
 178  
         destroyed = !aState;
 179  
     }
 180  
 
 181  
     /**
 182  
      * @return The listenerId value
 183  
      */
 184  
     public long getListenerId()
 185  
     {
 186  
         return listenerId;
 187  
     }
 188  
 
 189  
     /**
 190  
      * Destroy the queue. Interrupt all threads.
 191  
      */
 192  
     public synchronized void destroy()
 193  
     {
 194  
         if ( !destroyed )
 195  
         {
 196  
             destroyed = true;
 197  
             // TODO decide whether to shutdown or interrupt
 198  
             // pool.getPool().shutdownNow();
 199  
             pool.getPool().interruptAll();
 200  
             if ( log.isInfoEnabled() )
 201  
             {
 202  
                 log.info( "Cache event queue destroyed: " + this );
 203  
             }
 204  
         }
 205  
     }
 206  
 
 207  
     /**
 208  
      * Constructs a PutEvent for the object and passes it to the event queue.
 209  
      * <p>
 210  
      * @param ce The feature to be added to the PutEvent attribute
 211  
      * @exception IOException
 212  
      */
 213  
     public synchronized void addPutEvent( ICacheElement ce )
 214  
         throws IOException
 215  
     {
 216  
         if ( isWorking() )
 217  
         {
 218  
             put( new PutEvent( ce ) );
 219  
         }
 220  
         else
 221  
         {
 222  
             if ( log.isWarnEnabled() )
 223  
             {
 224  
                 log.warn( "Not enqueuing Put Event for [" + this + "] because it's non-functional." );
 225  
             }
 226  
         }
 227  
     }
 228  
 
 229  
     /**
 230  
      * @param key The feature to be added to the RemoveEvent attribute
 231  
      * @exception IOException
 232  
      */
 233  
     public synchronized void addRemoveEvent( Serializable key )
 234  
         throws IOException
 235  
     {
 236  
         if ( isWorking() )
 237  
         {
 238  
             put( new RemoveEvent( key ) );
 239  
         }
 240  
         else
 241  
         {
 242  
             if ( log.isWarnEnabled() )
 243  
             {
 244  
                 log.warn( "Not enqueuing Remove Event for [" + this + "] because it's non-functional." );
 245  
             }
 246  
         }
 247  
     }
 248  
 
 249  
     /**
 250  
      * @exception IOException
 251  
      */
 252  
     public synchronized void addRemoveAllEvent()
 253  
         throws IOException
 254  
     {
 255  
         if ( isWorking() )
 256  
         {
 257  
             put( new RemoveAllEvent() );
 258  
         }
 259  
         else
 260  
         {
 261  
             if ( log.isWarnEnabled() )
 262  
             {
 263  
                 log.warn( "Not enqueuing RemoveAll Event for [" + this + "] because it's non-functional." );
 264  
             }
 265  
         }
 266  
     }
 267  
 
 268  
     /**
 269  
      * @exception IOException
 270  
      */
 271  
     public synchronized void addDisposeEvent()
 272  
         throws IOException
 273  
     {
 274  
         if ( isWorking() )
 275  
         {
 276  
             put( new DisposeEvent() );
 277  
         }
 278  
         else
 279  
         {
 280  
             if ( log.isWarnEnabled() )
 281  
             {
 282  
                 log.warn( "Not enqueuing Dispose Event for [" + this + "] because it's non-functional." );
 283  
             }
 284  
         }
 285  
     }
 286  
 
 287  
     /**
 288  
      * Adds an event to the queue.
 289  
      * <p>
 290  
      * @param event
 291  
      */
 292  
     private void put( AbstractCacheEvent event )
 293  
     {
 294  
         try
 295  
         {
 296  
             pool.execute( event );
 297  
         }
 298  
         catch ( InterruptedException e )
 299  
         {
 300  
             log.error( e );
 301  
         }
 302  
     }
 303  
 
 304  
     /**
 305  
      * @return Statistics info
 306  
      */
 307  
     public String getStats()
 308  
     {
 309  
         return getStatistics().toString();
 310  
     }
 311  
 
 312  
     /*
 313  
      * (non-Javadoc)
 314  
      * @see org.apache.jcs.engine.behavior.ICacheEventQueue#getStatistics()
 315  
      */
 316  
     public IStats getStatistics()
 317  
     {
 318  
         IStats stats = new Stats();
 319  
         stats.setTypeName( "Pooled Cache Event Queue" );
 320  
 
 321  
         ArrayList elems = new ArrayList();
 322  
 
 323  
         IStatElement se = null;
 324  
 
 325  
         se = new StatElement();
 326  
         se.setName( "Working" );
 327  
         se.setData( "" + this.working );
 328  
         elems.add( se );
 329  
 
 330  
         se = new StatElement();
 331  
         se.setName( "Destroyed" );
 332  
         se.setData( "" + this.isAlive() );
 333  
         elems.add( se );
 334  
 
 335  
         se = new StatElement();
 336  
         se.setName( "Empty" );
 337  
         se.setData( "" + this.isEmpty() );
 338  
         elems.add( se );
 339  
 
 340  
         if ( pool.getQueue() != null )
 341  
         {
 342  
             if ( pool.getQueue() instanceof BoundedBuffer )
 343  
             {
 344  
                 BoundedBuffer bb = (BoundedBuffer) pool.getQueue();
 345  
                 se = new StatElement();
 346  
                 se.setName( "Queue Size" );
 347  
                 se.setData( "" + bb.size() );
 348  
                 elems.add( se );
 349  
 
 350  
                 se = new StatElement();
 351  
                 se.setName( "Queue Capacity" );
 352  
                 se.setData( "" + bb.capacity() );
 353  
                 elems.add( se );
 354  
             }
 355  
         }
 356  
 
 357  
         se = new StatElement();
 358  
         se.setName( "Pool Size" );
 359  
         se.setData( "" + pool.getPool().getPoolSize() );
 360  
         elems.add( se );
 361  
 
 362  
         se = new StatElement();
 363  
         se.setName( "Maximum Pool Size" );
 364  
         se.setData( "" + pool.getPool().getMaximumPoolSize() );
 365  
         elems.add( se );
 366  
 
 367  
         // get an array and put them in the Stats object
 368  
         IStatElement[] ses = (IStatElement[]) elems.toArray( new StatElement[elems.size()] );
 369  
         stats.setStatElements( ses );
 370  
 
 371  
         return stats;
 372  
     }
 373  
 
 374  
     // /////////////////////////// Inner classes /////////////////////////////
 375  
 
 376  
     /**
 377  
      * Retries before declaring failure.
 378  
      * <p>
 379  
      * @author asmuts
 380  
      * @created January 15, 2002
 381  
      */
 382  
     private abstract class AbstractCacheEvent
 383  
         implements Runnable
 384  
     {
 385  
         int failures = 0;
 386  
 
 387  
         boolean done = false;
 388  
 
 389  
         /**
 390  
          * Main processing method for the AbstractCacheEvent object. It calls the abstract doRun
 391  
          * method that all concrete instances must implement.
 392  
          */
 393  
         public void run()
 394  
         {
 395  
             try
 396  
             {
 397  
                 doRun();
 398  
             }
 399  
             catch ( IOException e )
 400  
             {
 401  
                 if ( log.isWarnEnabled() )
 402  
                 {
 403  
                     log.warn( e );
 404  
                 }
 405  
                 if ( ++failures >= maxFailure )
 406  
                 {
 407  
                     if ( log.isWarnEnabled() )
 408  
                     {
 409  
                         log.warn( "Error while running event from Queue: " + this
 410  
                             + ". Dropping Event and marking Event Queue as non-functional." );
 411  
                     }
 412  
                     setWorking( false );
 413  
                     setAlive( false );
 414  
                     return;
 415  
                 }
 416  
                 if ( log.isInfoEnabled() )
 417  
                 {
 418  
                     log.info( "Error while running event from Queue: " + this + ". Retrying..." );
 419  
                 }
 420  
                 try
 421  
                 {
 422  
                     Thread.sleep( waitBeforeRetry );
 423  
                     run();
 424  
                 }
 425  
                 catch ( InterruptedException ie )
 426  
                 {
 427  
                     if ( log.isErrorEnabled() )
 428  
                     {
 429  
                         log.warn( "Interrupted while sleeping for retry on event " + this + "." );
 430  
                     }
 431  
                     setWorking( false );
 432  
                     setAlive( false );
 433  
                 }
 434  
             }
 435  
         }
 436  
 
 437  
         /**
 438  
          * @exception IOException
 439  
          */
 440  
         protected abstract void doRun()
 441  
             throws IOException;
 442  
     }
 443  
 
 444  
     /**
 445  
      * An event that puts an item to a ICacheListener
 446  
      * <p>
 447  
      * @author asmuts
 448  
      * @created January 15, 2002
 449  
      */
 450  
     private class PutEvent
 451  
         extends AbstractCacheEvent
 452  
     {
 453  
         private ICacheElement ice;
 454  
 
 455  
         /**
 456  
          * Constructor for the PutEvent object
 457  
          * @param ice
 458  
          * @exception IOException
 459  
          */
 460  
         PutEvent( ICacheElement ice )
 461  
             throws IOException
 462  
         {
 463  
             this.ice = ice;
 464  
         }
 465  
 
 466  
         /**
 467  
          * Tells the ICacheListener to handle the put.
 468  
          * <p>
 469  
          * @exception IOException
 470  
          */
 471  
         protected void doRun()
 472  
             throws IOException
 473  
         {
 474  
             listener.handlePut( ice );
 475  
         }
 476  
 
 477  
         public String toString()
 478  
         {
 479  
             return new StringBuffer( "PutEvent for key: " ).append( ice.getKey() ).append( " value: " )
 480  
                 .append( ice.getVal() ).toString();
 481  
         }
 482  
 
 483  
     }
 484  
 
 485  
     /**
 486  
      * An event that knows how to call remove on an ICacheListener
 487  
      * <p>
 488  
      * @author asmuts
 489  
      * @created January 15, 2002
 490  
      */
 491  
     private class RemoveEvent
 492  
         extends AbstractCacheEvent
 493  
     {
 494  
         private Serializable key;
 495  
 
 496  
         /**
 497  
          * Constructor for the RemoveEvent object
 498  
          * @param key
 499  
          * @exception IOException
 500  
          */
 501  
         RemoveEvent( Serializable key )
 502  
             throws IOException
 503  
         {
 504  
             this.key = key;
 505  
         }
 506  
 
 507  
         /**
 508  
          * Calls remove on the listner.
 509  
          * <p>
 510  
          * @exception IOException
 511  
          */
 512  
         protected void doRun()
 513  
             throws IOException
 514  
         {
 515  
             listener.handleRemove( cacheName, key );
 516  
         }
 517  
 
 518  
         /*
 519  
          * (non-Javadoc)
 520  
          * @see java.lang.Object#toString()
 521  
          */
 522  
         public String toString()
 523  
         {
 524  
             return new StringBuffer( "RemoveEvent for " ).append( key ).toString();
 525  
         }
 526  
 
 527  
     }
 528  
 
 529  
     /**
 530  
      * An event that knows how to call remove all on an ICacheListener
 531  
      * <p>
 532  
      * @author asmuts
 533  
      * @created January 15, 2002
 534  
      */
 535  0
     private class RemoveAllEvent
 536  
         extends AbstractCacheEvent
 537  
     {
 538  
         /**
 539  
          * Call removeAll on the listener.
 540  
          * <p>
 541  
          * @exception IOException
 542  
          */
 543  
         protected void doRun()
 544  
             throws IOException
 545  
         {
 546  0
             listener.handleRemoveAll( cacheName );
 547  0
         }
 548  
 
 549  
         /*
 550  
          * (non-Javadoc)
 551  
          * @see java.lang.Object#toString()
 552  
          */
 553  
         public String toString()
 554  
         {
 555  0
             return "RemoveAllEvent";
 556  
         }
 557  
 
 558  
     }
 559  
 
 560  
     /**
 561  
      * The Event put into the queue for dispose requests.
 562  
      * <p>
 563  
      * @author asmuts
 564  
      * @created January 15, 2002
 565  
      */
 566  
     private class DisposeEvent
 567  
         extends AbstractCacheEvent
 568  
     {
 569  
         /**
 570  
          * Called when gets to the end of the queue
 571  
          * <p>
 572  
          * @exception IOException
 573  
          */
 574  
         protected void doRun()
 575  
             throws IOException
 576  
         {
 577  
             listener.handleDispose( cacheName );
 578  
         }
 579  
 
 580  
         public String toString()
 581  
         {
 582  
             return "DisposeEvent";
 583  
         }
 584  
     }
 585  
 
 586  
     /**
 587  
      * @return whether or not the queue is functional
 588  
      */
 589  
     public boolean isWorking()
 590  
     {
 591  
         return working;
 592  
     }
 593  
 
 594  
     /**
 595  
      * @param isWorkingArg whether the queue is functional
 596  
      */
 597  
     public void setWorking( boolean isWorkingArg )
 598  
     {
 599  
         working = isWorkingArg;
 600  
     }
 601  
 
 602  
     /**
 603  
      * If the Queue is using a bounded channel we can determine the size. If it is zero or we can't
 604  
      * determine the size, we return true.
 605  
      * <p>
 606  
      * @return whether or not there are items in the queue
 607  
      */
 608  
     public boolean isEmpty()
 609  
     {
 610  
         if ( pool.getQueue() == null )
 611  
         {
 612  
             return pool.getQueue().peek() == null;
 613  
         }
 614  
         else
 615  
         {
 616  
             if ( pool.getQueue() instanceof BoundedBuffer )
 617  
             {
 618  
                 BoundedBuffer bb = (BoundedBuffer) pool.getQueue();
 619  
                 return bb.size() == 0;
 620  
             }
 621  
             else
 622  
             {
 623  
                 return true;
 624  
             }
 625  
         }
 626  
     }
 627  
 
 628  
     /**
 629  
      * Returns the number of elements in the queue. If the queue cannot determine the size
 630  
      * accurately it will return 1.
 631  
      * <p>
 632  
      * @return number of items in the queue.
 633  
      */
 634  
     public int size()
 635  
     {
 636  
         if ( pool.getQueue() == null )
 637  
         {
 638  
             return pool.getQueue().peek() == null ? 0 : 1;
 639  
         }
 640  
         else
 641  
         {
 642  
             if ( pool.getQueue() instanceof BoundedBuffer )
 643  
             {
 644  
                 BoundedBuffer bb = (BoundedBuffer) pool.getQueue();
 645  
                 return bb.size();
 646  
             }
 647  
             else
 648  
             {
 649  
                 return 1;
 650  
             }
 651  
         }
 652  
     }
 653  
 }

This report is generated by jcoverage, Maven and Maven JCoverage Plugin.