View Javadoc

1   package org.apache.jcs.auxiliary.remote;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.IOException;
23  import java.rmi.Naming;
24  import java.rmi.registry.Registry;
25  import java.util.HashMap;
26  import java.util.Iterator;
27  import java.util.Map;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.jcs.auxiliary.AuxiliaryCache;
32  import org.apache.jcs.auxiliary.AuxiliaryCacheManager;
33  import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheAttributes;
34  import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheClient;
35  import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheListener;
36  import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheObserver;
37  import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheService;
38  import org.apache.jcs.engine.behavior.ICache;
39  import org.apache.jcs.engine.behavior.ICompositeCacheManager;
40  import org.apache.jcs.engine.behavior.IShutdownObserver;
41  import org.apache.jcs.engine.control.CompositeCacheManager;
42  
43  /***
44   * An instance of RemoteCacheManager corresponds to one remote connection of a specific host and
45   * port. All RemoteCacheManager instances are monitored by the singleton RemoteCacheMonitor
46   * monitoring daemon for error detection and recovery.
47   * <p>
48   * Getting an instance of the remote cache has the effect of getting a handle on the remote server.
49   * Listeners are not registered with the server until a cache is requested from the manager.
50   */
51  public class RemoteCacheManager
52      implements AuxiliaryCacheManager, IShutdownObserver
53  {
54      private static final long serialVersionUID = 798077557166389498L;
55  
56      private final static Log log = LogFactory.getLog( RemoteCacheManager.class );
57  
58      // Contains mappings of Location instance to RemoteCacheManager instance.
59      final static Map instances = new HashMap();
60  
61      private static RemoteCacheMonitor monitor;
62  
63      private int clients;
64  
65      // Contains instances of RemoteCacheNoWait managed by a RemoteCacheManager
66      // instance.
67      final Map caches = new HashMap();
68  
69      final String host;
70  
71      final int port;
72  
73      final String service;
74  
75      private IRemoteCacheAttributes irca;
76  
77      /***
78       * Handle to the remote cache service; or a zombie handle if failed to connect.
79       */
80      private IRemoteCacheService remoteService;
81  
82      /***
83       * Wrapper of the remote cache watch service; or wrapper of a zombie service if failed to
84       * connect.
85       */
86      private RemoteCacheWatchRepairable remoteWatch;
87  
88      /***
89       * The cache manager listeners will need to use to get a cache.
90       */
91      private ICompositeCacheManager cacheMgr;
92  
93      private String registry;
94  
95      /***
96       * Constructs an instance to with the given remote connection parameters. If the connection
97       * cannot be made, "zombie" services will be temporarily used until a successful re-connection
98       * is made by the monitoring daemon.
99       * <p>
100      * @param host
101      * @param port
102      * @param service
103      * @param cacheMgr
104      */
105     private RemoteCacheManager( String host, int port, String service, ICompositeCacheManager cacheMgr )
106     {
107         this.host = host;
108         this.port = port;
109         this.service = service;
110         this.cacheMgr = cacheMgr;
111 
112         // register shutdown observer
113         // TODO add the shutdown observable methods to the interface
114         if ( this.cacheMgr instanceof CompositeCacheManager )
115         {
116             ( (CompositeCacheManager) this.cacheMgr ).registerShutdownObserver( this );
117         }
118 
119         this.registry = "//" + host + ":" + port + "/" + service;
120         if ( log.isDebugEnabled() )
121         {
122             log.debug( "looking up server " + registry );
123         }
124         try
125         {
126             Object obj = Naming.lookup( registry );
127             if ( log.isDebugEnabled() )
128             {
129                 log.debug( "server found" );
130             }
131             // Successful connection to the remote server.
132             remoteService = (IRemoteCacheService) obj;
133             if ( log.isDebugEnabled() )
134             {
135                 log.debug( "remoteService = " + remoteService );
136             }
137 
138             remoteWatch = new RemoteCacheWatchRepairable();
139             remoteWatch.setCacheWatch( (IRemoteCacheObserver) obj );
140         }
141         catch ( Exception ex )
142         {
143             // Failed to connect to the remote server.
144             // Configure this RemoteCacheManager instance to use the "zombie"
145             // services.
146             log.error( "Problem finding server at [" + registry + "]", ex );
147             remoteService = new ZombieRemoteCacheService();
148             remoteWatch = new RemoteCacheWatchRepairable();
149             remoteWatch.setCacheWatch( new ZombieRemoteCacheWatch() );
150             // Notify the cache monitor about the error, and kick off the
151             // recovery process.
152             RemoteCacheMonitor.getInstance().notifyError();
153         }
154     }
155 
156     /***
157      * Gets the defaultCattr attribute of the RemoteCacheManager object.
158      * <p>
159      * @return The defaultCattr value
160      */
161     public IRemoteCacheAttributes getDefaultCattr()
162     {
163         return this.irca;
164     }
165 
166     /***
167      * Adds the remote cache listener to the underlying cache-watch service.
168      * <p>
169      * @param cattr The feature to be added to the RemoteCacheListener attribute
170      * @param listener The feature to be added to the RemoteCacheListener attribute
171      * @throws IOException
172      */
173     public void addRemoteCacheListener( IRemoteCacheAttributes cattr, IRemoteCacheListener listener )
174         throws IOException
175     {
176         if ( cattr.isReceive() )
177         {
178             if ( log.isInfoEnabled() )
179             {
180                 log.info( "The remote cache is configured to receive events from the remote server.  "
181                     + "We will register a listener." );
182             }
183 
184             synchronized ( caches )
185             {
186                 remoteWatch.addCacheListener( cattr.getCacheName(), listener );
187             }
188         }
189         else
190         {
191             if ( log.isInfoEnabled() )
192             {
193                 log.info( "The remote cache is configured to NOT receive events from the remote server.  "
194                     + "We will NOT register a listener." );
195             }
196         }
197         return;
198     }
199 
200     /***
201      * Removes a listener. When the primary recovers the failover must deregister itself for a
202      * region. The failover runner will call this method to de-register. We do not want to dergister
203      * all listeners to a remote server, in case a failover is a primary of another region. Having
204      * one regions failover act as another servers primary is not currently supported.
205      * <p>
206      * @param cattr
207      * @param listener
208      * @throws IOException
209      */
210     public void removeRemoteCacheListener( IRemoteCacheAttributes cattr, IRemoteCacheListener listener )
211         throws IOException
212     {
213         synchronized ( caches )
214         {
215             remoteWatch.removeCacheListener( cattr.getCacheName(), listener );
216         }
217         return;
218     }
219 
220     /***
221      * Stops a listener. This is used to deregister a failover after primary reconnection.
222      * <p>
223      * @param cattr
224      * @throws IOException
225      */
226     public void removeRemoteCacheListener( IRemoteCacheAttributes cattr )
227         throws IOException
228     {
229         synchronized ( caches )
230         {
231             RemoteCacheNoWait cache = (RemoteCacheNoWait) caches.get( cattr.getCacheName() );
232             if ( cache != null )
233             {
234                 IRemoteCacheClient rc = cache.getRemoteCache();
235                 if ( log.isDebugEnabled() )
236                 {
237                     log.debug( "Found cache for[ " + cattr.getCacheName() + "], deregistering listener." );
238                 }
239                 // could also store the listener for a server in the manager.
240                 IRemoteCacheListener listener = rc.getListener();
241                 remoteWatch.removeCacheListener( cattr.getCacheName(), listener );
242             }
243             else
244             {
245                 if ( cattr.isReceive() )
246                 {
247                     log.warn( "Trying to deregister Cache Listener that was never registered." );
248                 }
249                 else
250                 {
251                     if ( log.isDebugEnabled() )
252                     {
253                         log.debug( "Since the remote cache is configured to not receive, "
254                             + "there is no listener to deregister." );
255                     }
256                 }
257             }
258         }
259         return;
260     }
261 
262     /***
263      * Stops a listener. This is used to deregister a failover after primary reconnection.
264      * <p>
265      * @param cacheName
266      * @throws IOException
267      */
268     public void removeRemoteCacheListener( String cacheName )
269         throws IOException
270     {
271         synchronized ( caches )
272         {
273             RemoteCacheNoWait cache = (RemoteCacheNoWait) caches.get( cacheName );
274             if ( cache != null )
275             {
276                 IRemoteCacheClient rc = cache.getRemoteCache();
277                 if ( log.isDebugEnabled() )
278                 {
279                     log.debug( "Found cache for [" + cacheName + "], deregistering listener." );
280                 }
281                 // could also store the listener for a server in the manager.
282                 IRemoteCacheListener listener = rc.getListener();
283                 remoteWatch.removeCacheListener( cacheName, listener );
284             }
285         }
286         return;
287     }
288 
289     /***
290      * Returns an instance of RemoteCacheManager for the given connection parameters.
291      * <p>
292      * Host and Port uniquely identify a manager instance.
293      * <p>
294      * Also starts up the monitoring daemon, if not already started.
295      * <p>
296      * If the connection cannot be established, zombie objects will be used for future recovery
297      * purposes.
298      * <p>
299      * @param cattr
300      * @param cacheMgr
301      * @return The instance value
302      * @parma port port of the registry.
303      */
304     public static RemoteCacheManager getInstance( IRemoteCacheAttributes cattr, ICompositeCacheManager cacheMgr )
305     {
306         String host = cattr.getRemoteHost();
307         int port = cattr.getRemotePort();
308         String service = cattr.getRemoteServiceName();
309         if ( host == null )
310         {
311             host = "";
312         }
313         if ( port < 1024 )
314         {
315             port = Registry.REGISTRY_PORT;
316         }
317         Location loc = new Location( host, port );
318 
319         RemoteCacheManager ins = (RemoteCacheManager) instances.get( loc );
320         synchronized ( instances )
321         {
322             if ( ins == null )
323             {
324                 ins = (RemoteCacheManager) instances.get( loc );
325                 if ( ins == null )
326                 {
327                     // cahnge to use cattr and to set defaults
328                     ins = new RemoteCacheManager( host, port, service, cacheMgr );
329                     ins.irca = cattr;
330                     instances.put( loc, ins );
331                 }
332             }
333         }
334 
335         ins.clients++;
336         // Fires up the monitoring daemon.
337         if ( monitor == null )
338         {
339             monitor = RemoteCacheMonitor.getInstance();
340             // If the returned monitor is null, it means it's already started
341             // elsewhere.
342             if ( monitor != null )
343             {
344                 Thread t = new Thread( monitor );
345                 t.setDaemon( true );
346                 t.start();
347             }
348         }
349         return ins;
350     }
351 
352     /***
353      * Returns a remote cache for the given cache name.
354      * <p>
355      * @param cacheName
356      * @return The cache value
357      */
358     public AuxiliaryCache getCache( String cacheName )
359     {
360         IRemoteCacheAttributes ca = (IRemoteCacheAttributes) irca.copy();
361         ca.setCacheName( cacheName );
362         return getCache( ca );
363     }
364 
365     /***
366      * Gets a RemoteCacheNoWait from the RemoteCacheManager. The RemoteCacheNoWait objects are
367      * identified by the cache name value of the RemoteCacheAttributes object.
368      * <p>
369      * If the client is configured to register a listener, this call results on a listener being
370      * created if one isn't already registered with the remote cache for this region.
371      * <p>
372      * @param cattr
373      * @return The cache value
374      */
375     public AuxiliaryCache getCache( IRemoteCacheAttributes cattr )
376     {
377         RemoteCacheNoWait c = null;
378 
379         synchronized ( caches )
380         {
381             c = (RemoteCacheNoWait) caches.get( cattr.getCacheName() );
382             if ( c == null )
383             {
384                 // create a listener first and pass it to the remotecache
385                 // sender.
386                 RemoteCacheListener listener = null;
387                 try
388                 {
389                     listener = new RemoteCacheListener( cattr, cacheMgr );
390                     addRemoteCacheListener( cattr, listener );
391                 }
392                 catch ( IOException ioe )
393                 {
394                     log.error( ioe.getMessage() );
395                 }
396                 catch ( Exception e )
397                 {
398                     log.error( e.getMessage() );
399                 }
400 
401                 c = new RemoteCacheNoWait( new RemoteCache( cattr, remoteService, listener ) );
402                 caches.put( cattr.getCacheName(), c );
403             }
404 
405             // might want to do some listener sanity checking here.
406         }
407 
408         return c;
409     }
410 
411     /***
412      * Releases.
413      * <p>
414      * @param name
415      * @throws IOException
416      */
417     public void freeCache( String name )
418         throws IOException
419     {
420         if ( log.isInfoEnabled() )
421         {
422             log.info( "freeCache [" + name + "]" );
423         }
424         ICache c = null;
425         synchronized ( caches )
426         {
427             c = (ICache) caches.get( name );
428         }
429         if ( c != null )
430         {
431             this.removeRemoteCacheListener( name );
432             c.dispose();
433         }
434     }
435 
436     /***
437      * Gets the stats attribute of the RemoteCacheManager object
438      * <p>
439      * @return The stats value
440      */
441     public String getStats()
442     {
443         StringBuffer stats = new StringBuffer();
444         Iterator allCaches = caches.values().iterator();
445         while ( allCaches.hasNext() )
446         {
447             ICache c = (ICache) allCaches.next();
448             if ( c != null )
449             {
450                 stats.append( c.getCacheName() );
451             }
452         }
453         return stats.toString();
454     }
455 
456     /*** Shutdown all. */
457     public void release()
458     {
459         // Wait until called by the last client
460         if ( --clients != 0 )
461         {
462             return;
463         }
464         synchronized ( caches )
465         {
466             Iterator allCaches = caches.values().iterator();
467             while ( allCaches.hasNext() )
468             {
469                 ICache c = (ICache) allCaches.next();
470                 if ( c != null )
471                 {
472                     try
473                     {
474                         // c.dispose();
475                         freeCache( c.getCacheName() );
476                     }
477                     catch ( IOException ex )
478                     {
479                         log.error( "Problem in release.", ex );
480                     }
481                 }
482             }
483         }
484     }
485 
486     /***
487      * Fixes up all the caches managed by this cache manager.
488      * <p>
489      * @param remoteService
490      * @param remoteWatch
491      */
492     public void fixCaches( IRemoteCacheService remoteService, IRemoteCacheObserver remoteWatch )
493     {
494         synchronized ( caches )
495         {
496             this.remoteService = remoteService;
497             this.remoteWatch.setCacheWatch( remoteWatch );
498             for ( Iterator en = caches.values().iterator(); en.hasNext(); )
499             {
500                 RemoteCacheNoWait cache = (RemoteCacheNoWait) en.next();
501                 cache.fixCache( this.remoteService );
502             }
503         }
504     }
505 
506     /***
507      * Gets the cacheType attribute of the RemoteCacheManager object
508      * @return The cacheType value
509      */
510     public int getCacheType()
511     {
512         return REMOTE_CACHE;
513     }
514 
515     /***
516      * Location of the RMI registry.
517      */
518     private final static class Location
519     {
520         /*** Description of the Field */
521         public final String host;
522 
523         /*** Description of the Field */
524         public final int port;
525 
526         /***
527          * Constructor for the Location object
528          * <p>
529          * @param host
530          * @param port
531          */
532         public Location( String host, int port )
533         {
534             this.host = host;
535             this.port = port;
536         }
537 
538         /*
539          * (non-Javadoc)
540          * @see java.lang.Object#equals(java.lang.Object)
541          */
542         public boolean equals( Object obj )
543         {
544             if ( obj == this )
545             {
546                 return true;
547             }
548             if ( obj == null || !( obj instanceof Location ) )
549             {
550                 return false;
551             }
552             Location l = (Location) obj;
553             if ( this.host == null && l.host != null )
554             {
555                 return false;
556             }
557             return host.equals( l.host ) && port == l.port;
558         }
559 
560         /***
561          * @return int
562          */
563         public int hashCode()
564         {
565             return host == null ? port : host.hashCode() ^ port;
566         }
567     }
568 
569     /***
570      * Shutdown callback from composite cache manager.
571      * <p>
572      * (non-Javadoc)
573      * @see org.apache.jcs.engine.behavior.IShutdownObserver#shutdown()
574      */
575     public void shutdown()
576     {
577         if ( log.isInfoEnabled() )
578         {
579             log.info( "Observed shutdown request." );
580         }
581         release();
582     }
583 
584 }