1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.filter;
21
22 import java.io.ByteArrayInputStream;
23 import java.io.IOException;
24 import java.io.InputStream;
25 import java.net.InetSocketAddress;
26 import java.net.SocketAddress;
27 import java.security.MessageDigest;
28 import java.util.Random;
29
30 import junit.framework.TestCase;
31
32 import org.apache.mina.common.ByteBuffer;
33 import org.apache.mina.common.IdleStatus;
34 import org.apache.mina.common.IoAcceptor;
35 import org.apache.mina.common.IoConnector;
36 import org.apache.mina.common.IoFutureListener;
37 import org.apache.mina.common.IoHandlerAdapter;
38 import org.apache.mina.common.IoSession;
39 import org.apache.mina.common.WriteFuture;
40 import org.apache.mina.common.IoFilter.NextFilter;
41 import org.apache.mina.common.IoFilter.WriteRequest;
42 import org.apache.mina.common.support.DefaultWriteFuture;
43 import org.apache.mina.transport.socket.nio.SocketAcceptor;
44 import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
45 import org.apache.mina.transport.socket.nio.SocketConnector;
46 import org.apache.mina.util.AvailablePortFinder;
47 import org.apache.mina.util.Queue;
48 import org.easymock.AbstractMatcher;
49 import org.easymock.MockControl;
50
51
52
53
54
55
56
57 public class StreamWriteFilterTest extends TestCase {
58 MockControl mockSession;
59
60 MockControl mockNextFilter;
61
62 IoSession session;
63
64 NextFilter nextFilter;
65
66 protected void setUp() throws Exception {
67 super.setUp();
68
69
70
71
72 mockSession = MockControl.createControl(IoSession.class);
73 mockNextFilter = MockControl.createControl(NextFilter.class);
74 session = (IoSession) mockSession.getMock();
75 nextFilter = (NextFilter) mockNextFilter.getMock();
76
77 session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
78 mockSession.setReturnValue(null);
79 }
80
81 public void testWriteEmptyStream() throws Exception {
82 StreamWriteFilter filter = new StreamWriteFilter();
83
84 InputStream stream = new ByteArrayInputStream(new byte[0]);
85 WriteRequest writeRequest = new WriteRequest(stream,
86 new DummyWriteFuture());
87
88
89
90
91 nextFilter.messageSent(session, stream);
92
93
94
95
96 mockNextFilter.replay();
97 mockSession.replay();
98
99 filter.filterWrite(nextFilter, session, writeRequest);
100
101
102
103
104 mockNextFilter.verify();
105 mockSession.verify();
106
107 assertTrue(writeRequest.getFuture().isWritten());
108 }
109
110
111
112
113
114 public void testWriteNonStreamMessage() throws Exception {
115 StreamWriteFilter filter = new StreamWriteFilter();
116
117 Object message = new Object();
118 WriteRequest writeRequest = new WriteRequest(message,
119 new DummyWriteFuture());
120
121
122
123
124 nextFilter.filterWrite(session, writeRequest);
125 session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
126 mockSession.setReturnValue(null);
127 nextFilter.messageSent(session, message);
128
129
130
131
132 mockNextFilter.replay();
133 mockSession.replay();
134
135 filter.filterWrite(nextFilter, session, writeRequest);
136 filter.messageSent(nextFilter, session, message);
137
138
139
140
141 mockNextFilter.verify();
142 mockSession.verify();
143 }
144
145
146
147
148 public void testWriteSingleBufferStream() throws Exception {
149 StreamWriteFilter filter = new StreamWriteFilter();
150
151 byte[] data = new byte[] { 1, 2, 3, 4 };
152
153 InputStream stream = new ByteArrayInputStream(data);
154 WriteRequest writeRequest = new WriteRequest(stream,
155 new DummyWriteFuture());
156
157
158
159
160 session.setAttribute(StreamWriteFilter.CURRENT_STREAM, stream);
161 mockSession.setReturnValue(null);
162 session.setAttribute(StreamWriteFilter.INITIAL_WRITE_FUTURE,
163 writeRequest.getFuture());
164 mockSession.setReturnValue(null);
165 nextFilter
166 .filterWrite(session, new WriteRequest(ByteBuffer.wrap(data)));
167 mockNextFilter.setMatcher(new WriteRequestMatcher());
168
169 session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
170 mockSession.setReturnValue(stream);
171 session.removeAttribute(StreamWriteFilter.CURRENT_STREAM);
172 mockSession.setReturnValue(stream);
173 session.removeAttribute(StreamWriteFilter.INITIAL_WRITE_FUTURE);
174 mockSession.setReturnValue(writeRequest.getFuture());
175 session.removeAttribute(StreamWriteFilter.WRITE_REQUEST_QUEUE);
176 mockSession.setReturnValue(null);
177 nextFilter.messageSent(session, stream);
178
179
180
181
182 mockNextFilter.replay();
183 mockSession.replay();
184
185 filter.filterWrite(nextFilter, session, writeRequest);
186 filter.messageSent(nextFilter, session, data);
187
188
189
190
191 mockNextFilter.verify();
192 mockSession.verify();
193
194 assertTrue(writeRequest.getFuture().isWritten());
195 }
196
197
198
199
200 public void testWriteSeveralBuffersStream() throws Exception {
201 StreamWriteFilter filter = new StreamWriteFilter();
202 filter.setWriteBufferSize(4);
203
204 byte[] data = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
205 byte[] chunk1 = new byte[] { 1, 2, 3, 4 };
206 byte[] chunk2 = new byte[] { 5, 6, 7, 8 };
207 byte[] chunk3 = new byte[] { 9, 10 };
208
209 InputStream stream = new ByteArrayInputStream(data);
210 WriteRequest writeRequest = new WriteRequest(stream,
211 new DummyWriteFuture());
212
213
214
215
216 session.setAttribute(StreamWriteFilter.CURRENT_STREAM, stream);
217 mockSession.setReturnValue(null);
218 session.setAttribute(StreamWriteFilter.INITIAL_WRITE_FUTURE,
219 writeRequest.getFuture());
220 mockSession.setReturnValue(null);
221 nextFilter.filterWrite(session, new WriteRequest(ByteBuffer
222 .wrap(chunk1)));
223 mockNextFilter.setMatcher(new WriteRequestMatcher());
224
225 session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
226 mockSession.setReturnValue(stream);
227 nextFilter.filterWrite(session, new WriteRequest(ByteBuffer
228 .wrap(chunk2)));
229
230 session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
231 mockSession.setReturnValue(stream);
232 nextFilter.filterWrite(session, new WriteRequest(ByteBuffer
233 .wrap(chunk3)));
234
235 session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
236 mockSession.setReturnValue(stream);
237 session.removeAttribute(StreamWriteFilter.CURRENT_STREAM);
238 mockSession.setReturnValue(stream);
239 session.removeAttribute(StreamWriteFilter.INITIAL_WRITE_FUTURE);
240 mockSession.setReturnValue(writeRequest.getFuture());
241 session.removeAttribute(StreamWriteFilter.WRITE_REQUEST_QUEUE);
242 mockSession.setReturnValue(null);
243 nextFilter.messageSent(session, stream);
244
245
246
247
248 mockNextFilter.replay();
249 mockSession.replay();
250
251 filter.filterWrite(nextFilter, session, writeRequest);
252 filter.messageSent(nextFilter, session, chunk1);
253 filter.messageSent(nextFilter, session, chunk2);
254 filter.messageSent(nextFilter, session, chunk3);
255
256
257
258
259 mockNextFilter.verify();
260 mockSession.verify();
261
262 assertTrue(writeRequest.getFuture().isWritten());
263 }
264
265 public void testWriteWhileWriteInProgress() throws Exception {
266 StreamWriteFilter filter = new StreamWriteFilter();
267
268 Queue queue = new Queue();
269 InputStream stream = new ByteArrayInputStream(new byte[5]);
270
271
272
273
274 mockSession.reset();
275 session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
276 mockSession.setReturnValue(stream);
277 session.getAttribute(StreamWriteFilter.WRITE_REQUEST_QUEUE);
278 mockSession.setReturnValue(queue);
279
280
281
282
283 mockNextFilter.replay();
284 mockSession.replay();
285
286 WriteRequest wr = new WriteRequest(new Object(), new DummyWriteFuture());
287 filter.filterWrite(nextFilter, session, wr);
288 assertEquals(1, queue.size());
289 assertSame(wr, queue.pop());
290
291
292
293
294 mockNextFilter.verify();
295 mockSession.verify();
296 }
297
298 public void testWritesWriteRequestQueueWhenFinished() throws Exception {
299 StreamWriteFilter filter = new StreamWriteFilter();
300
301 WriteRequest wrs[] = new WriteRequest[] {
302 new WriteRequest(new Object(), new DummyWriteFuture()),
303 new WriteRequest(new Object(), new DummyWriteFuture()),
304 new WriteRequest(new Object(), new DummyWriteFuture()) };
305 Queue queue = new Queue();
306 queue.push(wrs[0]);
307 queue.push(wrs[1]);
308 queue.push(wrs[2]);
309 InputStream stream = new ByteArrayInputStream(new byte[0]);
310
311
312
313
314 mockSession.reset();
315
316 session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
317 mockSession.setReturnValue(stream);
318 session.removeAttribute(StreamWriteFilter.CURRENT_STREAM);
319 mockSession.setReturnValue(stream);
320 session.removeAttribute(StreamWriteFilter.INITIAL_WRITE_FUTURE);
321 mockSession.setReturnValue(new DefaultWriteFuture(session));
322 session.removeAttribute(StreamWriteFilter.WRITE_REQUEST_QUEUE);
323 mockSession.setReturnValue(queue);
324
325 nextFilter.filterWrite(session, wrs[0]);
326 session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
327 mockSession.setReturnValue(null);
328 nextFilter.filterWrite(session, wrs[1]);
329 session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
330 mockSession.setReturnValue(null);
331 nextFilter.filterWrite(session, wrs[2]);
332 session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
333 mockSession.setReturnValue(null);
334
335 nextFilter.messageSent(session, stream);
336
337
338
339
340 mockNextFilter.replay();
341 mockSession.replay();
342
343 filter.messageSent(nextFilter, session, new Object());
344 assertEquals(0, queue.size());
345
346
347
348
349 mockNextFilter.verify();
350 mockSession.verify();
351 }
352
353
354
355
356
357 public void testSetWriteBufferSize() throws Exception {
358 StreamWriteFilter filter = new StreamWriteFilter();
359
360 try {
361 filter.setWriteBufferSize(0);
362 fail("0 writeBuferSize specified. IllegalArgumentException expected.");
363 } catch (IllegalArgumentException iae) {
364 }
365
366 try {
367 filter.setWriteBufferSize(-100);
368 fail("Negative writeBuferSize specified. IllegalArgumentException expected.");
369 } catch (IllegalArgumentException iae) {
370 }
371
372 filter.setWriteBufferSize(1);
373 assertEquals(1, filter.getWriteBufferSize());
374 filter.setWriteBufferSize(1024);
375 assertEquals(1024, filter.getWriteBufferSize());
376 }
377
378 public void testWriteUsingSocketTransport() throws Exception {
379 IoAcceptor acceptor = new SocketAcceptor();
380 ((SocketAcceptorConfig) acceptor.getDefaultConfig())
381 .setReuseAddress(true);
382 SocketAddress address = new InetSocketAddress("localhost",
383 AvailablePortFinder.getNextAvailable());
384
385 IoConnector connector = new SocketConnector();
386
387 FixedRandomInputStream stream = new FixedRandomInputStream(
388 4 * 1024 * 1024);
389
390 SenderHandler sender = new SenderHandler(stream);
391 ReceiverHandler receiver = new ReceiverHandler(stream.size);
392
393 acceptor.bind(address, sender);
394
395 synchronized (sender.lock) {
396 synchronized (receiver.lock) {
397 connector.connect(address, receiver);
398
399 sender.lock.wait();
400 receiver.lock.wait();
401 }
402 }
403
404 acceptor.unbind(address);
405
406 assertEquals(stream.bytesRead, receiver.bytesRead);
407 assertEquals(stream.size, receiver.bytesRead);
408 byte[] expectedMd5 = stream.digest.digest();
409 byte[] actualMd5 = receiver.digest.digest();
410 assertEquals(expectedMd5.length, actualMd5.length);
411 for (int i = 0; i < expectedMd5.length; i++) {
412 assertEquals(expectedMd5[i], actualMd5[i]);
413 }
414 }
415
416 private static class FixedRandomInputStream extends InputStream {
417 long size;
418
419 long bytesRead = 0;
420
421 Random random = new Random();
422
423 MessageDigest digest;
424
425 public FixedRandomInputStream(long size) throws Exception {
426 this.size = size;
427 digest = MessageDigest.getInstance("MD5");
428 }
429
430 public int read() throws IOException {
431 if (isAllWritten())
432 return -1;
433 bytesRead++;
434 byte b = (byte) random.nextInt(255);
435 digest.update(b);
436 return b;
437 }
438
439 public long getBytesRead() {
440 return bytesRead;
441 }
442
443 public long getSize() {
444 return size;
445 }
446
447 public boolean isAllWritten() {
448 return bytesRead >= size;
449 }
450 }
451
452 private static class SenderHandler extends IoHandlerAdapter {
453 Object lock = new Object();
454
455 InputStream inputStream;
456
457 StreamWriteFilter streamWriteFilter = new StreamWriteFilter();
458
459 public SenderHandler(InputStream inputStream) {
460 this.inputStream = inputStream;
461 }
462
463 public void sessionCreated(IoSession session) throws Exception {
464 super.sessionCreated(session);
465 session.getFilterChain().addLast("codec", streamWriteFilter);
466 }
467
468 public void sessionOpened(IoSession session) throws Exception {
469 session.write(inputStream);
470 }
471
472 public void exceptionCaught(IoSession session, Throwable cause)
473 throws Exception {
474 synchronized (lock) {
475 lock.notifyAll();
476 }
477 }
478
479 public void sessionClosed(IoSession session) throws Exception {
480 synchronized (lock) {
481 lock.notifyAll();
482 }
483 }
484
485 public void sessionIdle(IoSession session, IdleStatus status)
486 throws Exception {
487 synchronized (lock) {
488 lock.notifyAll();
489 }
490 }
491
492 public void messageSent(IoSession session, Object message)
493 throws Exception {
494 if (message == inputStream) {
495 synchronized (lock) {
496 lock.notifyAll();
497 }
498 }
499 }
500 }
501
502 private static class ReceiverHandler extends IoHandlerAdapter {
503 Object lock = new Object();
504
505 long bytesRead = 0;
506
507 long size = 0;
508
509 MessageDigest digest;
510
511 public ReceiverHandler(long size) throws Exception {
512 this.size = size;
513 digest = MessageDigest.getInstance("MD5");
514 }
515
516 public void sessionCreated(IoSession session) throws Exception {
517 super.sessionCreated(session);
518
519 session.setIdleTime(IdleStatus.READER_IDLE, 5);
520 }
521
522 public void sessionIdle(IoSession session, IdleStatus status)
523 throws Exception {
524 session.close();
525 }
526
527 public void exceptionCaught(IoSession session, Throwable cause)
528 throws Exception {
529 synchronized (lock) {
530 lock.notifyAll();
531 }
532 }
533
534 public void sessionClosed(IoSession session) throws Exception {
535 synchronized (lock) {
536 lock.notifyAll();
537 }
538 }
539
540 public void messageReceived(IoSession session, Object message)
541 throws Exception {
542 ByteBuffer buf = (ByteBuffer) message;
543 while (buf.hasRemaining()) {
544 digest.update(buf.get());
545 bytesRead++;
546 }
547 if (bytesRead >= size) {
548 session.close();
549 }
550 }
551 }
552
553 public static class WriteRequestMatcher extends AbstractMatcher {
554 protected boolean argumentMatches(Object expected, Object actual) {
555 if (expected instanceof WriteRequest
556 && expected instanceof WriteRequest) {
557 WriteRequest w1 = (WriteRequest) expected;
558 WriteRequest w2 = (WriteRequest) actual;
559
560 return w1.getMessage().equals(w2.getMessage())
561 && w1.getFuture().isWritten() == w2.getFuture()
562 .isWritten();
563 }
564 return super.argumentMatches(expected, actual);
565 }
566 }
567
568 private static class DummyWriteFuture implements WriteFuture {
569 private boolean written;
570
571 public boolean isWritten() {
572 return written;
573 }
574
575 public void setWritten(boolean written) {
576 this.written = written;
577 }
578
579 public IoSession getSession() {
580 return null;
581 }
582
583 public Object getLock() {
584 return this;
585 }
586
587 public void join() {
588 }
589
590 public boolean join(long timeoutInMillis) {
591 return true;
592 }
593
594 public boolean isReady() {
595 return true;
596 }
597
598 public void addListener(IoFutureListener listener) {
599 }
600
601 public void removeListener(IoFutureListener listener) {
602 }
603 }
604 }