View Javadoc

1   package org.apache.jcs.auxiliary.disk;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.IOException;
23  import java.io.Serializable;
24  import java.util.ArrayList;
25  import java.util.Arrays;
26  import java.util.HashMap;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.Set;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.jcs.auxiliary.AuxiliaryCache;
34  import org.apache.jcs.auxiliary.disk.behavior.IDiskCacheAttributes;
35  import org.apache.jcs.engine.CacheConstants;
36  import org.apache.jcs.engine.CacheEventQueueFactory;
37  import org.apache.jcs.engine.CacheInfo;
38  import org.apache.jcs.engine.behavior.ICache;
39  import org.apache.jcs.engine.behavior.ICacheElement;
40  import org.apache.jcs.engine.behavior.ICacheEventQueue;
41  import org.apache.jcs.engine.behavior.ICacheListener;
42  import org.apache.jcs.engine.stats.StatElement;
43  import org.apache.jcs.engine.stats.Stats;
44  import org.apache.jcs.engine.stats.behavior.IStatElement;
45  import org.apache.jcs.engine.stats.behavior.IStats;
46  
47  import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock;
48  
49  /***
50   * Abstract class providing a base implementation of a disk cache, which can be easily extended to
51   * implement a disk cache for a specific perstistence mechanism.
52   * <p>
53   * When implementing the abstract methods note that while this base class handles most things, it
54   * does not acquire or release any locks. Implementations should do so as neccesary. This is mainly
55   * done to minimize the time speant in critical sections.
56   * <p>
57   * Error handling in this class needs to be addressed. Currently if an exception is thrown by the
58   * persistence mechanism, this class destroys the event queue. Should it also destory purgatory?
59   * Should it dispose itself?
60   */
61  public abstract class AbstractDiskCache
62      implements AuxiliaryCache, Serializable
63  {
64      /*** The logger */
65      private static final Log log = LogFactory.getLog( AbstractDiskCache.class );
66  
67      /*** Generic disk cache attributes */
68      private IDiskCacheAttributes dcattr = null;
69  
70      /***
71       * Map where elements are stored between being added to this cache and actually spooled to disk.
72       * This allows puts to the disk cache to return quickly, and the more expensive operation of
73       * serializing the elements to persistent storage queued for later.
74       * <p>
75       * If the elements are pulled into the memory cache while the are still in purgatory, writing to
76       * disk can be cancelled.
77       */
78      protected Map purgatory = new HashMap();
79  
80      /***
81       * The CacheEventQueue where changes will be queued for asynchronous updating of the persistent
82       * storage.
83       */
84      protected ICacheEventQueue cacheEventQueue;
85  
86      /***
87       * Indicates whether the cache is 'alive', defined as having been initialized, but not yet
88       * disposed.
89       */
90      protected boolean alive = false;
91  
92      /***
93       * Every cache will have a name, subclasses must set this when they are initialized.
94       */
95      protected String cacheName;
96  
97      /***
98       * DEBUG: Keeps a count of the number of purgatory hits for debug messages
99       */
100     protected int purgHits = 0;
101 
102     /***
103      * We lock here, so that we cannot get an update after a remove all. an individual removal locks
104      * the item.
105      */
106     private WriterPreferenceReadWriteLock removeAllLock = new WriterPreferenceReadWriteLock();
107 
108     // ----------------------------------------------------------- constructors
109 
110     /***
111      * Construc the abstract disk cache, create event queues and purgatory.
112      * <p>
113      * @param attr
114      */
115     public AbstractDiskCache( IDiskCacheAttributes attr )
116     {
117         this.dcattr = attr;
118 
119         this.cacheName = attr.getCacheName();
120 
121         // create queue
122         CacheEventQueueFactory fact = new CacheEventQueueFactory();
123         this.cacheEventQueue = fact.createCacheEventQueue( new MyCacheListener(), CacheInfo.listenerId, cacheName,
124                                                            dcattr.getEventQueuePoolName(), dcattr
125                                                                .getEventQueueTypeFactoryCode() );
126 
127         // create purgatory
128         initPurgatory();
129     }
130 
131     /***
132      * Purgatory size of -1 means to use a HashMap with no size limit. Anything greater will use an
133      * LRU map of some sort.
134      * <p>
135      * @TODO Currently setting this to 0 will cause nothing to be put to disk, since it will assume
136      *       that if an item is not in purgatory, then it must have been plucked. We should make 0
137      *       work, a way to not use purgatory.
138      */
139     private void initPurgatory()
140     {
141         try
142         {
143             // we need this so we can stop the updates from happening after a
144             // removeall
145             removeAllLock.writeLock().acquire();
146 
147             if ( purgatory != null )
148             {
149                 synchronized ( purgatory )
150                 {
151                     if ( dcattr.getMaxPurgatorySize() >= 0 )
152                     {
153                         purgatory = new LRUMapJCS( dcattr.getMaxPurgatorySize() );
154                     }
155                     else
156                     {
157                         purgatory = new HashMap();
158                     }
159                 }
160             }
161             else
162             {
163                 if ( dcattr.getMaxPurgatorySize() >= 0 )
164                 {
165                     purgatory = new LRUMapJCS( dcattr.getMaxPurgatorySize() );
166                 }
167                 else
168                 {
169                     purgatory = new HashMap();
170                 }
171             }
172         }
173         catch ( InterruptedException e )
174         {
175             log.error( "problem encountered resseting purgatory.", e );
176         }
177         finally
178         {
179             removeAllLock.writeLock().release();
180         }
181     }
182 
183     // ------------------------------------------------------- interface ICache
184 
185     /***
186      * Adds the provided element to the cache. Element will be added to purgatory, and then queued
187      * for later writing to the serialized storage mechanism.
188      * <p>
189      * An update results in a put event being created. The put event will call the handlePut method
190      * defined here. The handlePut method calls the implemented doPut on the child.
191      * <p>
192      * @param cacheElement
193      * @throws IOException
194      * @see org.apache.jcs.engine.behavior.ICache#update
195      */
196     public final void update( ICacheElement cacheElement )
197         throws IOException
198     {
199         if ( log.isDebugEnabled() )
200         {
201             log.debug( "Putting element in purgatory, cacheName: " + cacheName + ", key: " + cacheElement.getKey() );
202         }
203 
204         try
205         {
206             // Wrap the CacheElement in a PurgatoryElement
207             PurgatoryElement pe = new PurgatoryElement( cacheElement );
208 
209             // Indicates the the element is eligable to be spooled to disk,
210             // this will remain true unless the item is pulled back into
211             // memory.
212             pe.setSpoolable( true );
213 
214             // Add the element to purgatory
215             synchronized ( purgatory )
216             {
217                 purgatory.put( pe.getKey(), pe );
218             }
219 
220             // Queue element for serialization
221             cacheEventQueue.addPutEvent( pe );
222         }
223         catch ( IOException ex )
224         {
225             log.error( "Problem adding put event to queue.", ex );
226 
227             cacheEventQueue.destroy();
228         }
229     }
230 
231     /***
232      * Check to see if the item is in purgatory. If so, return it. If not, check to see if we have
233      * it on disk.
234      * <p>
235      * @param key
236      * @return ICacheElement or null
237      * @see AuxiliaryCache#get
238      */
239     public final ICacheElement get( Serializable key )
240     {
241         // If not alive, always return null.
242 
243         if ( !alive )
244         {
245             return null;
246         }
247 
248         PurgatoryElement pe = null;
249         synchronized ( purgatory )
250         {
251             pe = (PurgatoryElement) purgatory.get( key );
252         }
253 
254         // If the element was found in purgatory
255         if ( pe != null )
256         {
257             purgHits++;
258 
259             if ( log.isDebugEnabled() )
260             {
261                 if ( purgHits % 100 == 0 )
262                 {
263                     log.debug( "Purgatory hits = " + purgHits );
264                 }
265             }
266 
267             // Since the element will go back to the memory cache, we could set
268             // spoolable to false, which will prevent the queue listener from
269             // serializing the element. This would not match the disk cache
270             // behavior and the behavior of other auxiliaries. Gets never remove
271             // items from auxiliaries.
272             // Beyond consistency, the items should stay in purgatory and get
273             // spooled since the mem cache may be set to 0. If an item is
274             // active, it will keep getting put into purgatory and removed. The
275             // CompositeCache now does not put an item to memory from disk if
276             // the size is 0.
277             // Do not set spoolable to false. Just let it go to disk. This
278             // will allow the memory size = 0 setting to work well.
279 
280             if ( log.isDebugEnabled() )
281             {
282                 log.debug( "Found element in purgatory, cacheName: " + cacheName + ", key: " + key );
283             }
284 
285             return pe.cacheElement;
286         }
287 
288         // If we reach this point, element was not found in purgatory, so get
289         // it from the cache.
290         try
291         {
292             return doGet( key );
293         }
294         catch ( Exception e )
295         {
296             log.error( e );
297 
298             cacheEventQueue.destroy();
299         }
300 
301         return null;
302     }
303 
304     /***
305      * The keys in a group.
306      * <p>
307      * (non-Javadoc)
308      * @see org.apache.jcs.auxiliary.AuxiliaryCache#getGroupKeys(java.lang.String)
309      */
310     public abstract Set getGroupKeys( String groupName );
311 
312     /***
313      * Removes are not queued. A call to remove is immediate.
314      * <p>
315      * @param key
316      * @return whether the item was present to be removed.
317      * @see org.apache.jcs.engine.behavior.ICache#remove
318      */
319     public final boolean remove( Serializable key )
320     {
321         PurgatoryElement pe = null;
322 
323         synchronized ( purgatory )
324         {
325             // I'm getting the object, so I can lock on the element
326             // Remove element from purgatory if it is there
327             pe = (PurgatoryElement) purgatory.get( key );
328         }
329 
330         if ( pe != null )
331         {
332             synchronized ( pe.getCacheElement() )
333             {
334                 synchronized ( purgatory )
335                 {
336                     purgatory.remove( key );
337                 }
338 
339                 // no way to remove from queue, just make sure it doesn't get on
340                 // disk and then removed right afterwards
341                 pe.setSpoolable( false );
342 
343                 // Remove from persistent store immediately
344                 doRemove( key );
345             }
346         }
347         else
348         {
349             // Remove from persistent store immediately
350             doRemove( key );
351         }
352 
353         return false;
354     }
355 
356     /***
357      * @see org.apache.jcs.engine.behavior.ICache#removeAll
358      */
359     public final void removeAll()
360     {
361         if ( this.dcattr.isAllowRemoveAll() )
362         {
363             // Replace purgatory with a new empty hashtable
364             initPurgatory();
365 
366             // Remove all from persistent store immediately
367             doRemoveAll();
368         }
369         else
370         {
371             if ( log.isInfoEnabled() )
372             {
373                 log.info( "RemoveAll was requested but the request was not fulfilled: allowRemoveAll is set to false." );
374             }
375         }
376     }
377 
378     /***
379      * Adds a dispose request to the disk cache.
380      * <p>
381      * Disposal proceeds in several steps.
382      * <ol>
383      * <li> Prior to this call the Composite cache dumped the memory into the disk cache. If it is
384      * large then we need to wait for the event queue to finish.
385      * <li> Wait until the event queue is empty of until the configured ShutdownSpoolTimeLimit is
386      * reached.
387      * <li> Call doDispose on the concrete impl.
388      * </ol>
389      */
390     public final void dispose()
391     {
392         Runnable disR = new Runnable()
393         {
394             public void run()
395             {
396                 boolean keepGoing = true;
397                 long total = 0;
398                 long interval = 100;
399                 while ( keepGoing )
400                 {
401                     keepGoing = !cacheEventQueue.isEmpty();
402                     try
403                     {
404                         Thread.sleep( interval );
405                         total += interval;
406                         // log.info( "total = " + total );
407                     }
408                     catch ( InterruptedException e )
409                     {
410                         break;
411                     }
412                 }
413                 log.info( "No longer waiting for event queue to finish: " + cacheEventQueue.getStatistics() );
414             }
415         };
416         Thread t = new Thread( disR );
417         t.start();
418         // wait up to 60 seconds for dispose and then quit if not done.
419         try
420         {
421             t.join( this.dcattr.getShutdownSpoolTimeLimit() * 1000 );
422         }
423         catch ( InterruptedException ex )
424         {
425             log.error( ex );
426         }
427 
428         log.info( "In dispose, destroying event queue." );
429         // This stops the processor thread.
430         cacheEventQueue.destroy();
431 
432         // Invoke any implementation specific disposal code
433         // need to handle the disposal first.
434         doDispose();
435 
436         alive = false;
437     }
438 
439     /***
440      * @return the region name.
441      * @see ICache#getCacheName
442      */
443     public String getCacheName()
444     {
445         return cacheName;
446     }
447 
448     /***
449      * Gets basic stats for the abstract disk cache.
450      * <p>
451      * @return String
452      */
453     public String getStats()
454     {
455         return getStatistics().toString();
456     }
457 
458     /***
459      * Returns semi-structured data.
460      * <p>
461      * @see org.apache.jcs.auxiliary.AuxiliaryCache#getStatistics()
462      */
463     public IStats getStatistics()
464     {
465         IStats stats = new Stats();
466         stats.setTypeName( "Abstract Disk Cache" );
467 
468         ArrayList elems = new ArrayList();
469 
470         IStatElement se = null;
471 
472         se = new StatElement();
473         se.setName( "Purgatory Hits" );
474         se.setData( "" + purgHits );
475         elems.add( se );
476 
477         se = new StatElement();
478         se.setName( "Purgatory Size" );
479         se.setData( "" + purgatory.size() );
480         elems.add( se );
481 
482         // get the stats from the event queue too
483         // get as array, convert to list, add list to our outer list
484         IStats eqStats = this.cacheEventQueue.getStatistics();
485         IStatElement[] eqSEs = eqStats.getStatElements();
486         List eqL = Arrays.asList( eqSEs );
487         elems.addAll( eqL );
488 
489         // get an array and put them in the Stats object
490         IStatElement[] ses = (IStatElement[]) elems.toArray( new StatElement[0] );
491         stats.setStatElements( ses );
492 
493         return stats;
494     }
495 
496     /***
497      * @return the status -- alive or disposed from CacheConstants
498      * @see ICache#getStatus
499      */
500     public int getStatus()
501     {
502         return ( alive ? CacheConstants.STATUS_ALIVE : CacheConstants.STATUS_DISPOSED );
503     }
504 
505     /***
506      * Size cannot be determined without knowledge of the cache implementation, so subclasses will
507      * need to implement this method.
508      * <p>
509      * @return the number of items.
510      * @see ICache#getSize
511      */
512     public abstract int getSize();
513 
514     /***
515      * @see org.apache.jcs.engine.behavior.ICacheType#getCacheType
516      * @return Always returns DISK_CACHE since subclasses should all be of that type.
517      */
518     public int getCacheType()
519     {
520         return DISK_CACHE;
521     }
522 
523     /***
524      * Cache that implements the CacheListener interface, and calls appropriate methods in its
525      * parent class.
526      */
527     private class MyCacheListener
528         implements ICacheListener
529     {
530         /*** Id of the listener */
531         private long listenerId = 0;
532 
533         /***
534          * @return cacheElement.getElementAttributes();
535          * @throws IOException
536          * @see ICacheListener#getListenerId
537          */
538         public long getListenerId()
539             throws IOException
540         {
541             return this.listenerId;
542         }
543 
544         /***
545          * @param id
546          * @throws IOException
547          * @see ICacheListener#setListenerId
548          */
549         public void setListenerId( long id )
550             throws IOException
551         {
552             this.listenerId = id;
553         }
554 
555         /***
556          * @param element
557          * @throws IOException
558          * @see ICacheListener#handlePut NOTE: This checks if the element is a puratory element and
559          *      behaves differently depending. However since we have control over how elements are
560          *      added to the cache event queue, that may not be needed ( they are always
561          *      PurgatoryElements ).
562          */
563         public void handlePut( ICacheElement element )
564             throws IOException
565         {
566             if ( alive )
567             {
568                 // If the element is a PurgatoryElement we must check to see
569                 // if it is still spoolable, and remove it from purgatory.
570                 if ( element instanceof PurgatoryElement )
571                 {
572                     PurgatoryElement pe = (PurgatoryElement) element;
573 
574                     synchronized ( pe.getCacheElement() )
575                     {
576                         try
577                         {
578                             // TODO consider a timeout.
579                             // we need this so that we can have multiple update
580                             // threads
581                             // and still have removeAll request come in that
582                             // always win
583                             removeAllLock.readLock().acquire();
584 
585                             // TODO consider changing purgatory sync
586                             // String keyAsString = element.getKey().toString();
587                             synchronized ( purgatory )
588                             {
589                                 // If the element has already been removed from
590                                 // purgatory do nothing
591                                 if ( !purgatory.containsKey( pe.getKey() ) )
592                                 {
593                                     return;
594                                 }
595 
596                                 element = pe.getCacheElement();
597                             }
598 
599                             // I took this out of the purgatory sync block.
600                             // If the element is still eligable, spool it.
601                             if ( pe.isSpoolable() )
602                             {
603                                 doUpdate( element );
604                             }
605                         }
606                         catch ( InterruptedException e )
607                         {
608                             log.error( e );
609                         }
610                         finally
611                         {
612                             removeAllLock.readLock().release();
613                         }
614 
615                         synchronized ( purgatory )
616                         {
617                             // After the update has completed, it is safe to
618                             // remove
619                             // the element from purgatory.
620                             purgatory.remove( element.getKey() );
621                         }
622                     }
623                 }
624                 else
625                 {
626                     // call the child's implementation
627                     doUpdate( element );
628                 }
629             }
630             else
631             {
632                 /*
633                  * The cache is not alive, hence the element should be removed from purgatory. All
634                  * elements should be removed eventually. Perhaps, the alive check should have been
635                  * done before it went in the queue. This block handles the case where the disk
636                  * cache fails during normal opertations.
637                  */
638                 synchronized ( purgatory )
639                 {
640                     purgatory.remove( element.getKey() );
641                 }
642             }
643         }
644 
645         /***
646          * @param cacheName
647          * @param key
648          * @throws IOException
649          * @see ICacheListener#handleRemove
650          */
651         public void handleRemove( String cacheName, Serializable key )
652             throws IOException
653         {
654             if ( alive )
655             {
656                 if ( doRemove( key ) )
657                 {
658                     log.debug( "Element removed, key: " + key );
659                 }
660             }
661         }
662 
663         /***
664          * @param cacheName
665          * @throws IOException
666          * @see ICacheListener#handleRemoveAll
667          */
668         public void handleRemoveAll( String cacheName )
669             throws IOException
670         {
671             if ( alive )
672             {
673                 doRemoveAll();
674             }
675         }
676 
677         /***
678          * @param cacheName
679          * @throws IOException
680          * @see ICacheListener#handleDispose
681          */
682         public void handleDispose( String cacheName )
683             throws IOException
684         {
685             if ( alive )
686             {
687                 doDispose();
688             }
689         }
690     }
691 
692     // ---------------------- subclasses should implement the following methods
693 
694     /***
695      * Get a value from the persistent store.
696      * @param key Key to locate value for.
697      * @return An object matching key, or null.
698      */
699     protected abstract ICacheElement doGet( Serializable key );
700 
701     /***
702      * Add a cache element to the persistent store.
703      * @param element
704      */
705     protected abstract void doUpdate( ICacheElement element );
706 
707     /***
708      * Remove an object from the persistent store if found.
709      * @param key Key of object to remove.
710      * @return whether or no the item was present when removed
711      */
712     protected abstract boolean doRemove( Serializable key );
713 
714     /***
715      * Remove all objects from the persistent store.
716      */
717     protected abstract void doRemoveAll();
718 
719     /***
720      * Dispose of the persistent store. Note that disposal of purgatory and setting alive to false
721      * does NOT need to be done by this method.
722      */
723     protected abstract void doDispose();
724 
725 }