Coverage report

  %line %branch
org.apache.jcs.auxiliary.lateral.socket.tcp.discovery.UDPDiscoveryReceiver$MessageHandler
15% 
55% 

 1  
 package org.apache.jcs.auxiliary.lateral.socket.tcp.discovery;
 2  
 
 3  
 /*
 4  
  * Licensed to the Apache Software Foundation (ASF) under one
 5  
  * or more contributor license agreements.  See the NOTICE file
 6  
  * distributed with this work for additional information
 7  
  * regarding copyright ownership.  The ASF licenses this file
 8  
  * to you under the Apache License, Version 2.0 (the
 9  
  * "License"); you may not use this file except in compliance
 10  
  * with the License.  You may obtain a copy of the License at
 11  
  *
 12  
  *   http://www.apache.org/licenses/LICENSE-2.0
 13  
  *
 14  
  * Unless required by applicable law or agreed to in writing,
 15  
  * software distributed under the License is distributed on an
 16  
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 17  
  * KIND, either express or implied.  See the License for the
 18  
  * specific language governing permissions and limitations
 19  
  * under the License.
 20  
  */
 21  
 
 22  
 import java.io.ByteArrayInputStream;
 23  
 import java.io.IOException;
 24  
 import java.io.ObjectInputStream;
 25  
 import java.net.DatagramPacket;
 26  
 import java.net.InetAddress;
 27  
 import java.net.MulticastSocket;
 28  
 import java.util.ArrayList;
 29  
 import java.util.Iterator;
 30  
 
 31  
 import org.apache.commons.logging.Log;
 32  
 import org.apache.commons.logging.LogFactory;
 33  
 import org.apache.jcs.auxiliary.lateral.LateralCacheAttributes;
 34  
 import org.apache.jcs.auxiliary.lateral.LateralCacheInfo;
 35  
 import org.apache.jcs.auxiliary.lateral.LateralCacheNoWait;
 36  
 import org.apache.jcs.auxiliary.lateral.socket.tcp.LateralTCPCacheManager;
 37  
 import org.apache.jcs.auxiliary.lateral.socket.tcp.TCPLateralCacheAttributes;
 38  
 import org.apache.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
 39  
 import org.apache.jcs.engine.behavior.ICache;
 40  
 import org.apache.jcs.engine.behavior.ICompositeCacheManager;
 41  
 import org.apache.jcs.engine.behavior.IShutdownObserver;
 42  
 
 43  
 import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
 44  
 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
 45  
 import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
 46  
 
 47  
 /**
 48  
  * Receives UDP Discovery messages.
 49  
  */
 50  
 public class UDPDiscoveryReceiver
 51  
     implements Runnable, IShutdownObserver
 52  
 {
 53  
     private final static Log log = LogFactory.getLog( UDPDiscoveryReceiver.class );
 54  
 
 55  
     private final byte[] m_buffer = new byte[65536];
 56  
 
 57  
     private MulticastSocket m_socket;
 58  
 
 59  
     // todo consider using the threadpool manager to
 60  
     // get this thread pool
 61  
     // for now place a tight restrcition on the pool size
 62  
     private static final int maxPoolSize = 10;
 63  
 
 64  
     private PooledExecutor pooledExecutor = null;
 65  
 
 66  
     // number of messages received.
 67  
     private int cnt = 0;
 68  
 
 69  
     /**
 70  
      * Service to get cache names and hande request broadcasts
 71  
      */
 72  
     protected UDPDiscoveryService service = null;
 73  
 
 74  
     private String multicastAddressString = "";
 75  
 
 76  
     private int multicastPort = 0;
 77  
 
 78  
     private ICompositeCacheManager cacheMgr;
 79  
 
 80  
     private boolean shutdown = false;
 81  
 
 82  
     /**
 83  
      * Constructor for the LateralUDPReceiver object.
 84  
      * <p>
 85  
      * We determine out own host using InetAddress
 86  
      *
 87  
      * @param service
 88  
      * @param multicastAddressString
 89  
      * @param multicastPort
 90  
      * @param cacheMgr
 91  
      * @exception IOException
 92  
      */
 93  
     public UDPDiscoveryReceiver( UDPDiscoveryService service, String multicastAddressString, int multicastPort,
 94  
                                 ICompositeCacheManager cacheMgr )
 95  
         throws IOException
 96  
     {
 97  
         this.service = service;
 98  
         this.multicastAddressString = multicastAddressString;
 99  
         this.multicastPort = multicastPort;
 100  
         this.cacheMgr = cacheMgr;
 101  
 
 102  
         // create a small thread pool to handle a barage
 103  
         pooledExecutor = new PooledExecutor( class="keyword">new BoundedBuffer( 100 ), maxPoolSize );
 104  
         pooledExecutor.discardOldestWhenBlocked();
 105  
         //pooledExecutor.setMinimumPoolSize(1);
 106  
         pooledExecutor.setThreadFactory( new MyThreadFactory() );
 107  
 
 108  
         if ( log.isInfoEnabled() )
 109  
         {
 110  
             log.info( "constructing listener, [" + this.multicastAddressString + ":" + this.multicastPort + "]" );
 111  
         }
 112  
 
 113  
         try
 114  
         {
 115  
             createSocket( this.multicastAddressString, class="keyword">this.multicastPort );
 116  
         }
 117  
         catch ( IOException ioe )
 118  
         {
 119  
             // consider eatign this so we can go on, or constructing the socket
 120  
             // later
 121  
             throw ioe;
 122  
         }
 123  
     }
 124  
 
 125  
     /**
 126  
      * Creates the socket for this class.
 127  
      *
 128  
      * @param multicastAddressString
 129  
      * @param multicastPort
 130  
      * @throws IOException
 131  
      */
 132  
     private void createSocket( String multicastAddressString, int multicastPort )
 133  
         throws IOException
 134  
     {
 135  
         try
 136  
         {
 137  
             m_socket = new MulticastSocket( multicastPort );
 138  
             m_socket.joinGroup( InetAddress.getByName( multicastAddressString ) );
 139  
         }
 140  
         catch ( IOException e )
 141  
         {
 142  
             log.error( "Could not bind to multicast address [" + multicastAddressString + ":" + multicastPort + "]", e );
 143  
             throw e;
 144  
         }
 145  
     }
 146  
 
 147  
     /**
 148  
      * Highly unreliable. If it is processing one message while another comes in ,
 149  
      * the second message is lost. This is for low concurency peppering.
 150  
      *
 151  
      * @return the object message
 152  
      * @throws IOException
 153  
      */
 154  
     public Object waitForMessage()
 155  
         throws IOException
 156  
     {
 157  
         final DatagramPacket packet = new DatagramPacket( m_buffer, m_buffer.length );
 158  
 
 159  
         Object obj = null;
 160  
         try
 161  
         {
 162  
             m_socket.receive( packet );
 163  
 
 164  
             final ByteArrayInputStream byteStream = new ByteArrayInputStream( m_buffer, 0, packet.getLength() );
 165  
 
 166  
             final ObjectInputStream objectStream = new ObjectInputStream( byteStream );
 167  
 
 168  
             obj = objectStream.readObject();
 169  
 
 170  
         }
 171  
         catch ( Exception e )
 172  
         {
 173  
             log.error( "Error receving multicast packet", e );
 174  
         }
 175  
         return obj;
 176  
     }
 177  
 
 178  
     /** Main processing method for the LateralUDPReceiver object */
 179  
     public void run()
 180  
     {
 181  
         try
 182  
         {
 183  
             while ( !shutdown )
 184  
             {
 185  
                 Object obj = waitForMessage();
 186  
 
 187  
                 // not thread safe, but just for debugging
 188  
                 cnt++;
 189  
 
 190  
                 if ( log.isDebugEnabled() )
 191  
                 {
 192  
                     log.debug( getCnt() + " messages received." );
 193  
                 }
 194  
 
 195  
                 UDPDiscoveryMessage message = null;
 196  
 
 197  
                 try
 198  
                 {
 199  
                     message = (UDPDiscoveryMessage) obj;
 200  
                     // check for null
 201  
                     if ( message != null )
 202  
                     {
 203  
                         MessageHandler handler = new MessageHandler( message );
 204  
 
 205  
                         pooledExecutor.execute( handler );
 206  
 
 207  
                         if ( log.isDebugEnabled() )
 208  
                         {
 209  
                             log.debug( "Passed handler to executor." );
 210  
                         }
 211  
                     }
 212  
                     else
 213  
                     {
 214  
                         log.warn( "message is null" );
 215  
                     }
 216  
                 }
 217  
                 catch ( ClassCastException cce )
 218  
                 {
 219  
                     log.warn( "Received unknown message type " + cce.getMessage() );
 220  
                 }
 221  
             } // end while
 222  
         }
 223  
         catch ( Exception e )
 224  
         {
 225  
             log.error( "Unexpected exception in UDP receiver.", e );
 226  
             try
 227  
             {
 228  
                 Thread.sleep( 100 );
 229  
                 // TODO consider some failure count so we don't do this
 230  
                 // forever.
 231  
             }
 232  
             catch ( Exception e2 )
 233  
             {
 234  
                 log.error( "Problem sleeping", e2 );
 235  
             }
 236  
         }
 237  
         return;
 238  
     }
 239  
 
 240  
     /**
 241  
      * @param cnt
 242  
      *            The cnt to set.
 243  
      */
 244  
     public void setCnt( int cnt )
 245  
     {
 246  
         this.cnt = cnt;
 247  
     }
 248  
 
 249  
     /**
 250  
      * @return Returns the cnt.
 251  
      */
 252  
     public int getCnt()
 253  
     {
 254  
         return cnt;
 255  
     }
 256  
 
 257  
     /**
 258  
      * Separate thread run when a command comes into the UDPDiscoveryReceiver.
 259  
      */
 260  
     public class MessageHandler
 261  
         implements Runnable
 262  
     {
 263  13
         private UDPDiscoveryMessage message = null;
 264  
 
 265  
         /**
 266  
          * @param message
 267  
          */
 268  
         public MessageHandler( UDPDiscoveryMessage message )
 269  13
         {
 270  13
             this.message = message;
 271  13
         }
 272  
 
 273  
         /*
 274  
          * (non-Javadoc)
 275  
          *
 276  
          * @see java.lang.Runnable#run()
 277  
          */
 278  
         public void run()
 279  
         {
 280  
             // consider comparing ports here instead.
 281  13
             if ( message.getRequesterId() == LateralCacheInfo.listenerId )
 282  
             {
 283  13
                 if ( log.isDebugEnabled() )
 284  
                 {
 285  0
                     log.debug( "from self" );
 286  0
                 }
 287  
             }
 288  
             else
 289  
             {
 290  0
                 if ( log.isDebugEnabled() )
 291  
                 {
 292  0
                     log.debug( "from another" );
 293  0
                     log.debug( "Message = " + message );
 294  
                 }
 295  
 
 296  
                 // if this is a request message, have the service handle it and
 297  
                 // return
 298  0
                 if ( message.getMessageType() == UDPDiscoveryMessage.REQUEST_BROADCAST )
 299  
                 {
 300  0
                     if ( log.isDebugEnabled() )
 301  
                     {
 302  0
                         log.debug( "Message is a Request Broadcase, will have the service handle it." );
 303  
                     }
 304  0
                     service.serviceRequestBroadcast();
 305  0
                     return;
 306  
                 }
 307  
 
 308  
                 try
 309  
                 {
 310  
                     // get a cache and add it to the no waits
 311  
                     // the add method should not add the same.
 312  
                     // we need the listener port from the original config.
 313  0
                     ITCPLateralCacheAttributes lca = null;
 314  0
                     if ( service.getTcpLateralCacheAttributes() != null )
 315  
                     {
 316  0
                         lca = (ITCPLateralCacheAttributes) service.getTcpLateralCacheAttributes().copy();
 317  0
                     }
 318  
                     else
 319  
                     {
 320  0
                         lca = new TCPLateralCacheAttributes();
 321  
                     }
 322  0
                     lca.setTransmissionType( LateralCacheAttributes.TCP );
 323  0
                     lca.setTcpServer( message.getHost() + ":" + message.getPort() );
 324  0
                     LateralTCPCacheManager lcm = LateralTCPCacheManager.getInstance( lca, cacheMgr );
 325  
 
 326  0
                     ArrayList regions = message.getCacheNames();
 327  0
                     if ( regions != null )
 328  
                     {
 329  
                         // for each region get the cache
 330  0
                         Iterator it = regions.iterator();
 331  0
                         while ( it.hasNext() )
 332  
                         {
 333  0
                             String cacheName = (String) it.next();
 334  
 
 335  
                             try
 336  
                             {
 337  0
                                 ICache ic = lcm.getCache( cacheName );
 338  
 
 339  0
                                 if ( log.isDebugEnabled() )
 340  
                                 {
 341  0
                                     log.debug( "Got cache, ic = " + ic );
 342  
                                 }
 343  
 
 344  
                                 // add this to the nowaits for this cachename
 345  0
                                 if ( ic != null )
 346  
                                 {
 347  0
                                     service.addNoWait( (LateralCacheNoWait) ic );
 348  0
                                     if ( log.isDebugEnabled() )
 349  
                                     {
 350  0
                                         log.debug( "Called addNoWait for cacheName " + cacheName );
 351  
                                     }
 352  
                                 }
 353  
                             }
 354  0
                             catch ( Exception e )
 355  
                             {
 356  0
                                 log.error( "Problem creating no wait", e );
 357  0
                             }
 358  0
                         }
 359  
                         // end while
 360  0
                     }
 361  
                     else
 362  
                     {
 363  0
                         log.warn( "No cache names found in message " + message );
 364  
                     }
 365  
                 }
 366  0
                 catch ( Exception e )
 367  
                 {
 368  0
                     log.error( "Problem getting lateral maanger", e );
 369  0
                 }
 370  
             }
 371  13
         }
 372  
     }
 373  
 
 374  
     /**
 375  
      * Allows us to set the daemon status on the executor threads
 376  
      *
 377  
      * @author aaronsm
 378  
      *
 379  
      */
 380  
     class MyThreadFactory
 381  
         implements ThreadFactory
 382  
     {
 383  
         /*
 384  
          * (non-Javadoc)
 385  
          *
 386  
          * @see EDU.oswego.cs.dl.util.concurrent.ThreadFactory#newThread(java.lang.Runnable)
 387  
          */
 388  
         public Thread newThread( Runnable runner )
 389  
         {
 390  
             Thread t = new Thread( runner );
 391  
             t.setDaemon( true );
 392  
             t.setPriority( Thread.MIN_PRIORITY );
 393  
             return t;
 394  
         }
 395  
     }
 396  
 
 397  
     /*
 398  
      * (non-Javadoc)
 399  
      *
 400  
      * @see org.apache.jcs.engine.behavior.ShutdownObserver#shutdown()
 401  
      */
 402  
     public void shutdown()
 403  
     {
 404  
         try
 405  
         {
 406  
             shutdown = true;
 407  
             m_socket.close();
 408  
             pooledExecutor.shutdownNow();
 409  
         }
 410  
         catch ( Exception e )
 411  
         {
 412  
             log.error( "Problem closing socket" );
 413  
         }
 414  
     }
 415  
 }
 416  
 // end class

This report is generated by jcoverage, Maven and Maven JCoverage Plugin.