View Javadoc

1   package org.apache.jcs.auxiliary.remote.server;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.IOException;
23  import java.io.Serializable;
24  import java.rmi.RemoteException;
25  import java.rmi.registry.Registry;
26  import java.rmi.server.UnicastRemoteObject;
27  import java.rmi.server.Unreferenced;
28  import java.util.Collections;
29  import java.util.Enumeration;
30  import java.util.Hashtable;
31  import java.util.Iterator;
32  import java.util.Map;
33  import java.util.Set;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheAttributes;
38  import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheListener;
39  import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheObserver;
40  import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheService;
41  import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheServiceAdmin;
42  import org.apache.jcs.auxiliary.remote.server.behavior.IRemoteCacheServerAttributes;
43  import org.apache.jcs.engine.CacheEventQueueFactory;
44  import org.apache.jcs.engine.CacheListeners;
45  import org.apache.jcs.engine.behavior.ICacheElement;
46  import org.apache.jcs.engine.behavior.ICacheEventQueue;
47  import org.apache.jcs.engine.behavior.ICacheListener;
48  import org.apache.jcs.engine.control.CompositeCache;
49  import org.apache.jcs.engine.control.CompositeCacheManager;
50  
51  /***
52   * This class provides remote cache services. The remote cache server propagates
53   * events from local caches to other local caches. It can also store cached
54   * data, making it available to new clients.
55   * <p>
56   * Remote cache servers can be clustered. If the cache used by this remote cache
57   * is configured to use a remote cache of type cluster, the two remote caches
58   * will communicate with each other. Remote and put requests can be sent from
59   * one remote to another. If they are configured to broadcast such event to
60   * their client, then remove an puts can be sent to all locals in the cluster.
61   * <p>
62   * Get requests are made between clustered servers if AllowClusterGet is true. You can setup
63   * several clients to use one remote server and several to use another. The get
64   * locad will be distributed between the two servers. Since caches are usually
65   * high get and low put, this should allow you to scale.
66   */
67  class RemoteCacheServer
68      extends UnicastRemoteObject
69      implements IRemoteCacheService, IRemoteCacheObserver, IRemoteCacheServiceAdmin, Unreferenced
70  {
71      private static final long serialVersionUID = -8072345435941473116L;
72  
73      private final static Log log = LogFactory.getLog( RemoteCacheServer.class );
74  
75      /*** timing -- if we should record operation times. */
76      protected final static boolean timing = true;
77  
78      private int puts = 0;
79  
80      // Maps cache name to CacheListeners object.
81      // association of listeners (regions).
82      private final Hashtable cacheListenersMap = new Hashtable();
83  
84      private final Hashtable clusterListenersMap = new Hashtable();
85  
86      private CompositeCacheManager cacheManager;
87  
88      // relates listener id with a type
89      private final Hashtable idTypeMap = new Hashtable();
90  
91      // private transient int listenerId = 0;
92      private int[] listenerId = new int[1];
93  
94      /*** Configuration settings. */
95      protected IRemoteCacheServerAttributes rcsa;
96  
97      /*** The interval at which we will log updates. */
98      private int logInterval = 100;
99  
100     /***
101      * Constructor for the RemoteCacheServer object. Thiks initializes the
102      * server with the values from the config file.
103      * <p>
104      * @param rcsa
105      * @throws RemoteException
106      * @exception IOException
107      */
108     RemoteCacheServer( IRemoteCacheServerAttributes rcsa )
109         throws RemoteException
110     {
111         super( rcsa.getServicePort() );
112         this.rcsa = rcsa;
113         init( rcsa.getConfigFileName() );
114     }
115 
116     /***
117      * Initialize the RMI Cache Server from a properties file.
118      * <p>
119      * @param prop
120      */
121     private void init( String prop )
122     {
123         cacheManager = createCacheManager( prop );
124 
125         // cacheManager would have created a number of ICache objects.
126         // Use these objects to set up the cacheListenersMap.
127         String[] list = cacheManager.getCacheNames();
128         for ( int i = 0; i < list.length; i++ )
129         {
130             String name = list[i];
131             cacheListenersMap.put( name, new CacheListeners( cacheManager.getCache( name ) ) );
132         }
133     }
134 
135     /***
136      * Subclass can override this method to create the specific cache manager.
137      * <p>
138      * @param prop
139      *            The anem of the configuration file.
140      * @return The cache hub configured with this configuration file.
141      */
142     private CompositeCacheManager createCacheManager( String prop )
143     {
144         CompositeCacheManager hub = CompositeCacheManager.getUnconfiguredInstance();
145 
146         if ( prop == null )
147         {
148             hub.configure( "/remote.cache.ccf" );
149         }
150         else
151         {
152             hub.configure( prop );
153         }
154         return hub;
155     }
156 
157     /***
158      * Returns the cache listener for the specified cache. Creates the cache and
159      * the cache descriptor if they do not already exist.
160      * <p>
161      * @param cacheName
162      * @return The cacheListeners value
163      */
164     protected CacheListeners getCacheListeners( String cacheName )
165     {
166         CacheListeners cacheListeners = (CacheListeners) cacheListenersMap.get( cacheName );
167         synchronized ( cacheListenersMap )
168         {
169             if ( cacheListeners == null )
170             {
171                 cacheListeners = (CacheListeners) cacheListenersMap.get( cacheName );
172                 if ( cacheListeners == null )
173                 {
174                     cacheListeners = new CacheListeners( cacheManager.getCache( cacheName ) );
175                     cacheListenersMap.put( cacheName, cacheListeners );
176                 }
177             }
178         }
179         return cacheListeners;
180     }
181 
182     /***
183      * Gets the clusterListeners attribute of the RemoteCacheServer object.
184      * <p>
185      * @todo may be able to remove this
186      *
187      * @param cacheName
188      * @return The clusterListeners value
189      */
190     protected CacheListeners getClusterListeners( String cacheName )
191     {
192         CacheListeners cacheListeners = (CacheListeners) clusterListenersMap.get( cacheName );
193         synchronized ( clusterListenersMap )
194         {
195             if ( cacheListeners == null )
196             {
197                 cacheListeners = (CacheListeners) clusterListenersMap.get( cacheName );
198                 if ( cacheListeners == null )
199                 {
200                     cacheListeners = new CacheListeners( cacheManager.getCache( cacheName ) );
201                     clusterListenersMap.put( cacheName, cacheListeners );
202                 }
203             }
204         }
205         return cacheListeners;
206     }
207 
208     /***
209      * Puts a cache bean to the remote cache and notifies all listeners which
210      * <br>
211      * <ol>
212      * <li>have a different listener id than the originating host;
213      * <li>are currently subscribed to the related cache.
214      * </ol>
215      * <p>
216      * @param item
217      * @throws IOException
218      *
219      */
220     public void put( ICacheElement item )
221         throws IOException
222     {
223         update( item );
224     }
225 
226     /*
227      * (non-Javadoc)
228      *
229      * @see org.apache.jcs.engine.behavior.ICacheService#update(org.apache.jcs.engine.behavior.ICacheElement)
230      */
231     public void update( ICacheElement item )
232         throws IOException
233     {
234         update( item, 0 );
235     }
236 
237     /***
238      * An update can come from either a local cache's remote auxiliary, or it
239      * can come from a remote server. A remote server is considered a a source
240      * of type cluster.
241      * <p>
242      * If the update came from a cluster, then we should tell the cache manager
243      * that this was a remote put. This way, any lateral and remote auxiliaries
244      * configured for the region will not be updated. This is basically how a
245      * remote listener works when plugged into a local cache.
246      * <p>
247      * If the cluster is configured to keep local cluster consistency, then all
248      * listeners will be updated. This allows cluster server A to update cluster
249      * server B and then B to update its clients if it is told to keep local
250      * cluster consistency. Otherwise, server A will update server B and B will
251      * not tell its clients. If you cluster using lateral caches for instance,
252      * this is how it will work. Updates to a cluster node, will never get to
253      * the leavess. The remote cluster, with local cluster consistency, allows
254      * you to update leaves. This basically allows you to have a failover remote
255      * server.
256      * <p>
257      * Since currently a cluster will not try to get from other cluster servers,
258      * you can scale a bit with a cluster configuration. Puts and removes will
259      * be broadcasted to all clients, but the get load on a remote server can be
260      * reduced.
261      * <p>
262      * @param item
263      * @param requesterId
264      * @throws IOException
265      */
266     public void update( ICacheElement item, long requesterId )
267         throws IOException
268     {
269 
270         long start = 0;
271         if ( timing )
272         {
273             start = System.currentTimeMillis();
274         }
275 
276         if ( log.isInfoEnabled() )
277         {
278             // not thread safe, but it doesn't have to be accurate
279             puts++;
280             if ( puts % logInterval == 0 )
281             {
282                 log.info( "puts = " + puts );
283             }
284         }
285 
286         if ( log.isDebugEnabled() )
287         {
288             log.debug( "In update, put [" + item.getKey() + "] in [" + item.getCacheName() + "]" );
289         }
290 
291         try
292         {
293             CacheListeners cacheDesc = getCacheListeners( item.getCacheName() );
294             /* Object val = */item.getVal();
295 
296             Integer remoteTypeL = (Integer) idTypeMap.get( new Long( requesterId ) );
297             if ( log.isDebugEnabled() )
298             {
299                 log.debug( "In update, requesterId = [" + requesterId + "] remoteType = " + remoteTypeL );
300             }
301 
302             boolean fromCluster = false;
303             if ( remoteTypeL != null && remoteTypeL.intValue() == IRemoteCacheAttributes.CLUSTER )
304             {
305                 fromCluster = true;
306             }
307             // ordered cache item update and notification.
308             synchronized ( cacheDesc )
309             {
310                 try
311                 {
312                     CompositeCache c = (CompositeCache) cacheDesc.cache;
313 
314                     // If the source of this request was not from a cluster,
315                     // then consider it a local update. The cache manager will
316                     // try to
317                     // update all auxiliaries.
318                     //
319                     // This requires that two local caches not be connected to
320                     // two clustered remote caches. The failover runner will
321                     // have to make sure of this. ALos, the local cache needs
322                     // avoid updating this source. Will need to pass the source
323                     // id somehow. The remote cache should udate all local
324                     // caches
325                     // but not update the cluster source. Cluster remote caches
326                     // should only be updated by the server and not the
327                     // RemoteCache.
328                     if ( fromCluster )
329                     {
330                         if ( log.isDebugEnabled() )
331                         {
332                             log.debug( "Put FROM cluster, NOT updating other auxiliaries for region. "
333                                 + " requesterId [" + requesterId + "]" );
334                         }
335                         c.localUpdate( item );
336                     }
337                     else
338                     {
339                         if ( log.isDebugEnabled() )
340                         {
341                             log.debug( "Put NOT from cluster, updating other auxiliaries for region. "
342                                 + " requesterId [" + requesterId + "]" );
343                         }
344                         c.update( item );
345                     }
346                 }
347                 catch ( Exception ce )
348                 {
349                     // swallow
350                     if ( log.isInfoEnabled() )
351                     {
352                         log.info( "Exception caught updating item. requesterId [" + requesterId + "] "
353                             + ce.getMessage() );
354                     }
355                 }
356 
357                 // UPDATE LOCALS IF A REQUEST COMES FROM A CLUSTER
358                 // IF LOCAL CLUSTER CONSISTENCY IS CONFIGURED
359                 if ( !fromCluster || ( fromCluster && rcsa.getLocalClusterConsistency() ) )
360                 {
361                     ICacheEventQueue[] qlist = getEventQList( cacheDesc, requesterId );
362 
363                     if ( qlist != null )
364                     {
365                         if ( log.isDebugEnabled() )
366                         {
367                             log.debug( "qlist.length = " + qlist.length );
368                         }
369                         for ( int i = 0; i < qlist.length; i++ )
370                         {
371                             qlist[i].addPutEvent( item );
372                         }
373                     }
374                     else
375                     {
376                         if ( log.isDebugEnabled() )
377                         {
378                             log.debug( "q list is null" );
379                         }
380                     }
381                 }
382             }
383         }
384         catch ( Exception e )
385         {
386             log.error( "Trouble in Update. requesterId [" + requesterId + "]", e );
387         }
388 
389         // TODO use JAMON for timing
390         if ( timing )
391         {
392             long end = System.currentTimeMillis();
393             if ( log.isDebugEnabled() )
394             {
395                 log.debug( "put took " + String.valueOf( end - start ) + " ms." );
396             }
397         }
398 
399         return;
400     }
401 
402     /***
403      * Gets the eventQList attribute of the RemoteCacheServer object. This
404      * returns the event queues stored in the cacheListeners object for a
405      * particuylar region, if the queue is not for this requester.
406      * <p>
407      * Basically, this makes sure that a request from a particular local cache,
408      * identified by its listener id, does not result in a call to that same
409      * listener.
410      * <p>
411      * @param cacheListeners
412      * @param requesterId
413      * @return The eventQList value
414      */
415     private ICacheEventQueue[] getEventQList( CacheListeners cacheListeners, long requesterId )
416     {
417         ICacheEventQueue[] list = null;
418         synchronized ( cacheListeners.eventQMap )
419         {
420             list = (ICacheEventQueue[]) cacheListeners.eventQMap.values().toArray( new ICacheEventQueue[0] );
421         }
422         int count = 0;
423         // Set those not qualified to null; Count those qualified.
424         for ( int i = 0; i < list.length; i++ )
425         {
426             ICacheEventQueue q = list[i];
427             if ( q.isWorking() && q.getListenerId() != requesterId )
428             {
429                 count++;
430             }
431             else
432             {
433                 list[i] = null;
434             }
435         }
436         if ( count == list.length )
437         {
438             // All qualified.
439             return list;
440         }
441 
442         // Returns only the qualified.
443         ICacheEventQueue[] qq = new ICacheEventQueue[count];
444         count = 0;
445         for ( int i = 0; i < list.length; i++ )
446         {
447             if ( list[i] != null )
448             {
449                 qq[count++] = list[i];
450             }
451         }
452         return qq;
453     }
454 
455     /***
456      * Returns a cache value from the specified remote cache; or null if the
457      * cache or key does not exist.
458      * <p>
459      * @param cacheName
460      * @param key
461      * @return ICacheElement
462      * @throws IOException
463      */
464     public ICacheElement get( String cacheName, Serializable key )
465         throws IOException
466     {
467         return this.get( cacheName, key, 0 );
468     }
469 
470     /***
471      * Returns a cache bean from the specified cache; or null if the key does
472      * not exist.
473      * <p>
474      * Adding the requestor id, allows the cache to determine the sournce of the
475      * get.
476      * <p>
477      * @param cacheName
478      * @param key
479      * @param requesterId
480      * @return ICacheElement
481      * @throws IOException
482      */
483     public ICacheElement get( String cacheName, Serializable key, long requesterId )
484         throws IOException
485     {
486         Integer remoteTypeL = (Integer) idTypeMap.get( new Long( requesterId ) );
487 
488         if ( log.isDebugEnabled() )
489         {
490             log.debug( "get [" + key + "] from cache [" + cacheName + "] requesterId = [" + requesterId + "] remoteType = "
491                 + remoteTypeL );
492         }
493 
494         // Since a non-receiving remote cache client will not register a
495         // listener, it will not have a listener id assigned from the server. As
496         // such the remote server cannot determine if it is a cluster or a
497         // normal client. It will assume that it is a normal client.
498 
499         boolean fromCluster = false;
500         if ( remoteTypeL != null && remoteTypeL.intValue() == IRemoteCacheAttributes.CLUSTER )
501         {
502             fromCluster = true;
503         }
504 
505         CacheListeners cacheDesc = null;
506         try
507         {
508             cacheDesc = getCacheListeners( cacheName );
509         }
510         catch ( Exception e )
511         {
512             log.error( "Problem getting listeners.", e );
513         }
514 
515         if ( cacheDesc == null )
516         {
517             return null;
518         }
519         CompositeCache c = (CompositeCache) cacheDesc.cache;
520 
521         ICacheElement element = null;
522 
523         // If we have a get come in from a client and we don't have the item
524         // locally, we will allow the cache to look in other non local sources,
525         // such as a remote cache or a lateral.
526         //
527         // Since remote servers never get from clients and clients never go
528         // remote from a remote call, this
529         // will not result in any loops.
530         //
531         // This is the only instance I can think of where we allow a remote get
532         // from a remote call. The purpose is to allow remote cache servers to
533         // talk to each other. If one goes down, you want it to be able to get
534         // data from those that were up when the failed server comes back o
535         // line.
536 
537         if ( !fromCluster && this.rcsa.getAllowClusterGet() )
538         {
539             if ( log.isDebugEnabled() )
540             {
541                 log.debug( "NonLocalGet. fromCluster [" + fromCluster + "] AllowClusterGet [" + this.rcsa.getAllowClusterGet() + "]"  );
542             }
543             element = c.get( key );
544         }
545         else
546         {
547             // Gets from cluster type remote will end up here.
548             // Gets from all clients will end up here if allow cluster get is
549             // false.
550 
551             if ( log.isDebugEnabled() )
552             {
553                 log.debug( "LocalGet.  fromCluster [" + fromCluster + "] AllowClusterGet [" + this.rcsa.getAllowClusterGet() + "]" );
554             }
555             element = c.localGet( key );
556         }
557 
558         return element;
559     }
560 
561     /***
562      * Gets the set of keys of objects currently in the group.
563      * <p>
564      * @param cacheName
565      * @param group
566      * @return A Set of group keys
567      */
568     public Set getGroupKeys( String cacheName, String group )
569     {
570         CacheListeners cacheDesc = null;
571         try
572         {
573             cacheDesc = getCacheListeners( cacheName );
574         }
575         catch ( Exception e )
576         {
577             log.error( "Problem getting listeners.", e );
578         }
579 
580         if ( cacheDesc == null )
581         {
582             return Collections.EMPTY_SET;
583         }
584         CompositeCache c = (CompositeCache) cacheDesc.cache;
585         return c.getGroupKeys( group );
586     }
587 
588     /***
589      * Removes the given key from the specified remote cache. Defaults the
590      * listener id to 0.
591      * <p>
592      * @param cacheName
593      * @param key
594      * @throws IOException
595      */
596     public void remove( String cacheName, Serializable key )
597         throws IOException
598     {
599         remove( cacheName, key, 0 );
600     }
601 
602     /***
603      * Remove the key from the cache region and don't tell the source listener
604      * about it.
605      * <p>
606      * @param cacheName
607      * @param key
608      * @param requesterId
609      * @throws IOException
610      */
611     public void remove( String cacheName, Serializable key, long requesterId )
612         throws IOException
613     {
614         if ( log.isDebugEnabled() )
615         {
616             log.debug( "remove [" + key + "] from cache [" + cacheName + "]" );
617         }
618         CacheListeners cacheDesc = (CacheListeners) cacheListenersMap.get( cacheName );
619 
620         Integer remoteTypeL = (Integer) idTypeMap.get( new Long( requesterId ) );
621         boolean fromCluster = false;
622         if ( remoteTypeL != null && remoteTypeL.intValue() == IRemoteCacheAttributes.CLUSTER )
623         {
624             fromCluster = true;
625         }
626 
627         if ( cacheDesc != null )
628         {
629             // best attempt to achieve ordered cache item removal and
630             // notification.
631             synchronized ( cacheDesc )
632             {
633                 boolean removeSuccess = false;
634 
635                 // No need to notify if it was not cached.
636                 CompositeCache c = (CompositeCache) cacheDesc.cache;
637 
638                 if ( fromCluster )
639                 {
640                     if ( log.isDebugEnabled() )
641                     {
642                         log.debug( "Remove FROM cluster, NOT updating other auxiliaries for region" );
643                     }
644                     removeSuccess = c.localRemove( key );
645                 }
646                 else
647                 {
648                     if ( log.isDebugEnabled() )
649                     {
650                         log.debug( "Remove NOT from cluster, updating other auxiliaries for region" );
651                     }
652                     removeSuccess = c.remove( key );
653                 }
654 
655                 if ( log.isDebugEnabled() )
656                 {
657                     log.debug( "remove [" + key + "] from cache [" + cacheName + "] success (was it found) = "
658                         + removeSuccess );
659                 }
660 
661                 // UPDATE LOCALS IF A REQUEST COMES FROM A CLUSTER
662                 // IF LOCAL CLUSTER CONSISTENCY IS CONFIGURED
663                 if ( !fromCluster || ( fromCluster && rcsa.getLocalClusterConsistency() ) )
664                 {
665                     ICacheEventQueue[] qlist = getEventQList( cacheDesc, requesterId );
666 
667                     for ( int i = 0; i < qlist.length; i++ )
668                     {
669                         qlist[i].addRemoveEvent( key );
670                     }
671                 }
672             }
673         }
674         return;
675     }
676 
677     /***
678      * Remove all keys from the sepcified remote cache.
679      * <p>
680      * @param cacheName
681      * @throws IOException
682      */
683     public void removeAll( String cacheName )
684         throws IOException
685     {
686         removeAll( cacheName, 0 );
687     }
688 
689     /***
690      * Remove all keys from the specified remote cache.
691      * <p>
692      * @param cacheName
693      * @param requesterId
694      * @throws IOException
695      */
696     public void removeAll( String cacheName, long requesterId )
697         throws IOException
698     {
699         CacheListeners cacheDesc = (CacheListeners) cacheListenersMap.get( cacheName );
700 
701         Integer remoteTypeL = (Integer) idTypeMap.get( new Long( requesterId ) );
702         boolean fromCluster = false;
703         if ( remoteTypeL != null && remoteTypeL.intValue() == IRemoteCacheAttributes.CLUSTER )
704         {
705             fromCluster = true;
706         }
707 
708         if ( cacheDesc != null )
709         {
710             // best attempt to achieve ordered cache item removal and
711             // notification.
712             synchronized ( cacheDesc )
713             {
714                 // No need to broadcast, or notify if it was not cached.
715                 CompositeCache c = (CompositeCache) cacheDesc.cache;
716 
717                 if ( fromCluster )
718                 {
719                     if ( log.isDebugEnabled() )
720                     {
721                         log.debug( "RemoveALL FROM cluster, NOT updating other auxiliaries for region" );
722                     }
723                     c.localRemoveAll();
724                 }
725                 else
726                 {
727                     if ( log.isDebugEnabled() )
728                     {
729                         log.debug( "RemoveALL NOT from cluster, updating other auxiliaries for region" );
730                     }
731                     c.removeAll();
732                 }
733 
734                 // update registered listeners
735                 if ( !fromCluster || ( fromCluster && rcsa.getLocalClusterConsistency() ) )
736                 {
737                     ICacheEventQueue[] qlist = getEventQList( cacheDesc, requesterId );
738 
739                     for ( int i = 0; i < qlist.length; i++ )
740                     {
741                         qlist[i].addRemoveAllEvent();
742                     }
743                 }
744             }
745         }
746         return;
747     }
748 
749     /***
750      * How many put events have we received.
751      * <p>
752      * @return puts
753      */
754     protected int getPutCount()
755     {
756         return puts;
757     }
758 
759     /***
760      * Frees the specified remote cache.
761      * <p>
762      * @param cacheName
763      * @throws IOException
764      */
765     public void dispose( String cacheName )
766         throws IOException
767     {
768         dispose( cacheName, 0 );
769     }
770 
771     /***
772      * Frees the specified remote cache.
773      * <p>
774      * @param cacheName
775      * @param requesterId
776      * @throws IOException
777      */
778     public void dispose( String cacheName, long requesterId )
779         throws IOException
780     {
781         if ( log.isInfoEnabled() )
782         {
783             log.info( "Dispose request received from listener [" + requesterId + "]" );
784         }
785 
786         CacheListeners cacheDesc = (CacheListeners) cacheListenersMap.get( cacheName );
787 
788         // this is dangerous
789         if ( cacheDesc != null )
790         {
791             // best attempt to achieve ordered free-cache-op and notification.
792             synchronized ( cacheDesc )
793             {
794                 ICacheEventQueue[] qlist = getEventQList( cacheDesc, requesterId );
795 
796                 for ( int i = 0; i < qlist.length; i++ )
797                 {
798                     qlist[i].addDisposeEvent();
799                 }
800                 cacheManager.freeCache( cacheName );
801             }
802         }
803         return;
804     }
805 
806     /***
807      * Frees all remote caches.
808      * <p>
809      * @throws IOException
810      */
811     public void release()
812         throws IOException
813     {
814         synchronized ( cacheListenersMap )
815         {
816             for ( Enumeration en = cacheListenersMap.elements(); en.hasMoreElements(); )
817             {
818                 CacheListeners cacheDesc = (CacheListeners) en.nextElement();
819                 ICacheEventQueue[] qlist = getEventQList( cacheDesc, 0 );
820 
821                 for ( int i = 0; i < qlist.length; i++ )
822                 {
823                     qlist[i].addDisposeEvent();
824                 }
825             }
826             cacheManager.release();
827         }
828         return;
829     }
830 
831     /***
832      * Removes dead event queues. Should clean out deregistered listeners.
833      * <p>
834      * @param eventQMap
835      */
836     private static void cleanupEventQMap( Map eventQMap )
837     {
838         synchronized ( eventQMap )
839         {
840             for ( Iterator itr = eventQMap.entrySet().iterator(); itr.hasNext(); )
841             {
842                 Map.Entry e = (Map.Entry) itr.next();
843                 ICacheEventQueue q = (ICacheEventQueue) e.getValue();
844 
845                 // this does not care if the q is alive (i.e. if
846                 // there are active threads; it cares if the queue
847                 // is working -- if it has not encoutnered errors
848                 // above the failure threshhold
849                 if ( !q.isWorking() )
850                 {
851                     itr.remove();
852                     log.warn( "Cache event queue " + q + " is not working and removed from cache server." );
853                 }
854             }
855         }
856     }
857 
858     /***
859      * Subscribes to the specified remote cache.
860      * <p>
861      * If the client id is 0, then the remote cache server will increment it's
862      * local count and assign an id to the client.
863      * <p>
864      * @param cacheName
865      *            the specified remote cache.
866      * @param listener
867      *            object to notify for cache changes. must be synchronized since
868      *            there are remote calls involved.
869      * @throws IOException
870      */
871     public void addCacheListener( String cacheName, ICacheListener listener )
872         throws IOException
873     {
874         if ( cacheName == null || listener == null )
875         {
876             throw new IllegalArgumentException( "cacheName and listener must not be null" );
877         }
878         CacheListeners cacheDesc;
879 
880         IRemoteCacheListener ircl = (IRemoteCacheListener) listener;
881 
882         String listenerAddress = ircl.getLocalHostAddress();
883 
884         int remoteType = ircl.getRemoteType();
885         if ( remoteType == IRemoteCacheAttributes.CLUSTER )
886         {
887             log.debug( "adding cluster listener, listenerAddress [" + listenerAddress + "]" );
888             cacheDesc = getClusterListeners( cacheName );
889         }
890         else
891         {
892             log.debug( "adding normal listener, listenerAddress [" + listenerAddress + "]" );
893             cacheDesc = getCacheListeners( cacheName );
894         }
895         Map eventQMap = cacheDesc.eventQMap;
896         cleanupEventQMap( eventQMap );
897 
898         // synchronized ( listenerId )
899         synchronized ( ICacheListener.class )
900         {
901             long id = 0;
902             try
903             {
904                 id = listener.getListenerId();
905                 // clients problably shouldn't do this.
906                 if ( id == 0 )
907                 {
908                     // must start at one so the next gets recognized
909                     long listenerIdB = nextListenerId();
910                     if ( log.isDebugEnabled() )
911                     {
912                         log.debug( "listener id=" + ( listenerIdB & 0xff ) + " addded for cache [" + cacheName
913                             + "], listenerAddress [" + listenerAddress + "]" );
914                     }
915                     listener.setListenerId( listenerIdB );
916                     id = listenerIdB;
917 
918                     // in case it needs synchronization
919                     if ( log.isInfoEnabled() )
920                     {
921                         log.info( "adding vm listener under new id = [" + listenerIdB + "], listenerAddress ["
922                             + listenerAddress + "]" );
923                     }
924                 }
925                 else
926                 {
927                     if ( log.isInfoEnabled() )
928                     {
929                         log.info( "adding listener under existing id = [" + id + "], listenerAddress ["
930                             + listenerAddress + "]" );
931                     }
932                     // should confirm the the host is the same as we have on
933                     // record, just in case a client has made a mistake.
934                 }
935 
936                 // relate the type to an id
937                 this.idTypeMap.put( new Long( id ), new Integer( remoteType ) );
938             }
939             catch ( IOException ioe )
940             {
941                 log.error( "Problem setting listener id, listenerAddress [" + listenerAddress + "]", ioe );
942             }
943 
944             CacheEventQueueFactory fact = new CacheEventQueueFactory();
945             ICacheEventQueue q = fact.createCacheEventQueue( listener, id, cacheName, rcsa.getEventQueuePoolName(),
946                                                              rcsa.getEventQueueTypeFactoryCode() );
947 
948             eventQMap.put( new Long( listener.getListenerId() ), q );
949 
950             if ( log.isInfoEnabled() )
951             {
952                 log.info( "Region " + cacheName + "'s listener size = " + cacheDesc.eventQMap.size() );
953             }
954         }
955     }
956 
957     /***
958      * Subscribes to all remote caches.
959      * <p>
960      * @param listener
961      *            The feature to be added to the CacheListener attribute
962      * @throws IOException
963      */
964     public void addCacheListener( ICacheListener listener )
965         throws IOException
966     {
967         for ( Enumeration en = cacheListenersMap.keys(); en.hasMoreElements(); )
968         {
969             String cacheName = (String) en.nextElement();
970             addCacheListener( cacheName, listener );
971 
972             if ( log.isDebugEnabled() )
973             {
974                 log.debug( "Adding listener for cache [" + cacheName + "]" );
975             }
976         }
977     }
978 
979     /***
980      * Unsubscribe this listener from this region. If the listener is
981      * registered, it will be removed from the event queue map list.
982      * <p>
983      * @param cacheName
984      * @param listenerId
985      */
986     public void removeCacheListener( String cacheName, ICacheListener listener )
987         throws IOException
988     {
989         removeCacheListener( cacheName, listener.getListenerId() );
990     }
991 
992     /***
993      * Unsubscribe this listener from this region. If the listener is
994      * registered, it will be removed from the event queue map list.
995      * <p>
996      * @param cacheName
997      * @param listenerId
998      */
999     public void removeCacheListener( String cacheName, long listenerId )
1000     {
1001         if ( log.isInfoEnabled() )
1002         {
1003             log.info( "Removing listener for cache region = [" + cacheName + "] and listenerId [" + listenerId + "]" );
1004         }
1005 
1006         Integer remoteTypeL = (Integer) idTypeMap.get( new Long( listenerId ) );
1007         boolean isClusterListener = false;
1008         if ( remoteTypeL != null && remoteTypeL.intValue() == IRemoteCacheAttributes.CLUSTER )
1009         {
1010             isClusterListener = true;
1011         }
1012 
1013         CacheListeners cacheDesc = null;
1014 
1015         if ( isClusterListener )
1016         {
1017             cacheDesc = getClusterListeners( cacheName );
1018         }
1019         else
1020         {
1021             cacheDesc = getCacheListeners( cacheName );
1022         }
1023         Map eventQMap = cacheDesc.eventQMap;
1024         cleanupEventQMap( eventQMap );
1025         ICacheEventQueue q = (ICacheEventQueue) eventQMap.remove( new Long( listenerId ) );
1026 
1027         if ( q != null )
1028         {
1029             if ( log.isDebugEnabled() )
1030             {
1031                 log.debug( "Found queue for cache region = [" + cacheName + "] and listenerId  [" + listenerId + "]" );
1032             }
1033             q.destroy();
1034             cleanupEventQMap( eventQMap );
1035         }
1036         else
1037         {
1038             if ( log.isDebugEnabled() )
1039             {
1040                 log.debug( "Did not find queue for cache region = [" + cacheName + "] and listenerId [" + listenerId
1041                     + "]" );
1042             }
1043         }
1044 
1045         if ( log.isInfoEnabled() )
1046         {
1047             log.info( "After removing listener [" + listenerId + "] cache region " + cacheName + "'s listener size ["
1048                 + cacheDesc.eventQMap.size() + "]" );
1049         }
1050     }
1051 
1052     /***
1053      * Unsubscribes from all remote caches.
1054      * <p>
1055      * @param listener
1056      * @throws IOException
1057      */
1058     public void removeCacheListener( ICacheListener listener )
1059         throws IOException
1060     {
1061         for ( Enumeration en = cacheListenersMap.keys(); en.hasMoreElements(); )
1062         {
1063             String cacheName = (String) en.nextElement();
1064             removeCacheListener( cacheName, listener );
1065 
1066             if ( log.isInfoEnabled() )
1067             {
1068                 log.info( "Removing listener for cache [" + cacheName + "]" );
1069             }
1070         }
1071         return;
1072     }
1073 
1074     /***
1075      * Shuts down the remote server.
1076      * <p>
1077      * @throws IOException
1078      */
1079     public void shutdown()
1080         throws IOException
1081     {
1082         RemoteCacheServerFactory.shutdownImpl( "", Registry.REGISTRY_PORT );
1083     }
1084 
1085     /***
1086      * Shuts down a server at a particular host and port.  Then it calls shutdown on the cache itself.
1087      * <p>
1088      * @param host
1089      * @param port
1090      * @throws IOException
1091      */
1092     public void shutdown( String host, int port )
1093         throws IOException
1094     {
1095         if ( log.isInfoEnabled() )
1096         {
1097             log.info( "Received shutdown request.  Shutting down server." );
1098         }
1099         RemoteCacheServerFactory.shutdownImpl( host, port );
1100         this.cacheManager.shutDown();
1101     }
1102 
1103     /***
1104      * Called by the RMI runtime sometime after the runtime determines that the
1105      * reference list, the list of clients referencing the remote object,
1106      * becomes empty.
1107      */
1108     // TODO: test out the DGC.
1109     public void unreferenced()
1110     {
1111         if ( log.isInfoEnabled() )
1112         {
1113             log.info( "*** Server now unreferenced and subject to GC. ***" );
1114         }
1115     }
1116 
1117     /***
1118      * Returns the next generated listener id [0,255].
1119      * <p>
1120      * @return the listener id of a client. This should be unique for this
1121      *         server.
1122      */
1123     private long nextListenerId()
1124     {
1125         long id = 0;
1126         if ( listenerId[0] == Long.MAX_VALUE )
1127         {
1128             synchronized ( listenerId )
1129             {
1130                 id = listenerId[0];
1131                 listenerId[0] = 0;
1132                 // TODO: record & check if the generated id is currently being
1133                 // used by a valid listener. Currently if the id wraps after
1134                 // Long.MAX_VALUE,
1135                 // we just assume it won't collide with an existing listener who
1136                 // is live.
1137             }
1138         }
1139         else
1140         {
1141             synchronized ( listenerId )
1142             {
1143                 id = ++listenerId[0];
1144             }
1145         }
1146         return id;
1147     }
1148 
1149     /***
1150      * Gets the stats attribute of the RemoteCacheServer object.
1151      * <p>
1152      * @return The stats value
1153      * @throws IOException
1154      */
1155     public String getStats()
1156         throws IOException
1157     {
1158         return cacheManager.getStats();
1159     }
1160 }