Coverage report

  %line %branch
org.apache.jcs.auxiliary.lateral.socket.tcp.discovery.UDPDiscoveryService
51% 
81% 

 1  
 package org.apache.jcs.auxiliary.lateral.socket.tcp.discovery;
 2  
 
 3  
 /*
 4  
  * Licensed to the Apache Software Foundation (ASF) under one
 5  
  * or more contributor license agreements.  See the NOTICE file
 6  
  * distributed with this work for additional information
 7  
  * regarding copyright ownership.  The ASF licenses this file
 8  
  * to you under the Apache License, Version 2.0 (the
 9  
  * "License"); you may not use this file except in compliance
 10  
  * with the License.  You may obtain a copy of the License at
 11  
  *
 12  
  *   http://www.apache.org/licenses/LICENSE-2.0
 13  
  *
 14  
  * Unless required by applicable law or agreed to in writing,
 15  
  * software distributed under the License is distributed on an
 16  
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 17  
  * KIND, either express or implied.  See the License for the
 18  
  * specific language governing permissions and limitations
 19  
  * under the License.
 20  
  */
 21  
 
 22  
 import java.net.InetAddress;
 23  
 import java.net.UnknownHostException;
 24  
 import java.util.ArrayList;
 25  
 import java.util.HashMap;
 26  
 import java.util.Iterator;
 27  
 import java.util.Map;
 28  
 import java.util.Set;
 29  
 
 30  
 import org.apache.commons.logging.Log;
 31  
 import org.apache.commons.logging.LogFactory;
 32  
 import org.apache.jcs.auxiliary.lateral.LateralCacheNoWait;
 33  
 import org.apache.jcs.auxiliary.lateral.LateralCacheNoWaitFacade;
 34  
 import org.apache.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
 35  
 import org.apache.jcs.engine.behavior.ICompositeCacheManager;
 36  
 import org.apache.jcs.engine.behavior.IShutdownObservable;
 37  
 import org.apache.jcs.engine.behavior.IShutdownObserver;
 38  
 
 39  
 import EDU.oswego.cs.dl.util.concurrent.ClockDaemon;
 40  
 import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
 41  
 
 42  
 /**
 43  
  *
 44  
  * This service creates a listener that can create lateral caches and add them
 45  
  * to the no wait list.
 46  
  * <p>
 47  
  * It also creates a sender that periodically broadcasts its availability.
 48  
  * <p>
 49  
  * The sender also broadcasts a request for other caches to broadcast their
 50  
  * addresses.
 51  
  *
 52  
  * @author Aaron Smuts
 53  
  *
 54  
  */
 55  
 public class UDPDiscoveryService
 56  
     implements IShutdownObserver
 57  
 {
 58  26
     private final static Log log = LogFactory.getLog( UDPDiscoveryService.class );
 59  
 
 60  
     // The background broadcaster.
 61  
     private static ClockDaemon senderDaemon;
 62  
 
 63  
     // thread that listens for messages
 64  
     private Thread udpReceiverThread;
 65  
 
 66  
     // the runanble that the receiver thread runs
 67  
     private UDPDiscoveryReceiver receiver;
 68  
 
 69  13
     private Map facades = new HashMap();
 70  
 
 71  
     // the runanble that sends messages via the clock daemon
 72  13
     private UDPDiscoverySenderThread sender = null;
 73  
 
 74  13
     private String hostAddress = "unknown";
 75  
 
 76  
     private String discoveryAddress;
 77  
 
 78  
     private int discoveryPort;
 79  
 
 80  
     // the port this service runs on, the service we are telling other about
 81  
     // we should have a service info object instead
 82  
     private int servicePort;
 83  
 
 84  
     private ITCPLateralCacheAttributes tcpLateralCacheAttributes;
 85  
 
 86  
     /**
 87  
      *
 88  
      * @param discoveryAddress
 89  
      *            address to multicast to
 90  
      * @param discoveryPort
 91  
      *            port to multicast to
 92  
      * @param servicePort
 93  
      *            the port this service runs on, the service we are telling
 94  
      *            other about
 95  
      * @param cacheMgr
 96  
      */
 97  
     public UDPDiscoveryService( String discoveryAddress, int discoveryPort, class="keyword">int servicePort,
 98  
                                ICompositeCacheManager cacheMgr )
 99  13
     {
 100  
         // register for shutdown notification
 101  13
         ( (IShutdownObservable) cacheMgr ).registerShutdownObserver( this );
 102  
 
 103  13
         this.setDiscoveryAddress( discoveryAddress );
 104  13
         this.setDiscoveryPort( discoveryPort );
 105  13
         this.setServicePort( servicePort );
 106  
 
 107  
         try
 108  
         {
 109  
             // todo, you should be able to set this
 110  13
             hostAddress = InetAddress.getLocalHost().getHostAddress();
 111  13
             if ( log.isDebugEnabled() )
 112  
             {
 113  0
                 log.debug( "hostAddress = [" + hostAddress + "]" );
 114  
             }
 115  
         }
 116  0
         catch ( UnknownHostException e1 )
 117  
         {
 118  0
             log.error( "Couldn't get localhost address", e1 );
 119  13
         }
 120  
 
 121  
         try
 122  
         {
 123  
             // todo need some kind of recovery here.
 124  13
             receiver = new UDPDiscoveryReceiver( this, getDiscoveryAddress(), getDiscoveryPort(), cacheMgr );
 125  13
             udpReceiverThread = new Thread( receiver );
 126  13
             udpReceiverThread.setDaemon( true );
 127  
             // udpReceiverThread.setName( t.getName() + "--UDPReceiver" );
 128  13
             udpReceiverThread.start();
 129  
         }
 130  0
         catch ( Exception e )
 131  
         {
 132  0
             log.error( "Problem creating UDPDiscoveryReceiver, address [" + getDiscoveryAddress() + "] port [" + getDiscoveryPort()
 133  
                 + "] we won't be able to find any other caches", e );
 134  13
         }
 135  
 
 136  
         // todo only do the passive if receive is inenabled, perhaps set the
 137  
         // myhost to null or something on the request
 138  13
         if ( senderDaemon == null )
 139  
         {
 140  13
             senderDaemon = new ClockDaemon();
 141  13
             senderDaemon.setThreadFactory( new MyThreadFactory() );
 142  
         }
 143  
 
 144  
         // create a sender thread
 145  13
         sender = new UDPDiscoverySenderThread( getDiscoveryAddress(), getDiscoveryPort(), hostAddress, this
 146  
             .getServicePort(), this.getCacheNames() );
 147  
 
 148  13
         senderDaemon.executePeriodically( 30 * 1000, sender, false );
 149  13
     }
 150  
 
 151  
     /**
 152  
      * Adds a nowait facade under this cachename. If one already existed, it
 153  
      * will be overridden.
 154  
      * <p>
 155  
      * When a broadcast is received from the UDP Discovery receiver, for each
 156  
      * cacheName in the message, the add no wait will be called here. To add a
 157  
      * no wait, the facade is looked up for this cache name.
 158  
      *
 159  
      * @param facade
 160  
      * @param cacheName
 161  
      * @return true if the facade was not already registered.
 162  
      */
 163  
     public synchronized boolean addNoWaitFacade( LateralCacheNoWaitFacade facade, String cacheName )
 164  
     {
 165  26
         boolean isNew = !facades.containsKey( cacheName );
 166  
 
 167  
         // override or put anew, it doesn't matter
 168  26
         facades.put( cacheName, facade );
 169  
 
 170  26
         if ( isNew )
 171  
         {
 172  26
             if ( sender != null )
 173  
             {
 174  
                 // need to reset the cache names since we have a new one
 175  26
                 sender.setCacheNames( this.getCacheNames() );
 176  
             }
 177  
         }
 178  
 
 179  26
         return isNew;
 180  
     }
 181  
 
 182  
     /**
 183  
      * This adds nowaits to a facde for the region name. If the region has no
 184  
      * facade, then it is not configured to use the lateral cache, and no facde
 185  
      * will be created.
 186  
      *
 187  
      * @param noWait
 188  
      */
 189  
     protected void addNoWait( LateralCacheNoWait noWait )
 190  
     {
 191  0
         LateralCacheNoWaitFacade facade = (LateralCacheNoWaitFacade) facades.get( noWait.getCacheName() );
 192  0
         if ( log.isDebugEnabled() )
 193  
         {
 194  0
             log.debug( "Got facade for " + noWait.getCacheName() + " = " + facade );
 195  
         }
 196  
 
 197  0
         if ( facade != null )
 198  
         {
 199  0
             boolean isNew = facade.addNoWait( noWait );
 200  0
             if ( log.isDebugEnabled() )
 201  
             {
 202  0
                 log.debug( "Called addNoWait, isNew = " + isNew );
 203  
             }
 204  0
         }
 205  
         else
 206  
         {
 207  0
             if ( log.isInfoEnabled() )
 208  
             {
 209  0
                 log.info( "Different nodes are configured differently.  Region [" + noWait.getCacheName()
 210  
                     + "] is not configured to use the lateral cache." );
 211  
             }
 212  
         }
 213  0
     }
 214  
 
 215  
     /**
 216  
      * Send a passive broadcast in response to a request broadcast. Never send a
 217  
      * request for a request. We can respond to our own reques, since a request
 218  
      * broadcast is not intended as a connection request. We might want to only
 219  
      * send messages, so we would send a request, but never a passive broadcast.
 220  
      *
 221  
      */
 222  
     protected void serviceRequestBroadcast()
 223  
     {
 224  0
         UDPDiscoverySender sender = null;
 225  
         try
 226  
         {
 227  
             // create this connection each time.
 228  
             // more robust
 229  0
             sender = new UDPDiscoverySender( getDiscoveryAddress(), getDiscoveryPort() );
 230  
 
 231  0
             sender.passiveBroadcast( hostAddress, this.getServicePort(), class="keyword">this.getCacheNames() );
 232  
 
 233  
             // todo we should consider sending a request broadcast every so
 234  
             // often.
 235  
 
 236  0
             if ( log.isDebugEnabled() )
 237  
             {
 238  0
                 log.debug( "Called sender to issue a passive broadcast" );
 239  
             }
 240  
         }
 241  0
         catch ( Exception e )
 242  
         {
 243  0
             log.error( "Problem calling the UDP Discovery Sender. address [" + getDiscoveryAddress() + "] port ["
 244  
                 + getDiscoveryPort() + "]", e );
 245  
         }
 246  
         finally
 247  
         {
 248  0
             try
 249  
             {
 250  0
                 if ( sender != null )
 251  
                 {
 252  0
                     sender.destroy();
 253  
                 }
 254  
             }
 255  0
             catch ( Exception e )
 256  
             {
 257  0
                 log.error( "Problem closing Passive Broadcast sender, while servicing a request broadcast.", e );
 258  0
             }
 259  0
         }
 260  0
     }
 261  
 
 262  
     /**
 263  
      * Get all the cache names we have facades for.
 264  
      *
 265  
      * @return
 266  
      */
 267  
     protected ArrayList getCacheNames()
 268  
     {
 269  39
         ArrayList keys = new ArrayList();
 270  39
         Set keySet = facades.keySet();
 271  39
         Iterator it = keySet.iterator();
 272  78
         while ( it.hasNext() )
 273  
         {
 274  39
             String key = (String) it.next();
 275  39
             keys.add( key );
 276  39
         }
 277  39
         return keys;
 278  
     }
 279  
 
 280  
     /**
 281  
      * Allows us to set the daemon status on the clockdaemon
 282  
      *
 283  
      * @author aaronsm
 284  
      *
 285  
      */
 286  
     class MyThreadFactory
 287  
         implements ThreadFactory
 288  
     {
 289  
         /*
 290  
          * (non-Javadoc)
 291  
          *
 292  
          * @see EDU.oswego.cs.dl.util.concurrent.ThreadFactory#newThread(java.lang.Runnable)
 293  
          */
 294  
         public Thread newThread( Runnable runner )
 295  
         {
 296  
             Thread t = new Thread( runner );
 297  
             t.setDaemon( true );
 298  
             t.setPriority( Thread.MIN_PRIORITY );
 299  
             return t;
 300  
         }
 301  
     }
 302  
 
 303  
     /*
 304  
      * (non-Javadoc)
 305  
      *
 306  
      * @see org.apache.jcs.engine.behavior.ShutdownObserver#shutdown()
 307  
      */
 308  
     public void shutdown()
 309  
     {
 310  0
         if ( log.isInfoEnabled() )
 311  
         {
 312  0
             log.info( "Shutting down UDP discovery service receiver." );
 313  
         }
 314  
 
 315  
         try
 316  
         {
 317  
             // no good way to do this right now.
 318  0
             receiver.shutdown();
 319  0
             udpReceiverThread.interrupt();
 320  
         }
 321  0
         catch ( Exception e )
 322  
         {
 323  0
             log.error( "Problem interrupting UDP receiver thread." );
 324  0
         }
 325  
 
 326  0
         if ( log.isInfoEnabled() )
 327  
         {
 328  0
             log.info( "Shutting down UDP discovery service sender." );
 329  
         }
 330  
 
 331  
         try
 332  
         {
 333  
             // interrupt all the threads.
 334  0
             senderDaemon.shutDown();
 335  
         }
 336  0
         catch ( Exception e )
 337  
         {
 338  0
             log.error( "Problem shutting down UDP sender." );
 339  0
         }
 340  0
     }
 341  
 
 342  
     /**
 343  
      * @param discoveryAddress
 344  
      *            The discoveryAddress to set.
 345  
      */
 346  
     protected void setDiscoveryAddress( String discoveryAddress )
 347  
     {
 348  13
         this.discoveryAddress = discoveryAddress;
 349  13
     }
 350  
 
 351  
     /**
 352  
      * @return Returns the discoveryAddress.
 353  
      */
 354  
     protected String getDiscoveryAddress()
 355  
     {
 356  26
         return discoveryAddress;
 357  
     }
 358  
 
 359  
     /**
 360  
      * @param discoveryPort
 361  
      *            The discoveryPort to set.
 362  
      */
 363  
     protected void setDiscoveryPort( int discoveryPort )
 364  
     {
 365  13
         this.discoveryPort = discoveryPort;
 366  13
     }
 367  
 
 368  
     /**
 369  
      * @return Returns the discoveryPort.
 370  
      */
 371  
     protected int getDiscoveryPort()
 372  
     {
 373  26
         return discoveryPort;
 374  
     }
 375  
 
 376  
     /**
 377  
      * @param servicePort
 378  
      *            The servicePort to set.
 379  
      */
 380  
     protected void setServicePort( int servicePort )
 381  
     {
 382  13
         this.servicePort = servicePort;
 383  13
     }
 384  
 
 385  
     /**
 386  
      * @return Returns the servicePort.
 387  
      */
 388  
     protected int getServicePort()
 389  
     {
 390  13
         return servicePort;
 391  
     }
 392  
 
 393  
     /**
 394  
      * @param tCPLateralCacheAttributes
 395  
      *            The tCPLateralCacheAttributes to set.
 396  
      */
 397  
     public void setTcpLateralCacheAttributes( ITCPLateralCacheAttributes tCPLateralCacheAttributes )
 398  
     {
 399  26
         tcpLateralCacheAttributes = tCPLateralCacheAttributes;
 400  26
     }
 401  
 
 402  
     /**
 403  
      * @return Returns the tCPLateralCacheAttributes.
 404  
      */
 405  
     public ITCPLateralCacheAttributes getTcpLateralCacheAttributes()
 406  
     {
 407  0
         return tcpLateralCacheAttributes;
 408  
     }
 409  
 }

This report is generated by jcoverage, Maven and Maven JCoverage Plugin.