1 package org.apache.jcs.auxiliary.lateral.socket.tcp;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.BufferedReader;
23 import java.io.IOException;
24 import java.io.InputStreamReader;
25 import java.io.ObjectInputStream;
26 import java.io.ObjectOutputStream;
27 import java.net.InetAddress;
28 import java.net.Socket;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.jcs.auxiliary.lateral.LateralElementDescriptor;
33 import org.apache.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
34 import org.apache.jcs.auxiliary.lateral.socket.tcp.utils.SocketOpener;
35 import org.apache.jcs.engine.CacheElement;
36 import org.apache.jcs.engine.behavior.ICacheElement;
37
38 /***
39 * This class is based on the log4j SocketAppender class. I'm using a differnet repair structure, so
40 * it is significantly different.
41 */
42 public class LateralTCPSender
43 {
44 private final static Log log = LogFactory.getLog( LateralTCPSender.class );
45
46 private ITCPLateralCacheAttributes tcpLateralCacheAttributes;
47
48 private String remoteHost;
49
50 private InetAddress address;
51
52 int port = 1111;
53
54 private ObjectOutputStream oos;
55
56 private Socket socket;
57
58 int counter = 0;
59
60 private int sendCnt = 0;
61
62
63
64
65
66
67
68
69
70 private final static int RESET_FREQUENCY = 70;
71
72 /***
73 * Only block for 1 second before timing out on a read. TODO: make configurable. The default 1
74 * is way too long.
75 */
76 private final static int timeOut = 1000;
77
78 /*** Only block for 5 seconds before timing out on startup. */
79 private final static int openTimeOut = 5000;
80
81 /*** Use to synchronize multiple threads that may be trying to get. */
82 private Object getLock = new int[0];
83
84 /***
85 * Constructor for the LateralTCPSender object.
86 * <p>
87 * @param lca
88 * @exception IOException
89 */
90 public LateralTCPSender( ITCPLateralCacheAttributes lca )
91 throws IOException
92 {
93 this.setTcpLateralCacheAttributes( lca );
94
95 String p1 = lca.getTcpServer();
96 if ( p1 != null )
97 {
98 String h2 = p1.substring( 0, p1.indexOf( ":" ) );
99 int po = Integer.parseInt( p1.substring( p1.indexOf( ":" ) + 1 ) );
100 if ( log.isDebugEnabled() )
101 {
102 log.debug( "h2 = " + h2 );
103 log.debug( "po = " + po );
104 }
105
106 if ( h2 == null )
107 {
108 throw new IOException( "Cannot connect to invalid address [" + h2 + ":" + po + "]" );
109 }
110
111 init( h2, po );
112 }
113 }
114
115 /***
116 * Creates a connection to a TCP server.
117 * <p>
118 * @param host
119 * @param port
120 * @throws IOException
121 */
122 protected void init( String host, int port )
123 throws IOException
124 {
125 this.port = port;
126 this.address = getAddressByName( host );
127 this.setRemoteHost( host );
128
129 try
130 {
131 if ( log.isInfoEnabled() )
132 {
133 log.info( "Attempting connection to [" + address.getHostName() + "]" );
134 }
135
136
137 socket = SocketOpener.openSocket( host, port, openTimeOut );
138
139 if ( socket == null )
140 {
141 throw new IOException( "Socket is null, cannot connect to " + host + ":" + port );
142 }
143
144 socket.setSoTimeout( LateralTCPSender.timeOut );
145 synchronized ( this )
146 {
147 oos = new ObjectOutputStream( socket.getOutputStream() );
148 }
149 }
150 catch ( java.net.ConnectException e )
151 {
152 log.debug( "Remote host [" + address.getHostName() + "] refused connection." );
153 throw e;
154 }
155 catch ( IOException e )
156 {
157 log.debug( "Could not connect to [" + address.getHostName() + "]. Exception is " + e );
158 throw e;
159 }
160 }
161
162 /***
163 * Gets the addressByName attribute of the LateralTCPSender object.
164 * <p>
165 * @param host
166 * @return The addressByName value
167 * @throws IOException
168 */
169 private InetAddress getAddressByName( String host )
170 throws IOException
171 {
172 try
173 {
174 return InetAddress.getByName( host );
175 }
176 catch ( Exception e )
177 {
178 log.error( "Could not find address of [" + host + "] ", e );
179 throw new IOException( "Could not find address of [" + host + "] " + e.getMessage() );
180 }
181 }
182
183 /***
184 * Sends commands to the lateral cache listener.
185 * <p>
186 * @param led
187 * @throws IOException
188 */
189 public void send( LateralElementDescriptor led )
190 throws IOException
191 {
192 sendCnt++;
193 if ( log.isInfoEnabled() )
194 {
195 if ( sendCnt % 100 == 0 )
196 {
197 log.info( "Send Count (port " + port + ") = " + sendCnt );
198 }
199 }
200
201 if ( log.isDebugEnabled() )
202 {
203 log.debug( "sending LateralElementDescriptor" );
204 }
205
206 if ( led == null )
207 {
208 return;
209 }
210
211 if ( address == null )
212 {
213 throw new IOException( "No remote host is set for LateralTCPSender." );
214 }
215
216 if ( oos != null )
217 {
218 synchronized ( this.getLock )
219 {
220 try
221 {
222 oos.writeObject( led );
223 oos.flush();
224 if ( ++counter >= RESET_FREQUENCY )
225 {
226 counter = 0;
227
228
229 if ( log.isDebugEnabled() )
230 {
231 log.debug( "Doing oos.reset()" );
232 }
233 oos.reset();
234 }
235 }
236 catch ( IOException e )
237 {
238 oos = null;
239 log.error( "Detected problem with connection: " + e );
240 throw e;
241 }
242 }
243 }
244 }
245
246 /***
247 * Sends commands to the lateral cache listener and gets a response. I'm afraid that we could
248 * get into a pretty bad blocking situation here. This needs work. I just wanted to get some
249 * form of get working. However, get is not recommended for performance reasons. If you have 10
250 * laterals, then you have to make 10 failed gets to find out none of the caches have the item.
251 * <p>
252 * @param led
253 * @return
254 * @throws IOException
255 */
256 public ICacheElement sendAndReceive( LateralElementDescriptor led )
257 throws IOException
258 {
259 ICacheElement ice = null;
260
261 if ( led == null )
262 {
263 return null;
264 }
265
266 if ( address == null )
267 {
268 throw new IOException( "No remote host is set for LateralTCPSender." );
269 }
270
271 if ( oos != null )
272 {
273
274
275
276
277
278
279 synchronized ( this.getLock )
280 {
281 try
282 {
283 try
284 {
285
286 if ( socket.getInputStream().available() > 0 )
287 {
288 socket.getInputStream().read( new byte[socket.getInputStream().available()] );
289 }
290 }
291 catch ( IOException ioe )
292 {
293 log.error( "Problem cleaning socket before send " + socket, ioe );
294 throw ioe;
295 }
296
297
298 oos.writeObject( led );
299 oos.flush();
300
301 try
302 {
303
304
305 ObjectInputStream ois = new ObjectInputStream( socket.getInputStream() );
306 Object obj = ois.readObject();
307 ice = (ICacheElement) obj;
308 if ( ice == null )
309 {
310
311
312 }
313 }
314 catch ( IOException ioe )
315 {
316 String message = "Could not open ObjectInputStream to " + socket;
317 if ( socket != null )
318 {
319 message += " SoTimeout [" + socket.getSoTimeout() + "]";
320
321 }
322 log.error( message, ioe );
323 throw ioe;
324 }
325 catch ( Exception e )
326 {
327 log.error( e );
328 }
329
330 if ( ++counter >= RESET_FREQUENCY )
331 {
332 counter = 0;
333
334
335
336 log.info( "Doing oos.reset()" );
337 oos.reset();
338 }
339 }
340 catch ( IOException e )
341 {
342 oos = null;
343 log.error( "Detected problem with connection: " + e );
344 throw e;
345 }
346 }
347 }
348
349 return ice;
350 }
351
352 /***
353 * Closes connection used by all LateralTCPSenders for this lateral conneciton. Dispose request
354 * should come into the facade and be sent to all lateral cache sevices. The lateral cache
355 * service will then call this method.
356 * <p>
357 * @param cache
358 * @throws IOException
359 */
360 public void dispose( String cache )
361 throws IOException
362 {
363 if ( log.isInfoEnabled() )
364 {
365 log.info( "Dispose called for cache [" + cache + "]" );
366 }
367
368 oos.close();
369 }
370
371 /***
372 * @param tcpLateralCacheAttributes The tcpLateralCacheAttributes to set.
373 */
374 public void setTcpLateralCacheAttributes( ITCPLateralCacheAttributes tcpLateralCacheAttributes )
375 {
376 this.tcpLateralCacheAttributes = tcpLateralCacheAttributes;
377 }
378
379 /***
380 * @return Returns the tcpLateralCacheAttributes.
381 */
382 public ITCPLateralCacheAttributes getTcpLateralCacheAttributes()
383 {
384 return tcpLateralCacheAttributes;
385 }
386
387 /***
388 * @param remoteHost The remoteHost to set.
389 */
390 public void setRemoteHost( String remoteHost )
391 {
392 this.remoteHost = remoteHost;
393 }
394
395 /***
396 * @return Returns the remoteHost.
397 */
398 public String getRemoteHost()
399 {
400 return remoteHost;
401 }
402
403 /***
404 * This is a Testing Method. It should be moved to a unit test.
405 * @param args
406 */
407 public static void main( String args[] )
408 {
409 try
410 {
411 LateralTCPSender lur = null;
412
413
414
415 boolean notDone = true;
416 String message = null;
417
418 BufferedReader br = new BufferedReader( new InputStreamReader( System.in ) );
419
420 while ( notDone )
421 {
422 System.out.println( "enter mesage:" );
423 message = br.readLine();
424 CacheElement ce = new CacheElement( "test", "test", message );
425 LateralElementDescriptor led = new LateralElementDescriptor( ce );
426 lur.send( led );
427 }
428 }
429 catch ( Exception e )
430 {
431 System.out.println( e.toString() );
432 }
433 }
434 }