View Javadoc

1   package org.apache.jcs.auxiliary.remote;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.IOException;
23  import java.io.Serializable;
24  import java.lang.reflect.InvocationTargetException;
25  import java.net.ServerSocket;
26  import java.net.Socket;
27  import java.rmi.server.RMISocketFactory;
28  import java.util.ArrayList;
29  import java.util.Set;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.jcs.auxiliary.AuxiliaryCacheAttributes;
34  import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheAttributes;
35  import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheClient;
36  import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheListener;
37  import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheService;
38  import org.apache.jcs.engine.CacheConstants;
39  import org.apache.jcs.engine.behavior.ICacheElement;
40  import org.apache.jcs.engine.behavior.ICacheElementSerialized;
41  import org.apache.jcs.engine.behavior.IElementAttributes;
42  import org.apache.jcs.engine.behavior.IElementSerializer;
43  import org.apache.jcs.engine.behavior.IZombie;
44  import org.apache.jcs.engine.stats.StatElement;
45  import org.apache.jcs.engine.stats.Stats;
46  import org.apache.jcs.engine.stats.behavior.IStatElement;
47  import org.apache.jcs.engine.stats.behavior.IStats;
48  import org.apache.jcs.utils.serialization.SerializationConversionUtil;
49  import org.apache.jcs.utils.serialization.StandardSerializer;
50  import org.apache.jcs.utils.threadpool.ThreadPool;
51  import org.apache.jcs.utils.threadpool.ThreadPoolManager;
52  
53  import EDU.oswego.cs.dl.util.concurrent.Callable;
54  import EDU.oswego.cs.dl.util.concurrent.FutureResult;
55  import EDU.oswego.cs.dl.util.concurrent.TimeoutException;
56  
57  /***
58   * Client proxy for an RMI remote cache. This handles gets, updates, and removes. It also initiates
59   * failover recovery when an error is encountered.
60   */
61  public class RemoteCache
62      implements IRemoteCacheClient
63  {
64      private static final long serialVersionUID = -5329231850422826460L;
65  
66      private final static Log log = LogFactory.getLog( RemoteCache.class );
67  
68      final String cacheName;
69  
70      private IRemoteCacheAttributes irca;
71  
72      private IRemoteCacheService remote;
73  
74      private IRemoteCacheListener listener;
75  
76      private IElementAttributes attr = null;
77  
78      private ThreadPool pool = null;
79  
80      private boolean usePoolForGet = false;
81  
82      private IElementSerializer elementSerializer = new StandardSerializer();
83  
84      /***
85       * Constructor for the RemoteCache object. This object communicates with a remote cache server.
86       * One of these exists for each region. This also holds a reference to a listener. The same
87       * listener is used for all regions for one remote server. Holding a reference to the listener
88       * allows this object to know the listener id assigned by the remote cache.
89       * <p>
90       * @param cattr
91       * @param remote
92       * @param listener
93       */
94      public RemoteCache( IRemoteCacheAttributes cattr, IRemoteCacheService remote, IRemoteCacheListener listener )
95      {
96          this.irca = cattr;
97          this.cacheName = cattr.getCacheName();
98          this.remote = remote;
99          this.listener = listener;
100 
101         if ( log.isDebugEnabled() )
102         {
103             log.debug( "Construct> cacheName=" + cattr.getCacheName() );
104             log.debug( "irca = " + irca );
105             log.debug( "remote = " + remote );
106             log.debug( "listener = " + listener );
107         }
108 
109         // use a pool if it is greater than 0
110         if ( log.isDebugEnabled() )
111         {
112             log.debug( "GetTimeoutMillis() = " + irca.getGetTimeoutMillis() );
113         }
114 
115         if ( irca.getGetTimeoutMillis() > 0 )
116         {
117             pool = ThreadPoolManager.getInstance().getPool( irca.getThreadPoolName() );
118             if ( log.isDebugEnabled() )
119             {
120                 log.debug( "Thread Pool = " + pool );
121             }
122             if ( pool != null )
123             {
124                 usePoolForGet = true;
125             }
126         }
127 
128         try
129         {
130             // Don't set a socket factory if the setting is -1
131             if ( irca.getRmiSocketFactoryTimeoutMillis() > 0 )
132             {
133                 // TODO make configurable.
134                 // use this socket factory to add a timeout.
135                 RMISocketFactory.setSocketFactory( new RMISocketFactory()
136                 {
137                     public Socket createSocket( String host, int port )
138                         throws IOException
139                     {
140                         Socket socket = new Socket( host, port );
141                         socket.setSoTimeout( irca.getRmiSocketFactoryTimeoutMillis() );
142                         socket.setSoLinger( false, 0 );
143                         return socket;
144                     }
145 
146                     public ServerSocket createServerSocket( int port )
147                         throws IOException
148                     {
149                         return new ServerSocket( port );
150                     }
151                 } );
152             }
153         }
154         catch ( Exception e )
155         {
156             // TODO change this so that we only try to do it once. Otherwise we
157             // genreate errors for each region on construction.
158             log.info( e.getMessage() );
159         }
160     }
161 
162     /***
163      * Sets the attributes attribute of the RemoteCache object.
164      * <p>
165      * @param attr The new attributes value
166      */
167     public void setElementAttributes( IElementAttributes attr )
168     {
169         this.attr = attr;
170     }
171 
172     /***
173      * Gets the attributes attribute of the RemoteCache object.
174      * <p>
175      * @return The attributes value
176      */
177     public IElementAttributes getElementAttributes()
178     {
179         return this.attr;
180     }
181 
182     /***
183      * Serializes the object and then calls update on the remote server with the byte array. The
184      * byte array is wrapped in a ICacheElementSerialized. This allows the remote server to operate
185      * without any knowledge of caches classes.
186      * <p>
187      * (non-Javadoc)
188      * @see org.apache.jcs.engine.behavior.ICache#update(org.apache.jcs.engine.behavior.ICacheElement)
189      */
190     public void update( ICacheElement ce )
191         throws IOException
192     {
193         if ( true )
194         {
195             if ( !this.irca.getGetOnly() )
196             {
197                 ICacheElementSerialized serialized = null;
198                 try
199                 {
200                     if ( log.isDebugEnabled() )
201                     {
202                         log.debug( "sending item to remote server" );
203                     }
204 
205                     // convert so we don't have to know about the object on the
206                     // other end.
207                     serialized = SerializationConversionUtil
208                         .getSerializedCacheElement( ce, this.elementSerializer );
209 
210                     remote.update( serialized, getListenerId() );
211                 }
212                 catch ( NullPointerException npe )
213                 {
214                     log.error( "npe for ce = " + ce + "ce.attr = " + ce.getElementAttributes(), npe );
215                     return;
216                 }
217                 catch ( Exception ex )
218                 {
219                     // event queue will wait and retry
220                     handleException( ex, "Failed to put [" + ce.getKey() + "] to " + ce.getCacheName() );
221                 }
222             }
223             else
224             {
225                 if ( log.isDebugEnabled() )
226                 {
227                     log.debug( "get only mode, not sending to remote server" );
228                 }
229             }
230         }
231     }
232 
233     /***
234      * Synchronously get from the remote cache; if failed, replace the remote handle with a zombie.
235      * <p>
236      * Use threadpool to timeout if a value is set for GetTimeoutMillis
237      * <p>
238      * If we are a cluster client, we need to leave the Element in its serilaized form. Cluster
239      * cients cannot deserialize objects. Cluster clients get ICacheElementSerialized objects from
240      * other remote servers.
241      * <p>
242      * @param key
243      * @return ICacheElement, a wrapper around the key, value, and attributes
244      * @throws IOException
245      */
246     public ICacheElement get( Serializable key )
247         throws IOException
248     {
249         ICacheElement retVal = null;
250 
251         try
252         {
253             if ( usePoolForGet )
254             {
255                 retVal = getUsingPool( key );
256             }
257             else
258             {
259                 retVal = remote.get( cacheName, key, getListenerId() );
260             }
261 
262             // Eventually the instance of will not be necessary.
263             if ( retVal != null && retVal instanceof ICacheElementSerialized )
264             {
265                 // Never try to deserialize if you are a cluster client. Cluster
266                 // clients are merely intra-remote cache communicators. Remote caches are assumed
267                 // to have no ability to deserialze the objects.
268                 if ( this.irca.getRemoteType() != IRemoteCacheAttributes.CLUSTER )
269                 {
270                     retVal = SerializationConversionUtil.getDeSerializedCacheElement( (ICacheElementSerialized) retVal,
271                                                                                       this.elementSerializer );
272                 }
273             }
274         }
275         catch ( Exception ex )
276         {
277             handleException( ex, "Failed to get [" + key + "] from [" + cacheName + "]" );
278         }
279 
280         return retVal;
281     }
282 
283     /***
284      * This allows gets to timeout in case of remote server machine shutdown.
285      * <p>
286      * @param key
287      * @return ICacheElement
288      * @throws IOException
289      */
290     public ICacheElement getUsingPool( final Serializable key )
291         throws IOException
292     {
293         int timeout = irca.getGetTimeoutMillis();
294 
295         try
296         {
297             FutureResult future = new FutureResult();
298             Runnable command = future.setter( new Callable()
299             {
300                 public Object call()
301                     throws IOException
302                 {
303                     return remote.get( cacheName, key, getListenerId() );
304                 }
305             } );
306 
307             // execute using the pool
308             pool.execute( command );
309 
310             // used timed get in order to timeout
311             ICacheElement ice = (ICacheElement) future.timedGet( timeout );
312             if ( log.isDebugEnabled() )
313             {
314                 if ( ice == null )
315                 {
316                     log.debug( "nothing found in remote cache" );
317                 }
318                 else
319                 {
320                     log.debug( "found item in remote cache" );
321                 }
322             }
323             return ice;
324         }
325         catch ( TimeoutException te )
326         {
327             log.warn( "TimeoutException, Get Request timed out after " + timeout );
328             throw new IOException( "Get Request timed out after " + timeout );
329         }
330         catch ( InterruptedException ex )
331         {
332             log.warn( "InterruptedException, Get Request timed out after " + timeout );
333             throw new IOException( "Get Request timed out after " + timeout );
334         }
335         catch ( InvocationTargetException ex )
336         {
337             // assume that this is an IOException thrown by the callable.
338             log.error( "InvocationTargetException, Assuming an IO exception thrown in the background.", ex );
339             throw new IOException( "Get Request timed out after " + timeout );
340         }
341     }
342 
343     /***
344      * Returns all the keys for a group.
345      * <p>
346      * @param groupName
347      * @return Set
348      * @throws java.rmi.RemoteException
349      */
350     public Set getGroupKeys( String groupName )
351         throws java.rmi.RemoteException
352     {
353         return remote.getGroupKeys( cacheName, groupName );
354     }
355 
356     /***
357      * Synchronously remove from the remote cache; if failed, replace the remote handle with a
358      * zombie.
359      * <p>
360      * @param key
361      * @return boolean, whether or not the item was removed
362      * @throws IOException
363      */
364     public boolean remove( Serializable key )
365         throws IOException
366     {
367         if ( true )
368         {
369             if ( !this.irca.getGetOnly() )
370             {
371                 if ( log.isDebugEnabled() )
372                 {
373                     log.debug( "remove> key=" + key );
374                 }
375                 try
376                 {
377                     remote.remove( cacheName, key, getListenerId() );
378                 }
379                 catch ( Exception ex )
380                 {
381                     handleException( ex, "Failed to remove " + key + " from " + cacheName );
382                 }
383             }
384         }
385         return false;
386     }
387 
388     /***
389      * Synchronously removeAll from the remote cache; if failed, replace the remote handle with a
390      * zombie.
391      * <p>
392      * @throws IOException
393      */
394     public void removeAll()
395         throws IOException
396     {
397         if ( true )
398         {
399             if ( !this.irca.getGetOnly() )
400             {
401                 try
402                 {
403                     remote.removeAll( cacheName, getListenerId() );
404                 }
405                 catch ( Exception ex )
406                 {
407                     handleException( ex, "Failed to remove all from " + cacheName );
408                 }
409             }
410         }
411     }
412 
413     /***
414      * Synchronously dispose the remote cache; if failed, replace the remote handle with a zombie.
415      * <p>
416      * @throws IOException
417      */
418     public void dispose()
419         throws IOException
420     {
421         if ( log.isInfoEnabled() )
422         {
423             log.info( "Disposing of remote cache" );
424         }
425         try
426         {
427             listener.dispose();
428         }
429         catch ( Exception ex )
430         {
431             log.error( "Couldn't dispose", ex );
432             handleException( ex, "Failed to dispose [" + cacheName + "]" );
433         }
434     }
435 
436     /***
437      * Returns the cache status. An error status indicates the remote connection is not available.
438      * <p>
439      * @return The status value
440      */
441     public int getStatus()
442     {
443         return remote instanceof IZombie ? CacheConstants.STATUS_ERROR : CacheConstants.STATUS_ALIVE;
444     }
445 
446     /***
447      * Gets the stats attribute of the RemoteCache object.
448      * <p>
449      * @return The stats value
450      */
451     public String getStats()
452     {
453         return getStatistics().toString();
454     }
455 
456     /***
457      * @return IStats object
458      */
459     public IStats getStatistics()
460     {
461         IStats stats = new Stats();
462         stats.setTypeName( "Remote Cache No Wait" );
463 
464         ArrayList elems = new ArrayList();
465 
466         IStatElement se = null;
467 
468         se = new StatElement();
469         se.setName( "Remote Host:Port" );
470         se.setData( this.irca.getRemoteHost() + ":" + this.irca.getRemotePort() );
471         elems.add( se );
472 
473         se = new StatElement();
474         se.setName( "Remote Type" );
475         se.setData( this.irca.getRemoteTypeName() + "" );
476         elems.add( se );
477 
478         if ( this.irca.getRemoteType() == IRemoteCacheAttributes.CLUSTER )
479         {
480             // something cluster specific
481         }
482 
483         // no data gathered here
484 
485         se = new StatElement();
486         se.setName( "UsePoolForGet" );
487         se.setData( "" + usePoolForGet );
488         elems.add( se );
489 
490         if ( pool != null )
491         {
492             se = new StatElement();
493             se.setName( "Pool Size" );
494             se.setData( "" + pool.getPool().getPoolSize() );
495             elems.add( se );
496 
497             se = new StatElement();
498             se.setName( "Maximum Pool Size" );
499             se.setData( "" + pool.getPool().getMaximumPoolSize() );
500             elems.add( se );
501         }
502 
503         if ( remote instanceof ZombieRemoteCacheService )
504         {
505             se = new StatElement();
506             se.setName( "Zombie Queue Size" );
507             se.setData( "" + ((ZombieRemoteCacheService)remote).getQueueSize() );
508             elems.add( se );
509         }
510 
511         // get an array and put them in the Stats object
512         IStatElement[] ses = (IStatElement[]) elems.toArray( new StatElement[0] );
513         stats.setStatElements( ses );
514 
515         return stats;
516     }
517 
518     /***
519      * Returns the current cache size.
520      * @return The size value
521      */
522     public int getSize()
523     {
524         return 0;
525     }
526 
527     /***
528      * Gets the cacheType attribute of the RemoteCache object
529      * @return The cacheType value
530      */
531     public int getCacheType()
532     {
533         return REMOTE_CACHE;
534     }
535 
536     /***
537      * Gets the cacheName attribute of the RemoteCache object.
538      * <p>
539      * @return The cacheName value
540      */
541     public String getCacheName()
542     {
543         return cacheName;
544     }
545 
546     /***
547      * Replaces the current remote cache service handle with the given handle.
548      * If the current remote is a Zombie, the propagate teh events that may be
549      * queued to the restored service.
550      * <p>
551      * @param remote IRemoteCacheService -- the remote server or proxy to the remote server
552      */
553     public void fixCache( IRemoteCacheService remote )
554     {
555         if ( this.remote != null && this.remote instanceof ZombieRemoteCacheService )
556         {
557             ZombieRemoteCacheService zombie = (ZombieRemoteCacheService)this.remote;
558             this.remote = remote;
559             try
560             {
561                 zombie.propagateEvents(  remote );
562             }
563             catch ( Exception e )
564             {
565                 try
566                 {
567                     handleException( e, "Problem propagating events from Zombie Queue to new Remote Service." );
568                 }
569                 catch ( IOException e1 )
570                 {
571                     // swallow, since this is just expected kick back.  Handle always throws
572                 }
573             }
574         }
575         else
576         {
577             this.remote = remote;
578         }
579         return;
580     }
581 
582     /***
583      * Handles exception by disabling the remote cache service before re-throwing the exception in
584      * the form of an IOException.
585      * <p>
586      * @param ex
587      * @param msg
588      * @throws IOException
589      */
590     private void handleException( Exception ex, String msg )
591         throws IOException
592     {
593         log.error( "Disabling remote cache due to error: " + msg , ex );
594 
595         // we should not switch if the existing is a zombie.
596         if ( remote == null || !(remote instanceof ZombieRemoteCacheService) )
597         {
598             // TODO make configurable
599             remote = new ZombieRemoteCacheService( irca.getZombieQueueMaxSize() );
600         }
601         // may want to flush if region specifies
602         // Notify the cache monitor about the error, and kick off the recovery
603         // process.
604         RemoteCacheMonitor.getInstance().notifyError();
605 
606         // initiate failover if local
607         RemoteCacheNoWaitFacade rcnwf = (RemoteCacheNoWaitFacade) RemoteCacheFactory.getFacades()
608             .get( irca.getCacheName() );
609 
610         if ( log.isDebugEnabled() )
611         {
612             log.debug( "Initiating failover, rcnf = " + rcnwf );
613         }
614 
615         if ( rcnwf != null && rcnwf.remoteCacheAttributes.getRemoteType() == RemoteCacheAttributes.LOCAL )
616         {
617             if ( log.isDebugEnabled() )
618             {
619                 log.debug( "Found facade, calling failover" );
620             }
621             // may need to remove the noWait index here. It will be 0 if it is
622             // local since there is only 1 possible listener.
623             rcnwf.failover( 0 );
624         }
625 
626         if ( ex instanceof IOException )
627         {
628             throw (IOException) ex;
629         }
630         throw new IOException( ex.getMessage() );
631     }
632 
633     /***
634      * @return Returns the AuxiliaryCacheAttributes.
635      */
636     public AuxiliaryCacheAttributes getAuxiliaryCacheAttributes()
637     {
638         return irca;
639     }
640 
641     /***
642      * let the remote cache set a listener_id. Since there is only one listerenr for all the regions
643      * and every region gets registered? the id shouldn't be set if it isn't zero. If it is we
644      * assume that it is a reconnect.
645      * @param id The new listenerId value
646      */
647     public void setListenerId( long id )
648     {
649         try
650         {
651             listener.setListenerId( id );
652 
653             if ( log.isDebugEnabled() )
654             {
655                 log.debug( "set listenerId = " + id );
656             }
657         }
658         catch ( Exception e )
659         {
660             log.error( "Problem setting listenerId", e );
661         }
662     }
663 
664     /***
665      * Gets the listenerId attribute of the RemoteCacheListener object
666      * @return The listenerId value
667      */
668     public long getListenerId()
669     {
670         try
671         {
672             if ( log.isDebugEnabled() )
673             {
674                 log.debug( "get listenerId = " + listener.getListenerId() );
675             }
676             return listener.getListenerId();
677         }
678         catch ( Exception e )
679         {
680             log.error( "Problem setting listenerId", e );
681         }
682         return -1;
683     }
684 
685     /***
686      * Allows other member of this package to access the listerner. This is mainly needed for
687      * deregistering alistener.
688      * <p>
689      * @return IRemoteCacheListener, the listener for this remote server
690      */
691     public IRemoteCacheListener getListener()
692     {
693         return listener;
694     }
695 
696     /***
697      * @param elementSerializer The elementSerializer to set.
698      */
699     public void setElementSerializer( IElementSerializer elementSerializer )
700     {
701         this.elementSerializer = elementSerializer;
702     }
703 
704     /***
705      * @return Returns the elementSerializer.
706      */
707     public IElementSerializer getElementSerializer()
708     {
709         return elementSerializer;
710     }
711 
712     /***
713      * Debugging info.
714      * @return basic info about the RemoteCache
715      */
716     public String toString()
717     {
718         return "RemoteCache: " + cacheName + " attributes = " + irca;
719     }
720 }