1 package org.apache.jcs.auxiliary.remote.server;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.IOException;
23 import java.io.Serializable;
24 import java.rmi.RemoteException;
25 import java.rmi.registry.Registry;
26 import java.rmi.server.UnicastRemoteObject;
27 import java.rmi.server.Unreferenced;
28 import java.util.Collections;
29 import java.util.Enumeration;
30 import java.util.Hashtable;
31 import java.util.Iterator;
32 import java.util.Map;
33 import java.util.Set;
34
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheAttributes;
38 import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheListener;
39 import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheObserver;
40 import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheService;
41 import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheServiceAdmin;
42 import org.apache.jcs.auxiliary.remote.server.behavior.IRemoteCacheServerAttributes;
43 import org.apache.jcs.engine.CacheEventQueueFactory;
44 import org.apache.jcs.engine.CacheListeners;
45 import org.apache.jcs.engine.behavior.ICacheElement;
46 import org.apache.jcs.engine.behavior.ICacheEventQueue;
47 import org.apache.jcs.engine.behavior.ICacheListener;
48 import org.apache.jcs.engine.control.CompositeCache;
49 import org.apache.jcs.engine.control.CompositeCacheManager;
50
51 /***
52 * This class provides remote cache services. The remote cache server propagates
53 * events from local caches to other local caches. It can also store cached
54 * data, making it available to new clients.
55 * <p>
56 * Remote cache servers can be clustered. If the cache used by this remote cache
57 * is configured to use a remote cache of type cluster, the two remote caches
58 * will communicate with each other. Remote and put requests can be sent from
59 * one remote to another. If they are configured to broadcast such event to
60 * their client, then remove an puts can be sent to all locals in the cluster.
61 * <p>
62 * Get requests are made between clustered servers if AllowClusterGet is true. You can setup
63 * several clients to use one remote server and several to use another. The get
64 * locad will be distributed between the two servers. Since caches are usually
65 * high get and low put, this should allow you to scale.
66 */
67 class RemoteCacheServer
68 extends UnicastRemoteObject
69 implements IRemoteCacheService, IRemoteCacheObserver, IRemoteCacheServiceAdmin, Unreferenced
70 {
71 private static final long serialVersionUID = -8072345435941473116L;
72
73 private final static Log log = LogFactory.getLog( RemoteCacheServer.class );
74
75 /*** timing -- if we should record operation times. */
76 protected final static boolean timing = true;
77
78 private int puts = 0;
79
80
81
82 private final Hashtable cacheListenersMap = new Hashtable();
83
84 private final Hashtable clusterListenersMap = new Hashtable();
85
86 private CompositeCacheManager cacheManager;
87
88
89 private final Hashtable idTypeMap = new Hashtable();
90
91
92 private int[] listenerId = new int[1];
93
94 /*** Configuration settings. */
95 protected IRemoteCacheServerAttributes rcsa;
96
97 /*** The interval at which we will log updates. */
98 private int logInterval = 100;
99
100 /***
101 * Constructor for the RemoteCacheServer object. Thiks initializes the
102 * server with the values from the config file.
103 * <p>
104 * @param rcsa
105 * @throws RemoteException
106 * @exception IOException
107 */
108 RemoteCacheServer( IRemoteCacheServerAttributes rcsa )
109 throws RemoteException
110 {
111 super( rcsa.getServicePort() );
112 this.rcsa = rcsa;
113 init( rcsa.getConfigFileName() );
114 }
115
116 /***
117 * Initialize the RMI Cache Server from a properties file.
118 * <p>
119 * @param prop
120 */
121 private void init( String prop )
122 {
123 cacheManager = createCacheManager( prop );
124
125
126
127 String[] list = cacheManager.getCacheNames();
128 for ( int i = 0; i < list.length; i++ )
129 {
130 String name = list[i];
131 cacheListenersMap.put( name, new CacheListeners( cacheManager.getCache( name ) ) );
132 }
133 }
134
135 /***
136 * Subclass can override this method to create the specific cache manager.
137 * <p>
138 * @param prop
139 * The anem of the configuration file.
140 * @return The cache hub configured with this configuration file.
141 */
142 private CompositeCacheManager createCacheManager( String prop )
143 {
144 CompositeCacheManager hub = CompositeCacheManager.getUnconfiguredInstance();
145
146 if ( prop == null )
147 {
148 hub.configure( "/remote.cache.ccf" );
149 }
150 else
151 {
152 hub.configure( prop );
153 }
154 return hub;
155 }
156
157 /***
158 * Returns the cache listener for the specified cache. Creates the cache and
159 * the cache descriptor if they do not already exist.
160 * <p>
161 * @param cacheName
162 * @return The cacheListeners value
163 */
164 protected CacheListeners getCacheListeners( String cacheName )
165 {
166 CacheListeners cacheListeners = (CacheListeners) cacheListenersMap.get( cacheName );
167 synchronized ( cacheListenersMap )
168 {
169 if ( cacheListeners == null )
170 {
171 cacheListeners = (CacheListeners) cacheListenersMap.get( cacheName );
172 if ( cacheListeners == null )
173 {
174 cacheListeners = new CacheListeners( cacheManager.getCache( cacheName ) );
175 cacheListenersMap.put( cacheName, cacheListeners );
176 }
177 }
178 }
179 return cacheListeners;
180 }
181
182 /***
183 * Gets the clusterListeners attribute of the RemoteCacheServer object.
184 * <p>
185 * @todo may be able to remove this
186 *
187 * @param cacheName
188 * @return The clusterListeners value
189 */
190 protected CacheListeners getClusterListeners( String cacheName )
191 {
192 CacheListeners cacheListeners = (CacheListeners) clusterListenersMap.get( cacheName );
193 synchronized ( clusterListenersMap )
194 {
195 if ( cacheListeners == null )
196 {
197 cacheListeners = (CacheListeners) clusterListenersMap.get( cacheName );
198 if ( cacheListeners == null )
199 {
200 cacheListeners = new CacheListeners( cacheManager.getCache( cacheName ) );
201 clusterListenersMap.put( cacheName, cacheListeners );
202 }
203 }
204 }
205 return cacheListeners;
206 }
207
208 /***
209 * Puts a cache bean to the remote cache and notifies all listeners which
210 * <br>
211 * <ol>
212 * <li>have a different listener id than the originating host;
213 * <li>are currently subscribed to the related cache.
214 * </ol>
215 * <p>
216 * @param item
217 * @throws IOException
218 *
219 */
220 public void put( ICacheElement item )
221 throws IOException
222 {
223 update( item );
224 }
225
226
227
228
229
230
231 public void update( ICacheElement item )
232 throws IOException
233 {
234 update( item, 0 );
235 }
236
237 /***
238 * An update can come from either a local cache's remote auxiliary, or it
239 * can come from a remote server. A remote server is considered a a source
240 * of type cluster.
241 * <p>
242 * If the update came from a cluster, then we should tell the cache manager
243 * that this was a remote put. This way, any lateral and remote auxiliaries
244 * configured for the region will not be updated. This is basically how a
245 * remote listener works when plugged into a local cache.
246 * <p>
247 * If the cluster is configured to keep local cluster consistency, then all
248 * listeners will be updated. This allows cluster server A to update cluster
249 * server B and then B to update its clients if it is told to keep local
250 * cluster consistency. Otherwise, server A will update server B and B will
251 * not tell its clients. If you cluster using lateral caches for instance,
252 * this is how it will work. Updates to a cluster node, will never get to
253 * the leavess. The remote cluster, with local cluster consistency, allows
254 * you to update leaves. This basically allows you to have a failover remote
255 * server.
256 * <p>
257 * Since currently a cluster will not try to get from other cluster servers,
258 * you can scale a bit with a cluster configuration. Puts and removes will
259 * be broadcasted to all clients, but the get load on a remote server can be
260 * reduced.
261 * <p>
262 * @param item
263 * @param requesterId
264 * @throws IOException
265 */
266 public void update( ICacheElement item, long requesterId )
267 throws IOException
268 {
269
270 long start = 0;
271 if ( timing )
272 {
273 start = System.currentTimeMillis();
274 }
275
276 if ( log.isInfoEnabled() )
277 {
278
279 puts++;
280 if ( puts % logInterval == 0 )
281 {
282 log.info( "puts = " + puts );
283 }
284 }
285
286 if ( log.isDebugEnabled() )
287 {
288 log.debug( "In update, put [" + item.getKey() + "] in [" + item.getCacheName() + "]" );
289 }
290
291 try
292 {
293 CacheListeners cacheDesc = getCacheListeners( item.getCacheName() );
294
295
296 Integer remoteTypeL = (Integer) idTypeMap.get( new Long( requesterId ) );
297 if ( log.isDebugEnabled() )
298 {
299 log.debug( "In update, requesterId = [" + requesterId + "] remoteType = " + remoteTypeL );
300 }
301
302 boolean fromCluster = false;
303 if ( remoteTypeL != null && remoteTypeL.intValue() == IRemoteCacheAttributes.CLUSTER )
304 {
305 fromCluster = true;
306 }
307
308 synchronized ( cacheDesc )
309 {
310 try
311 {
312 CompositeCache c = (CompositeCache) cacheDesc.cache;
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328 if ( fromCluster )
329 {
330 if ( log.isDebugEnabled() )
331 {
332 log.debug( "Put FROM cluster, NOT updating other auxiliaries for region. "
333 + " requesterId [" + requesterId + "]" );
334 }
335 c.localUpdate( item );
336 }
337 else
338 {
339 if ( log.isDebugEnabled() )
340 {
341 log.debug( "Put NOT from cluster, updating other auxiliaries for region. "
342 + " requesterId [" + requesterId + "]" );
343 }
344 c.update( item );
345 }
346 }
347 catch ( Exception ce )
348 {
349
350 if ( log.isInfoEnabled() )
351 {
352 log.info( "Exception caught updating item. requesterId [" + requesterId + "] "
353 + ce.getMessage() );
354 }
355 }
356
357
358
359 if ( !fromCluster || ( fromCluster && rcsa.getLocalClusterConsistency() ) )
360 {
361 ICacheEventQueue[] qlist = getEventQList( cacheDesc, requesterId );
362
363 if ( qlist != null )
364 {
365 if ( log.isDebugEnabled() )
366 {
367 log.debug( "qlist.length = " + qlist.length );
368 }
369 for ( int i = 0; i < qlist.length; i++ )
370 {
371 qlist[i].addPutEvent( item );
372 }
373 }
374 else
375 {
376 if ( log.isDebugEnabled() )
377 {
378 log.debug( "q list is null" );
379 }
380 }
381 }
382 }
383 }
384 catch ( Exception e )
385 {
386 log.error( "Trouble in Update. requesterId [" + requesterId + "]", e );
387 }
388
389
390 if ( timing )
391 {
392 long end = System.currentTimeMillis();
393 if ( log.isDebugEnabled() )
394 {
395 log.debug( "put took " + String.valueOf( end - start ) + " ms." );
396 }
397 }
398
399 return;
400 }
401
402 /***
403 * Gets the eventQList attribute of the RemoteCacheServer object. This
404 * returns the event queues stored in the cacheListeners object for a
405 * particuylar region, if the queue is not for this requester.
406 * <p>
407 * Basically, this makes sure that a request from a particular local cache,
408 * identified by its listener id, does not result in a call to that same
409 * listener.
410 * <p>
411 * @param cacheListeners
412 * @param requesterId
413 * @return The eventQList value
414 */
415 private ICacheEventQueue[] getEventQList( CacheListeners cacheListeners, long requesterId )
416 {
417 ICacheEventQueue[] list = null;
418 synchronized ( cacheListeners.eventQMap )
419 {
420 list = (ICacheEventQueue[]) cacheListeners.eventQMap.values().toArray( new ICacheEventQueue[0] );
421 }
422 int count = 0;
423
424 for ( int i = 0; i < list.length; i++ )
425 {
426 ICacheEventQueue q = list[i];
427 if ( q.isWorking() && q.getListenerId() != requesterId )
428 {
429 count++;
430 }
431 else
432 {
433 list[i] = null;
434 }
435 }
436 if ( count == list.length )
437 {
438
439 return list;
440 }
441
442
443 ICacheEventQueue[] qq = new ICacheEventQueue[count];
444 count = 0;
445 for ( int i = 0; i < list.length; i++ )
446 {
447 if ( list[i] != null )
448 {
449 qq[count++] = list[i];
450 }
451 }
452 return qq;
453 }
454
455 /***
456 * Returns a cache value from the specified remote cache; or null if the
457 * cache or key does not exist.
458 * <p>
459 * @param cacheName
460 * @param key
461 * @return ICacheElement
462 * @throws IOException
463 */
464 public ICacheElement get( String cacheName, Serializable key )
465 throws IOException
466 {
467 return this.get( cacheName, key, 0 );
468 }
469
470 /***
471 * Returns a cache bean from the specified cache; or null if the key does
472 * not exist.
473 * <p>
474 * Adding the requestor id, allows the cache to determine the sournce of the
475 * get.
476 * <p>
477 * @param cacheName
478 * @param key
479 * @param requesterId
480 * @return ICacheElement
481 * @throws IOException
482 */
483 public ICacheElement get( String cacheName, Serializable key, long requesterId )
484 throws IOException
485 {
486 Integer remoteTypeL = (Integer) idTypeMap.get( new Long( requesterId ) );
487
488 if ( log.isDebugEnabled() )
489 {
490 log.debug( "get [" + key + "] from cache [" + cacheName + "] requesterId = [" + requesterId + "] remoteType = "
491 + remoteTypeL );
492 }
493
494
495
496
497
498
499 boolean fromCluster = false;
500 if ( remoteTypeL != null && remoteTypeL.intValue() == IRemoteCacheAttributes.CLUSTER )
501 {
502 fromCluster = true;
503 }
504
505 CacheListeners cacheDesc = null;
506 try
507 {
508 cacheDesc = getCacheListeners( cacheName );
509 }
510 catch ( Exception e )
511 {
512 log.error( "Problem getting listeners.", e );
513 }
514
515 if ( cacheDesc == null )
516 {
517 return null;
518 }
519 CompositeCache c = (CompositeCache) cacheDesc.cache;
520
521 ICacheElement element = null;
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537 if ( !fromCluster && this.rcsa.getAllowClusterGet() )
538 {
539 if ( log.isDebugEnabled() )
540 {
541 log.debug( "NonLocalGet. fromCluster [" + fromCluster + "] AllowClusterGet [" + this.rcsa.getAllowClusterGet() + "]" );
542 }
543 element = c.get( key );
544 }
545 else
546 {
547
548
549
550
551 if ( log.isDebugEnabled() )
552 {
553 log.debug( "LocalGet. fromCluster [" + fromCluster + "] AllowClusterGet [" + this.rcsa.getAllowClusterGet() + "]" );
554 }
555 element = c.localGet( key );
556 }
557
558 return element;
559 }
560
561 /***
562 * Gets the set of keys of objects currently in the group.
563 * <p>
564 * @param cacheName
565 * @param group
566 * @return A Set of group keys
567 */
568 public Set getGroupKeys( String cacheName, String group )
569 {
570 CacheListeners cacheDesc = null;
571 try
572 {
573 cacheDesc = getCacheListeners( cacheName );
574 }
575 catch ( Exception e )
576 {
577 log.error( "Problem getting listeners.", e );
578 }
579
580 if ( cacheDesc == null )
581 {
582 return Collections.EMPTY_SET;
583 }
584 CompositeCache c = (CompositeCache) cacheDesc.cache;
585 return c.getGroupKeys( group );
586 }
587
588 /***
589 * Removes the given key from the specified remote cache. Defaults the
590 * listener id to 0.
591 * <p>
592 * @param cacheName
593 * @param key
594 * @throws IOException
595 */
596 public void remove( String cacheName, Serializable key )
597 throws IOException
598 {
599 remove( cacheName, key, 0 );
600 }
601
602 /***
603 * Remove the key from the cache region and don't tell the source listener
604 * about it.
605 * <p>
606 * @param cacheName
607 * @param key
608 * @param requesterId
609 * @throws IOException
610 */
611 public void remove( String cacheName, Serializable key, long requesterId )
612 throws IOException
613 {
614 if ( log.isDebugEnabled() )
615 {
616 log.debug( "remove [" + key + "] from cache [" + cacheName + "]" );
617 }
618 CacheListeners cacheDesc = (CacheListeners) cacheListenersMap.get( cacheName );
619
620 Integer remoteTypeL = (Integer) idTypeMap.get( new Long( requesterId ) );
621 boolean fromCluster = false;
622 if ( remoteTypeL != null && remoteTypeL.intValue() == IRemoteCacheAttributes.CLUSTER )
623 {
624 fromCluster = true;
625 }
626
627 if ( cacheDesc != null )
628 {
629
630
631 synchronized ( cacheDesc )
632 {
633 boolean removeSuccess = false;
634
635
636 CompositeCache c = (CompositeCache) cacheDesc.cache;
637
638 if ( fromCluster )
639 {
640 if ( log.isDebugEnabled() )
641 {
642 log.debug( "Remove FROM cluster, NOT updating other auxiliaries for region" );
643 }
644 removeSuccess = c.localRemove( key );
645 }
646 else
647 {
648 if ( log.isDebugEnabled() )
649 {
650 log.debug( "Remove NOT from cluster, updating other auxiliaries for region" );
651 }
652 removeSuccess = c.remove( key );
653 }
654
655 if ( log.isDebugEnabled() )
656 {
657 log.debug( "remove [" + key + "] from cache [" + cacheName + "] success (was it found) = "
658 + removeSuccess );
659 }
660
661
662
663 if ( !fromCluster || ( fromCluster && rcsa.getLocalClusterConsistency() ) )
664 {
665 ICacheEventQueue[] qlist = getEventQList( cacheDesc, requesterId );
666
667 for ( int i = 0; i < qlist.length; i++ )
668 {
669 qlist[i].addRemoveEvent( key );
670 }
671 }
672 }
673 }
674 return;
675 }
676
677 /***
678 * Remove all keys from the sepcified remote cache.
679 * <p>
680 * @param cacheName
681 * @throws IOException
682 */
683 public void removeAll( String cacheName )
684 throws IOException
685 {
686 removeAll( cacheName, 0 );
687 }
688
689 /***
690 * Remove all keys from the specified remote cache.
691 * <p>
692 * @param cacheName
693 * @param requesterId
694 * @throws IOException
695 */
696 public void removeAll( String cacheName, long requesterId )
697 throws IOException
698 {
699 CacheListeners cacheDesc = (CacheListeners) cacheListenersMap.get( cacheName );
700
701 Integer remoteTypeL = (Integer) idTypeMap.get( new Long( requesterId ) );
702 boolean fromCluster = false;
703 if ( remoteTypeL != null && remoteTypeL.intValue() == IRemoteCacheAttributes.CLUSTER )
704 {
705 fromCluster = true;
706 }
707
708 if ( cacheDesc != null )
709 {
710
711
712 synchronized ( cacheDesc )
713 {
714
715 CompositeCache c = (CompositeCache) cacheDesc.cache;
716
717 if ( fromCluster )
718 {
719 if ( log.isDebugEnabled() )
720 {
721 log.debug( "RemoveALL FROM cluster, NOT updating other auxiliaries for region" );
722 }
723 c.localRemoveAll();
724 }
725 else
726 {
727 if ( log.isDebugEnabled() )
728 {
729 log.debug( "RemoveALL NOT from cluster, updating other auxiliaries for region" );
730 }
731 c.removeAll();
732 }
733
734
735 if ( !fromCluster || ( fromCluster && rcsa.getLocalClusterConsistency() ) )
736 {
737 ICacheEventQueue[] qlist = getEventQList( cacheDesc, requesterId );
738
739 for ( int i = 0; i < qlist.length; i++ )
740 {
741 qlist[i].addRemoveAllEvent();
742 }
743 }
744 }
745 }
746 return;
747 }
748
749 /***
750 * How many put events have we received.
751 * <p>
752 * @return puts
753 */
754 protected int getPutCount()
755 {
756 return puts;
757 }
758
759 /***
760 * Frees the specified remote cache.
761 * <p>
762 * @param cacheName
763 * @throws IOException
764 */
765 public void dispose( String cacheName )
766 throws IOException
767 {
768 dispose( cacheName, 0 );
769 }
770
771 /***
772 * Frees the specified remote cache.
773 * <p>
774 * @param cacheName
775 * @param requesterId
776 * @throws IOException
777 */
778 public void dispose( String cacheName, long requesterId )
779 throws IOException
780 {
781 if ( log.isInfoEnabled() )
782 {
783 log.info( "Dispose request received from listener [" + requesterId + "]" );
784 }
785
786 CacheListeners cacheDesc = (CacheListeners) cacheListenersMap.get( cacheName );
787
788
789 if ( cacheDesc != null )
790 {
791
792 synchronized ( cacheDesc )
793 {
794 ICacheEventQueue[] qlist = getEventQList( cacheDesc, requesterId );
795
796 for ( int i = 0; i < qlist.length; i++ )
797 {
798 qlist[i].addDisposeEvent();
799 }
800 cacheManager.freeCache( cacheName );
801 }
802 }
803 return;
804 }
805
806 /***
807 * Frees all remote caches.
808 * <p>
809 * @throws IOException
810 */
811 public void release()
812 throws IOException
813 {
814 synchronized ( cacheListenersMap )
815 {
816 for ( Enumeration en = cacheListenersMap.elements(); en.hasMoreElements(); )
817 {
818 CacheListeners cacheDesc = (CacheListeners) en.nextElement();
819 ICacheEventQueue[] qlist = getEventQList( cacheDesc, 0 );
820
821 for ( int i = 0; i < qlist.length; i++ )
822 {
823 qlist[i].addDisposeEvent();
824 }
825 }
826 cacheManager.release();
827 }
828 return;
829 }
830
831 /***
832 * Removes dead event queues. Should clean out deregistered listeners.
833 * <p>
834 * @param eventQMap
835 */
836 private static void cleanupEventQMap( Map eventQMap )
837 {
838 synchronized ( eventQMap )
839 {
840 for ( Iterator itr = eventQMap.entrySet().iterator(); itr.hasNext(); )
841 {
842 Map.Entry e = (Map.Entry) itr.next();
843 ICacheEventQueue q = (ICacheEventQueue) e.getValue();
844
845
846
847
848
849 if ( !q.isWorking() )
850 {
851 itr.remove();
852 log.warn( "Cache event queue " + q + " is not working and removed from cache server." );
853 }
854 }
855 }
856 }
857
858 /***
859 * Subscribes to the specified remote cache.
860 * <p>
861 * If the client id is 0, then the remote cache server will increment it's
862 * local count and assign an id to the client.
863 * <p>
864 * @param cacheName
865 * the specified remote cache.
866 * @param listener
867 * object to notify for cache changes. must be synchronized since
868 * there are remote calls involved.
869 * @throws IOException
870 */
871 public void addCacheListener( String cacheName, ICacheListener listener )
872 throws IOException
873 {
874 if ( cacheName == null || listener == null )
875 {
876 throw new IllegalArgumentException( "cacheName and listener must not be null" );
877 }
878 CacheListeners cacheDesc;
879
880 IRemoteCacheListener ircl = (IRemoteCacheListener) listener;
881
882 String listenerAddress = ircl.getLocalHostAddress();
883
884 int remoteType = ircl.getRemoteType();
885 if ( remoteType == IRemoteCacheAttributes.CLUSTER )
886 {
887 log.debug( "adding cluster listener, listenerAddress [" + listenerAddress + "]" );
888 cacheDesc = getClusterListeners( cacheName );
889 }
890 else
891 {
892 log.debug( "adding normal listener, listenerAddress [" + listenerAddress + "]" );
893 cacheDesc = getCacheListeners( cacheName );
894 }
895 Map eventQMap = cacheDesc.eventQMap;
896 cleanupEventQMap( eventQMap );
897
898
899 synchronized ( ICacheListener.class )
900 {
901 long id = 0;
902 try
903 {
904 id = listener.getListenerId();
905
906 if ( id == 0 )
907 {
908
909 long listenerIdB = nextListenerId();
910 if ( log.isDebugEnabled() )
911 {
912 log.debug( "listener id=" + ( listenerIdB & 0xff ) + " addded for cache [" + cacheName
913 + "], listenerAddress [" + listenerAddress + "]" );
914 }
915 listener.setListenerId( listenerIdB );
916 id = listenerIdB;
917
918
919 if ( log.isInfoEnabled() )
920 {
921 log.info( "adding vm listener under new id = [" + listenerIdB + "], listenerAddress ["
922 + listenerAddress + "]" );
923 }
924 }
925 else
926 {
927 if ( log.isInfoEnabled() )
928 {
929 log.info( "adding listener under existing id = [" + id + "], listenerAddress ["
930 + listenerAddress + "]" );
931 }
932
933
934 }
935
936
937 this.idTypeMap.put( new Long( id ), new Integer( remoteType ) );
938 }
939 catch ( IOException ioe )
940 {
941 log.error( "Problem setting listener id, listenerAddress [" + listenerAddress + "]", ioe );
942 }
943
944 CacheEventQueueFactory fact = new CacheEventQueueFactory();
945 ICacheEventQueue q = fact.createCacheEventQueue( listener, id, cacheName, rcsa.getEventQueuePoolName(),
946 rcsa.getEventQueueTypeFactoryCode() );
947
948 eventQMap.put( new Long( listener.getListenerId() ), q );
949
950 if ( log.isInfoEnabled() )
951 {
952 log.info( "Region " + cacheName + "'s listener size = " + cacheDesc.eventQMap.size() );
953 }
954 }
955 }
956
957 /***
958 * Subscribes to all remote caches.
959 * <p>
960 * @param listener
961 * The feature to be added to the CacheListener attribute
962 * @throws IOException
963 */
964 public void addCacheListener( ICacheListener listener )
965 throws IOException
966 {
967 for ( Enumeration en = cacheListenersMap.keys(); en.hasMoreElements(); )
968 {
969 String cacheName = (String) en.nextElement();
970 addCacheListener( cacheName, listener );
971
972 if ( log.isDebugEnabled() )
973 {
974 log.debug( "Adding listener for cache [" + cacheName + "]" );
975 }
976 }
977 }
978
979 /***
980 * Unsubscribe this listener from this region. If the listener is
981 * registered, it will be removed from the event queue map list.
982 * <p>
983 * @param cacheName
984 * @param listenerId
985 */
986 public void removeCacheListener( String cacheName, ICacheListener listener )
987 throws IOException
988 {
989 removeCacheListener( cacheName, listener.getListenerId() );
990 }
991
992 /***
993 * Unsubscribe this listener from this region. If the listener is
994 * registered, it will be removed from the event queue map list.
995 * <p>
996 * @param cacheName
997 * @param listenerId
998 */
999 public void removeCacheListener( String cacheName, long listenerId )
1000 {
1001 if ( log.isInfoEnabled() )
1002 {
1003 log.info( "Removing listener for cache region = [" + cacheName + "] and listenerId [" + listenerId + "]" );
1004 }
1005
1006 Integer remoteTypeL = (Integer) idTypeMap.get( new Long( listenerId ) );
1007 boolean isClusterListener = false;
1008 if ( remoteTypeL != null && remoteTypeL.intValue() == IRemoteCacheAttributes.CLUSTER )
1009 {
1010 isClusterListener = true;
1011 }
1012
1013 CacheListeners cacheDesc = null;
1014
1015 if ( isClusterListener )
1016 {
1017 cacheDesc = getClusterListeners( cacheName );
1018 }
1019 else
1020 {
1021 cacheDesc = getCacheListeners( cacheName );
1022 }
1023 Map eventQMap = cacheDesc.eventQMap;
1024 cleanupEventQMap( eventQMap );
1025 ICacheEventQueue q = (ICacheEventQueue) eventQMap.remove( new Long( listenerId ) );
1026
1027 if ( q != null )
1028 {
1029 if ( log.isDebugEnabled() )
1030 {
1031 log.debug( "Found queue for cache region = [" + cacheName + "] and listenerId [" + listenerId + "]" );
1032 }
1033 q.destroy();
1034 cleanupEventQMap( eventQMap );
1035 }
1036 else
1037 {
1038 if ( log.isDebugEnabled() )
1039 {
1040 log.debug( "Did not find queue for cache region = [" + cacheName + "] and listenerId [" + listenerId
1041 + "]" );
1042 }
1043 }
1044
1045 if ( log.isInfoEnabled() )
1046 {
1047 log.info( "After removing listener [" + listenerId + "] cache region " + cacheName + "'s listener size ["
1048 + cacheDesc.eventQMap.size() + "]" );
1049 }
1050 }
1051
1052 /***
1053 * Unsubscribes from all remote caches.
1054 * <p>
1055 * @param listener
1056 * @throws IOException
1057 */
1058 public void removeCacheListener( ICacheListener listener )
1059 throws IOException
1060 {
1061 for ( Enumeration en = cacheListenersMap.keys(); en.hasMoreElements(); )
1062 {
1063 String cacheName = (String) en.nextElement();
1064 removeCacheListener( cacheName, listener );
1065
1066 if ( log.isInfoEnabled() )
1067 {
1068 log.info( "Removing listener for cache [" + cacheName + "]" );
1069 }
1070 }
1071 return;
1072 }
1073
1074 /***
1075 * Shuts down the remote server.
1076 * <p>
1077 * @throws IOException
1078 */
1079 public void shutdown()
1080 throws IOException
1081 {
1082 RemoteCacheServerFactory.shutdownImpl( "", Registry.REGISTRY_PORT );
1083 }
1084
1085 /***
1086 * Shuts down a server at a particular host and port. Then it calls shutdown on the cache itself.
1087 * <p>
1088 * @param host
1089 * @param port
1090 * @throws IOException
1091 */
1092 public void shutdown( String host, int port )
1093 throws IOException
1094 {
1095 if ( log.isInfoEnabled() )
1096 {
1097 log.info( "Received shutdown request. Shutting down server." );
1098 }
1099 RemoteCacheServerFactory.shutdownImpl( host, port );
1100 this.cacheManager.shutDown();
1101 }
1102
1103 /***
1104 * Called by the RMI runtime sometime after the runtime determines that the
1105 * reference list, the list of clients referencing the remote object,
1106 * becomes empty.
1107 */
1108
1109 public void unreferenced()
1110 {
1111 if ( log.isInfoEnabled() )
1112 {
1113 log.info( "*** Server now unreferenced and subject to GC. ***" );
1114 }
1115 }
1116
1117 /***
1118 * Returns the next generated listener id [0,255].
1119 * <p>
1120 * @return the listener id of a client. This should be unique for this
1121 * server.
1122 */
1123 private long nextListenerId()
1124 {
1125 long id = 0;
1126 if ( listenerId[0] == Long.MAX_VALUE )
1127 {
1128 synchronized ( listenerId )
1129 {
1130 id = listenerId[0];
1131 listenerId[0] = 0;
1132
1133
1134
1135
1136
1137 }
1138 }
1139 else
1140 {
1141 synchronized ( listenerId )
1142 {
1143 id = ++listenerId[0];
1144 }
1145 }
1146 return id;
1147 }
1148
1149 /***
1150 * Gets the stats attribute of the RemoteCacheServer object.
1151 * <p>
1152 * @return The stats value
1153 * @throws IOException
1154 */
1155 public String getStats()
1156 throws IOException
1157 {
1158 return cacheManager.getStats();
1159 }
1160 }