View Javadoc

1   package org.apache.jcs.auxiliary.lateral.socket.tcp.discovery;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.net.InetAddress;
23  import java.net.UnknownHostException;
24  import java.util.ArrayList;
25  import java.util.HashMap;
26  import java.util.Iterator;
27  import java.util.Map;
28  import java.util.Set;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.jcs.auxiliary.lateral.LateralCacheNoWait;
33  import org.apache.jcs.auxiliary.lateral.LateralCacheNoWaitFacade;
34  import org.apache.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
35  import org.apache.jcs.engine.behavior.ICompositeCacheManager;
36  import org.apache.jcs.engine.behavior.IShutdownObservable;
37  import org.apache.jcs.engine.behavior.IShutdownObserver;
38  
39  import EDU.oswego.cs.dl.util.concurrent.ClockDaemon;
40  import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
41  
42  /***
43   *
44   * This service creates a listener that can create lateral caches and add them
45   * to the no wait list.
46   * <p>
47   * It also creates a sender that periodically broadcasts its availability.
48   * <p>
49   * The sender also broadcasts a request for other caches to broadcast their
50   * addresses.
51   *
52   * @author Aaron Smuts
53   *
54   */
55  public class UDPDiscoveryService
56      implements IShutdownObserver
57  {
58      private final static Log log = LogFactory.getLog( UDPDiscoveryService.class );
59  
60      // The background broadcaster.
61      private static ClockDaemon senderDaemon;
62  
63      // thread that listens for messages
64      private Thread udpReceiverThread;
65  
66      // the runanble that the receiver thread runs
67      private UDPDiscoveryReceiver receiver;
68  
69      private Map facades = new HashMap();
70  
71      // the runanble that sends messages via the clock daemon
72      private UDPDiscoverySenderThread sender = null;
73  
74      private String hostAddress = "unknown";
75  
76      private String discoveryAddress;
77  
78      private int discoveryPort;
79  
80      // the port this service runs on, the service we are telling other about
81      // we should have a service info object instead
82      private int servicePort;
83  
84      private ITCPLateralCacheAttributes tcpLateralCacheAttributes;
85  
86      /***
87       *
88       * @param discoveryAddress
89       *            address to multicast to
90       * @param discoveryPort
91       *            port to multicast to
92       * @param servicePort
93       *            the port this service runs on, the service we are telling
94       *            other about
95       * @param cacheMgr
96       */
97      public UDPDiscoveryService( String discoveryAddress, int discoveryPort, int servicePort,
98                                 ICompositeCacheManager cacheMgr )
99      {
100         // register for shutdown notification
101         ( (IShutdownObservable) cacheMgr ).registerShutdownObserver( this );
102 
103         this.setDiscoveryAddress( discoveryAddress );
104         this.setDiscoveryPort( discoveryPort );
105         this.setServicePort( servicePort );
106 
107         try
108         {
109             // todo, you should be able to set this
110             hostAddress = InetAddress.getLocalHost().getHostAddress();
111             if ( log.isDebugEnabled() )
112             {
113                 log.debug( "hostAddress = [" + hostAddress + "]" );
114             }
115         }
116         catch ( UnknownHostException e1 )
117         {
118             log.error( "Couldn't get localhost address", e1 );
119         }
120 
121         try
122         {
123             // todo need some kind of recovery here.
124             receiver = new UDPDiscoveryReceiver( this, getDiscoveryAddress(), getDiscoveryPort(), cacheMgr );
125             udpReceiverThread = new Thread( receiver );
126             udpReceiverThread.setDaemon( true );
127             // udpReceiverThread.setName( t.getName() + "--UDPReceiver" );
128             udpReceiverThread.start();
129         }
130         catch ( Exception e )
131         {
132             log.error( "Problem creating UDPDiscoveryReceiver, address [" + getDiscoveryAddress() + "] port [" + getDiscoveryPort()
133                 + "] we won't be able to find any other caches", e );
134         }
135 
136         // todo only do the passive if receive is inenabled, perhaps set the
137         // myhost to null or something on the request
138         if ( senderDaemon == null )
139         {
140             senderDaemon = new ClockDaemon();
141             senderDaemon.setThreadFactory( new MyThreadFactory() );
142         }
143 
144         // create a sender thread
145         sender = new UDPDiscoverySenderThread( getDiscoveryAddress(), getDiscoveryPort(), hostAddress, this
146             .getServicePort(), this.getCacheNames() );
147 
148         senderDaemon.executePeriodically( 30 * 1000, sender, false );
149     }
150 
151     /***
152      * Adds a nowait facade under this cachename. If one already existed, it
153      * will be overridden.
154      * <p>
155      * When a broadcast is received from the UDP Discovery receiver, for each
156      * cacheName in the message, the add no wait will be called here. To add a
157      * no wait, the facade is looked up for this cache name.
158      *
159      * @param facade
160      * @param cacheName
161      * @return true if the facade was not already registered.
162      */
163     public synchronized boolean addNoWaitFacade( LateralCacheNoWaitFacade facade, String cacheName )
164     {
165         boolean isNew = !facades.containsKey( cacheName );
166 
167         // override or put anew, it doesn't matter
168         facades.put( cacheName, facade );
169 
170         if ( isNew )
171         {
172             if ( sender != null )
173             {
174                 // need to reset the cache names since we have a new one
175                 sender.setCacheNames( this.getCacheNames() );
176             }
177         }
178 
179         return isNew;
180     }
181 
182     /***
183      * This adds nowaits to a facde for the region name. If the region has no
184      * facade, then it is not configured to use the lateral cache, and no facde
185      * will be created.
186      *
187      * @param noWait
188      */
189     protected void addNoWait( LateralCacheNoWait noWait )
190     {
191         LateralCacheNoWaitFacade facade = (LateralCacheNoWaitFacade) facades.get( noWait.getCacheName() );
192         if ( log.isDebugEnabled() )
193         {
194             log.debug( "Got facade for " + noWait.getCacheName() + " = " + facade );
195         }
196 
197         if ( facade != null )
198         {
199             boolean isNew = facade.addNoWait( noWait );
200             if ( log.isDebugEnabled() )
201             {
202                 log.debug( "Called addNoWait, isNew = " + isNew );
203             }
204         }
205         else
206         {
207             if ( log.isInfoEnabled() )
208             {
209                 log.info( "Different nodes are configured differently.  Region [" + noWait.getCacheName()
210                     + "] is not configured to use the lateral cache." );
211             }
212         }
213     }
214 
215     /***
216      * Send a passive broadcast in response to a request broadcast. Never send a
217      * request for a request. We can respond to our own reques, since a request
218      * broadcast is not intended as a connection request. We might want to only
219      * send messages, so we would send a request, but never a passive broadcast.
220      *
221      */
222     protected void serviceRequestBroadcast()
223     {
224         UDPDiscoverySender sender = null;
225         try
226         {
227             // create this connection each time.
228             // more robust
229             sender = new UDPDiscoverySender( getDiscoveryAddress(), getDiscoveryPort() );
230 
231             sender.passiveBroadcast( hostAddress, this.getServicePort(), this.getCacheNames() );
232 
233             // todo we should consider sending a request broadcast every so
234             // often.
235 
236             if ( log.isDebugEnabled() )
237             {
238                 log.debug( "Called sender to issue a passive broadcast" );
239             }
240         }
241         catch ( Exception e )
242         {
243             log.error( "Problem calling the UDP Discovery Sender. address [" + getDiscoveryAddress() + "] port ["
244                 + getDiscoveryPort() + "]", e );
245         }
246         finally
247         {
248             try
249             {
250                 if ( sender != null )
251                 {
252                     sender.destroy();
253                 }
254             }
255             catch ( Exception e )
256             {
257                 log.error( "Problem closing Passive Broadcast sender, while servicing a request broadcast.", e );
258             }
259         }
260     }
261 
262     /***
263      * Get all the cache names we have facades for.
264      *
265      * @return
266      */
267     protected ArrayList getCacheNames()
268     {
269         ArrayList keys = new ArrayList();
270         Set keySet = facades.keySet();
271         Iterator it = keySet.iterator();
272         while ( it.hasNext() )
273         {
274             String key = (String) it.next();
275             keys.add( key );
276         }
277         return keys;
278     }
279 
280     /***
281      * Allows us to set the daemon status on the clockdaemon
282      *
283      * @author aaronsm
284      *
285      */
286     class MyThreadFactory
287         implements ThreadFactory
288     {
289         /*
290          * (non-Javadoc)
291          *
292          * @see EDU.oswego.cs.dl.util.concurrent.ThreadFactory#newThread(java.lang.Runnable)
293          */
294         public Thread newThread( Runnable runner )
295         {
296             Thread t = new Thread( runner );
297             t.setDaemon( true );
298             t.setPriority( Thread.MIN_PRIORITY );
299             return t;
300         }
301     }
302 
303     /*
304      * (non-Javadoc)
305      *
306      * @see org.apache.jcs.engine.behavior.ShutdownObserver#shutdown()
307      */
308     public void shutdown()
309     {
310         if ( log.isInfoEnabled() )
311         {
312             log.info( "Shutting down UDP discovery service receiver." );
313         }
314 
315         try
316         {
317             // no good way to do this right now.
318             receiver.shutdown();
319             udpReceiverThread.interrupt();
320         }
321         catch ( Exception e )
322         {
323             log.error( "Problem interrupting UDP receiver thread." );
324         }
325 
326         if ( log.isInfoEnabled() )
327         {
328             log.info( "Shutting down UDP discovery service sender." );
329         }
330 
331         try
332         {
333             // interrupt all the threads.
334             senderDaemon.shutDown();
335         }
336         catch ( Exception e )
337         {
338             log.error( "Problem shutting down UDP sender." );
339         }
340     }
341 
342     /***
343      * @param discoveryAddress
344      *            The discoveryAddress to set.
345      */
346     protected void setDiscoveryAddress( String discoveryAddress )
347     {
348         this.discoveryAddress = discoveryAddress;
349     }
350 
351     /***
352      * @return Returns the discoveryAddress.
353      */
354     protected String getDiscoveryAddress()
355     {
356         return discoveryAddress;
357     }
358 
359     /***
360      * @param discoveryPort
361      *            The discoveryPort to set.
362      */
363     protected void setDiscoveryPort( int discoveryPort )
364     {
365         this.discoveryPort = discoveryPort;
366     }
367 
368     /***
369      * @return Returns the discoveryPort.
370      */
371     protected int getDiscoveryPort()
372     {
373         return discoveryPort;
374     }
375 
376     /***
377      * @param servicePort
378      *            The servicePort to set.
379      */
380     protected void setServicePort( int servicePort )
381     {
382         this.servicePort = servicePort;
383     }
384 
385     /***
386      * @return Returns the servicePort.
387      */
388     protected int getServicePort()
389     {
390         return servicePort;
391     }
392 
393     /***
394      * @param tCPLateralCacheAttributes
395      *            The tCPLateralCacheAttributes to set.
396      */
397     public void setTcpLateralCacheAttributes( ITCPLateralCacheAttributes tCPLateralCacheAttributes )
398     {
399         tcpLateralCacheAttributes = tCPLateralCacheAttributes;
400     }
401 
402     /***
403      * @return Returns the tCPLateralCacheAttributes.
404      */
405     public ITCPLateralCacheAttributes getTcpLateralCacheAttributes()
406     {
407         return tcpLateralCacheAttributes;
408     }
409 }