1 package org.apache.jcs.auxiliary.lateral.socket.tcp.discovery;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.net.InetAddress;
23 import java.net.UnknownHostException;
24 import java.util.ArrayList;
25 import java.util.HashMap;
26 import java.util.Iterator;
27 import java.util.Map;
28 import java.util.Set;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.jcs.auxiliary.lateral.LateralCacheNoWait;
33 import org.apache.jcs.auxiliary.lateral.LateralCacheNoWaitFacade;
34 import org.apache.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
35 import org.apache.jcs.engine.behavior.ICompositeCacheManager;
36 import org.apache.jcs.engine.behavior.IShutdownObservable;
37 import org.apache.jcs.engine.behavior.IShutdownObserver;
38
39 import EDU.oswego.cs.dl.util.concurrent.ClockDaemon;
40 import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
41
42 /***
43 *
44 * This service creates a listener that can create lateral caches and add them
45 * to the no wait list.
46 * <p>
47 * It also creates a sender that periodically broadcasts its availability.
48 * <p>
49 * The sender also broadcasts a request for other caches to broadcast their
50 * addresses.
51 *
52 * @author Aaron Smuts
53 *
54 */
55 public class UDPDiscoveryService
56 implements IShutdownObserver
57 {
58 private final static Log log = LogFactory.getLog( UDPDiscoveryService.class );
59
60
61 private static ClockDaemon senderDaemon;
62
63
64 private Thread udpReceiverThread;
65
66
67 private UDPDiscoveryReceiver receiver;
68
69 private Map facades = new HashMap();
70
71
72 private UDPDiscoverySenderThread sender = null;
73
74 private String hostAddress = "unknown";
75
76 private String discoveryAddress;
77
78 private int discoveryPort;
79
80
81
82 private int servicePort;
83
84 private ITCPLateralCacheAttributes tcpLateralCacheAttributes;
85
86 /***
87 *
88 * @param discoveryAddress
89 * address to multicast to
90 * @param discoveryPort
91 * port to multicast to
92 * @param servicePort
93 * the port this service runs on, the service we are telling
94 * other about
95 * @param cacheMgr
96 */
97 public UDPDiscoveryService( String discoveryAddress, int discoveryPort, int servicePort,
98 ICompositeCacheManager cacheMgr )
99 {
100
101 ( (IShutdownObservable) cacheMgr ).registerShutdownObserver( this );
102
103 this.setDiscoveryAddress( discoveryAddress );
104 this.setDiscoveryPort( discoveryPort );
105 this.setServicePort( servicePort );
106
107 try
108 {
109
110 hostAddress = InetAddress.getLocalHost().getHostAddress();
111 if ( log.isDebugEnabled() )
112 {
113 log.debug( "hostAddress = [" + hostAddress + "]" );
114 }
115 }
116 catch ( UnknownHostException e1 )
117 {
118 log.error( "Couldn't get localhost address", e1 );
119 }
120
121 try
122 {
123
124 receiver = new UDPDiscoveryReceiver( this, getDiscoveryAddress(), getDiscoveryPort(), cacheMgr );
125 udpReceiverThread = new Thread( receiver );
126 udpReceiverThread.setDaemon( true );
127
128 udpReceiverThread.start();
129 }
130 catch ( Exception e )
131 {
132 log.error( "Problem creating UDPDiscoveryReceiver, address [" + getDiscoveryAddress() + "] port [" + getDiscoveryPort()
133 + "] we won't be able to find any other caches", e );
134 }
135
136
137
138 if ( senderDaemon == null )
139 {
140 senderDaemon = new ClockDaemon();
141 senderDaemon.setThreadFactory( new MyThreadFactory() );
142 }
143
144
145 sender = new UDPDiscoverySenderThread( getDiscoveryAddress(), getDiscoveryPort(), hostAddress, this
146 .getServicePort(), this.getCacheNames() );
147
148 senderDaemon.executePeriodically( 30 * 1000, sender, false );
149 }
150
151 /***
152 * Adds a nowait facade under this cachename. If one already existed, it
153 * will be overridden.
154 * <p>
155 * When a broadcast is received from the UDP Discovery receiver, for each
156 * cacheName in the message, the add no wait will be called here. To add a
157 * no wait, the facade is looked up for this cache name.
158 *
159 * @param facade
160 * @param cacheName
161 * @return true if the facade was not already registered.
162 */
163 public synchronized boolean addNoWaitFacade( LateralCacheNoWaitFacade facade, String cacheName )
164 {
165 boolean isNew = !facades.containsKey( cacheName );
166
167
168 facades.put( cacheName, facade );
169
170 if ( isNew )
171 {
172 if ( sender != null )
173 {
174
175 sender.setCacheNames( this.getCacheNames() );
176 }
177 }
178
179 return isNew;
180 }
181
182 /***
183 * This adds nowaits to a facde for the region name. If the region has no
184 * facade, then it is not configured to use the lateral cache, and no facde
185 * will be created.
186 *
187 * @param noWait
188 */
189 protected void addNoWait( LateralCacheNoWait noWait )
190 {
191 LateralCacheNoWaitFacade facade = (LateralCacheNoWaitFacade) facades.get( noWait.getCacheName() );
192 if ( log.isDebugEnabled() )
193 {
194 log.debug( "Got facade for " + noWait.getCacheName() + " = " + facade );
195 }
196
197 if ( facade != null )
198 {
199 boolean isNew = facade.addNoWait( noWait );
200 if ( log.isDebugEnabled() )
201 {
202 log.debug( "Called addNoWait, isNew = " + isNew );
203 }
204 }
205 else
206 {
207 if ( log.isInfoEnabled() )
208 {
209 log.info( "Different nodes are configured differently. Region [" + noWait.getCacheName()
210 + "] is not configured to use the lateral cache." );
211 }
212 }
213 }
214
215 /***
216 * Send a passive broadcast in response to a request broadcast. Never send a
217 * request for a request. We can respond to our own reques, since a request
218 * broadcast is not intended as a connection request. We might want to only
219 * send messages, so we would send a request, but never a passive broadcast.
220 *
221 */
222 protected void serviceRequestBroadcast()
223 {
224 UDPDiscoverySender sender = null;
225 try
226 {
227
228
229 sender = new UDPDiscoverySender( getDiscoveryAddress(), getDiscoveryPort() );
230
231 sender.passiveBroadcast( hostAddress, this.getServicePort(), this.getCacheNames() );
232
233
234
235
236 if ( log.isDebugEnabled() )
237 {
238 log.debug( "Called sender to issue a passive broadcast" );
239 }
240 }
241 catch ( Exception e )
242 {
243 log.error( "Problem calling the UDP Discovery Sender. address [" + getDiscoveryAddress() + "] port ["
244 + getDiscoveryPort() + "]", e );
245 }
246 finally
247 {
248 try
249 {
250 if ( sender != null )
251 {
252 sender.destroy();
253 }
254 }
255 catch ( Exception e )
256 {
257 log.error( "Problem closing Passive Broadcast sender, while servicing a request broadcast.", e );
258 }
259 }
260 }
261
262 /***
263 * Get all the cache names we have facades for.
264 *
265 * @return
266 */
267 protected ArrayList getCacheNames()
268 {
269 ArrayList keys = new ArrayList();
270 Set keySet = facades.keySet();
271 Iterator it = keySet.iterator();
272 while ( it.hasNext() )
273 {
274 String key = (String) it.next();
275 keys.add( key );
276 }
277 return keys;
278 }
279
280 /***
281 * Allows us to set the daemon status on the clockdaemon
282 *
283 * @author aaronsm
284 *
285 */
286 class MyThreadFactory
287 implements ThreadFactory
288 {
289
290
291
292
293
294 public Thread newThread( Runnable runner )
295 {
296 Thread t = new Thread( runner );
297 t.setDaemon( true );
298 t.setPriority( Thread.MIN_PRIORITY );
299 return t;
300 }
301 }
302
303
304
305
306
307
308 public void shutdown()
309 {
310 if ( log.isInfoEnabled() )
311 {
312 log.info( "Shutting down UDP discovery service receiver." );
313 }
314
315 try
316 {
317
318 receiver.shutdown();
319 udpReceiverThread.interrupt();
320 }
321 catch ( Exception e )
322 {
323 log.error( "Problem interrupting UDP receiver thread." );
324 }
325
326 if ( log.isInfoEnabled() )
327 {
328 log.info( "Shutting down UDP discovery service sender." );
329 }
330
331 try
332 {
333
334 senderDaemon.shutDown();
335 }
336 catch ( Exception e )
337 {
338 log.error( "Problem shutting down UDP sender." );
339 }
340 }
341
342 /***
343 * @param discoveryAddress
344 * The discoveryAddress to set.
345 */
346 protected void setDiscoveryAddress( String discoveryAddress )
347 {
348 this.discoveryAddress = discoveryAddress;
349 }
350
351 /***
352 * @return Returns the discoveryAddress.
353 */
354 protected String getDiscoveryAddress()
355 {
356 return discoveryAddress;
357 }
358
359 /***
360 * @param discoveryPort
361 * The discoveryPort to set.
362 */
363 protected void setDiscoveryPort( int discoveryPort )
364 {
365 this.discoveryPort = discoveryPort;
366 }
367
368 /***
369 * @return Returns the discoveryPort.
370 */
371 protected int getDiscoveryPort()
372 {
373 return discoveryPort;
374 }
375
376 /***
377 * @param servicePort
378 * The servicePort to set.
379 */
380 protected void setServicePort( int servicePort )
381 {
382 this.servicePort = servicePort;
383 }
384
385 /***
386 * @return Returns the servicePort.
387 */
388 protected int getServicePort()
389 {
390 return servicePort;
391 }
392
393 /***
394 * @param tCPLateralCacheAttributes
395 * The tCPLateralCacheAttributes to set.
396 */
397 public void setTcpLateralCacheAttributes( ITCPLateralCacheAttributes tCPLateralCacheAttributes )
398 {
399 tcpLateralCacheAttributes = tCPLateralCacheAttributes;
400 }
401
402 /***
403 * @return Returns the tCPLateralCacheAttributes.
404 */
405 public ITCPLateralCacheAttributes getTcpLateralCacheAttributes()
406 {
407 return tcpLateralCacheAttributes;
408 }
409 }