1 package org.apache.jcs.auxiliary.remote;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.IOException;
23 import java.rmi.Naming;
24 import java.rmi.registry.Registry;
25 import java.util.HashMap;
26 import java.util.Iterator;
27 import java.util.Map;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.jcs.auxiliary.AuxiliaryCache;
32 import org.apache.jcs.auxiliary.AuxiliaryCacheManager;
33 import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheAttributes;
34 import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheClient;
35 import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheListener;
36 import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheObserver;
37 import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheService;
38 import org.apache.jcs.engine.behavior.ICache;
39 import org.apache.jcs.engine.behavior.ICompositeCacheManager;
40 import org.apache.jcs.engine.behavior.IShutdownObserver;
41 import org.apache.jcs.engine.control.CompositeCacheManager;
42
43 /***
44 * An instance of RemoteCacheManager corresponds to one remote connection of a specific host and
45 * port. All RemoteCacheManager instances are monitored by the singleton RemoteCacheMonitor
46 * monitoring daemon for error detection and recovery.
47 * <p>
48 * Getting an instance of the remote cache has the effect of getting a handle on the remote server.
49 * Listeners are not registered with the server until a cache is requested from the manager.
50 */
51 public class RemoteCacheManager
52 implements AuxiliaryCacheManager, IShutdownObserver
53 {
54 private static final long serialVersionUID = 798077557166389498L;
55
56 private final static Log log = LogFactory.getLog( RemoteCacheManager.class );
57
58
59 final static Map instances = new HashMap();
60
61 private static RemoteCacheMonitor monitor;
62
63 private int clients;
64
65
66
67 final Map caches = new HashMap();
68
69 final String host;
70
71 final int port;
72
73 final String service;
74
75 private IRemoteCacheAttributes irca;
76
77 /***
78 * Handle to the remote cache service; or a zombie handle if failed to connect.
79 */
80 private IRemoteCacheService remoteService;
81
82 /***
83 * Wrapper of the remote cache watch service; or wrapper of a zombie service if failed to
84 * connect.
85 */
86 private RemoteCacheWatchRepairable remoteWatch;
87
88 /***
89 * The cache manager listeners will need to use to get a cache.
90 */
91 private ICompositeCacheManager cacheMgr;
92
93 private String registry;
94
95 /***
96 * Constructs an instance to with the given remote connection parameters. If the connection
97 * cannot be made, "zombie" services will be temporarily used until a successful re-connection
98 * is made by the monitoring daemon.
99 * <p>
100 * @param host
101 * @param port
102 * @param service
103 * @param cacheMgr
104 */
105 private RemoteCacheManager( String host, int port, String service, ICompositeCacheManager cacheMgr )
106 {
107 this.host = host;
108 this.port = port;
109 this.service = service;
110 this.cacheMgr = cacheMgr;
111
112
113
114 if ( this.cacheMgr instanceof CompositeCacheManager )
115 {
116 ( (CompositeCacheManager) this.cacheMgr ).registerShutdownObserver( this );
117 }
118
119 this.registry = "//" + host + ":" + port + "/" + service;
120 if ( log.isDebugEnabled() )
121 {
122 log.debug( "looking up server " + registry );
123 }
124 try
125 {
126 Object obj = Naming.lookup( registry );
127 if ( log.isDebugEnabled() )
128 {
129 log.debug( "server found" );
130 }
131
132 remoteService = (IRemoteCacheService) obj;
133 if ( log.isDebugEnabled() )
134 {
135 log.debug( "remoteService = " + remoteService );
136 }
137
138 remoteWatch = new RemoteCacheWatchRepairable();
139 remoteWatch.setCacheWatch( (IRemoteCacheObserver) obj );
140 }
141 catch ( Exception ex )
142 {
143
144
145
146 log.error( "Problem finding server at [" + registry + "]", ex );
147 remoteService = new ZombieRemoteCacheService();
148 remoteWatch = new RemoteCacheWatchRepairable();
149 remoteWatch.setCacheWatch( new ZombieRemoteCacheWatch() );
150
151
152 RemoteCacheMonitor.getInstance().notifyError();
153 }
154 }
155
156 /***
157 * Gets the defaultCattr attribute of the RemoteCacheManager object.
158 * <p>
159 * @return The defaultCattr value
160 */
161 public IRemoteCacheAttributes getDefaultCattr()
162 {
163 return this.irca;
164 }
165
166 /***
167 * Adds the remote cache listener to the underlying cache-watch service.
168 * <p>
169 * @param cattr The feature to be added to the RemoteCacheListener attribute
170 * @param listener The feature to be added to the RemoteCacheListener attribute
171 * @throws IOException
172 */
173 public void addRemoteCacheListener( IRemoteCacheAttributes cattr, IRemoteCacheListener listener )
174 throws IOException
175 {
176 if ( cattr.isReceive() )
177 {
178 if ( log.isInfoEnabled() )
179 {
180 log.info( "The remote cache is configured to receive events from the remote server. "
181 + "We will register a listener." );
182 }
183
184 synchronized ( caches )
185 {
186 remoteWatch.addCacheListener( cattr.getCacheName(), listener );
187 }
188 }
189 else
190 {
191 if ( log.isInfoEnabled() )
192 {
193 log.info( "The remote cache is configured to NOT receive events from the remote server. "
194 + "We will NOT register a listener." );
195 }
196 }
197 return;
198 }
199
200 /***
201 * Removes a listener. When the primary recovers the failover must deregister itself for a
202 * region. The failover runner will call this method to de-register. We do not want to dergister
203 * all listeners to a remote server, in case a failover is a primary of another region. Having
204 * one regions failover act as another servers primary is not currently supported.
205 * <p>
206 * @param cattr
207 * @param listener
208 * @throws IOException
209 */
210 public void removeRemoteCacheListener( IRemoteCacheAttributes cattr, IRemoteCacheListener listener )
211 throws IOException
212 {
213 synchronized ( caches )
214 {
215 remoteWatch.removeCacheListener( cattr.getCacheName(), listener );
216 }
217 return;
218 }
219
220 /***
221 * Stops a listener. This is used to deregister a failover after primary reconnection.
222 * <p>
223 * @param cattr
224 * @throws IOException
225 */
226 public void removeRemoteCacheListener( IRemoteCacheAttributes cattr )
227 throws IOException
228 {
229 synchronized ( caches )
230 {
231 RemoteCacheNoWait cache = (RemoteCacheNoWait) caches.get( cattr.getCacheName() );
232 if ( cache != null )
233 {
234 IRemoteCacheClient rc = cache.getRemoteCache();
235 if ( log.isDebugEnabled() )
236 {
237 log.debug( "Found cache for[ " + cattr.getCacheName() + "], deregistering listener." );
238 }
239
240 IRemoteCacheListener listener = rc.getListener();
241 remoteWatch.removeCacheListener( cattr.getCacheName(), listener );
242 }
243 else
244 {
245 if ( cattr.isReceive() )
246 {
247 log.warn( "Trying to deregister Cache Listener that was never registered." );
248 }
249 else
250 {
251 if ( log.isDebugEnabled() )
252 {
253 log.debug( "Since the remote cache is configured to not receive, "
254 + "there is no listener to deregister." );
255 }
256 }
257 }
258 }
259 return;
260 }
261
262 /***
263 * Stops a listener. This is used to deregister a failover after primary reconnection.
264 * <p>
265 * @param cacheName
266 * @throws IOException
267 */
268 public void removeRemoteCacheListener( String cacheName )
269 throws IOException
270 {
271 synchronized ( caches )
272 {
273 RemoteCacheNoWait cache = (RemoteCacheNoWait) caches.get( cacheName );
274 if ( cache != null )
275 {
276 IRemoteCacheClient rc = cache.getRemoteCache();
277 if ( log.isDebugEnabled() )
278 {
279 log.debug( "Found cache for [" + cacheName + "], deregistering listener." );
280 }
281
282 IRemoteCacheListener listener = rc.getListener();
283 remoteWatch.removeCacheListener( cacheName, listener );
284 }
285 }
286 return;
287 }
288
289 /***
290 * Returns an instance of RemoteCacheManager for the given connection parameters.
291 * <p>
292 * Host and Port uniquely identify a manager instance.
293 * <p>
294 * Also starts up the monitoring daemon, if not already started.
295 * <p>
296 * If the connection cannot be established, zombie objects will be used for future recovery
297 * purposes.
298 * <p>
299 * @param cattr
300 * @param cacheMgr
301 * @return The instance value
302 * @parma port port of the registry.
303 */
304 public static RemoteCacheManager getInstance( IRemoteCacheAttributes cattr, ICompositeCacheManager cacheMgr )
305 {
306 String host = cattr.getRemoteHost();
307 int port = cattr.getRemotePort();
308 String service = cattr.getRemoteServiceName();
309 if ( host == null )
310 {
311 host = "";
312 }
313 if ( port < 1024 )
314 {
315 port = Registry.REGISTRY_PORT;
316 }
317 Location loc = new Location( host, port );
318
319 RemoteCacheManager ins = (RemoteCacheManager) instances.get( loc );
320 synchronized ( instances )
321 {
322 if ( ins == null )
323 {
324 ins = (RemoteCacheManager) instances.get( loc );
325 if ( ins == null )
326 {
327
328 ins = new RemoteCacheManager( host, port, service, cacheMgr );
329 ins.irca = cattr;
330 instances.put( loc, ins );
331 }
332 }
333 }
334
335 ins.clients++;
336
337 if ( monitor == null )
338 {
339 monitor = RemoteCacheMonitor.getInstance();
340
341
342 if ( monitor != null )
343 {
344 Thread t = new Thread( monitor );
345 t.setDaemon( true );
346 t.start();
347 }
348 }
349 return ins;
350 }
351
352 /***
353 * Returns a remote cache for the given cache name.
354 * <p>
355 * @param cacheName
356 * @return The cache value
357 */
358 public AuxiliaryCache getCache( String cacheName )
359 {
360 IRemoteCacheAttributes ca = (IRemoteCacheAttributes) irca.copy();
361 ca.setCacheName( cacheName );
362 return getCache( ca );
363 }
364
365 /***
366 * Gets a RemoteCacheNoWait from the RemoteCacheManager. The RemoteCacheNoWait objects are
367 * identified by the cache name value of the RemoteCacheAttributes object.
368 * <p>
369 * If the client is configured to register a listener, this call results on a listener being
370 * created if one isn't already registered with the remote cache for this region.
371 * <p>
372 * @param cattr
373 * @return The cache value
374 */
375 public AuxiliaryCache getCache( IRemoteCacheAttributes cattr )
376 {
377 RemoteCacheNoWait c = null;
378
379 synchronized ( caches )
380 {
381 c = (RemoteCacheNoWait) caches.get( cattr.getCacheName() );
382 if ( c == null )
383 {
384
385
386 RemoteCacheListener listener = null;
387 try
388 {
389 listener = new RemoteCacheListener( cattr, cacheMgr );
390 addRemoteCacheListener( cattr, listener );
391 }
392 catch ( IOException ioe )
393 {
394 log.error( ioe.getMessage() );
395 }
396 catch ( Exception e )
397 {
398 log.error( e.getMessage() );
399 }
400
401 c = new RemoteCacheNoWait( new RemoteCache( cattr, remoteService, listener ) );
402 caches.put( cattr.getCacheName(), c );
403 }
404
405
406 }
407
408 return c;
409 }
410
411 /***
412 * Releases.
413 * <p>
414 * @param name
415 * @throws IOException
416 */
417 public void freeCache( String name )
418 throws IOException
419 {
420 if ( log.isInfoEnabled() )
421 {
422 log.info( "freeCache [" + name + "]" );
423 }
424 ICache c = null;
425 synchronized ( caches )
426 {
427 c = (ICache) caches.get( name );
428 }
429 if ( c != null )
430 {
431 this.removeRemoteCacheListener( name );
432 c.dispose();
433 }
434 }
435
436 /***
437 * Gets the stats attribute of the RemoteCacheManager object
438 * <p>
439 * @return The stats value
440 */
441 public String getStats()
442 {
443 StringBuffer stats = new StringBuffer();
444 Iterator allCaches = caches.values().iterator();
445 while ( allCaches.hasNext() )
446 {
447 ICache c = (ICache) allCaches.next();
448 if ( c != null )
449 {
450 stats.append( c.getCacheName() );
451 }
452 }
453 return stats.toString();
454 }
455
456 /*** Shutdown all. */
457 public void release()
458 {
459
460 if ( --clients != 0 )
461 {
462 return;
463 }
464 synchronized ( caches )
465 {
466 Iterator allCaches = caches.values().iterator();
467 while ( allCaches.hasNext() )
468 {
469 ICache c = (ICache) allCaches.next();
470 if ( c != null )
471 {
472 try
473 {
474
475 freeCache( c.getCacheName() );
476 }
477 catch ( IOException ex )
478 {
479 log.error( "Problem in release.", ex );
480 }
481 }
482 }
483 }
484 }
485
486 /***
487 * Fixes up all the caches managed by this cache manager.
488 * <p>
489 * @param remoteService
490 * @param remoteWatch
491 */
492 public void fixCaches( IRemoteCacheService remoteService, IRemoteCacheObserver remoteWatch )
493 {
494 synchronized ( caches )
495 {
496 this.remoteService = remoteService;
497 this.remoteWatch.setCacheWatch( remoteWatch );
498 for ( Iterator en = caches.values().iterator(); en.hasNext(); )
499 {
500 RemoteCacheNoWait cache = (RemoteCacheNoWait) en.next();
501 cache.fixCache( this.remoteService );
502 }
503 }
504 }
505
506 /***
507 * Gets the cacheType attribute of the RemoteCacheManager object
508 * @return The cacheType value
509 */
510 public int getCacheType()
511 {
512 return REMOTE_CACHE;
513 }
514
515 /***
516 * Location of the RMI registry.
517 */
518 private final static class Location
519 {
520 /*** Description of the Field */
521 public final String host;
522
523 /*** Description of the Field */
524 public final int port;
525
526 /***
527 * Constructor for the Location object
528 * <p>
529 * @param host
530 * @param port
531 */
532 public Location( String host, int port )
533 {
534 this.host = host;
535 this.port = port;
536 }
537
538
539
540
541
542 public boolean equals( Object obj )
543 {
544 if ( obj == this )
545 {
546 return true;
547 }
548 if ( obj == null || !( obj instanceof Location ) )
549 {
550 return false;
551 }
552 Location l = (Location) obj;
553 if ( this.host == null && l.host != null )
554 {
555 return false;
556 }
557 return host.equals( l.host ) && port == l.port;
558 }
559
560 /***
561 * @return int
562 */
563 public int hashCode()
564 {
565 return host == null ? port : host.hashCode() ^ port;
566 }
567 }
568
569 /***
570 * Shutdown callback from composite cache manager.
571 * <p>
572 * (non-Javadoc)
573 * @see org.apache.jcs.engine.behavior.IShutdownObserver#shutdown()
574 */
575 public void shutdown()
576 {
577 if ( log.isInfoEnabled() )
578 {
579 log.info( "Observed shutdown request." );
580 }
581 release();
582 }
583
584 }