View Javadoc

1   package org.apache.jcs.auxiliary.lateral.socket.tcp;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.IOException;
23  import java.io.ObjectInputStream;
24  import java.io.ObjectOutputStream;
25  import java.io.Serializable;
26  import java.net.InetAddress;
27  import java.net.ServerSocket;
28  import java.net.Socket;
29  import java.util.HashMap;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.jcs.auxiliary.lateral.LateralCacheInfo;
34  import org.apache.jcs.auxiliary.lateral.LateralElementDescriptor;
35  import org.apache.jcs.auxiliary.lateral.behavior.ILateralCacheListener;
36  import org.apache.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
37  import org.apache.jcs.engine.behavior.ICacheElement;
38  import org.apache.jcs.engine.behavior.ICompositeCacheManager;
39  import org.apache.jcs.engine.control.CompositeCache;
40  import org.apache.jcs.engine.control.CompositeCacheManager;
41  
42  import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
43  import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
44  
45  /***
46   * Listens for connections from other TCP lateral caches and handles them. The
47   * initialization method starts a listening thread, which creates a socket
48   * server. When messages are received they are passed to a pooled executor which
49   * then calls the appropriate handle method.
50   */
51  public class LateralTCPListener
52      implements ILateralCacheListener, Serializable
53  {
54      private static final long serialVersionUID = -9107062664967131738L;
55  
56      private final static Log log = LogFactory.getLog( LateralTCPListener.class );
57  
58      /*** How long the server will block on an accept(). 0 is infinte. */
59      private final static int acceptTimeOut = 0;
60  
61      /*** The CacheHub this listener is associated with */
62      private transient ICompositeCacheManager cacheManager;
63  
64      /*** Map of available instances, keyed by port */
65      protected final static HashMap instances = new HashMap();
66  
67      /*** The socket listener */
68      private ListenerThread receiver;
69  
70      private ITCPLateralCacheAttributes tcpLateralCacheAttributes;
71  
72      private int port;
73  
74      private PooledExecutor pooledExecutor;
75  
76      private int putCnt = 0;
77  
78      private int removeCnt = 0;
79  
80      private int getCnt = 0;
81  
82      /***
83       * Use the vmid by default. This can be set for testing. If we ever need to
84       * run more than one per vm, then we need a new technique.
85       */
86      private long listenerId = LateralCacheInfo.listenerId;
87  
88      /***
89       * Gets the instance attribute of the LateralCacheTCPListener class.
90       * <p>
91       * @param ilca
92       *            ITCPLateralCacheAttributes
93       * @param cacheMgr
94       * @return The instance value
95       */
96      public synchronized static ILateralCacheListener getInstance( ITCPLateralCacheAttributes ilca,
97                                                                   ICompositeCacheManager cacheMgr )
98      {
99          ILateralCacheListener ins = (ILateralCacheListener) instances.get( String.valueOf( ilca.getTcpListenerPort() ) );
100 
101         if ( ins == null )
102         {
103             ins = new LateralTCPListener( ilca );
104 
105             ins.init();
106 
107             ins.setCacheManager( cacheMgr );
108 
109             instances.put( String.valueOf( ilca.getTcpListenerPort() ), ins );
110 
111             if ( log.isDebugEnabled() )
112             {
113                 log.debug( "created new listener " + ilca.getTcpListenerPort() );
114             }
115         }
116 
117         return ins;
118     }
119 
120     /***
121      * Only need one since it does work for all regions, just reference by
122      * multiple region names.
123      * <p>
124      * @param ilca
125      */
126     protected LateralTCPListener( ITCPLateralCacheAttributes ilca )
127     {
128         this.setTcpLateralCacheAttributes( ilca );
129     }
130 
131     /***
132      * This starts the ListenerThread on the specified port.
133      */
134     public void init()
135     {
136         try
137         {
138             this.port = getTcpLateralCacheAttributes().getTcpListenerPort();
139 
140             receiver = new ListenerThread();
141             receiver.setDaemon( true );
142             receiver.start();
143 
144             pooledExecutor = new PooledExecutor();
145             pooledExecutor.setThreadFactory( new MyThreadFactory() );
146         }
147         catch ( Exception ex )
148         {
149             log.error( ex );
150 
151             throw new IllegalStateException( ex.getMessage() );
152         }
153     }
154 
155     /***
156      * Let the lateral cache set a listener_id. Since there is only one
157      * listerenr for all the regions and every region gets registered? the id
158      * shouldn't be set if it isn't zero. If it is we assume that it is a
159      * reconnect.
160      * <p>
161      * By default, the listener id is the vmid.
162      * <p>
163      * The service should set this value. This value will never be changed by a
164      * server we connect to. It needs to be non static, for unit tests.
165      * <p>
166      * The service will use the value it sets in all send requests to the
167      * sender.
168      * <p>
169      * @param id
170      *            The new listenerId value
171      * @throws IOException
172      */
173     public void setListenerId( long id )
174         throws IOException
175     {
176         this.listenerId = id;
177         if ( log.isDebugEnabled() )
178         {
179             log.debug( "set listenerId = " + id );
180         }
181     }
182 
183     /***
184      * Gets the listenerId attribute of the LateralCacheTCPListener object
185      * <p>
186      * @return The listenerId value
187      * @throws IOException
188      */
189     public long getListenerId()
190         throws IOException
191     {
192         return this.listenerId;
193     }
194 
195     /***
196      * Increments the put count. Gets the cache that was injected by the lateral
197      * factory. Calls put on the cache.
198      * <p>
199      * @see org.apache.jcs.engine.behavior.ICacheListener#handlePut(org.apache.jcs.engine.behavior.ICacheElement)
200      */
201     public void handlePut( ICacheElement element )
202         throws IOException
203     {
204         putCnt++;
205         if ( log.isInfoEnabled() )
206         {
207             if ( getPutCnt() % 100 == 0 )
208             {
209                 log.info( "Put Count (port " + getTcpLateralCacheAttributes().getTcpListenerPort() + ") = "
210                     + getPutCnt() );
211             }
212         }
213 
214         if ( log.isDebugEnabled() )
215         {
216             log.debug( "handlePut> cacheName=" + element.getCacheName() + ", key=" + element.getKey() );
217         }
218 
219         getCache( element.getCacheName() ).localUpdate( element );
220     }
221 
222     /***
223      * Increments the remove count. Gets the cache that was injected by the
224      * lateral factory. Calls remove on the cache.
225      * <p>
226      * @see org.apache.jcs.engine.behavior.ICacheListener#handleRemove(java.lang.String,
227      *      java.io.Serializable)
228      */
229     public void handleRemove( String cacheName, Serializable key )
230         throws IOException
231     {
232         removeCnt++;
233         if ( log.isInfoEnabled() )
234         {
235             if ( getRemoveCnt() % 100 == 0 )
236             {
237                 log.info( "Remove Count = " + getRemoveCnt() );
238             }
239         }
240 
241         if ( log.isDebugEnabled() )
242         {
243             log.debug( "handleRemove> cacheName=" + cacheName + ", key=" + key );
244         }
245 
246         getCache( cacheName ).localRemove( key );
247     }
248 
249     /***
250      * Gets the cache that was injected by the lateral factory. Calls removeAll
251      * on the cache.
252      * <p>
253      * @see org.apache.jcs.engine.behavior.ICacheListener#handleRemoveAll(java.lang.String)
254      */
255     public void handleRemoveAll( String cacheName )
256         throws IOException
257     {
258         if ( log.isDebugEnabled() )
259         {
260             log.debug( "handleRemoveAll> cacheName=" + cacheName );
261         }
262 
263         getCache( cacheName ).localRemoveAll();
264     }
265 
266     /***
267      * Gets the cache that was injected by the lateral factory. Calls get on the
268      * cache.
269      * <p>
270      * @param cacheName
271      * @param key
272      * @return Serializable
273      * @throws IOException
274      */
275     public Serializable handleGet( String cacheName, Serializable key )
276         throws IOException
277     {
278         getCnt++;
279         if ( log.isInfoEnabled() )
280         {
281             if ( getGetCnt() % 100 == 0 )
282             {
283                 log.info( "Get Count (port " + getTcpLateralCacheAttributes().getTcpListenerPort() + ") = "
284                     + getGetCnt() );
285             }
286         }
287 
288         if ( log.isDebugEnabled() )
289         {
290             log.debug( "handleGet> cacheName=" + cacheName + ", key = " + key );
291         }
292 
293         return getCache( cacheName ).localGet( key );
294     }
295 
296     /***
297      * Right now this does nothing.
298      * <p>
299      * @see org.apache.jcs.engine.behavior.ICacheListener#handleDispose(java.lang.String)
300      */
301     public void handleDispose( String cacheName )
302         throws IOException
303     {
304         if ( log.isInfoEnabled() )
305         {
306             log.info( "handleDispose > cacheName=" + cacheName );
307         }
308 
309         // TODO handle active deregistration, rather than passive detection
310         // through error
311         // getCacheManager().freeCache( cacheName, true );
312     }
313 
314     /***
315      * Gets the cacheManager attribute of the LateralCacheTCPListener object.
316      * <p>
317      * Normally this is set by the factory. If it wasn't set the listener
318      * defaults to the expected singleton behavior of the cache amanger.
319      * <p>
320      * @param name
321      * @return CompositeCache
322      */
323     protected CompositeCache getCache( String name )
324     {
325         if ( getCacheManager() == null )
326         {
327             // revert to singleton on failure
328             setCacheManager( CompositeCacheManager.getInstance() );
329 
330             if ( log.isDebugEnabled() )
331             {
332                 log.debug( "cacheMgr = " + getCacheManager() );
333             }
334         }
335 
336         return getCacheManager().getCache( name );
337     }
338 
339     /***
340      * This is roughly the number of updates the lateral has received.
341      * <p>
342      * @return Returns the putCnt.
343      */
344     public int getPutCnt()
345     {
346         return putCnt;
347     }
348 
349     /***
350      * @return Returns the getCnt.
351      */
352     public int getGetCnt()
353     {
354         return getCnt;
355     }
356 
357     /***
358      * @return Returns the removeCnt.
359      */
360     public int getRemoveCnt()
361     {
362         return removeCnt;
363     }
364 
365     /***
366      * @param cacheMgr
367      *            The cacheMgr to set.
368      */
369     public void setCacheManager( ICompositeCacheManager cacheMgr )
370     {
371         this.cacheManager = cacheMgr;
372     }
373 
374     /***
375      * @return Returns the cacheMgr.
376      */
377     public ICompositeCacheManager getCacheManager()
378     {
379         return cacheManager;
380     }
381 
382     /***
383      * @param tcpLateralCacheAttributes
384      *            The tcpLateralCacheAttributes to set.
385      */
386     public void setTcpLateralCacheAttributes( ITCPLateralCacheAttributes tcpLateralCacheAttributes )
387     {
388         this.tcpLateralCacheAttributes = tcpLateralCacheAttributes;
389     }
390 
391     /***
392      * @return Returns the tcpLateralCacheAttributes.
393      */
394     public ITCPLateralCacheAttributes getTcpLateralCacheAttributes()
395     {
396         return tcpLateralCacheAttributes;
397     }
398 
399     /***
400      * Processes commands from the server socket. There should be one listener
401      * for each configured TCP lateral.
402      */
403     public class ListenerThread
404         extends Thread
405     {
406         /*** Main processing method for the ListenerThread object */
407         public void run()
408         {
409             try
410             {
411                 log.info( "Listening on port " + port );
412 
413                 ServerSocket serverSocket = new ServerSocket( port );
414                 serverSocket.setSoTimeout( acceptTimeOut );
415 
416                 ConnectionHandler handler;
417 
418                 while ( true )
419                 {
420                     if ( log.isDebugEnabled() )
421                     {
422                         log.debug( "Waiting for clients to connect " );
423                     }
424 
425                     Socket socket = serverSocket.accept();
426 
427                     if ( log.isDebugEnabled() )
428                     {
429                         InetAddress inetAddress = socket.getInetAddress();
430 
431                         log.debug( "Connected to client at " + inetAddress );
432                     }
433 
434                     handler = new ConnectionHandler( socket );
435 
436                     pooledExecutor.execute( handler );
437                 }
438             }
439             catch ( Exception e )
440             {
441                 log.error( "Exception caught in TCP listener", e );
442             }
443         }
444     }
445 
446     /***
447      * A Separate thread taht runs when a command comes into the
448      * LateralTCPReceiver.
449      */
450     public class ConnectionHandler
451         implements Runnable
452     {
453         private Socket socket;
454 
455         /***
456          * Construct for a given socket
457          * @param socket
458          */
459         public ConnectionHandler( Socket socket )
460         {
461             this.socket = socket;
462         }
463 
464         /***
465          * Main processing method for the LateralTCPReceiverConnection object
466          */
467         public void run()
468         {
469             ObjectInputStream ois;
470 
471             try
472             {
473                 ois = new ObjectInputStream( socket.getInputStream() );
474             }
475             catch ( Exception e )
476             {
477                 log.error( "Could not open ObjectInputStream on " + socket, e );
478 
479                 return;
480             }
481 
482             LateralElementDescriptor led;
483 
484             try
485             {
486                 while ( true )
487                 {
488                     led = (LateralElementDescriptor) ois.readObject();
489 
490                     if ( led == null )
491                     {
492                         log.debug( "LateralElementDescriptor is null" );
493                         continue;
494                     }
495                     if ( led.requesterId == getListenerId() )
496                     {
497                         log.debug( "from self" );
498                     }
499                     else
500                     {
501                         if ( log.isDebugEnabled() )
502                         {
503                             log.debug( "receiving LateralElementDescriptor from another" + "led = " + led
504                                 + ", led.command = " + led.command + ", led.ce = " + led.ce );
505                         }
506 
507                         handle( led );
508                     }
509                 }
510             }
511             catch ( java.io.EOFException e )
512             {
513                 log.info( "Caught java.io.EOFException closing connection." );
514             }
515             catch ( java.net.SocketException e )
516             {
517                 log.info( "Caught java.net.SocketException closing connection." );
518             }
519             catch ( Exception e )
520             {
521                 log.error( "Unexpected exception.", e );
522             }
523 
524             try
525             {
526                 ois.close();
527             }
528             catch ( Exception e )
529             {
530                 log.error( "Could not close object input stream.", e );
531             }
532         }
533 
534         /***
535          * This calls the appropriate method, based on the command sent in the
536          * Lateral element descriptor.
537          * <p>
538          * @param led
539          * @throws IOException
540          */
541         private void handle( LateralElementDescriptor led )
542             throws IOException
543         {
544             String cacheName = led.ce.getCacheName();
545             Serializable key = led.ce.getKey();
546 
547             if ( led.command == LateralElementDescriptor.UPDATE )
548             {
549                 handlePut( led.ce );
550             }
551             else if ( led.command == LateralElementDescriptor.REMOVE )
552             {
553                 // if a hashcode was given and filtering is on
554                 // check to see if they are the same
555                 // if so, then don't remvoe, otherwise issue a remove
556                 if ( led.valHashCode != -1 )
557                 {
558                     if ( getTcpLateralCacheAttributes().isFilterRemoveByHashCode() )
559                     {
560                         ICacheElement test = getCache( cacheName ).localGet( key );
561                         if ( test != null )
562                         {
563                             if ( test.getVal().hashCode() == led.valHashCode )
564                             {
565                                 if ( log.isDebugEnabled() )
566                                 {
567                                     log.debug( "Filtering detected identical hashCode [" + led.valHashCode
568                                         + "], not issuing a remove for led " + led );
569                                 }
570                                 return;
571                             }
572                             else
573                             {
574                                 if ( log.isDebugEnabled() )
575                                 {
576                                     log.debug( "Different hashcodes, in cache [" + test.getVal().hashCode()
577                                         + "] sent [" + led.valHashCode + "]" );
578                                 }
579                             }
580                         }
581                     }
582                 }
583                 handleRemove( cacheName, key );
584             }
585             else if ( led.command == LateralElementDescriptor.REMOVEALL )
586             {
587                 handleRemoveAll( cacheName );
588             }
589             else if ( led.command == LateralElementDescriptor.GET )
590             {
591                 Serializable obj = handleGet( cacheName, key );
592 
593                 ObjectOutputStream oos = new ObjectOutputStream( socket.getOutputStream() );
594 
595                 if ( oos != null )
596                 {
597                     oos.writeObject( obj );
598                     oos.flush();
599                 }
600             }
601         }
602     }
603 
604     /***
605      * Allows us to set the daemon status on the executor threads
606      * <p>
607      * @author aaronsm
608      */
609     class MyThreadFactory
610         implements ThreadFactory
611     {
612         /*
613          * (non-Javadoc)
614          * @see EDU.oswego.cs.dl.util.concurrent.ThreadFactory#newThread(java.lang.Runnable)
615          */
616         public Thread newThread( Runnable runner )
617         {
618             Thread t = new Thread( runner );
619             t.setDaemon( true );
620             t.setPriority( Thread.MIN_PRIORITY );
621             return t;
622         }
623     }
624 }