View Javadoc

1   package org.apache.jcs.engine;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.IOException;
23  import java.io.Serializable;
24  import java.util.ArrayList;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.jcs.engine.behavior.ICacheElement;
29  import org.apache.jcs.engine.behavior.ICacheEventQueue;
30  import org.apache.jcs.engine.behavior.ICacheListener;
31  import org.apache.jcs.engine.stats.StatElement;
32  import org.apache.jcs.engine.stats.Stats;
33  import org.apache.jcs.engine.stats.behavior.IStatElement;
34  import org.apache.jcs.engine.stats.behavior.IStats;
35  
36  /***
37   * An event queue is used to propagate ordered cache events to one and only one target listener.
38   * <p>
39   * This is a modified version of the experimental version. It should lazy initilaize the processor
40   * thread, and kill the thread if the queue goes emtpy for a specified period, now set to 1 minute.
41   * If something comes in after that a new processor thread should be created.
42   */
43  public class CacheEventQueue
44      implements ICacheEventQueue
45  {
46      /*** The logger. */
47      private static final Log log = LogFactory.getLog( CacheEventQueue.class );
48  
49      /*** The type of queue -- there are pooled and single */
50      private static final int queueType = SINGLE_QUEUE_TYPE;
51  
52      /*** default */
53      private static final int DEFAULT_WAIT_TO_DIE_MILLIS = 10000;
54  
55      /***
56       * time to wait for an event before snuffing the background thread if the queue is empty. make
57       * configurable later
58       */
59      private int waitToDieMillis = DEFAULT_WAIT_TO_DIE_MILLIS;
60  
61      /***
62       * When the events are pulled off the queue, the tell the listener to handle the specific event
63       * type. The work is done by the listener.
64       */
65      private ICacheListener listener;
66  
67      /*** Id of the listener registed with this queue */
68      private long listenerId;
69  
70      /*** The cache region name, if applicable. */
71      private String cacheName;
72  
73      /*** Maximum number of failures before we buy the farm. */
74      private int maxFailure;
75  
76      /*** in milliseconds */
77      private int waitBeforeRetry;
78  
79      /*** this is true if there is no worker thread. */
80      private boolean destroyed = true;
81  
82      /***
83       * This means that the queue is functional. If we reached the max number of failures, the queue
84       * is marked as non functional and will never work again.
85       */
86      private boolean working = true;
87  
88      /*** the thread that works the queue. */
89      private Thread processorThread;
90  
91      /*** sync */
92      private Object queueLock = new Object();
93  
94      /*** the head of the queue */
95      private Node head = new Node();
96  
97      /*** the end of the queue */
98      private Node tail = head;
99  
100     /*** Number of items in the queue */
101     private int size = 0;
102 
103     /***
104      * Constructs with the specified listener and the cache name.
105      * <p>
106      * @param listener
107      * @param listenerId
108      * @param cacheName
109      */
110     public CacheEventQueue( ICacheListener listener, long listenerId, String cacheName )
111     {
112         this( listener, listenerId, cacheName, 10, 500 );
113     }
114 
115     /***
116      * Constructor for the CacheEventQueue object
117      * <p>
118      * @param listener
119      * @param listenerId
120      * @param cacheName
121      * @param maxFailure
122      * @param waitBeforeRetry
123      */
124     public CacheEventQueue( ICacheListener listener, long listenerId, String cacheName, int maxFailure,
125                             int waitBeforeRetry )
126     {
127         if ( listener == null )
128         {
129             throw new IllegalArgumentException( "listener must not be null" );
130         }
131 
132         this.listener = listener;
133         this.listenerId = listenerId;
134         this.cacheName = cacheName;
135         this.maxFailure = maxFailure <= 0 ? 3 : maxFailure;
136         this.waitBeforeRetry = waitBeforeRetry <= 0 ? 500 : waitBeforeRetry;
137 
138         if ( log.isDebugEnabled() )
139         {
140             log.debug( "Constructed: " + this );
141         }
142     }
143 
144     /***
145      * What type of queue is this.
146      * <p>
147      * (non-Javadoc)
148      * @see org.apache.jcs.engine.behavior.ICacheEventQueue#getQueueType()
149      */
150     public int getQueueType()
151     {
152         return queueType;
153     }
154 
155     /***
156      * Kill the processor thread and indicate that the queue is detroyed and no longer alive, but it
157      * can still be working.
158      */
159     public synchronized void stopProcessing()
160     {
161         destroyed = true;
162         processorThread = null;
163     }
164 
165     /***
166      * Returns the time to wait for events before killing the background thread.
167      * @return int
168      */
169     public int getWaitToDieMillis()
170     {
171         return waitToDieMillis;
172     }
173 
174     /***
175      * Sets the time to wait for events before killing the background thread.
176      * <p>
177      * @param wtdm the ms for the q to sit idle.
178      */
179     public void setWaitToDieMillis( int wtdm )
180     {
181         waitToDieMillis = wtdm;
182     }
183 
184     /***
185      * Creates a brief string identifying the listener and the region.
186      * <p>
187      * @return String debugging info.
188      */
189     public String toString()
190     {
191         return "CacheEventQueue [listenerId=" + listenerId + ", cacheName=" + cacheName + "]";
192     }
193 
194     /***
195      * If they queue has an active thread it is considered alive.
196      * <p>
197      * @return The alive value
198      */
199     public synchronized boolean isAlive()
200     {
201         return ( !destroyed );
202     }
203 
204     /***
205      * Sets whether the queue is actively processing -- if there are working threads.
206      * <p>
207      * @param aState
208      */
209     public synchronized void setAlive( boolean aState )
210     {
211         destroyed = !aState;
212     }
213 
214     /***
215      * @return The listenerId value
216      */
217     public long getListenerId()
218     {
219         return listenerId;
220     }
221 
222     /***
223      * Event Q is emtpy.
224      * <p>
225      * Calling destroy interupts the processor thread.
226      */
227     public synchronized void destroy()
228     {
229         if ( !destroyed )
230         {
231             destroyed = true;
232 
233             if ( log.isInfoEnabled() )
234             {
235                 log.info( "Destroying queue, stats =  " + getStatistics() );
236             }
237 
238             // sychronize on queue so the thread will not wait forever,
239             // and then interrupt the QueueProcessor
240 
241             if ( processorThread != null )
242             {
243                 synchronized ( queueLock )
244                 {
245                     processorThread.interrupt();
246                 }
247             }
248             processorThread = null;
249 
250             if ( log.isInfoEnabled() )
251             {
252                 log.info( "Cache event queue destroyed: " + this );
253             }
254         }
255         else
256         {
257             if ( log.isInfoEnabled() )
258             {
259                 log.info( "Destroy was called after queue was destroyed.  Doing nothing.  Stats =  " + getStatistics() );
260             }
261         }
262     }
263 
264     /***
265      * This adds a put event ot the queue. When it is processed, the element will be put to the
266      * listener.
267      * <p>
268      * @param ce The feature to be added to the PutEvent attribute
269      * @exception IOException
270      */
271     public synchronized void addPutEvent( ICacheElement ce )
272         throws IOException
273     {
274         if ( isWorking() )
275         {
276             put( new PutEvent( ce ) );
277         }
278         else
279         {
280             if ( log.isWarnEnabled() )
281             {
282                 log.warn( "Not enqueuing Put Event for [" + this + "] because it's non-functional." );
283             }
284         }
285     }
286 
287     /***
288      * This adds a remove event to the queue. When processed the listener's remove method will be
289      * called for the key.
290      * <p>
291      * @param key The feature to be added to the RemoveEvent attribute
292      * @exception IOException
293      */
294     public synchronized void addRemoveEvent( Serializable key )
295         throws IOException
296     {
297         if ( isWorking() )
298         {
299             put( new RemoveEvent( key ) );
300         }
301         else
302         {
303             if ( log.isWarnEnabled() )
304             {
305                 log.warn( "Not enqueuing Remove Event for [" + this + "] because it's non-functional." );
306             }
307         }
308     }
309 
310     /***
311      * This adds a remove all event to the queue. When it is processed, all elements will be removed
312      * from the cache.
313      * <p>
314      * @exception IOException
315      */
316     public synchronized void addRemoveAllEvent()
317         throws IOException
318     {
319         if ( isWorking() )
320         {
321             put( new RemoveAllEvent() );
322         }
323         else
324         {
325             if ( log.isWarnEnabled() )
326             {
327                 log.warn( "Not enqueuing RemoveAll Event for [" + this + "] because it's non-functional." );
328             }
329         }
330     }
331 
332     /***
333      * @exception IOException
334      */
335     public synchronized void addDisposeEvent()
336         throws IOException
337     {
338         if ( isWorking() )
339         {
340             put( new DisposeEvent() );
341         }
342         else
343         {
344             if ( log.isWarnEnabled() )
345             {
346                 log.warn( "Not enqueuing Dispose Event for [" + this + "] because it's non-functional." );
347             }
348         }
349     }
350 
351     /***
352      * Adds an event to the queue.
353      * <p>
354      * @param event
355      */
356     private void put( AbstractCacheEvent event )
357     {
358         Node newNode = new Node();
359         if ( log.isDebugEnabled() )
360         {
361             log.debug( "Event entering Queue for " + cacheName + ": " + event );
362         }
363 
364         newNode.event = event;
365 
366         synchronized ( queueLock )
367         {
368             size++;
369             tail.next = newNode;
370             tail = newNode;
371             if ( isWorking() )
372             {
373                 if ( !isAlive() )
374                 {
375                     destroyed = false;
376                     processorThread = new QProcessor( this );
377                     processorThread.start();
378                     if ( log.isInfoEnabled() )
379                     {
380                         log.info( "Cache event queue created: " + this );
381                     }
382                 }
383                 else
384                 {
385                     queueLock.notify();
386                 }
387             }
388         }
389     }
390 
391     /***
392      * Returns the next cache event from the queue or null if there are no events in the queue.
393      * <p>
394      * We have an empty node at the head and the tail. When we take an item from the queue we move
395      * the next node to the head and then clear the value from that node. This value is returned.
396      * <p>
397      * When the queue is empty the head node is the same as the tail node.
398      * <p>
399      * @return An event to process.
400      */
401     private AbstractCacheEvent take()
402     {
403         synchronized ( queueLock )
404         {
405             // wait until there is something to read
406             if ( head == tail )
407             {
408                 return null;
409             }
410 
411             Node node = head.next;
412 
413             AbstractCacheEvent value = node.event;
414 
415             if ( log.isDebugEnabled() )
416             {
417                 log.debug( "head.event = " + head.event );
418                 log.debug( "node.event = " + node.event );
419             }
420 
421             // Node becomes the new head (head is always empty)
422 
423             node.event = null;
424             head = node;
425 
426             size--;
427             return value;
428         }
429     }
430 
431     /***
432      * This method returns semi structured data on this queue.
433      * <p>
434      * (non-Javadoc)
435      * @see org.apache.jcs.engine.behavior.ICacheEventQueue#getStatistics()
436      */
437     public IStats getStatistics()
438     {
439         IStats stats = new Stats();
440         stats.setTypeName( "Cache Event Queue" );
441 
442         ArrayList elems = new ArrayList();
443 
444         IStatElement se = null;
445 
446         se = new StatElement();
447         se.setName( "Working" );
448         se.setData( "" + this.working );
449         elems.add( se );
450 
451         se = new StatElement();
452         se.setName( "Alive" );
453         se.setData( "" + this.isAlive() );
454         elems.add( se );
455 
456         se = new StatElement();
457         se.setName( "Empty" );
458         se.setData( "" + this.isEmpty() );
459         elems.add( se );
460 
461         int size = 0;
462         synchronized ( queueLock )
463         {
464             // wait until there is something to read
465             if ( head == tail )
466             {
467                 size = 0;
468             }
469             else
470             {
471                 Node n = head;
472                 while ( n != null )
473                 {
474                     n = n.next;
475                     size++;
476                 }
477             }
478 
479             se = new StatElement();
480             se.setName( "Size" );
481             se.setData( "" + size );
482             elems.add( se );
483         }
484 
485         // get an array and put them in the Stats object
486         IStatElement[] ses = (IStatElement[]) elems.toArray( new StatElement[0] );
487         stats.setStatElements( ses );
488 
489         return stats;
490     }
491 
492     // /////////////////////////// Inner classes /////////////////////////////
493 
494     /*** The queue is composed of nodes. */
495     private static class Node
496     {
497         /*** Next node in the singly linked list. */
498         Node next = null;
499 
500         /*** The payload. */
501         CacheEventQueue.AbstractCacheEvent event = null;
502     }
503 
504     /***
505      * This is the thread that works the queue.
506      * <p>
507      * @author asmuts
508      * @created January 15, 2002
509      */
510     private class QProcessor
511         extends Thread
512     {
513         /*** The queue to work */
514         CacheEventQueue queue;
515 
516         /***
517          * Constructor for the QProcessor object
518          * <p>
519          * @param aQueue the event queue to take items from.
520          */
521         QProcessor( CacheEventQueue aQueue )
522         {
523             super( "CacheEventQueue.QProcessor-" + aQueue.cacheName );
524 
525             setDaemon( true );
526             queue = aQueue;
527         }
528 
529         /***
530          * Main processing method for the QProcessor object.
531          * <p>
532          * Waits for a specified time (waitToDieMillis) for something to come in and if no new
533          * events come in during that period the run method can exit and the thread is dereferenced.
534          */
535         public void run()
536         {
537             AbstractCacheEvent event = null;
538 
539             while ( queue.isAlive() )
540             {
541                 event = queue.take();
542 
543                 if ( log.isDebugEnabled() )
544                 {
545                     log.debug( "Event from queue = " + event );
546                 }
547 
548                 if ( event == null )
549                 {
550                     synchronized ( queueLock )
551                     {
552                         try
553                         {
554                             queueLock.wait( queue.getWaitToDieMillis() );
555                         }
556                         catch ( InterruptedException e )
557                         {
558                             log.warn( "Interrupted while waiting for another event to come in before we die." );
559                             return;
560                         }
561                         event = queue.take();
562                         if ( log.isDebugEnabled() )
563                         {
564                             log.debug( "Event from queue after sleep = " + event );
565                         }
566                     }
567                     if ( event == null )
568                     {
569                         queue.stopProcessing();
570                     }
571                 }
572 
573                 if ( queue.isWorking() && queue.isAlive() && event != null )
574                 {
575                     event.run();
576                 }
577             }
578             if ( log.isDebugEnabled() )
579             {
580                 log.debug( "QProcessor exiting for " + queue );
581             }
582         }
583     }
584 
585     /***
586      * Retries before declaring failure.
587      * <p>
588      * @author asmuts
589      * @created January 15, 2002
590      */
591     private abstract class AbstractCacheEvent
592         implements Runnable
593     {
594         /*** Number of failures encountered processing this event. */
595         int failures = 0;
596 
597         /*** Have we finished the job */
598         boolean done = false;
599 
600         /***
601          * Main processing method for the AbstractCacheEvent object
602          */
603         public void run()
604         {
605             try
606             {
607                 doRun();
608             }
609             catch ( IOException e )
610             {
611                 if ( log.isWarnEnabled() )
612                 {
613                     log.warn( e );
614                 }
615                 if ( ++failures >= maxFailure )
616                 {
617                     if ( log.isWarnEnabled() )
618                     {
619                         log.warn( "Error while running event from Queue: " + this
620                             + ". Dropping Event and marking Event Queue as non-functional." );
621                     }
622                     setWorking( false );
623                     setAlive( false );
624                     return;
625                 }
626                 if ( log.isInfoEnabled() )
627                 {
628                     log.info( "Error while running event from Queue: " + this + ". Retrying..." );
629                 }
630                 try
631                 {
632                     Thread.sleep( waitBeforeRetry );
633                     run();
634                 }
635                 catch ( InterruptedException ie )
636                 {
637                     if ( log.isErrorEnabled() )
638                     {
639                         log.warn( "Interrupted while sleeping for retry on event " + this + "." );
640                     }
641                     // TODO consider if this is best. maybe we shoudl just
642                     // destroy
643                     setWorking( false );
644                     setAlive( false );
645                 }
646             }
647         }
648 
649         /***
650          * @exception IOException
651          */
652         protected abstract void doRun()
653             throws IOException;
654     }
655 
656     /***
657      * An element should be put in the cache.
658      * <p>
659      * @author asmuts
660      * @created January 15, 2002
661      */
662     private class PutEvent
663         extends AbstractCacheEvent
664     {
665         /*** The element to put to the listener */
666         private ICacheElement ice;
667 
668         /***
669          * Constructor for the PutEvent object.
670          * <p>
671          * @param ice
672          * @exception IOException
673          */
674         PutEvent( ICacheElement ice )
675             throws IOException
676         {
677             this.ice = ice;
678         }
679 
680         /***
681          * Call put on the listener.
682          * <p>
683          * @exception IOException
684          */
685         protected void doRun()
686             throws IOException
687         {
688             listener.handlePut( ice );
689         }
690 
691         /***
692          * For debugging.
693          * <p>
694          * @return Info on the key and value.
695          */
696         public String toString()
697         {
698             return new StringBuffer( "PutEvent for key: " ).append( ice.getKey() ).append( " value: " )
699                 .append( ice.getVal() ).toString();
700         }
701 
702     }
703 
704     /***
705      * An element should be removed from the cache.
706      * <p>
707      * @author asmuts
708      * @created January 15, 2002
709      */
710     private class RemoveEvent
711         extends AbstractCacheEvent
712     {
713         /*** The key to remove from the listener */
714         private Serializable key;
715 
716         /***
717          * Constructor for the RemoveEvent object
718          * <p>
719          * @param key
720          * @exception IOException
721          */
722         RemoveEvent( Serializable key )
723             throws IOException
724         {
725             this.key = key;
726         }
727 
728         /***
729          * Call remove on the listener.
730          * <p>
731          * @exception IOException
732          */
733         protected void doRun()
734             throws IOException
735         {
736             listener.handleRemove( cacheName, key );
737         }
738 
739         /***
740          * For debugging.
741          * <p>
742          * @return Info on the key to remove.
743          */
744         public String toString()
745         {
746             return new StringBuffer( "RemoveEvent for " ).append( key ).toString();
747         }
748 
749     }
750 
751     /***
752      * All elements should be removed from the cache when this event is processed.
753      * <p>
754      * @author asmuts
755      * @created January 15, 2002
756      */
757     private class RemoveAllEvent
758         extends AbstractCacheEvent
759     {
760         /***
761          * Call removeAll on the listener.
762          * <p>
763          * @exception IOException
764          */
765         protected void doRun()
766             throws IOException
767         {
768             listener.handleRemoveAll( cacheName );
769         }
770 
771         /***
772          * For debugging.
773          * <p>
774          * @return The name of the event.
775          */
776         public String toString()
777         {
778             return "RemoveAllEvent";
779         }
780 
781     }
782 
783     /***
784      * The cache should be disposed when this event is processed.
785      * <p>
786      * @author asmuts
787      * @created January 15, 2002
788      */
789     private class DisposeEvent
790         extends AbstractCacheEvent
791     {
792         /***
793          * Called when gets to the end of the queue
794          * <p>
795          * @exception IOException
796          */
797         protected void doRun()
798             throws IOException
799         {
800             listener.handleDispose( cacheName );
801         }
802 
803         /***
804          * For debugging.
805          * <p>
806          * @return The name of the event.
807          */
808         public String toString()
809         {
810             return "DisposeEvent";
811         }
812     }
813 
814     /***
815      * @return whether the queue is functional.
816      */
817     public boolean isWorking()
818     {
819         return working;
820     }
821 
822     /***
823      * This means that the queue is functional. If we reached the max number of failures, the queue
824      * is marked as non functional and will never work again.
825      * <p>
826      * @param b
827      */
828     public void setWorking( boolean b )
829     {
830         working = b;
831     }
832 
833     /***
834      * @return whether there are any items in the queue.
835      */
836     public boolean isEmpty()
837     {
838         return tail == head;
839     }
840 
841     /***
842      * Returns the number of elements in the queue.
843      * <p>
844      * @return number of items in the queue.
845      */
846     public int size()
847     {
848         return size;
849     }
850 }