1 package org.apache.jcs.auxiliary.remote;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.IOException;
23 import java.io.Serializable;
24 import java.lang.reflect.InvocationTargetException;
25 import java.net.ServerSocket;
26 import java.net.Socket;
27 import java.rmi.server.RMISocketFactory;
28 import java.util.ArrayList;
29 import java.util.Set;
30
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.jcs.auxiliary.AuxiliaryCacheAttributes;
34 import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheAttributes;
35 import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheClient;
36 import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheListener;
37 import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheService;
38 import org.apache.jcs.engine.CacheConstants;
39 import org.apache.jcs.engine.behavior.ICacheElement;
40 import org.apache.jcs.engine.behavior.ICacheElementSerialized;
41 import org.apache.jcs.engine.behavior.IElementAttributes;
42 import org.apache.jcs.engine.behavior.IElementSerializer;
43 import org.apache.jcs.engine.behavior.IZombie;
44 import org.apache.jcs.engine.stats.StatElement;
45 import org.apache.jcs.engine.stats.Stats;
46 import org.apache.jcs.engine.stats.behavior.IStatElement;
47 import org.apache.jcs.engine.stats.behavior.IStats;
48 import org.apache.jcs.utils.serialization.SerializationConversionUtil;
49 import org.apache.jcs.utils.serialization.StandardSerializer;
50 import org.apache.jcs.utils.threadpool.ThreadPool;
51 import org.apache.jcs.utils.threadpool.ThreadPoolManager;
52
53 import EDU.oswego.cs.dl.util.concurrent.Callable;
54 import EDU.oswego.cs.dl.util.concurrent.FutureResult;
55 import EDU.oswego.cs.dl.util.concurrent.TimeoutException;
56
57 /***
58 * Client proxy for an RMI remote cache. This handles gets, updates, and removes. It also initiates
59 * failover recovery when an error is encountered.
60 */
61 public class RemoteCache
62 implements IRemoteCacheClient
63 {
64 private static final long serialVersionUID = -5329231850422826460L;
65
66 private final static Log log = LogFactory.getLog( RemoteCache.class );
67
68 final String cacheName;
69
70 private IRemoteCacheAttributes irca;
71
72 private IRemoteCacheService remote;
73
74 private IRemoteCacheListener listener;
75
76 private IElementAttributes attr = null;
77
78 private ThreadPool pool = null;
79
80 private boolean usePoolForGet = false;
81
82 private IElementSerializer elementSerializer = new StandardSerializer();
83
84 /***
85 * Constructor for the RemoteCache object. This object communicates with a remote cache server.
86 * One of these exists for each region. This also holds a reference to a listener. The same
87 * listener is used for all regions for one remote server. Holding a reference to the listener
88 * allows this object to know the listener id assigned by the remote cache.
89 * <p>
90 * @param cattr
91 * @param remote
92 * @param listener
93 */
94 public RemoteCache( IRemoteCacheAttributes cattr, IRemoteCacheService remote, IRemoteCacheListener listener )
95 {
96 this.irca = cattr;
97 this.cacheName = cattr.getCacheName();
98 this.remote = remote;
99 this.listener = listener;
100
101 if ( log.isDebugEnabled() )
102 {
103 log.debug( "Construct> cacheName=" + cattr.getCacheName() );
104 log.debug( "irca = " + irca );
105 log.debug( "remote = " + remote );
106 log.debug( "listener = " + listener );
107 }
108
109
110 if ( log.isDebugEnabled() )
111 {
112 log.debug( "GetTimeoutMillis() = " + irca.getGetTimeoutMillis() );
113 }
114
115 if ( irca.getGetTimeoutMillis() > 0 )
116 {
117 pool = ThreadPoolManager.getInstance().getPool( irca.getThreadPoolName() );
118 if ( log.isDebugEnabled() )
119 {
120 log.debug( "Thread Pool = " + pool );
121 }
122 if ( pool != null )
123 {
124 usePoolForGet = true;
125 }
126 }
127
128 try
129 {
130
131 if ( irca.getRmiSocketFactoryTimeoutMillis() > 0 )
132 {
133
134
135 RMISocketFactory.setSocketFactory( new RMISocketFactory()
136 {
137 public Socket createSocket( String host, int port )
138 throws IOException
139 {
140 Socket socket = new Socket( host, port );
141 socket.setSoTimeout( irca.getRmiSocketFactoryTimeoutMillis() );
142 socket.setSoLinger( false, 0 );
143 return socket;
144 }
145
146 public ServerSocket createServerSocket( int port )
147 throws IOException
148 {
149 return new ServerSocket( port );
150 }
151 } );
152 }
153 }
154 catch ( Exception e )
155 {
156
157
158 log.info( e.getMessage() );
159 }
160 }
161
162 /***
163 * Sets the attributes attribute of the RemoteCache object.
164 * <p>
165 * @param attr The new attributes value
166 */
167 public void setElementAttributes( IElementAttributes attr )
168 {
169 this.attr = attr;
170 }
171
172 /***
173 * Gets the attributes attribute of the RemoteCache object.
174 * <p>
175 * @return The attributes value
176 */
177 public IElementAttributes getElementAttributes()
178 {
179 return this.attr;
180 }
181
182 /***
183 * Serializes the object and then calls update on the remote server with the byte array. The
184 * byte array is wrapped in a ICacheElementSerialized. This allows the remote server to operate
185 * without any knowledge of caches classes.
186 * <p>
187 * (non-Javadoc)
188 * @see org.apache.jcs.engine.behavior.ICache#update(org.apache.jcs.engine.behavior.ICacheElement)
189 */
190 public void update( ICacheElement ce )
191 throws IOException
192 {
193 if ( true )
194 {
195 if ( !this.irca.getGetOnly() )
196 {
197 ICacheElementSerialized serialized = null;
198 try
199 {
200 if ( log.isDebugEnabled() )
201 {
202 log.debug( "sending item to remote server" );
203 }
204
205
206
207 serialized = SerializationConversionUtil
208 .getSerializedCacheElement( ce, this.elementSerializer );
209
210 remote.update( serialized, getListenerId() );
211 }
212 catch ( NullPointerException npe )
213 {
214 log.error( "npe for ce = " + ce + "ce.attr = " + ce.getElementAttributes(), npe );
215 return;
216 }
217 catch ( Exception ex )
218 {
219
220 handleException( ex, "Failed to put [" + ce.getKey() + "] to " + ce.getCacheName() );
221 }
222 }
223 else
224 {
225 if ( log.isDebugEnabled() )
226 {
227 log.debug( "get only mode, not sending to remote server" );
228 }
229 }
230 }
231 }
232
233 /***
234 * Synchronously get from the remote cache; if failed, replace the remote handle with a zombie.
235 * <p>
236 * Use threadpool to timeout if a value is set for GetTimeoutMillis
237 * <p>
238 * If we are a cluster client, we need to leave the Element in its serilaized form. Cluster
239 * cients cannot deserialize objects. Cluster clients get ICacheElementSerialized objects from
240 * other remote servers.
241 * <p>
242 * @param key
243 * @return ICacheElement, a wrapper around the key, value, and attributes
244 * @throws IOException
245 */
246 public ICacheElement get( Serializable key )
247 throws IOException
248 {
249 ICacheElement retVal = null;
250
251 try
252 {
253 if ( usePoolForGet )
254 {
255 retVal = getUsingPool( key );
256 }
257 else
258 {
259 retVal = remote.get( cacheName, key, getListenerId() );
260 }
261
262
263 if ( retVal != null && retVal instanceof ICacheElementSerialized )
264 {
265
266
267
268 if ( this.irca.getRemoteType() != IRemoteCacheAttributes.CLUSTER )
269 {
270 retVal = SerializationConversionUtil.getDeSerializedCacheElement( (ICacheElementSerialized) retVal,
271 this.elementSerializer );
272 }
273 }
274 }
275 catch ( Exception ex )
276 {
277 handleException( ex, "Failed to get [" + key + "] from [" + cacheName + "]" );
278 }
279
280 return retVal;
281 }
282
283 /***
284 * This allows gets to timeout in case of remote server machine shutdown.
285 * <p>
286 * @param key
287 * @return ICacheElement
288 * @throws IOException
289 */
290 public ICacheElement getUsingPool( final Serializable key )
291 throws IOException
292 {
293 int timeout = irca.getGetTimeoutMillis();
294
295 try
296 {
297 FutureResult future = new FutureResult();
298 Runnable command = future.setter( new Callable()
299 {
300 public Object call()
301 throws IOException
302 {
303 return remote.get( cacheName, key, getListenerId() );
304 }
305 } );
306
307
308 pool.execute( command );
309
310
311 ICacheElement ice = (ICacheElement) future.timedGet( timeout );
312 if ( log.isDebugEnabled() )
313 {
314 if ( ice == null )
315 {
316 log.debug( "nothing found in remote cache" );
317 }
318 else
319 {
320 log.debug( "found item in remote cache" );
321 }
322 }
323 return ice;
324 }
325 catch ( TimeoutException te )
326 {
327 log.warn( "TimeoutException, Get Request timed out after " + timeout );
328 throw new IOException( "Get Request timed out after " + timeout );
329 }
330 catch ( InterruptedException ex )
331 {
332 log.warn( "InterruptedException, Get Request timed out after " + timeout );
333 throw new IOException( "Get Request timed out after " + timeout );
334 }
335 catch ( InvocationTargetException ex )
336 {
337
338 log.error( "InvocationTargetException, Assuming an IO exception thrown in the background.", ex );
339 throw new IOException( "Get Request timed out after " + timeout );
340 }
341 }
342
343 /***
344 * Returns all the keys for a group.
345 * <p>
346 * @param groupName
347 * @return Set
348 * @throws java.rmi.RemoteException
349 */
350 public Set getGroupKeys( String groupName )
351 throws java.rmi.RemoteException
352 {
353 return remote.getGroupKeys( cacheName, groupName );
354 }
355
356 /***
357 * Synchronously remove from the remote cache; if failed, replace the remote handle with a
358 * zombie.
359 * <p>
360 * @param key
361 * @return boolean, whether or not the item was removed
362 * @throws IOException
363 */
364 public boolean remove( Serializable key )
365 throws IOException
366 {
367 if ( true )
368 {
369 if ( !this.irca.getGetOnly() )
370 {
371 if ( log.isDebugEnabled() )
372 {
373 log.debug( "remove> key=" + key );
374 }
375 try
376 {
377 remote.remove( cacheName, key, getListenerId() );
378 }
379 catch ( Exception ex )
380 {
381 handleException( ex, "Failed to remove " + key + " from " + cacheName );
382 }
383 }
384 }
385 return false;
386 }
387
388 /***
389 * Synchronously removeAll from the remote cache; if failed, replace the remote handle with a
390 * zombie.
391 * <p>
392 * @throws IOException
393 */
394 public void removeAll()
395 throws IOException
396 {
397 if ( true )
398 {
399 if ( !this.irca.getGetOnly() )
400 {
401 try
402 {
403 remote.removeAll( cacheName, getListenerId() );
404 }
405 catch ( Exception ex )
406 {
407 handleException( ex, "Failed to remove all from " + cacheName );
408 }
409 }
410 }
411 }
412
413 /***
414 * Synchronously dispose the remote cache; if failed, replace the remote handle with a zombie.
415 * <p>
416 * @throws IOException
417 */
418 public void dispose()
419 throws IOException
420 {
421 if ( log.isInfoEnabled() )
422 {
423 log.info( "Disposing of remote cache" );
424 }
425 try
426 {
427 listener.dispose();
428 }
429 catch ( Exception ex )
430 {
431 log.error( "Couldn't dispose", ex );
432 handleException( ex, "Failed to dispose [" + cacheName + "]" );
433 }
434 }
435
436 /***
437 * Returns the cache status. An error status indicates the remote connection is not available.
438 * <p>
439 * @return The status value
440 */
441 public int getStatus()
442 {
443 return remote instanceof IZombie ? CacheConstants.STATUS_ERROR : CacheConstants.STATUS_ALIVE;
444 }
445
446 /***
447 * Gets the stats attribute of the RemoteCache object.
448 * <p>
449 * @return The stats value
450 */
451 public String getStats()
452 {
453 return getStatistics().toString();
454 }
455
456 /***
457 * @return IStats object
458 */
459 public IStats getStatistics()
460 {
461 IStats stats = new Stats();
462 stats.setTypeName( "Remote Cache No Wait" );
463
464 ArrayList elems = new ArrayList();
465
466 IStatElement se = null;
467
468 se = new StatElement();
469 se.setName( "Remote Host:Port" );
470 se.setData( this.irca.getRemoteHost() + ":" + this.irca.getRemotePort() );
471 elems.add( se );
472
473 se = new StatElement();
474 se.setName( "Remote Type" );
475 se.setData( this.irca.getRemoteTypeName() + "" );
476 elems.add( se );
477
478 if ( this.irca.getRemoteType() == IRemoteCacheAttributes.CLUSTER )
479 {
480
481 }
482
483
484
485 se = new StatElement();
486 se.setName( "UsePoolForGet" );
487 se.setData( "" + usePoolForGet );
488 elems.add( se );
489
490 if ( pool != null )
491 {
492 se = new StatElement();
493 se.setName( "Pool Size" );
494 se.setData( "" + pool.getPool().getPoolSize() );
495 elems.add( se );
496
497 se = new StatElement();
498 se.setName( "Maximum Pool Size" );
499 se.setData( "" + pool.getPool().getMaximumPoolSize() );
500 elems.add( se );
501 }
502
503 if ( remote instanceof ZombieRemoteCacheService )
504 {
505 se = new StatElement();
506 se.setName( "Zombie Queue Size" );
507 se.setData( "" + ((ZombieRemoteCacheService)remote).getQueueSize() );
508 elems.add( se );
509 }
510
511
512 IStatElement[] ses = (IStatElement[]) elems.toArray( new StatElement[0] );
513 stats.setStatElements( ses );
514
515 return stats;
516 }
517
518 /***
519 * Returns the current cache size.
520 * @return The size value
521 */
522 public int getSize()
523 {
524 return 0;
525 }
526
527 /***
528 * Gets the cacheType attribute of the RemoteCache object
529 * @return The cacheType value
530 */
531 public int getCacheType()
532 {
533 return REMOTE_CACHE;
534 }
535
536 /***
537 * Gets the cacheName attribute of the RemoteCache object.
538 * <p>
539 * @return The cacheName value
540 */
541 public String getCacheName()
542 {
543 return cacheName;
544 }
545
546 /***
547 * Replaces the current remote cache service handle with the given handle.
548 * If the current remote is a Zombie, the propagate teh events that may be
549 * queued to the restored service.
550 * <p>
551 * @param remote IRemoteCacheService -- the remote server or proxy to the remote server
552 */
553 public void fixCache( IRemoteCacheService remote )
554 {
555 if ( this.remote != null && this.remote instanceof ZombieRemoteCacheService )
556 {
557 ZombieRemoteCacheService zombie = (ZombieRemoteCacheService)this.remote;
558 this.remote = remote;
559 try
560 {
561 zombie.propagateEvents( remote );
562 }
563 catch ( Exception e )
564 {
565 try
566 {
567 handleException( e, "Problem propagating events from Zombie Queue to new Remote Service." );
568 }
569 catch ( IOException e1 )
570 {
571
572 }
573 }
574 }
575 else
576 {
577 this.remote = remote;
578 }
579 return;
580 }
581
582 /***
583 * Handles exception by disabling the remote cache service before re-throwing the exception in
584 * the form of an IOException.
585 * <p>
586 * @param ex
587 * @param msg
588 * @throws IOException
589 */
590 private void handleException( Exception ex, String msg )
591 throws IOException
592 {
593 log.error( "Disabling remote cache due to error: " + msg , ex );
594
595
596 if ( remote == null || !(remote instanceof ZombieRemoteCacheService) )
597 {
598
599 remote = new ZombieRemoteCacheService( irca.getZombieQueueMaxSize() );
600 }
601
602
603
604 RemoteCacheMonitor.getInstance().notifyError();
605
606
607 RemoteCacheNoWaitFacade rcnwf = (RemoteCacheNoWaitFacade) RemoteCacheFactory.getFacades()
608 .get( irca.getCacheName() );
609
610 if ( log.isDebugEnabled() )
611 {
612 log.debug( "Initiating failover, rcnf = " + rcnwf );
613 }
614
615 if ( rcnwf != null && rcnwf.remoteCacheAttributes.getRemoteType() == RemoteCacheAttributes.LOCAL )
616 {
617 if ( log.isDebugEnabled() )
618 {
619 log.debug( "Found facade, calling failover" );
620 }
621
622
623 rcnwf.failover( 0 );
624 }
625
626 if ( ex instanceof IOException )
627 {
628 throw (IOException) ex;
629 }
630 throw new IOException( ex.getMessage() );
631 }
632
633 /***
634 * @return Returns the AuxiliaryCacheAttributes.
635 */
636 public AuxiliaryCacheAttributes getAuxiliaryCacheAttributes()
637 {
638 return irca;
639 }
640
641 /***
642 * let the remote cache set a listener_id. Since there is only one listerenr for all the regions
643 * and every region gets registered? the id shouldn't be set if it isn't zero. If it is we
644 * assume that it is a reconnect.
645 * @param id The new listenerId value
646 */
647 public void setListenerId( long id )
648 {
649 try
650 {
651 listener.setListenerId( id );
652
653 if ( log.isDebugEnabled() )
654 {
655 log.debug( "set listenerId = " + id );
656 }
657 }
658 catch ( Exception e )
659 {
660 log.error( "Problem setting listenerId", e );
661 }
662 }
663
664 /***
665 * Gets the listenerId attribute of the RemoteCacheListener object
666 * @return The listenerId value
667 */
668 public long getListenerId()
669 {
670 try
671 {
672 if ( log.isDebugEnabled() )
673 {
674 log.debug( "get listenerId = " + listener.getListenerId() );
675 }
676 return listener.getListenerId();
677 }
678 catch ( Exception e )
679 {
680 log.error( "Problem setting listenerId", e );
681 }
682 return -1;
683 }
684
685 /***
686 * Allows other member of this package to access the listerner. This is mainly needed for
687 * deregistering alistener.
688 * <p>
689 * @return IRemoteCacheListener, the listener for this remote server
690 */
691 public IRemoteCacheListener getListener()
692 {
693 return listener;
694 }
695
696 /***
697 * @param elementSerializer The elementSerializer to set.
698 */
699 public void setElementSerializer( IElementSerializer elementSerializer )
700 {
701 this.elementSerializer = elementSerializer;
702 }
703
704 /***
705 * @return Returns the elementSerializer.
706 */
707 public IElementSerializer getElementSerializer()
708 {
709 return elementSerializer;
710 }
711
712 /***
713 * Debugging info.
714 * @return basic info about the RemoteCache
715 */
716 public String toString()
717 {
718 return "RemoteCache: " + cacheName + " attributes = " + irca;
719 }
720 }