Coverage report

  %line %branch
org.apache.jcs.auxiliary.lateral.socket.tcp.LateralTCPSender
42% 
77% 

 1  
 package org.apache.jcs.auxiliary.lateral.socket.tcp;
 2  
 
 3  
 /*
 4  
  * Licensed to the Apache Software Foundation (ASF) under one
 5  
  * or more contributor license agreements.  See the NOTICE file
 6  
  * distributed with this work for additional information
 7  
  * regarding copyright ownership.  The ASF licenses this file
 8  
  * to you under the Apache License, Version 2.0 (the
 9  
  * "License"); you may not use this file except in compliance
 10  
  * with the License.  You may obtain a copy of the License at
 11  
  *
 12  
  *   http://www.apache.org/licenses/LICENSE-2.0
 13  
  *
 14  
  * Unless required by applicable law or agreed to in writing,
 15  
  * software distributed under the License is distributed on an
 16  
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 17  
  * KIND, either express or implied.  See the License for the
 18  
  * specific language governing permissions and limitations
 19  
  * under the License.
 20  
  */
 21  
 
 22  
 import java.io.BufferedReader;
 23  
 import java.io.IOException;
 24  
 import java.io.InputStreamReader;
 25  
 import java.io.ObjectInputStream;
 26  
 import java.io.ObjectOutputStream;
 27  
 import java.net.InetAddress;
 28  
 import java.net.Socket;
 29  
 
 30  
 import org.apache.commons.logging.Log;
 31  
 import org.apache.commons.logging.LogFactory;
 32  
 import org.apache.jcs.auxiliary.lateral.LateralElementDescriptor;
 33  
 import org.apache.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
 34  
 import org.apache.jcs.auxiliary.lateral.socket.tcp.utils.SocketOpener;
 35  
 import org.apache.jcs.engine.CacheElement;
 36  
 import org.apache.jcs.engine.behavior.ICacheElement;
 37  
 
 38  
 /**
 39  
  * This class is based on the log4j SocketAppender class. I'm using a differnet repair structure, so
 40  
  * it is significantly different.
 41  
  */
 42  
 public class LateralTCPSender
 43  
 {
 44  28
     private final static Log log = LogFactory.getLog( LateralTCPSender.class );
 45  
 
 46  
     private ITCPLateralCacheAttributes tcpLateralCacheAttributes;
 47  
 
 48  
     private String remoteHost;
 49  
 
 50  
     private InetAddress address;
 51  
 
 52  34
     int port = 1111;
 53  
 
 54  
     private ObjectOutputStream oos;
 55  
 
 56  
     private Socket socket;
 57  
 
 58  34
     int counter = 0;
 59  
 
 60  34
     private int sendCnt = 0;
 61  
 
 62  
     // reset the ObjectOutputStream every 70 calls
 63  
     // private static final int RESET_FREQUENCY = 70;
 64  
     // Perhaps we need to resett every time until we move to jdk 1.4
 65  
     // then we can call writeUnshared to make sure
 66  
     // that the object definetely gets across and not
 67  
     // a stream cached version.
 68  
     // I can't replicate an issue that was reported, so I'm not changing the
 69  
     // reset frequency for now.
 70  
     private final static int RESET_FREQUENCY = 70;
 71  
 
 72  
     /**
 73  
      * Only block for 1 second before timing out on a read. TODO: make configurable. The default 1
 74  
      * is way too long.
 75  
      */
 76  
     private final static int timeOut = 1000;
 77  
 
 78  
     /** Only block for 5 seconds before timing out on startup. */
 79  
     private final static int openTimeOut = 5000;
 80  
 
 81  
     /** Use to synchronize multiple threads that may be trying to get. */
 82  34
     private Object getLock = new int[0];
 83  
 
 84  
     /**
 85  
      * Constructor for the LateralTCPSender object.
 86  
      * <p>
 87  
      * @param lca
 88  
      * @exception IOException
 89  
      */
 90  
     public LateralTCPSender( ITCPLateralCacheAttributes lca )
 91  
         throws IOException
 92  34
     {
 93  34
         this.setTcpLateralCacheAttributes( lca );
 94  
 
 95  34
         String p1 = lca.getTcpServer();
 96  34
         if ( p1 != null )
 97  
         {
 98  34
             String h2 = p1.substring( 0, p1.indexOf( ":" ) );
 99  34
             int po = Integer.parseInt( p1.substring( p1.indexOf( ":" ) + 1 ) );
 100  34
             if ( log.isDebugEnabled() )
 101  
             {
 102  0
                 log.debug( "h2 = " + h2 );
 103  0
                 log.debug( "po = " + po );
 104  
             }
 105  
 
 106  34
             if ( h2 == null )
 107  
             {
 108  0
                 throw new IOException( "Cannot connect to invalid address [" + h2 + ":" + po + "]" );
 109  
             }
 110  
 
 111  34
             init( h2, po );
 112  
         }
 113  20
     }
 114  
 
 115  
     /**
 116  
      * Creates a connection to a TCP server.
 117  
      * <p>
 118  
      * @param host
 119  
      * @param port
 120  
      * @throws IOException
 121  
      */
 122  
     protected void init( String host, int port )
 123  
         throws IOException
 124  
     {
 125  34
         this.port = port;
 126  34
         this.address = getAddressByName( host );
 127  34
         this.setRemoteHost( host );
 128  
 
 129  
         try
 130  
         {
 131  34
             if ( log.isInfoEnabled() )
 132  
             {
 133  34
                 log.info( "Attempting connection to [" + address.getHostName() + "]" );
 134  
             }
 135  
 
 136  
             // have time out socket open do this for us
 137  34
             socket = SocketOpener.openSocket( host, port, openTimeOut );
 138  
 
 139  34
             if ( socket == null )
 140  
             {
 141  14
                 throw new IOException( "Socket is null, cannot connect to " + host + ":" + port );
 142  
             }
 143  
 
 144  20
             socket.setSoTimeout( LateralTCPSender.timeOut );
 145  20
             synchronized ( this )
 146  
             {
 147  20
                 oos = new ObjectOutputStream( socket.getOutputStream() );
 148  20
             }
 149  
         }
 150  0
         catch ( java.net.ConnectException e )
 151  
         {
 152  0
             log.debug( "Remote host [" + address.getHostName() + "] refused connection." );
 153  0
             throw e;
 154  
         }
 155  14
         catch ( IOException e )
 156  
         {
 157  14
             log.debug( "Could not connect to [" + address.getHostName() + "]. Exception is " + e );
 158  14
             throw e;
 159  20
         }
 160  20
     }
 161  
 
 162  
     /**
 163  
      * Gets the addressByName attribute of the LateralTCPSender object.
 164  
      * <p>
 165  
      * @param host
 166  
      * @return The addressByName value
 167  
      * @throws IOException
 168  
      */
 169  
     private InetAddress getAddressByName( String host )
 170  
         throws IOException
 171  
     {
 172  
         try
 173  
         {
 174  34
             return InetAddress.getByName( host );
 175  
         }
 176  0
         catch ( Exception e )
 177  
         {
 178  0
             log.error( "Could not find address of [" + host + "] ", e );
 179  0
             throw new IOException( "Could not find address of [" + host + "] " + e.getMessage() );
 180  
         }
 181  
     }
 182  
 
 183  
     /**
 184  
      * Sends commands to the lateral cache listener.
 185  
      * <p>
 186  
      * @param led
 187  
      * @throws IOException
 188  
      */
 189  
     public void send( LateralElementDescriptor led )
 190  
         throws IOException
 191  
     {
 192  1419
         sendCnt++;
 193  1419
         if ( log.isInfoEnabled() )
 194  
         {
 195  1419
             if ( sendCnt % 100 == 0 )
 196  
             {
 197  14
                 log.info( "Send Count (port " + port + ") = " + sendCnt );
 198  
             }
 199  
         }
 200  
 
 201  1419
         if ( log.isDebugEnabled() )
 202  
         {
 203  0
             log.debug( "sending LateralElementDescriptor" );
 204  
         }
 205  
 
 206  1419
         if ( led == null )
 207  
         {
 208  0
             return;
 209  
         }
 210  
 
 211  1419
         if ( address == null )
 212  
         {
 213  0
             throw new IOException( "No remote host is set for LateralTCPSender." );
 214  
         }
 215  
 
 216  1419
         if ( oos != null )
 217  
         {
 218  1419
             synchronized ( this.getLock )
 219  
             {
 220  
                 try
 221  
                 {
 222  1419
                     oos.writeObject( led );
 223  1419
                     oos.flush();
 224  1419
                     if ( ++counter >= RESET_FREQUENCY )
 225  
                     {
 226  14
                         counter = 0;
 227  
                         // Failing to reset the object output stream every now and
 228  
                         // then creates a serious memory leak.
 229  14
                         if ( log.isDebugEnabled() )
 230  
                         {
 231  0
                             log.debug( "Doing oos.reset()" );
 232  
                         }
 233  14
                         oos.reset();
 234  
                     }
 235  
                 }
 236  0
                 catch ( IOException e )
 237  
                 {
 238  0
                     oos = null;
 239  0
                     log.error( "Detected problem with connection: " + e );
 240  0
                     throw e;
 241  1419
                 }
 242  1419
             }
 243  
         }
 244  1419
     }
 245  
 
 246  
     /**
 247  
      * Sends commands to the lateral cache listener and gets a response. I'm afraid that we could
 248  
      * get into a pretty bad blocking situation here. This needs work. I just wanted to get some
 249  
      * form of get working. However, get is not recommended for performance reasons. If you have 10
 250  
      * laterals, then you have to make 10 failed gets to find out none of the caches have the item.
 251  
      * <p>
 252  
      * @param led
 253  
      * @return
 254  
      * @throws IOException
 255  
      */
 256  
     public ICacheElement sendAndReceive( LateralElementDescriptor led )
 257  
         throws IOException
 258  
     {
 259  0
         ICacheElement ice = null;
 260  
 
 261  0
         if ( led == null )
 262  
         {
 263  0
             return null;
 264  
         }
 265  
 
 266  0
         if ( address == null )
 267  
         {
 268  0
             throw new IOException( "No remote host is set for LateralTCPSender." );
 269  
         }
 270  
 
 271  0
         if ( oos != null )
 272  
         {
 273  
             // Synchronized to insure that the get requests to server from this
 274  
             // sender and the responses are processed in order, else you could
 275  
             // return the wrong item from the cache.
 276  
             // This is a big block of code. May need to rethink this strategy.
 277  
             // This may not be necessary.
 278  
             // Normal puts, etc to laterals do not have to be synchronized.
 279  0
             synchronized ( this.getLock )
 280  
             {
 281  
                 try
 282  
                 {
 283  
                     try
 284  
                     {
 285  
                         // clean up input stream, nothing should be there yet.
 286  0
                         if ( socket.getInputStream().available() > 0 )
 287  
                         {
 288  0
                             socket.getInputStream().read( new byte[socket.getInputStream().available()] );
 289  
                         }
 290  
                     }
 291  0
                     catch ( IOException ioe )
 292  
                     {
 293  0
                         log.error( "Problem cleaning socket before send " + socket, ioe );
 294  0
                         throw ioe;
 295  0
                     }
 296  
 
 297  
                     // write object to listener
 298  0
                     oos.writeObject( led );
 299  0
                     oos.flush();
 300  
 
 301  
                     try
 302  
                     {
 303  
                         // TODO make configurable
 304  
                         // socket.setSoTimeout( 2000 );
 305  0
                         ObjectInputStream ois = new ObjectInputStream( socket.getInputStream() );
 306  0
                         Object obj = ois.readObject();
 307  0
                         ice = (ICacheElement) obj;
 308  0
                         if ( ice == null )
 309  
                         {
 310  
                             // p( "ice is null" );
 311  
                             // TODO: count misses
 312  
                         }
 313  
                     }
 314  0
                     catch ( IOException ioe )
 315  
                     {
 316  0
                         String message = "Could not open ObjectInputStream to " + socket;
 317  0
                         if ( socket != null )
 318  
                         {
 319  0
                             message += " SoTimeout [" + socket.getSoTimeout() + "]";
 320  
                             // this is 1.4 specific -- Connected [" + socket.isConnected() + "]";
 321  
                         }
 322  0
                         log.error( message, ioe );
 323  0
                         throw ioe;
 324  
                     }
 325  0
                     catch ( Exception e )
 326  
                     {
 327  0
                         log.error( e );
 328  0
                     }
 329  
 
 330  0
                     if ( ++counter >= RESET_FREQUENCY )
 331  
                     {
 332  0
                         counter = 0;
 333  
                         // Failing to reset the object output stream every now
 334  
                         // and
 335  
                         // then creates a serious memory leak.
 336  0
                         log.info( "Doing oos.reset()" );
 337  0
                         oos.reset();
 338  
                     }
 339  
                 }
 340  0
                 catch ( IOException e )
 341  
                 {
 342  0
                     oos = null;
 343  0
                     log.error( "Detected problem with connection: " + e );
 344  0
                     throw e;
 345  0
                 }
 346  0
             }
 347  
         }
 348  
 
 349  0
         return ice;
 350  
     }
 351  
 
 352  
     /**
 353  
      * Closes connection used by all LateralTCPSenders for this lateral conneciton. Dispose request
 354  
      * should come into the facade and be sent to all lateral cache sevices. The lateral cache
 355  
      * service will then call this method.
 356  
      * <p>
 357  
      * @param cache
 358  
      * @throws IOException
 359  
      */
 360  
     public void dispose( String cache )
 361  
         throws IOException
 362  
     {
 363  0
         if ( log.isInfoEnabled() )
 364  
         {
 365  0
             log.info( "Dispose called for cache [" + cache + "]" );
 366  
         }
 367  
         // WILL CLOSE CONNECTION USED BY ALL
 368  0
         oos.close();
 369  0
     }
 370  
 
 371  
     /**
 372  
      * @param tcpLateralCacheAttributes The tcpLateralCacheAttributes to set.
 373  
      */
 374  
     public void setTcpLateralCacheAttributes( ITCPLateralCacheAttributes tcpLateralCacheAttributes )
 375  
     {
 376  34
         this.tcpLateralCacheAttributes = tcpLateralCacheAttributes;
 377  34
     }
 378  
 
 379  
     /**
 380  
      * @return Returns the tcpLateralCacheAttributes.
 381  
      */
 382  
     public ITCPLateralCacheAttributes getTcpLateralCacheAttributes()
 383  
     {
 384  0
         return tcpLateralCacheAttributes;
 385  
     }
 386  
 
 387  
     /**
 388  
      * @param remoteHost The remoteHost to set.
 389  
      */
 390  
     public void setRemoteHost( String remoteHost )
 391  
     {
 392  34
         this.remoteHost = remoteHost;
 393  34
     }
 394  
 
 395  
     /**
 396  
      * @return Returns the remoteHost.
 397  
      */
 398  
     public String getRemoteHost()
 399  
     {
 400  0
         return remoteHost;
 401  
     }
 402  
 
 403  
     /**
 404  
      * This is a Testing Method. It should be moved to a unit test.
 405  
      * @param args
 406  
      */
 407  
     public static void main( String args[] )
 408  
     {
 409  
         try
 410  
         {
 411  0
             LateralTCPSender lur = null;
 412  
             // new LateralTCPSender( "localhost", 1111 );
 413  
 
 414  
             // process user input till done
 415  0
             boolean notDone = true;
 416  0
             String message = null;
 417  
             // wait to dispose
 418  0
             BufferedReader br = new BufferedReader( class="keyword">new InputStreamReader( System.in ) );
 419  
 
 420  0
             while ( notDone )
 421  
             {
 422  0
                 System.out.println( "enter mesage:" );
 423  0
                 message = br.readLine();
 424  0
                 CacheElement ce = new CacheElement( "test", "test", message );
 425  0
                 LateralElementDescriptor led = new LateralElementDescriptor( ce );
 426  0
                 lur.send( led );
 427  0
             }
 428  
         }
 429  0
         catch ( Exception e )
 430  
         {
 431  0
             System.out.println( e.toString() );
 432  0
         }
 433  0
     }
 434  
 }

This report is generated by jcoverage, Maven and Maven JCoverage Plugin.