1 package org.apache.jcs.auxiliary.lateral.socket.tcp.discovery;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.ByteArrayInputStream;
23 import java.io.IOException;
24 import java.io.ObjectInputStream;
25 import java.net.DatagramPacket;
26 import java.net.InetAddress;
27 import java.net.MulticastSocket;
28 import java.util.ArrayList;
29 import java.util.Iterator;
30
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.jcs.auxiliary.lateral.LateralCacheAttributes;
34 import org.apache.jcs.auxiliary.lateral.LateralCacheInfo;
35 import org.apache.jcs.auxiliary.lateral.LateralCacheNoWait;
36 import org.apache.jcs.auxiliary.lateral.socket.tcp.LateralTCPCacheManager;
37 import org.apache.jcs.auxiliary.lateral.socket.tcp.TCPLateralCacheAttributes;
38 import org.apache.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
39 import org.apache.jcs.engine.behavior.ICache;
40 import org.apache.jcs.engine.behavior.ICompositeCacheManager;
41 import org.apache.jcs.engine.behavior.IShutdownObserver;
42
43 import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
44 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
45 import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
46
47 /***
48 * Receives UDP Discovery messages.
49 */
50 public class UDPDiscoveryReceiver
51 implements Runnable, IShutdownObserver
52 {
53 private final static Log log = LogFactory.getLog( UDPDiscoveryReceiver.class );
54
55 private final byte[] m_buffer = new byte[65536];
56
57 private MulticastSocket m_socket;
58
59
60
61
62 private static final int maxPoolSize = 10;
63
64 private PooledExecutor pooledExecutor = null;
65
66
67 private int cnt = 0;
68
69 /***
70 * Service to get cache names and hande request broadcasts
71 */
72 protected UDPDiscoveryService service = null;
73
74 private String multicastAddressString = "";
75
76 private int multicastPort = 0;
77
78 private ICompositeCacheManager cacheMgr;
79
80 private boolean shutdown = false;
81
82 /***
83 * Constructor for the LateralUDPReceiver object.
84 * <p>
85 * We determine out own host using InetAddress
86 *
87 * @param service
88 * @param multicastAddressString
89 * @param multicastPort
90 * @param cacheMgr
91 * @exception IOException
92 */
93 public UDPDiscoveryReceiver( UDPDiscoveryService service, String multicastAddressString, int multicastPort,
94 ICompositeCacheManager cacheMgr )
95 throws IOException
96 {
97 this.service = service;
98 this.multicastAddressString = multicastAddressString;
99 this.multicastPort = multicastPort;
100 this.cacheMgr = cacheMgr;
101
102
103 pooledExecutor = new PooledExecutor( new BoundedBuffer( 100 ), maxPoolSize );
104 pooledExecutor.discardOldestWhenBlocked();
105
106 pooledExecutor.setThreadFactory( new MyThreadFactory() );
107
108 if ( log.isInfoEnabled() )
109 {
110 log.info( "constructing listener, [" + this.multicastAddressString + ":" + this.multicastPort + "]" );
111 }
112
113 try
114 {
115 createSocket( this.multicastAddressString, this.multicastPort );
116 }
117 catch ( IOException ioe )
118 {
119
120
121 throw ioe;
122 }
123 }
124
125 /***
126 * Creates the socket for this class.
127 *
128 * @param multicastAddressString
129 * @param multicastPort
130 * @throws IOException
131 */
132 private void createSocket( String multicastAddressString, int multicastPort )
133 throws IOException
134 {
135 try
136 {
137 m_socket = new MulticastSocket( multicastPort );
138 m_socket.joinGroup( InetAddress.getByName( multicastAddressString ) );
139 }
140 catch ( IOException e )
141 {
142 log.error( "Could not bind to multicast address [" + multicastAddressString + ":" + multicastPort + "]", e );
143 throw e;
144 }
145 }
146
147 /***
148 * Highly unreliable. If it is processing one message while another comes in ,
149 * the second message is lost. This is for low concurency peppering.
150 *
151 * @return the object message
152 * @throws IOException
153 */
154 public Object waitForMessage()
155 throws IOException
156 {
157 final DatagramPacket packet = new DatagramPacket( m_buffer, m_buffer.length );
158
159 Object obj = null;
160 try
161 {
162 m_socket.receive( packet );
163
164 final ByteArrayInputStream byteStream = new ByteArrayInputStream( m_buffer, 0, packet.getLength() );
165
166 final ObjectInputStream objectStream = new ObjectInputStream( byteStream );
167
168 obj = objectStream.readObject();
169
170 }
171 catch ( Exception e )
172 {
173 log.error( "Error receving multicast packet", e );
174 }
175 return obj;
176 }
177
178 /*** Main processing method for the LateralUDPReceiver object */
179 public void run()
180 {
181 try
182 {
183 while ( !shutdown )
184 {
185 Object obj = waitForMessage();
186
187
188 cnt++;
189
190 if ( log.isDebugEnabled() )
191 {
192 log.debug( getCnt() + " messages received." );
193 }
194
195 UDPDiscoveryMessage message = null;
196
197 try
198 {
199 message = (UDPDiscoveryMessage) obj;
200
201 if ( message != null )
202 {
203 MessageHandler handler = new MessageHandler( message );
204
205 pooledExecutor.execute( handler );
206
207 if ( log.isDebugEnabled() )
208 {
209 log.debug( "Passed handler to executor." );
210 }
211 }
212 else
213 {
214 log.warn( "message is null" );
215 }
216 }
217 catch ( ClassCastException cce )
218 {
219 log.warn( "Received unknown message type " + cce.getMessage() );
220 }
221 }
222 }
223 catch ( Exception e )
224 {
225 log.error( "Unexpected exception in UDP receiver.", e );
226 try
227 {
228 Thread.sleep( 100 );
229
230
231 }
232 catch ( Exception e2 )
233 {
234 log.error( "Problem sleeping", e2 );
235 }
236 }
237 return;
238 }
239
240 /***
241 * @param cnt
242 * The cnt to set.
243 */
244 public void setCnt( int cnt )
245 {
246 this.cnt = cnt;
247 }
248
249 /***
250 * @return Returns the cnt.
251 */
252 public int getCnt()
253 {
254 return cnt;
255 }
256
257 /***
258 * Separate thread run when a command comes into the UDPDiscoveryReceiver.
259 */
260 public class MessageHandler
261 implements Runnable
262 {
263 private UDPDiscoveryMessage message = null;
264
265 /***
266 * @param message
267 */
268 public MessageHandler( UDPDiscoveryMessage message )
269 {
270 this.message = message;
271 }
272
273
274
275
276
277
278 public void run()
279 {
280
281 if ( message.getRequesterId() == LateralCacheInfo.listenerId )
282 {
283 if ( log.isDebugEnabled() )
284 {
285 log.debug( "from self" );
286 }
287 }
288 else
289 {
290 if ( log.isDebugEnabled() )
291 {
292 log.debug( "from another" );
293 log.debug( "Message = " + message );
294 }
295
296
297
298 if ( message.getMessageType() == UDPDiscoveryMessage.REQUEST_BROADCAST )
299 {
300 if ( log.isDebugEnabled() )
301 {
302 log.debug( "Message is a Request Broadcase, will have the service handle it." );
303 }
304 service.serviceRequestBroadcast();
305 return;
306 }
307
308 try
309 {
310
311
312
313 ITCPLateralCacheAttributes lca = null;
314 if ( service.getTcpLateralCacheAttributes() != null )
315 {
316 lca = (ITCPLateralCacheAttributes) service.getTcpLateralCacheAttributes().copy();
317 }
318 else
319 {
320 lca = new TCPLateralCacheAttributes();
321 }
322 lca.setTransmissionType( LateralCacheAttributes.TCP );
323 lca.setTcpServer( message.getHost() + ":" + message.getPort() );
324 LateralTCPCacheManager lcm = LateralTCPCacheManager.getInstance( lca, cacheMgr );
325
326 ArrayList regions = message.getCacheNames();
327 if ( regions != null )
328 {
329
330 Iterator it = regions.iterator();
331 while ( it.hasNext() )
332 {
333 String cacheName = (String) it.next();
334
335 try
336 {
337 ICache ic = lcm.getCache( cacheName );
338
339 if ( log.isDebugEnabled() )
340 {
341 log.debug( "Got cache, ic = " + ic );
342 }
343
344
345 if ( ic != null )
346 {
347 service.addNoWait( (LateralCacheNoWait) ic );
348 if ( log.isDebugEnabled() )
349 {
350 log.debug( "Called addNoWait for cacheName " + cacheName );
351 }
352 }
353 }
354 catch ( Exception e )
355 {
356 log.error( "Problem creating no wait", e );
357 }
358 }
359
360 }
361 else
362 {
363 log.warn( "No cache names found in message " + message );
364 }
365 }
366 catch ( Exception e )
367 {
368 log.error( "Problem getting lateral maanger", e );
369 }
370 }
371 }
372 }
373
374 /***
375 * Allows us to set the daemon status on the executor threads
376 *
377 * @author aaronsm
378 *
379 */
380 class MyThreadFactory
381 implements ThreadFactory
382 {
383
384
385
386
387
388 public Thread newThread( Runnable runner )
389 {
390 Thread t = new Thread( runner );
391 t.setDaemon( true );
392 t.setPriority( Thread.MIN_PRIORITY );
393 return t;
394 }
395 }
396
397
398
399
400
401
402 public void shutdown()
403 {
404 try
405 {
406 shutdown = true;
407 m_socket.close();
408 pooledExecutor.shutdownNow();
409 }
410 catch ( Exception e )
411 {
412 log.error( "Problem closing socket" );
413 }
414 }
415 }
416