Coverage report

  %line %branch
org.apache.jcs.auxiliary.remote.server.RemoteCacheServer
64% 
78% 

 1  
 package org.apache.jcs.auxiliary.remote.server;
 2  
 
 3  
 /*
 4  
  * Licensed to the Apache Software Foundation (ASF) under one
 5  
  * or more contributor license agreements.  See the NOTICE file
 6  
  * distributed with this work for additional information
 7  
  * regarding copyright ownership.  The ASF licenses this file
 8  
  * to you under the Apache License, Version 2.0 (the
 9  
  * "License"); you may not use this file except in compliance
 10  
  * with the License.  You may obtain a copy of the License at
 11  
  *
 12  
  *   http://www.apache.org/licenses/LICENSE-2.0
 13  
  *
 14  
  * Unless required by applicable law or agreed to in writing,
 15  
  * software distributed under the License is distributed on an
 16  
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 17  
  * KIND, either express or implied.  See the License for the
 18  
  * specific language governing permissions and limitations
 19  
  * under the License.
 20  
  */
 21  
 
 22  
 import java.io.IOException;
 23  
 import java.io.Serializable;
 24  
 import java.rmi.RemoteException;
 25  
 import java.rmi.registry.Registry;
 26  
 import java.rmi.server.UnicastRemoteObject;
 27  
 import java.rmi.server.Unreferenced;
 28  
 import java.util.Collections;
 29  
 import java.util.Enumeration;
 30  
 import java.util.Hashtable;
 31  
 import java.util.Iterator;
 32  
 import java.util.Map;
 33  
 import java.util.Set;
 34  
 
 35  
 import org.apache.commons.logging.Log;
 36  
 import org.apache.commons.logging.LogFactory;
 37  
 import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheAttributes;
 38  
 import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheListener;
 39  
 import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheObserver;
 40  
 import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheService;
 41  
 import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheServiceAdmin;
 42  
 import org.apache.jcs.auxiliary.remote.server.behavior.IRemoteCacheServerAttributes;
 43  
 import org.apache.jcs.engine.CacheEventQueueFactory;
 44  
 import org.apache.jcs.engine.CacheListeners;
 45  
 import org.apache.jcs.engine.behavior.ICacheElement;
 46  
 import org.apache.jcs.engine.behavior.ICacheEventQueue;
 47  
 import org.apache.jcs.engine.behavior.ICacheListener;
 48  
 import org.apache.jcs.engine.control.CompositeCache;
 49  
 import org.apache.jcs.engine.control.CompositeCacheManager;
 50  
 
 51  
 /**
 52  
  * This class provides remote cache services. The remote cache server propagates
 53  
  * events from local caches to other local caches. It can also store cached
 54  
  * data, making it available to new clients.
 55  
  * <p>
 56  
  * Remote cache servers can be clustered. If the cache used by this remote cache
 57  
  * is configured to use a remote cache of type cluster, the two remote caches
 58  
  * will communicate with each other. Remote and put requests can be sent from
 59  
  * one remote to another. If they are configured to broadcast such event to
 60  
  * their client, then remove an puts can be sent to all locals in the cluster.
 61  
  * <p>
 62  
  * Get requests are made between clustered servers if AllowClusterGet is true. You can setup
 63  
  * several clients to use one remote server and several to use another. The get
 64  
  * locad will be distributed between the two servers. Since caches are usually
 65  
  * high get and low put, this should allow you to scale.
 66  
  */
 67  
 class RemoteCacheServer
 68  
     extends UnicastRemoteObject
 69  
     implements IRemoteCacheService, IRemoteCacheObserver, IRemoteCacheServiceAdmin, Unreferenced
 70  
 {
 71  
     private static final long serialVersionUID = -8072345435941473116L;
 72  
 
 73  42
     private final static Log log = LogFactory.getLog( RemoteCacheServer.class );
 74  
 
 75  
     /** timing -- if we should record operation times. */
 76  
     protected final static boolean timing = true;
 77  
 
 78  56
     private int puts = 0;
 79  
 
 80  
     // Maps cache name to CacheListeners object.
 81  
     // association of listeners (regions).
 82  56
     private final Hashtable cacheListenersMap = new Hashtable();
 83  
 
 84  56
     private final Hashtable clusterListenersMap = new Hashtable();
 85  
 
 86  
     private CompositeCacheManager cacheManager;
 87  
 
 88  
     // relates listener id with a type
 89  56
     private final Hashtable idTypeMap = new Hashtable();
 90  
 
 91  
     // private transient int listenerId = 0;
 92  56
     private int[] listenerId = new class="keyword">int[1];
 93  
 
 94  
     /** Configuration settings. */
 95  
     protected IRemoteCacheServerAttributes rcsa;
 96  
 
 97  
     /** The interval at which we will log updates. */
 98  56
     private int logInterval = 100;
 99  
 
 100  
     /**
 101  
      * Constructor for the RemoteCacheServer object. Thiks initializes the
 102  
      * server with the values from the config file.
 103  
      * <p>
 104  
      * @param rcsa
 105  
      * @throws RemoteException
 106  
      * @exception IOException
 107  
      */
 108  
     RemoteCacheServer( IRemoteCacheServerAttributes rcsa )
 109  
         throws RemoteException
 110  
     {
 111  56
         super( rcsa.getServicePort() );
 112  56
         this.rcsa = rcsa;
 113  56
         init( rcsa.getConfigFileName() );
 114  56
     }
 115  
 
 116  
     /**
 117  
      * Initialize the RMI Cache Server from a properties file.
 118  
      * <p>
 119  
      * @param prop
 120  
      */
 121  
     private void init( String prop )
 122  
     {
 123  56
         cacheManager = createCacheManager( prop );
 124  
 
 125  
         // cacheManager would have created a number of ICache objects.
 126  
         // Use these objects to set up the cacheListenersMap.
 127  56
         String[] list = cacheManager.getCacheNames();
 128  147
         for ( int i = 0; i < list.length; i++ )
 129  
         {
 130  91
             String name = list[i];
 131  91
             cacheListenersMap.put( name, new CacheListeners( cacheManager.getCache( name ) ) );
 132  
         }
 133  56
     }
 134  
 
 135  
     /**
 136  
      * Subclass can override this method to create the specific cache manager.
 137  
      * <p>
 138  
      * @param prop
 139  
      *            The anem of the configuration file.
 140  
      * @return The cache hub configured with this configuration file.
 141  
      */
 142  
     private CompositeCacheManager createCacheManager( String prop )
 143  
     {
 144  56
         CompositeCacheManager hub = CompositeCacheManager.getUnconfiguredInstance();
 145  
 
 146  56
         if ( prop == null )
 147  
         {
 148  0
             hub.configure( "/remote.cache.ccf" );
 149  0
         }
 150  
         else
 151  
         {
 152  56
             hub.configure( prop );
 153  
         }
 154  56
         return hub;
 155  
     }
 156  
 
 157  
     /**
 158  
      * Returns the cache listener for the specified cache. Creates the cache and
 159  
      * the cache descriptor if they do not already exist.
 160  
      * <p>
 161  
      * @param cacheName
 162  
      * @return The cacheListeners value
 163  
      */
 164  
     protected CacheListeners getCacheListeners( String cacheName )
 165  
     {
 166  1029
         CacheListeners cacheListeners = (CacheListeners) cacheListenersMap.get( cacheName );
 167  1029
         synchronized ( cacheListenersMap )
 168  
         {
 169  1029
             if ( cacheListeners == null )
 170  
             {
 171  49
                 cacheListeners = (CacheListeners) cacheListenersMap.get( cacheName );
 172  49
                 if ( cacheListeners == null )
 173  
                 {
 174  49
                     cacheListeners = new CacheListeners( cacheManager.getCache( cacheName ) );
 175  49
                     cacheListenersMap.put( cacheName, cacheListeners );
 176  
                 }
 177  
             }
 178  1029
         }
 179  1029
         return cacheListeners;
 180  
     }
 181  
 
 182  
     /**
 183  
      * Gets the clusterListeners attribute of the RemoteCacheServer object.
 184  
      * <p>
 185  
      * @todo may be able to remove this
 186  
      *
 187  
      * @param cacheName
 188  
      * @return The clusterListeners value
 189  
      */
 190  
     protected CacheListeners getClusterListeners( String cacheName )
 191  
     {
 192  42
         CacheListeners cacheListeners = (CacheListeners) clusterListenersMap.get( cacheName );
 193  42
         synchronized ( clusterListenersMap )
 194  
         {
 195  42
             if ( cacheListeners == null )
 196  
             {
 197  14
                 cacheListeners = (CacheListeners) clusterListenersMap.get( cacheName );
 198  14
                 if ( cacheListeners == null )
 199  
                 {
 200  14
                     cacheListeners = new CacheListeners( cacheManager.getCache( cacheName ) );
 201  14
                     clusterListenersMap.put( cacheName, cacheListeners );
 202  
                 }
 203  
             }
 204  42
         }
 205  42
         return cacheListeners;
 206  
     }
 207  
 
 208  
     /**
 209  
      * Puts a cache bean to the remote cache and notifies all listeners which
 210  
      * <br>
 211  
      * <ol>
 212  
      * <li>have a different listener id than the originating host;
 213  
      * <li>are currently subscribed to the related cache.
 214  
      * </ol>
 215  
      * <p>
 216  
      * @param item
 217  
      * @throws IOException
 218  
      *
 219  
      */
 220  
     public void put( ICacheElement item )
 221  
         throws IOException
 222  
     {
 223  0
         update( item );
 224  0
     }
 225  
 
 226  
     /*
 227  
      * (non-Javadoc)
 228  
      *
 229  
      * @see org.apache.jcs.engine.behavior.ICacheService#update(org.apache.jcs.engine.behavior.ICacheElement)
 230  
      */
 231  
     public void update( ICacheElement item )
 232  
         throws IOException
 233  
     {
 234  0
         update( item, 0 );
 235  0
     }
 236  
 
 237  
     /**
 238  
      * An update can come from either a local cache's remote auxiliary, or it
 239  
      * can come from a remote server. A remote server is considered a a source
 240  
      * of type cluster.
 241  
      * <p>
 242  
      * If the update came from a cluster, then we should tell the cache manager
 243  
      * that this was a remote put. This way, any lateral and remote auxiliaries
 244  
      * configured for the region will not be updated. This is basically how a
 245  
      * remote listener works when plugged into a local cache.
 246  
      * <p>
 247  
      * If the cluster is configured to keep local cluster consistency, then all
 248  
      * listeners will be updated. This allows cluster server A to update cluster
 249  
      * server B and then B to update its clients if it is told to keep local
 250  
      * cluster consistency. Otherwise, server A will update server B and B will
 251  
      * not tell its clients. If you cluster using lateral caches for instance,
 252  
      * this is how it will work. Updates to a cluster node, will never get to
 253  
      * the leavess. The remote cluster, with local cluster consistency, allows
 254  
      * you to update leaves. This basically allows you to have a failover remote
 255  
      * server.
 256  
      * <p>
 257  
      * Since currently a cluster will not try to get from other cluster servers,
 258  
      * you can scale a bit with a cluster configuration. Puts and removes will
 259  
      * be broadcasted to all clients, but the get load on a remote server can be
 260  
      * reduced.
 261  
      * <p>
 262  
      * @param item
 263  
      * @param requesterId
 264  
      * @throws IOException
 265  
      */
 266  
     public void update( ICacheElement item, long requesterId )
 267  
         throws IOException
 268  
     {
 269  
 
 270  861
         long start = 0;
 271  
         if ( timing )
 272  
         {
 273  861
             start = System.currentTimeMillis();
 274  
         }
 275  
 
 276  861
         if ( log.isInfoEnabled() )
 277  
         {
 278  
             // not thread safe, but it doesn't have to be accurate
 279  861
             puts++;
 280  861
             if ( puts % logInterval == 0 )
 281  
             {
 282  7
                 log.info( "puts = " + puts );
 283  
             }
 284  
         }
 285  
 
 286  861
         if ( log.isDebugEnabled() )
 287  
         {
 288  0
             log.debug( "In update, put [" + item.getKey() + "] in [" + item.getCacheName() + "]" );
 289  
         }
 290  
 
 291  
         try
 292  
         {
 293  861
             CacheListeners cacheDesc = getCacheListeners( item.getCacheName() );
 294  861
             /* Object val = */item.getVal();
 295  
 
 296  861
             Integer remoteTypeL = (Integer) idTypeMap.get( new Long( requesterId ) );
 297  861
             if ( log.isDebugEnabled() )
 298  
             {
 299  0
                 log.debug( "In update, requesterId = [" + requesterId + "] remoteType = " + remoteTypeL );
 300  
             }
 301  
 
 302  861
             boolean fromCluster = false;
 303  861
             if ( remoteTypeL != null && remoteTypeL.intValue() == IRemoteCacheAttributes.CLUSTER )
 304  
             {
 305  70
                 fromCluster = true;
 306  
             }
 307  
             // ordered cache item update and notification.
 308  861
             synchronized ( cacheDesc )
 309  
             {
 310  
                 try
 311  
                 {
 312  861
                     CompositeCache c = (CompositeCache) cacheDesc.cache;
 313  
 
 314  
                     // If the source of this request was not from a cluster,
 315  
                     // then consider it a local update. The cache manager will
 316  
                     // try to
 317  
                     // update all auxiliaries.
 318  
                     //
 319  
                     // This requires that two local caches not be connected to
 320  
                     // two clustered remote caches. The failover runner will
 321  
                     // have to make sure of this. ALos, the local cache needs
 322  
                     // avoid updating this source. Will need to pass the source
 323  
                     // id somehow. The remote cache should udate all local
 324  
                     // caches
 325  
                     // but not update the cluster source. Cluster remote caches
 326  
                     // should only be updated by the server and not the
 327  
                     // RemoteCache.
 328  861
                     if ( fromCluster )
 329  
                     {
 330  70
                         if ( log.isDebugEnabled() )
 331  
                         {
 332  0
                             log.debug( "Put FROM cluster, NOT updating other auxiliaries for region. "
 333  
                                 + " requesterId [" + requesterId + "]" );
 334  
                         }
 335  70
                         c.localUpdate( item );
 336  70
                     }
 337  
                     else
 338  
                     {
 339  791
                         if ( log.isDebugEnabled() )
 340  
                         {
 341  0
                             log.debug( "Put NOT from cluster, updating other auxiliaries for region. "
 342  
                                 + " requesterId [" + requesterId + "]" );
 343  
                         }
 344  791
                         c.update( item );
 345  
                     }
 346  
                 }
 347  0
                 catch ( Exception ce )
 348  
                 {
 349  
                     // swallow
 350  0
                     if ( log.isInfoEnabled() )
 351  
                     {
 352  0
                         log.info( "Exception caught updating item. requesterId [" + requesterId + "] "
 353  
                             + ce.getMessage() );
 354  
                     }
 355  861
                 }
 356  
 
 357  
                 // UPDATE LOCALS IF A REQUEST COMES FROM A CLUSTER
 358  
                 // IF LOCAL CLUSTER CONSISTENCY IS CONFIGURED
 359  861
                 if ( !fromCluster || ( fromCluster && rcsa.getLocalClusterConsistency() ) )
 360  
                 {
 361  861
                     ICacheEventQueue[] qlist = getEventQList( cacheDesc, requesterId );
 362  
 
 363  861
                     if ( qlist != null )
 364  
                     {
 365  861
                         if ( log.isDebugEnabled() )
 366  
                         {
 367  0
                             log.debug( "qlist.length = " + qlist.length );
 368  
                         }
 369  1708
                         for ( int i = 0; i < qlist.length; i++ )
 370  
                         {
 371  847
                             qlist[i].addPutEvent( item );
 372  
                         }
 373  861
                     }
 374  
                     else
 375  
                     {
 376  0
                         if ( log.isDebugEnabled() )
 377  
                         {
 378  0
                             log.debug( "q list is null" );
 379  
                         }
 380  
                     }
 381  
                 }
 382  861
             }
 383  
         }
 384  0
         catch ( Exception e )
 385  
         {
 386  0
             log.error( "Trouble in Update. requesterId [" + requesterId + "]", e );
 387  861
         }
 388  
 
 389  
         // TODO use JAMON for timing
 390  
         if ( timing )
 391  
         {
 392  861
             long end = System.currentTimeMillis();
 393  861
             if ( log.isDebugEnabled() )
 394  
             {
 395  0
                 log.debug( "put took " + String.valueOf( end - start ) + " ms." );
 396  
             }
 397  
         }
 398  
 
 399  861
         return;
 400  
     }
 401  
 
 402  
     /**
 403  
      * Gets the eventQList attribute of the RemoteCacheServer object. This
 404  
      * returns the event queues stored in the cacheListeners object for a
 405  
      * particuylar region, if the queue is not for this requester.
 406  
      * <p>
 407  
      * Basically, this makes sure that a request from a particular local cache,
 408  
      * identified by its listener id, does not result in a call to that same
 409  
      * listener.
 410  
      * <p>
 411  
      * @param cacheListeners
 412  
      * @param requesterId
 413  
      * @return The eventQList value
 414  
      */
 415  
     private ICacheEventQueue[] getEventQList( CacheListeners cacheListeners, long requesterId )
 416  
     {
 417  938
         ICacheEventQueue[] list = null;
 418  938
         synchronized ( cacheListeners.eventQMap )
 419  
         {
 420  938
             list = (ICacheEventQueue[]) cacheListeners.eventQMap.values().toArray( new ICacheEventQueue[0] );
 421  938
         }
 422  938
         int count = 0;
 423  
         // Set those not qualified to null; Count those qualified.
 424  2583
         for ( int i = 0; i < list.length; i++ )
 425  
         {
 426  1645
             ICacheEventQueue q = list[i];
 427  1645
             if ( q.isWorking() && q.getListenerId() != requesterId )
 428  
             {
 429  917
                 count++;
 430  917
             }
 431  
             else
 432  
             {
 433  728
                 list[i] = null;
 434  
             }
 435  
         }
 436  938
         if ( count == list.length )
 437  
         {
 438  
             // All qualified.
 439  210
             return list;
 440  
         }
 441  
 
 442  
         // Returns only the qualified.
 443  728
         ICacheEventQueue[] qq = new ICacheEventQueue[count];
 444  728
         count = 0;
 445  2163
         for ( int i = 0; i < list.length; i++ )
 446  
         {
 447  1435
             if ( list[i] != null )
 448  
             {
 449  707
                 qq[count++] = list[i];
 450  
             }
 451  
         }
 452  728
         return qq;
 453  
     }
 454  
 
 455  
     /**
 456  
      * Returns a cache value from the specified remote cache; or null if the
 457  
      * cache or key does not exist.
 458  
      * <p>
 459  
      * @param cacheName
 460  
      * @param key
 461  
      * @return ICacheElement
 462  
      * @throws IOException
 463  
      */
 464  
     public ICacheElement get( String cacheName, Serializable key )
 465  
         throws IOException
 466  
     {
 467  0
         return this.get( cacheName, key, 0 );
 468  
     }
 469  
 
 470  
     /**
 471  
      * Returns a cache bean from the specified cache; or null if the key does
 472  
      * not exist.
 473  
      * <p>
 474  
      * Adding the requestor id, allows the cache to determine the sournce of the
 475  
      * get.
 476  
      * <p>
 477  
      * @param cacheName
 478  
      * @param key
 479  
      * @param requesterId
 480  
      * @return ICacheElement
 481  
      * @throws IOException
 482  
      */
 483  
     public ICacheElement get( String cacheName, Serializable key, long requesterId )
 484  
         throws IOException
 485  
     {
 486  21
         Integer remoteTypeL = (Integer) idTypeMap.get( new Long( requesterId ) );
 487  
 
 488  21
         if ( log.isDebugEnabled() )
 489  
         {
 490  0
             log.debug( "get [" + key + "] from cache [" + cacheName + "] requesterId = [" + requesterId + "] remoteType = "
 491  
                 + remoteTypeL );
 492  
         }
 493  
 
 494  
         // Since a non-receiving remote cache client will not register a
 495  
         // listener, it will not have a listener id assigned from the server. As
 496  
         // such the remote server cannot determine if it is a cluster or a
 497  
         // normal client. It will assume that it is a normal client.
 498  
 
 499  21
         boolean fromCluster = false;
 500  21
         if ( remoteTypeL != null && remoteTypeL.intValue() == IRemoteCacheAttributes.CLUSTER )
 501  
         {
 502  0
             fromCluster = true;
 503  
         }
 504  
 
 505  21
         CacheListeners cacheDesc = null;
 506  
         try
 507  
         {
 508  21
             cacheDesc = getCacheListeners( cacheName );
 509  
         }
 510  0
         catch ( Exception e )
 511  
         {
 512  0
             log.error( "Problem getting listeners.", e );
 513  21
         }
 514  
 
 515  21
         if ( cacheDesc == null )
 516  
         {
 517  0
             return null;
 518  
         }
 519  21
         CompositeCache c = (CompositeCache) cacheDesc.cache;
 520  
 
 521  21
         ICacheElement element = null;
 522  
 
 523  
         // If we have a get come in from a client and we don't have the item
 524  
         // locally, we will allow the cache to look in other non local sources,
 525  
         // such as a remote cache or a lateral.
 526  
         //
 527  
         // Since remote servers never get from clients and clients never go
 528  
         // remote from a remote call, this
 529  
         // will not result in any loops.
 530  
         //
 531  
         // This is the only instance I can think of where we allow a remote get
 532  
         // from a remote call. The purpose is to allow remote cache servers to
 533  
         // talk to each other. If one goes down, you want it to be able to get
 534  
         // data from those that were up when the failed server comes back o
 535  
         // line.
 536  
 
 537  21
         if ( !fromCluster && this.rcsa.getAllowClusterGet() )
 538  
         {
 539  21
             if ( log.isDebugEnabled() )
 540  
             {
 541  0
                 log.debug( "NonLocalGet. fromCluster [" + fromCluster + "] AllowClusterGet [" + this.rcsa.getAllowClusterGet() + "]"  );
 542  
             }
 543  21
             element = c.get( key );
 544  21
         }
 545  
         else
 546  
         {
 547  
             // Gets from cluster type remote will end up here.
 548  
             // Gets from all clients will end up here if allow cluster get is
 549  
             // false.
 550  
 
 551  0
             if ( log.isDebugEnabled() )
 552  
             {
 553  0
                 log.debug( "LocalGet.  fromCluster [" + fromCluster + "] AllowClusterGet [" + this.rcsa.getAllowClusterGet() + "]" );
 554  
             }
 555  0
             element = c.localGet( key );
 556  
         }
 557  
 
 558  21
         return element;
 559  
     }
 560  
 
 561  
     /**
 562  
      * Gets the set of keys of objects currently in the group.
 563  
      * <p>
 564  
      * @param cacheName
 565  
      * @param group
 566  
      * @return A Set of group keys
 567  
      */
 568  
     public Set getGroupKeys( String cacheName, String group )
 569  
     {
 570  0
         CacheListeners cacheDesc = null;
 571  
         try
 572  
         {
 573  0
             cacheDesc = getCacheListeners( cacheName );
 574  
         }
 575  0
         catch ( Exception e )
 576  
         {
 577  0
             log.error( "Problem getting listeners.", e );
 578  0
         }
 579  
 
 580  0
         if ( cacheDesc == null )
 581  
         {
 582  0
             return Collections.EMPTY_SET;
 583  
         }
 584  0
         CompositeCache c = (CompositeCache) cacheDesc.cache;
 585  0
         return c.getGroupKeys( group );
 586  
     }
 587  
 
 588  
     /**
 589  
      * Removes the given key from the specified remote cache. Defaults the
 590  
      * listener id to 0.
 591  
      * <p>
 592  
      * @param cacheName
 593  
      * @param key
 594  
      * @throws IOException
 595  
      */
 596  
     public void remove( String cacheName, Serializable key )
 597  
         throws IOException
 598  
     {
 599  0
         remove( cacheName, key, 0 );
 600  0
     }
 601  
 
 602  
     /**
 603  
      * Remove the key from the cache region and don't tell the source listener
 604  
      * about it.
 605  
      * <p>
 606  
      * @param cacheName
 607  
      * @param key
 608  
      * @param requesterId
 609  
      * @throws IOException
 610  
      */
 611  
     public void remove( String cacheName, Serializable key, long requesterId )
 612  
         throws IOException
 613  
     {
 614  77
         if ( log.isDebugEnabled() )
 615  
         {
 616  0
             log.debug( "remove [" + key + "] from cache [" + cacheName + "]" );
 617  
         }
 618  77
         CacheListeners cacheDesc = (CacheListeners) cacheListenersMap.get( cacheName );
 619  
 
 620  77
         Integer remoteTypeL = (Integer) idTypeMap.get( new Long( requesterId ) );
 621  77
         boolean fromCluster = false;
 622  77
         if ( remoteTypeL != null && remoteTypeL.intValue() == IRemoteCacheAttributes.CLUSTER )
 623  
         {
 624  0
             fromCluster = true;
 625  
         }
 626  
 
 627  77
         if ( cacheDesc != null )
 628  
         {
 629  
             // best attempt to achieve ordered cache item removal and
 630  
             // notification.
 631  77
             synchronized ( cacheDesc )
 632  
             {
 633  77
                 boolean removeSuccess = false;
 634  
 
 635  
                 // No need to notify if it was not cached.
 636  77
                 CompositeCache c = (CompositeCache) cacheDesc.cache;
 637  
 
 638  77
                 if ( fromCluster )
 639  
                 {
 640  0
                     if ( log.isDebugEnabled() )
 641  
                     {
 642  0
                         log.debug( "Remove FROM cluster, NOT updating other auxiliaries for region" );
 643  
                     }
 644  0
                     removeSuccess = c.localRemove( key );
 645  0
                 }
 646  
                 else
 647  
                 {
 648  77
                     if ( log.isDebugEnabled() )
 649  
                     {
 650  0
                         log.debug( "Remove NOT from cluster, updating other auxiliaries for region" );
 651  
                     }
 652  77
                     removeSuccess = c.remove( key );
 653  
                 }
 654  
 
 655  77
                 if ( log.isDebugEnabled() )
 656  
                 {
 657  0
                     log.debug( "remove [" + key + "] from cache [" + cacheName + "] success (was it found) = "
 658  
                         + removeSuccess );
 659  
                 }
 660  
 
 661  
                 // UPDATE LOCALS IF A REQUEST COMES FROM A CLUSTER
 662  
                 // IF LOCAL CLUSTER CONSISTENCY IS CONFIGURED
 663  77
                 if ( !fromCluster || ( fromCluster && rcsa.getLocalClusterConsistency() ) )
 664  
                 {
 665  77
                     ICacheEventQueue[] qlist = getEventQList( cacheDesc, requesterId );
 666  
 
 667  147
                     for ( int i = 0; i < qlist.length; i++ )
 668  
                     {
 669  70
                         qlist[i].addRemoveEvent( key );
 670  
                     }
 671  
                 }
 672  77
             }
 673  
         }
 674  77
         return;
 675  
     }
 676  
 
 677  
     /**
 678  
      * Remove all keys from the sepcified remote cache.
 679  
      * <p>
 680  
      * @param cacheName
 681  
      * @throws IOException
 682  
      */
 683  
     public void removeAll( String cacheName )
 684  
         throws IOException
 685  
     {
 686  0
         removeAll( cacheName, 0 );
 687  0
     }
 688  
 
 689  
     /**
 690  
      * Remove all keys from the specified remote cache.
 691  
      * <p>
 692  
      * @param cacheName
 693  
      * @param requesterId
 694  
      * @throws IOException
 695  
      */
 696  
     public void removeAll( String cacheName, long requesterId )
 697  
         throws IOException
 698  
     {
 699  0
         CacheListeners cacheDesc = (CacheListeners) cacheListenersMap.get( cacheName );
 700  
 
 701  0
         Integer remoteTypeL = (Integer) idTypeMap.get( new Long( requesterId ) );
 702  0
         boolean fromCluster = false;
 703  0
         if ( remoteTypeL != null && remoteTypeL.intValue() == IRemoteCacheAttributes.CLUSTER )
 704  
         {
 705  0
             fromCluster = true;
 706  
         }
 707  
 
 708  0
         if ( cacheDesc != null )
 709  
         {
 710  
             // best attempt to achieve ordered cache item removal and
 711  
             // notification.
 712  0
             synchronized ( cacheDesc )
 713  
             {
 714  
                 // No need to broadcast, or notify if it was not cached.
 715  0
                 CompositeCache c = (CompositeCache) cacheDesc.cache;
 716  
 
 717  0
                 if ( fromCluster )
 718  
                 {
 719  0
                     if ( log.isDebugEnabled() )
 720  
                     {
 721  0
                         log.debug( "RemoveALL FROM cluster, NOT updating other auxiliaries for region" );
 722  
                     }
 723  0
                     c.localRemoveAll();
 724  0
                 }
 725  
                 else
 726  
                 {
 727  0
                     if ( log.isDebugEnabled() )
 728  
                     {
 729  0
                         log.debug( "RemoveALL NOT from cluster, updating other auxiliaries for region" );
 730  
                     }
 731  0
                     c.removeAll();
 732  
                 }
 733  
 
 734  
                 // update registered listeners
 735  0
                 if ( !fromCluster || ( fromCluster && rcsa.getLocalClusterConsistency() ) )
 736  
                 {
 737  0
                     ICacheEventQueue[] qlist = getEventQList( cacheDesc, requesterId );
 738  
 
 739  0
                     for ( int i = 0; i < qlist.length; i++ )
 740  
                     {
 741  0
                         qlist[i].addRemoveAllEvent();
 742  
                     }
 743  
                 }
 744  0
             }
 745  
         }
 746  0
         return;
 747  
     }
 748  
 
 749  
     /**
 750  
      * How many put events have we received.
 751  
      * <p>
 752  
      * @return puts
 753  
      */
 754  
     protected int getPutCount()
 755  
     {
 756  56
         return puts;
 757  
     }
 758  
 
 759  
     /**
 760  
      * Frees the specified remote cache.
 761  
      * <p>
 762  
      * @param cacheName
 763  
      * @throws IOException
 764  
      */
 765  
     public void dispose( String cacheName )
 766  
         throws IOException
 767  
     {
 768  0
         dispose( cacheName, 0 );
 769  0
     }
 770  
 
 771  
     /**
 772  
      * Frees the specified remote cache.
 773  
      * <p>
 774  
      * @param cacheName
 775  
      * @param requesterId
 776  
      * @throws IOException
 777  
      */
 778  
     public void dispose( String cacheName, long requesterId )
 779  
         throws IOException
 780  
     {
 781  0
         if ( log.isInfoEnabled() )
 782  
         {
 783  0
             log.info( "Dispose request received from listener [" + requesterId + "]" );
 784  
         }
 785  
 
 786  0
         CacheListeners cacheDesc = (CacheListeners) cacheListenersMap.get( cacheName );
 787  
 
 788  
         // this is dangerous
 789  0
         if ( cacheDesc != null )
 790  
         {
 791  
             // best attempt to achieve ordered free-cache-op and notification.
 792  0
             synchronized ( cacheDesc )
 793  
             {
 794  0
                 ICacheEventQueue[] qlist = getEventQList( cacheDesc, requesterId );
 795  
 
 796  0
                 for ( int i = 0; i < qlist.length; i++ )
 797  
                 {
 798  0
                     qlist[i].addDisposeEvent();
 799  
                 }
 800  0
                 cacheManager.freeCache( cacheName );
 801  0
             }
 802  
         }
 803  0
         return;
 804  
     }
 805  
 
 806  
     /**
 807  
      * Frees all remote caches.
 808  
      * <p>
 809  
      * @throws IOException
 810  
      */
 811  
     public void release()
 812  
         throws IOException
 813  
     {
 814  0
         synchronized ( cacheListenersMap )
 815  
         {
 816  0
             for ( Enumeration en = cacheListenersMap.elements(); en.hasMoreElements(); )
 817  
             {
 818  0
                 CacheListeners cacheDesc = (CacheListeners) en.nextElement();
 819  0
                 ICacheEventQueue[] qlist = getEventQList( cacheDesc, 0 );
 820  
 
 821  0
                 for ( int i = 0; i < qlist.length; i++ )
 822  
                 {
 823  0
                     qlist[i].addDisposeEvent();
 824  
                 }
 825  0
             }
 826  0
             cacheManager.release();
 827  0
         }
 828  0
         return;
 829  
     }
 830  
 
 831  
     /**
 832  
      * Removes dead event queues. Should clean out deregistered listeners.
 833  
      * <p>
 834  
      * @param eventQMap
 835  
      */
 836  
     private static void cleanupEventQMap( Map eventQMap )
 837  
     {
 838  175
         synchronized ( eventQMap )
 839  
         {
 840  175
             for ( Iterator itr = eventQMap.entrySet().iterator(); itr.hasNext(); )
 841  
             {
 842  119
                 Map.Entry e = (Map.Entry) itr.next();
 843  119
                 ICacheEventQueue q = (ICacheEventQueue) e.getValue();
 844  
 
 845  
                 // this does not care if the q is alive (i.e. if
 846  
                 // there are active threads; it cares if the queue
 847  
                 // is working -- if it has not encoutnered errors
 848  
                 // above the failure threshhold
 849  119
                 if ( !q.isWorking() )
 850  
                 {
 851  0
                     itr.remove();
 852  0
                     log.warn( "Cache event queue " + q + " is not working and removed from cache server." );
 853  
                 }
 854  119
             }
 855  175
         }
 856  175
     }
 857  
 
 858  
     /**
 859  
      * Subscribes to the specified remote cache.
 860  
      * <p>
 861  
      * If the client id is 0, then the remote cache server will increment it's
 862  
      * local count and assign an id to the client.
 863  
      * <p>
 864  
      * @param cacheName
 865  
      *            the specified remote cache.
 866  
      * @param listener
 867  
      *            object to notify for cache changes. must be synchronized since
 868  
      *            there are remote calls involved.
 869  
      * @throws IOException
 870  
      */
 871  
     public void addCacheListener( String cacheName, ICacheListener listener )
 872  
         throws IOException
 873  
     {
 874  119
         if ( cacheName == null || listener == class="keyword">null )
 875  
         {
 876  0
             throw new IllegalArgumentException( "cacheName and listener must not be null" );
 877  
         }
 878  
         CacheListeners cacheDesc;
 879  
 
 880  119
         IRemoteCacheListener ircl = (IRemoteCacheListener) listener;
 881  
 
 882  119
         String listenerAddress = ircl.getLocalHostAddress();
 883  
 
 884  119
         int remoteType = ircl.getRemoteType();
 885  119
         if ( remoteType == IRemoteCacheAttributes.CLUSTER )
 886  
         {
 887  21
             log.debug( "adding cluster listener, listenerAddress [" + listenerAddress + "]" );
 888  21
             cacheDesc = getClusterListeners( cacheName );
 889  21
         }
 890  
         else
 891  
         {
 892  98
             log.debug( "adding normal listener, listenerAddress [" + listenerAddress + "]" );
 893  98
             cacheDesc = getCacheListeners( cacheName );
 894  
         }
 895  119
         Map eventQMap = cacheDesc.eventQMap;
 896  119
         cleanupEventQMap( eventQMap );
 897  
 
 898  
         // synchronized ( listenerId )
 899  119
         synchronized ( ICacheListener.class )
 900  
         {
 901  119
             long id = 0;
 902  
             try
 903  
             {
 904  119
                 id = listener.getListenerId();
 905  
                 // clients problably shouldn't do this.
 906  119
                 if ( id == 0 )
 907  
                 {
 908  
                     // must start at one so the next gets recognized
 909  119
                     long listenerIdB = nextListenerId();
 910  119
                     if ( log.isDebugEnabled() )
 911  
                     {
 912  0
                         log.debug( "listener id=" + ( listenerIdB & 0xff ) + " addded for cache [" + cacheName
 913  
                             + "], listenerAddress [" + listenerAddress + "]" );
 914  
                     }
 915  119
                     listener.setListenerId( listenerIdB );
 916  119
                     id = listenerIdB;
 917  
 
 918  
                     // in case it needs synchronization
 919  119
                     if ( log.isInfoEnabled() )
 920  
                     {
 921  119
                         log.info( "adding vm listener under new id = [" + listenerIdB + "], listenerAddress ["
 922  
                             + listenerAddress + "]" );
 923  
                     }
 924  119
                 }
 925  
                 else
 926  
                 {
 927  0
                     if ( log.isInfoEnabled() )
 928  
                     {
 929  0
                         log.info( "adding listener under existing id = [" + id + "], listenerAddress ["
 930  
                             + listenerAddress + "]" );
 931  
                     }
 932  
                     // should confirm the the host is the same as we have on
 933  
                     // record, just in case a client has made a mistake.
 934  
                 }
 935  
 
 936  
                 // relate the type to an id
 937  119
                 this.idTypeMap.put( new Long( id ), class="keyword">new Integer( remoteType ) );
 938  
             }
 939  0
             catch ( IOException ioe )
 940  
             {
 941  0
                 log.error( "Problem setting listener id, listenerAddress [" + listenerAddress + "]", ioe );
 942  119
             }
 943  
 
 944  119
             CacheEventQueueFactory fact = new CacheEventQueueFactory();
 945  119
             ICacheEventQueue q = fact.createCacheEventQueue( listener, id, cacheName, rcsa.getEventQueuePoolName(),
 946  
                                                              rcsa.getEventQueueTypeFactoryCode() );
 947  
 
 948  119
             eventQMap.put( new Long( listener.getListenerId() ), q );
 949  
 
 950  119
             if ( log.isInfoEnabled() )
 951  
             {
 952  119
                 log.info( "Region " + cacheName + "'s listener size = " + cacheDesc.eventQMap.size() );
 953  
             }
 954  119
         }
 955  119
     }
 956  
 
 957  
     /**
 958  
      * Subscribes to all remote caches.
 959  
      * <p>
 960  
      * @param listener
 961  
      *            The feature to be added to the CacheListener attribute
 962  
      * @throws IOException
 963  
      */
 964  
     public void addCacheListener( ICacheListener listener )
 965  
         throws IOException
 966  
     {
 967  14
         for ( Enumeration en = cacheListenersMap.keys(); en.hasMoreElements(); )
 968  
         {
 969  14
             String cacheName = (String) en.nextElement();
 970  14
             addCacheListener( cacheName, listener );
 971  
 
 972  14
             if ( log.isDebugEnabled() )
 973  
             {
 974  0
                 log.debug( "Adding listener for cache [" + cacheName + "]" );
 975  
             }
 976  14
         }
 977  14
     }
 978  
 
 979  
     /**
 980  
      * Unsubscribe this listener from this region. If the listener is
 981  
      * registered, it will be removed from the event queue map list.
 982  
      * <p>
 983  
      * @param cacheName
 984  
      * @param listenerId
 985  
      */
 986  
     public void removeCacheListener( String cacheName, ICacheListener listener )
 987  
         throws IOException
 988  
     {
 989  21
         removeCacheListener( cacheName, listener.getListenerId() );
 990  21
     }
 991  
 
 992  
     /**
 993  
      * Unsubscribe this listener from this region. If the listener is
 994  
      * registered, it will be removed from the event queue map list.
 995  
      * <p>
 996  
      * @param cacheName
 997  
      * @param listenerId
 998  
      */
 999  
     public void removeCacheListener( String cacheName, long listenerId )
 1000  
     {
 1001  35
         if ( log.isInfoEnabled() )
 1002  
         {
 1003  35
             log.info( "Removing listener for cache region = [" + cacheName + "] and listenerId [" + listenerId + "]" );
 1004  
         }
 1005  
 
 1006  35
         Integer remoteTypeL = (Integer) idTypeMap.get( new Long( listenerId ) );
 1007  35
         boolean isClusterListener = false;
 1008  35
         if ( remoteTypeL != null && remoteTypeL.intValue() == IRemoteCacheAttributes.CLUSTER )
 1009  
         {
 1010  7
             isClusterListener = true;
 1011  
         }
 1012  
 
 1013  35
         CacheListeners cacheDesc = null;
 1014  
 
 1015  35
         if ( isClusterListener )
 1016  
         {
 1017  7
             cacheDesc = getClusterListeners( cacheName );
 1018  7
         }
 1019  
         else
 1020  
         {
 1021  28
             cacheDesc = getCacheListeners( cacheName );
 1022  
         }
 1023  35
         Map eventQMap = cacheDesc.eventQMap;
 1024  35
         cleanupEventQMap( eventQMap );
 1025  35
         ICacheEventQueue q = (ICacheEventQueue) eventQMap.remove( new Long( listenerId ) );
 1026  
 
 1027  35
         if ( q != null )
 1028  
         {
 1029  21
             if ( log.isDebugEnabled() )
 1030  
             {
 1031  0
                 log.debug( "Found queue for cache region = [" + cacheName + "] and listenerId  [" + listenerId + "]" );
 1032  
             }
 1033  21
             q.destroy();
 1034  21
             cleanupEventQMap( eventQMap );
 1035  21
         }
 1036  
         else
 1037  
         {
 1038  14
             if ( log.isDebugEnabled() )
 1039  
             {
 1040  0
                 log.debug( "Did not find queue for cache region = [" + cacheName + "] and listenerId [" + listenerId
 1041  
                     + "]" );
 1042  
             }
 1043  
         }
 1044  
 
 1045  35
         if ( log.isInfoEnabled() )
 1046  
         {
 1047  35
             log.info( "After removing listener [" + listenerId + "] cache region " + cacheName + "'s listener size ["
 1048  
                 + cacheDesc.eventQMap.size() + "]" );
 1049  
         }
 1050  35
     }
 1051  
 
 1052  
     /**
 1053  
      * Unsubscribes from all remote caches.
 1054  
      * <p>
 1055  
      * @param listener
 1056  
      * @throws IOException
 1057  
      */
 1058  
     public void removeCacheListener( ICacheListener listener )
 1059  
         throws IOException
 1060  
     {
 1061  7
         for ( Enumeration en = cacheListenersMap.keys(); en.hasMoreElements(); )
 1062  
         {
 1063  21
             String cacheName = (String) en.nextElement();
 1064  21
             removeCacheListener( cacheName, listener );
 1065  
 
 1066  21
             if ( log.isInfoEnabled() )
 1067  
             {
 1068  21
                 log.info( "Removing listener for cache [" + cacheName + "]" );
 1069  
             }
 1070  21
         }
 1071  7
         return;
 1072  
     }
 1073  
 
 1074  
     /**
 1075  
      * Shuts down the remote server.
 1076  
      * <p>
 1077  
      * @throws IOException
 1078  
      */
 1079  
     public void shutdown()
 1080  
         throws IOException
 1081  
     {
 1082  0
         RemoteCacheServerFactory.shutdownImpl( "", Registry.REGISTRY_PORT );
 1083  0
     }
 1084  
 
 1085  
     /**
 1086  
      * Shuts down a server at a particular host and port.  Then it calls shutdown on the cache itself.
 1087  
      * <p>
 1088  
      * @param host
 1089  
      * @param port
 1090  
      * @throws IOException
 1091  
      */
 1092  
     public void shutdown( String host, int port )
 1093  
         throws IOException
 1094  
     {
 1095  0
         if ( log.isInfoEnabled() )
 1096  
         {
 1097  0
             log.info( "Received shutdown request.  Shutting down server." );
 1098  
         }
 1099  0
         RemoteCacheServerFactory.shutdownImpl( host, port );
 1100  0
         this.cacheManager.shutDown();
 1101  0
     }
 1102  
 
 1103  
     /**
 1104  
      * Called by the RMI runtime sometime after the runtime determines that the
 1105  
      * reference list, the list of clients referencing the remote object,
 1106  
      * becomes empty.
 1107  
      */
 1108  
     // TODO: test out the DGC.
 1109  
     public void unreferenced()
 1110  
     {
 1111  0
         if ( log.isInfoEnabled() )
 1112  
         {
 1113  0
             log.info( "*** Server now unreferenced and subject to GC. ***" );
 1114  
         }
 1115  0
     }
 1116  
 
 1117  
     /**
 1118  
      * Returns the next generated listener id [0,255].
 1119  
      * <p>
 1120  
      * @return the listener id of a client. This should be unique for this
 1121  
      *         server.
 1122  
      */
 1123  
     private long nextListenerId()
 1124  
     {
 1125  119
         long id = 0;
 1126  119
         if ( listenerId[0] == Long.MAX_VALUE )
 1127  
         {
 1128  0
             synchronized ( listenerId )
 1129  
             {
 1130  0
                 id = listenerId[0];
 1131  0
                 listenerId[0] = 0;
 1132  
                 // TODO: record & check if the generated id is currently being
 1133  
                 // used by a valid listener. Currently if the id wraps after
 1134  
                 // Long.MAX_VALUE,
 1135  
                 // we just assume it won't collide with an existing listener who
 1136  
                 // is live.
 1137  0
             }
 1138  0
         }
 1139  
         else
 1140  
         {
 1141  119
             synchronized ( listenerId )
 1142  
             {
 1143  119
                 id = ++listenerId[0];
 1144  119
             }
 1145  
         }
 1146  119
         return id;
 1147  
     }
 1148  
 
 1149  
     /**
 1150  
      * Gets the stats attribute of the RemoteCacheServer object.
 1151  
      * <p>
 1152  
      * @return The stats value
 1153  
      * @throws IOException
 1154  
      */
 1155  
     public String getStats()
 1156  
         throws IOException
 1157  
     {
 1158  28
         return cacheManager.getStats();
 1159  
     }
 1160  
 }

This report is generated by jcoverage, Maven and Maven JCoverage Plugin.