%line | %branch | |||||||||
---|---|---|---|---|---|---|---|---|---|---|
org.apache.jcs.auxiliary.lateral.socket.tcp.discovery.UDPDiscoveryService |
|
|
1 | package org.apache.jcs.auxiliary.lateral.socket.tcp.discovery; |
|
2 | ||
3 | /* |
|
4 | * Licensed to the Apache Software Foundation (ASF) under one |
|
5 | * or more contributor license agreements. See the NOTICE file |
|
6 | * distributed with this work for additional information |
|
7 | * regarding copyright ownership. The ASF licenses this file |
|
8 | * to you under the Apache License, Version 2.0 (the |
|
9 | * "License"); you may not use this file except in compliance |
|
10 | * with the License. You may obtain a copy of the License at |
|
11 | * |
|
12 | * http://www.apache.org/licenses/LICENSE-2.0 |
|
13 | * |
|
14 | * Unless required by applicable law or agreed to in writing, |
|
15 | * software distributed under the License is distributed on an |
|
16 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
|
17 | * KIND, either express or implied. See the License for the |
|
18 | * specific language governing permissions and limitations |
|
19 | * under the License. |
|
20 | */ |
|
21 | ||
22 | import java.net.InetAddress; |
|
23 | import java.net.UnknownHostException; |
|
24 | import java.util.ArrayList; |
|
25 | import java.util.HashMap; |
|
26 | import java.util.Iterator; |
|
27 | import java.util.Map; |
|
28 | import java.util.Set; |
|
29 | ||
30 | import org.apache.commons.logging.Log; |
|
31 | import org.apache.commons.logging.LogFactory; |
|
32 | import org.apache.jcs.auxiliary.lateral.LateralCacheNoWait; |
|
33 | import org.apache.jcs.auxiliary.lateral.LateralCacheNoWaitFacade; |
|
34 | import org.apache.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes; |
|
35 | import org.apache.jcs.engine.behavior.ICompositeCacheManager; |
|
36 | import org.apache.jcs.engine.behavior.IShutdownObservable; |
|
37 | import org.apache.jcs.engine.behavior.IShutdownObserver; |
|
38 | ||
39 | import EDU.oswego.cs.dl.util.concurrent.ClockDaemon; |
|
40 | import EDU.oswego.cs.dl.util.concurrent.ThreadFactory; |
|
41 | ||
42 | /** |
|
43 | * |
|
44 | * This service creates a listener that can create lateral caches and add them |
|
45 | * to the no wait list. |
|
46 | * <p> |
|
47 | * It also creates a sender that periodically broadcasts its availability. |
|
48 | * <p> |
|
49 | * The sender also broadcasts a request for other caches to broadcast their |
|
50 | * addresses. |
|
51 | * |
|
52 | * @author Aaron Smuts |
|
53 | * |
|
54 | */ |
|
55 | public class UDPDiscoveryService |
|
56 | implements IShutdownObserver |
|
57 | { |
|
58 | 26 | private final static Log log = LogFactory.getLog( UDPDiscoveryService.class ); |
59 | ||
60 | // The background broadcaster. |
|
61 | private static ClockDaemon senderDaemon; |
|
62 | ||
63 | // thread that listens for messages |
|
64 | private Thread udpReceiverThread; |
|
65 | ||
66 | // the runanble that the receiver thread runs |
|
67 | private UDPDiscoveryReceiver receiver; |
|
68 | ||
69 | 13 | private Map facades = new HashMap(); |
70 | ||
71 | // the runanble that sends messages via the clock daemon |
|
72 | 13 | private UDPDiscoverySenderThread sender = null; |
73 | ||
74 | 13 | private String hostAddress = "unknown"; |
75 | ||
76 | private String discoveryAddress; |
|
77 | ||
78 | private int discoveryPort; |
|
79 | ||
80 | // the port this service runs on, the service we are telling other about |
|
81 | // we should have a service info object instead |
|
82 | private int servicePort; |
|
83 | ||
84 | private ITCPLateralCacheAttributes tcpLateralCacheAttributes; |
|
85 | ||
86 | /** |
|
87 | * |
|
88 | * @param discoveryAddress |
|
89 | * address to multicast to |
|
90 | * @param discoveryPort |
|
91 | * port to multicast to |
|
92 | * @param servicePort |
|
93 | * the port this service runs on, the service we are telling |
|
94 | * other about |
|
95 | * @param cacheMgr |
|
96 | */ |
|
97 | public UDPDiscoveryService( String discoveryAddress, int discoveryPort, class="keyword">int servicePort, |
|
98 | ICompositeCacheManager cacheMgr ) |
|
99 | 13 | { |
100 | // register for shutdown notification |
|
101 | 13 | ( (IShutdownObservable) cacheMgr ).registerShutdownObserver( this ); |
102 | ||
103 | 13 | this.setDiscoveryAddress( discoveryAddress ); |
104 | 13 | this.setDiscoveryPort( discoveryPort ); |
105 | 13 | this.setServicePort( servicePort ); |
106 | ||
107 | try |
|
108 | { |
|
109 | // todo, you should be able to set this |
|
110 | 13 | hostAddress = InetAddress.getLocalHost().getHostAddress(); |
111 | 13 | if ( log.isDebugEnabled() ) |
112 | { |
|
113 | 0 | log.debug( "hostAddress = [" + hostAddress + "]" ); |
114 | } |
|
115 | } |
|
116 | 0 | catch ( UnknownHostException e1 ) |
117 | { |
|
118 | 0 | log.error( "Couldn't get localhost address", e1 ); |
119 | 13 | } |
120 | ||
121 | try |
|
122 | { |
|
123 | // todo need some kind of recovery here. |
|
124 | 13 | receiver = new UDPDiscoveryReceiver( this, getDiscoveryAddress(), getDiscoveryPort(), cacheMgr ); |
125 | 13 | udpReceiverThread = new Thread( receiver ); |
126 | 13 | udpReceiverThread.setDaemon( true ); |
127 | // udpReceiverThread.setName( t.getName() + "--UDPReceiver" ); |
|
128 | 13 | udpReceiverThread.start(); |
129 | } |
|
130 | 0 | catch ( Exception e ) |
131 | { |
|
132 | 0 | log.error( "Problem creating UDPDiscoveryReceiver, address [" + getDiscoveryAddress() + "] port [" + getDiscoveryPort() |
133 | + "] we won't be able to find any other caches", e ); |
|
134 | 13 | } |
135 | ||
136 | // todo only do the passive if receive is inenabled, perhaps set the |
|
137 | // myhost to null or something on the request |
|
138 | 13 | if ( senderDaemon == null ) |
139 | { |
|
140 | 13 | senderDaemon = new ClockDaemon(); |
141 | 13 | senderDaemon.setThreadFactory( new MyThreadFactory() ); |
142 | } |
|
143 | ||
144 | // create a sender thread |
|
145 | 13 | sender = new UDPDiscoverySenderThread( getDiscoveryAddress(), getDiscoveryPort(), hostAddress, this |
146 | .getServicePort(), this.getCacheNames() ); |
|
147 | ||
148 | 13 | senderDaemon.executePeriodically( 30 * 1000, sender, false ); |
149 | 13 | } |
150 | ||
151 | /** |
|
152 | * Adds a nowait facade under this cachename. If one already existed, it |
|
153 | * will be overridden. |
|
154 | * <p> |
|
155 | * When a broadcast is received from the UDP Discovery receiver, for each |
|
156 | * cacheName in the message, the add no wait will be called here. To add a |
|
157 | * no wait, the facade is looked up for this cache name. |
|
158 | * |
|
159 | * @param facade |
|
160 | * @param cacheName |
|
161 | * @return true if the facade was not already registered. |
|
162 | */ |
|
163 | public synchronized boolean addNoWaitFacade( LateralCacheNoWaitFacade facade, String cacheName ) |
|
164 | { |
|
165 | 26 | boolean isNew = !facades.containsKey( cacheName ); |
166 | ||
167 | // override or put anew, it doesn't matter |
|
168 | 26 | facades.put( cacheName, facade ); |
169 | ||
170 | 26 | if ( isNew ) |
171 | { |
|
172 | 26 | if ( sender != null ) |
173 | { |
|
174 | // need to reset the cache names since we have a new one |
|
175 | 26 | sender.setCacheNames( this.getCacheNames() ); |
176 | } |
|
177 | } |
|
178 | ||
179 | 26 | return isNew; |
180 | } |
|
181 | ||
182 | /** |
|
183 | * This adds nowaits to a facde for the region name. If the region has no |
|
184 | * facade, then it is not configured to use the lateral cache, and no facde |
|
185 | * will be created. |
|
186 | * |
|
187 | * @param noWait |
|
188 | */ |
|
189 | protected void addNoWait( LateralCacheNoWait noWait ) |
|
190 | { |
|
191 | 0 | LateralCacheNoWaitFacade facade = (LateralCacheNoWaitFacade) facades.get( noWait.getCacheName() ); |
192 | 0 | if ( log.isDebugEnabled() ) |
193 | { |
|
194 | 0 | log.debug( "Got facade for " + noWait.getCacheName() + " = " + facade ); |
195 | } |
|
196 | ||
197 | 0 | if ( facade != null ) |
198 | { |
|
199 | 0 | boolean isNew = facade.addNoWait( noWait ); |
200 | 0 | if ( log.isDebugEnabled() ) |
201 | { |
|
202 | 0 | log.debug( "Called addNoWait, isNew = " + isNew ); |
203 | } |
|
204 | 0 | } |
205 | else |
|
206 | { |
|
207 | 0 | if ( log.isInfoEnabled() ) |
208 | { |
|
209 | 0 | log.info( "Different nodes are configured differently. Region [" + noWait.getCacheName() |
210 | + "] is not configured to use the lateral cache." ); |
|
211 | } |
|
212 | } |
|
213 | 0 | } |
214 | ||
215 | /** |
|
216 | * Send a passive broadcast in response to a request broadcast. Never send a |
|
217 | * request for a request. We can respond to our own reques, since a request |
|
218 | * broadcast is not intended as a connection request. We might want to only |
|
219 | * send messages, so we would send a request, but never a passive broadcast. |
|
220 | * |
|
221 | */ |
|
222 | protected void serviceRequestBroadcast() |
|
223 | { |
|
224 | 0 | UDPDiscoverySender sender = null; |
225 | try |
|
226 | { |
|
227 | // create this connection each time. |
|
228 | // more robust |
|
229 | 0 | sender = new UDPDiscoverySender( getDiscoveryAddress(), getDiscoveryPort() ); |
230 | ||
231 | 0 | sender.passiveBroadcast( hostAddress, this.getServicePort(), class="keyword">this.getCacheNames() ); |
232 | ||
233 | // todo we should consider sending a request broadcast every so |
|
234 | // often. |
|
235 | ||
236 | 0 | if ( log.isDebugEnabled() ) |
237 | { |
|
238 | 0 | log.debug( "Called sender to issue a passive broadcast" ); |
239 | } |
|
240 | } |
|
241 | 0 | catch ( Exception e ) |
242 | { |
|
243 | 0 | log.error( "Problem calling the UDP Discovery Sender. address [" + getDiscoveryAddress() + "] port [" |
244 | + getDiscoveryPort() + "]", e ); |
|
245 | } |
|
246 | finally |
|
247 | { |
|
248 | 0 | try |
249 | { |
|
250 | 0 | if ( sender != null ) |
251 | { |
|
252 | 0 | sender.destroy(); |
253 | } |
|
254 | } |
|
255 | 0 | catch ( Exception e ) |
256 | { |
|
257 | 0 | log.error( "Problem closing Passive Broadcast sender, while servicing a request broadcast.", e ); |
258 | 0 | } |
259 | 0 | } |
260 | 0 | } |
261 | ||
262 | /** |
|
263 | * Get all the cache names we have facades for. |
|
264 | * |
|
265 | * @return |
|
266 | */ |
|
267 | protected ArrayList getCacheNames() |
|
268 | { |
|
269 | 39 | ArrayList keys = new ArrayList(); |
270 | 39 | Set keySet = facades.keySet(); |
271 | 39 | Iterator it = keySet.iterator(); |
272 | 78 | while ( it.hasNext() ) |
273 | { |
|
274 | 39 | String key = (String) it.next(); |
275 | 39 | keys.add( key ); |
276 | 39 | } |
277 | 39 | return keys; |
278 | } |
|
279 | ||
280 | /** |
|
281 | * Allows us to set the daemon status on the clockdaemon |
|
282 | * |
|
283 | * @author aaronsm |
|
284 | * |
|
285 | */ |
|
286 | class MyThreadFactory |
|
287 | implements ThreadFactory |
|
288 | { |
|
289 | /* |
|
290 | * (non-Javadoc) |
|
291 | * |
|
292 | * @see EDU.oswego.cs.dl.util.concurrent.ThreadFactory#newThread(java.lang.Runnable) |
|
293 | */ |
|
294 | public Thread newThread( Runnable runner ) |
|
295 | { |
|
296 | Thread t = new Thread( runner ); |
|
297 | t.setDaemon( true ); |
|
298 | t.setPriority( Thread.MIN_PRIORITY ); |
|
299 | return t; |
|
300 | } |
|
301 | } |
|
302 | ||
303 | /* |
|
304 | * (non-Javadoc) |
|
305 | * |
|
306 | * @see org.apache.jcs.engine.behavior.ShutdownObserver#shutdown() |
|
307 | */ |
|
308 | public void shutdown() |
|
309 | { |
|
310 | 0 | if ( log.isInfoEnabled() ) |
311 | { |
|
312 | 0 | log.info( "Shutting down UDP discovery service receiver." ); |
313 | } |
|
314 | ||
315 | try |
|
316 | { |
|
317 | // no good way to do this right now. |
|
318 | 0 | receiver.shutdown(); |
319 | 0 | udpReceiverThread.interrupt(); |
320 | } |
|
321 | 0 | catch ( Exception e ) |
322 | { |
|
323 | 0 | log.error( "Problem interrupting UDP receiver thread." ); |
324 | 0 | } |
325 | ||
326 | 0 | if ( log.isInfoEnabled() ) |
327 | { |
|
328 | 0 | log.info( "Shutting down UDP discovery service sender." ); |
329 | } |
|
330 | ||
331 | try |
|
332 | { |
|
333 | // interrupt all the threads. |
|
334 | 0 | senderDaemon.shutDown(); |
335 | } |
|
336 | 0 | catch ( Exception e ) |
337 | { |
|
338 | 0 | log.error( "Problem shutting down UDP sender." ); |
339 | 0 | } |
340 | 0 | } |
341 | ||
342 | /** |
|
343 | * @param discoveryAddress |
|
344 | * The discoveryAddress to set. |
|
345 | */ |
|
346 | protected void setDiscoveryAddress( String discoveryAddress ) |
|
347 | { |
|
348 | 13 | this.discoveryAddress = discoveryAddress; |
349 | 13 | } |
350 | ||
351 | /** |
|
352 | * @return Returns the discoveryAddress. |
|
353 | */ |
|
354 | protected String getDiscoveryAddress() |
|
355 | { |
|
356 | 26 | return discoveryAddress; |
357 | } |
|
358 | ||
359 | /** |
|
360 | * @param discoveryPort |
|
361 | * The discoveryPort to set. |
|
362 | */ |
|
363 | protected void setDiscoveryPort( int discoveryPort ) |
|
364 | { |
|
365 | 13 | this.discoveryPort = discoveryPort; |
366 | 13 | } |
367 | ||
368 | /** |
|
369 | * @return Returns the discoveryPort. |
|
370 | */ |
|
371 | protected int getDiscoveryPort() |
|
372 | { |
|
373 | 26 | return discoveryPort; |
374 | } |
|
375 | ||
376 | /** |
|
377 | * @param servicePort |
|
378 | * The servicePort to set. |
|
379 | */ |
|
380 | protected void setServicePort( int servicePort ) |
|
381 | { |
|
382 | 13 | this.servicePort = servicePort; |
383 | 13 | } |
384 | ||
385 | /** |
|
386 | * @return Returns the servicePort. |
|
387 | */ |
|
388 | protected int getServicePort() |
|
389 | { |
|
390 | 13 | return servicePort; |
391 | } |
|
392 | ||
393 | /** |
|
394 | * @param tCPLateralCacheAttributes |
|
395 | * The tCPLateralCacheAttributes to set. |
|
396 | */ |
|
397 | public void setTcpLateralCacheAttributes( ITCPLateralCacheAttributes tCPLateralCacheAttributes ) |
|
398 | { |
|
399 | 26 | tcpLateralCacheAttributes = tCPLateralCacheAttributes; |
400 | 26 | } |
401 | ||
402 | /** |
|
403 | * @return Returns the tCPLateralCacheAttributes. |
|
404 | */ |
|
405 | public ITCPLateralCacheAttributes getTcpLateralCacheAttributes() |
|
406 | { |
|
407 | 0 | return tcpLateralCacheAttributes; |
408 | } |
|
409 | } |
This report is generated by jcoverage, Maven and Maven JCoverage Plugin. |