1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.filter;
21
22 import java.io.ByteArrayInputStream;
23 import java.io.IOException;
24 import java.io.InputStream;
25 import java.net.InetSocketAddress;
26 import java.net.SocketAddress;
27 import java.security.MessageDigest;
28 import java.util.LinkedList;
29 import java.util.Queue;
30 import java.util.Random;
31
32 import junit.framework.TestCase;
33 import org.apache.mina.common.ByteBuffer;
34 import org.apache.mina.common.IdleStatus;
35 import org.apache.mina.common.IoAcceptor;
36 import org.apache.mina.common.IoConnector;
37 import org.apache.mina.common.IoFilter.NextFilter;
38 import org.apache.mina.common.IoFilter.WriteRequest;
39 import org.apache.mina.common.IoFutureListener;
40 import org.apache.mina.common.IoHandlerAdapter;
41 import org.apache.mina.common.IoSession;
42 import org.apache.mina.common.WriteFuture;
43 import org.apache.mina.common.support.DefaultWriteFuture;
44 import org.apache.mina.transport.socket.nio.SocketAcceptor;
45 import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
46 import org.apache.mina.transport.socket.nio.SocketConnector;
47 import org.apache.mina.util.AvailablePortFinder;
48 import org.easymock.AbstractMatcher;
49 import org.easymock.MockControl;
50
51
52
53
54
55
56
57 public class StreamWriteFilterTest extends TestCase {
58 MockControl mockSession;
59
60 MockControl mockNextFilter;
61
62 IoSession session;
63
64 NextFilter nextFilter;
65
66 @Override
67 protected void setUp() throws Exception {
68 super.setUp();
69
70
71
72
73 mockSession = MockControl.createControl(IoSession.class);
74 mockNextFilter = MockControl.createControl(NextFilter.class);
75 session = (IoSession) mockSession.getMock();
76 nextFilter = (NextFilter) mockNextFilter.getMock();
77
78 session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
79 mockSession.setReturnValue(null);
80 }
81
82 public void testWriteEmptyStream() throws Exception {
83 StreamWriteFilter filter = new StreamWriteFilter();
84
85 InputStream stream = new ByteArrayInputStream(new byte[0]);
86 WriteRequest writeRequest = new WriteRequest(stream,
87 new DummyWriteFuture());
88
89
90
91
92 nextFilter.messageSent(session, stream);
93
94
95
96
97 mockNextFilter.replay();
98 mockSession.replay();
99
100 filter.filterWrite(nextFilter, session, writeRequest);
101
102
103
104
105 mockNextFilter.verify();
106 mockSession.verify();
107
108 assertTrue(writeRequest.getFuture().isWritten());
109 }
110
111
112
113
114
115 public void testWriteNonStreamMessage() throws Exception {
116 StreamWriteFilter filter = new StreamWriteFilter();
117
118 Object message = new Object();
119 WriteRequest writeRequest = new WriteRequest(message,
120 new DummyWriteFuture());
121
122
123
124
125 nextFilter.filterWrite(session, writeRequest);
126 session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
127 mockSession.setReturnValue(null);
128 nextFilter.messageSent(session, message);
129
130
131
132
133 mockNextFilter.replay();
134 mockSession.replay();
135
136 filter.filterWrite(nextFilter, session, writeRequest);
137 filter.messageSent(nextFilter, session, message);
138
139
140
141
142 mockNextFilter.verify();
143 mockSession.verify();
144 }
145
146
147
148
149 public void testWriteSingleBufferStream() throws Exception {
150 StreamWriteFilter filter = new StreamWriteFilter();
151
152 byte[] data = new byte[] { 1, 2, 3, 4 };
153
154 InputStream stream = new ByteArrayInputStream(data);
155 WriteRequest writeRequest = new WriteRequest(stream,
156 new DummyWriteFuture());
157
158
159
160
161 session.setAttribute(StreamWriteFilter.CURRENT_STREAM, stream);
162 mockSession.setReturnValue(null);
163 session.setAttribute(StreamWriteFilter.INITIAL_WRITE_FUTURE,
164 writeRequest.getFuture());
165 mockSession.setReturnValue(null);
166 nextFilter
167 .filterWrite(session, new WriteRequest(ByteBuffer.wrap(data)));
168 mockNextFilter.setMatcher(new WriteRequestMatcher());
169
170 session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
171 mockSession.setReturnValue(stream);
172 session.removeAttribute(StreamWriteFilter.CURRENT_STREAM);
173 mockSession.setReturnValue(stream);
174 session.removeAttribute(StreamWriteFilter.INITIAL_WRITE_FUTURE);
175 mockSession.setReturnValue(writeRequest.getFuture());
176 session.removeAttribute(StreamWriteFilter.WRITE_REQUEST_QUEUE);
177 mockSession.setReturnValue(null);
178 nextFilter.messageSent(session, stream);
179
180
181
182
183 mockNextFilter.replay();
184 mockSession.replay();
185
186 filter.filterWrite(nextFilter, session, writeRequest);
187 filter.messageSent(nextFilter, session, data);
188
189
190
191
192 mockNextFilter.verify();
193 mockSession.verify();
194
195 assertTrue(writeRequest.getFuture().isWritten());
196 }
197
198
199
200
201 public void testWriteSeveralBuffersStream() throws Exception {
202 StreamWriteFilter filter = new StreamWriteFilter();
203 filter.setWriteBufferSize(4);
204
205 byte[] data = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
206 byte[] chunk1 = new byte[] { 1, 2, 3, 4 };
207 byte[] chunk2 = new byte[] { 5, 6, 7, 8 };
208 byte[] chunk3 = new byte[] { 9, 10 };
209
210 InputStream stream = new ByteArrayInputStream(data);
211 WriteRequest writeRequest = new WriteRequest(stream,
212 new DummyWriteFuture());
213
214
215
216
217 session.setAttribute(StreamWriteFilter.CURRENT_STREAM, stream);
218 mockSession.setReturnValue(null);
219 session.setAttribute(StreamWriteFilter.INITIAL_WRITE_FUTURE,
220 writeRequest.getFuture());
221 mockSession.setReturnValue(null);
222 nextFilter.filterWrite(session, new WriteRequest(ByteBuffer
223 .wrap(chunk1)));
224 mockNextFilter.setMatcher(new WriteRequestMatcher());
225
226 session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
227 mockSession.setReturnValue(stream);
228 nextFilter.filterWrite(session, new WriteRequest(ByteBuffer
229 .wrap(chunk2)));
230
231 session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
232 mockSession.setReturnValue(stream);
233 nextFilter.filterWrite(session, new WriteRequest(ByteBuffer
234 .wrap(chunk3)));
235
236 session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
237 mockSession.setReturnValue(stream);
238 session.removeAttribute(StreamWriteFilter.CURRENT_STREAM);
239 mockSession.setReturnValue(stream);
240 session.removeAttribute(StreamWriteFilter.INITIAL_WRITE_FUTURE);
241 mockSession.setReturnValue(writeRequest.getFuture());
242 session.removeAttribute(StreamWriteFilter.WRITE_REQUEST_QUEUE);
243 mockSession.setReturnValue(null);
244 nextFilter.messageSent(session, stream);
245
246
247
248
249 mockNextFilter.replay();
250 mockSession.replay();
251
252 filter.filterWrite(nextFilter, session, writeRequest);
253 filter.messageSent(nextFilter, session, chunk1);
254 filter.messageSent(nextFilter, session, chunk2);
255 filter.messageSent(nextFilter, session, chunk3);
256
257
258
259
260 mockNextFilter.verify();
261 mockSession.verify();
262
263 assertTrue(writeRequest.getFuture().isWritten());
264 }
265
266 public void testWriteWhileWriteInProgress() throws Exception {
267 StreamWriteFilter filter = new StreamWriteFilter();
268
269 Queue<? extends Object> queue = new LinkedList<Object>();
270 InputStream stream = new ByteArrayInputStream(new byte[5]);
271
272
273
274
275 mockSession.reset();
276 session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
277 mockSession.setReturnValue(stream);
278 session.getAttribute(StreamWriteFilter.WRITE_REQUEST_QUEUE);
279 mockSession.setReturnValue(queue);
280
281
282
283
284 mockNextFilter.replay();
285 mockSession.replay();
286
287 WriteRequest wr = new WriteRequest(new Object(), new DummyWriteFuture());
288 filter.filterWrite(nextFilter, session, wr);
289 assertEquals(1, queue.size());
290 assertSame(wr, queue.poll());
291
292
293
294
295 mockNextFilter.verify();
296 mockSession.verify();
297 }
298
299 public void testWritesWriteRequestQueueWhenFinished() throws Exception {
300 StreamWriteFilter filter = new StreamWriteFilter();
301
302 WriteRequest wrs[] = new WriteRequest[] {
303 new WriteRequest(new Object(), new DummyWriteFuture()),
304 new WriteRequest(new Object(), new DummyWriteFuture()),
305 new WriteRequest(new Object(), new DummyWriteFuture()) };
306 Queue<WriteRequest> queue = new LinkedList<WriteRequest>();
307 queue.add(wrs[0]);
308 queue.add(wrs[1]);
309 queue.add(wrs[2]);
310 InputStream stream = new ByteArrayInputStream(new byte[0]);
311
312
313
314
315 mockSession.reset();
316
317 session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
318 mockSession.setReturnValue(stream);
319 session.removeAttribute(StreamWriteFilter.CURRENT_STREAM);
320 mockSession.setReturnValue(stream);
321 session.removeAttribute(StreamWriteFilter.INITIAL_WRITE_FUTURE);
322 mockSession.setReturnValue(new DefaultWriteFuture(session));
323 session.removeAttribute(StreamWriteFilter.WRITE_REQUEST_QUEUE);
324 mockSession.setReturnValue(queue);
325
326 nextFilter.filterWrite(session, wrs[0]);
327 session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
328 mockSession.setReturnValue(null);
329 nextFilter.filterWrite(session, wrs[1]);
330 session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
331 mockSession.setReturnValue(null);
332 nextFilter.filterWrite(session, wrs[2]);
333 session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
334 mockSession.setReturnValue(null);
335
336 nextFilter.messageSent(session, stream);
337
338
339
340
341 mockNextFilter.replay();
342 mockSession.replay();
343
344 filter.messageSent(nextFilter, session, new Object());
345 assertEquals(0, queue.size());
346
347
348
349
350 mockNextFilter.verify();
351 mockSession.verify();
352 }
353
354
355
356
357
358 public void testSetWriteBufferSize() throws Exception {
359 StreamWriteFilter filter = new StreamWriteFilter();
360
361 try {
362 filter.setWriteBufferSize(0);
363 fail("0 writeBuferSize specified. IllegalArgumentException expected.");
364 } catch (IllegalArgumentException iae) {
365 }
366
367 try {
368 filter.setWriteBufferSize(-100);
369 fail("Negative writeBuferSize specified. IllegalArgumentException expected.");
370 } catch (IllegalArgumentException iae) {
371 }
372
373 filter.setWriteBufferSize(1);
374 assertEquals(1, filter.getWriteBufferSize());
375 filter.setWriteBufferSize(1024);
376 assertEquals(1024, filter.getWriteBufferSize());
377 }
378
379 public void testWriteUsingSocketTransport() throws Exception {
380 IoAcceptor acceptor = new SocketAcceptor();
381 ((SocketAcceptorConfig) acceptor.getDefaultConfig())
382 .setReuseAddress(true);
383 SocketAddress address = new InetSocketAddress("localhost",
384 AvailablePortFinder.getNextAvailable());
385
386 IoConnector connector = new SocketConnector();
387
388 FixedRandomInputStream stream = new FixedRandomInputStream(
389 4 * 1024 * 1024);
390
391 SenderHandler sender = new SenderHandler(stream);
392 ReceiverHandler receiver = new ReceiverHandler(stream.size);
393
394 acceptor.bind(address, sender);
395
396 synchronized (sender.lock) {
397 synchronized (receiver.lock) {
398 connector.connect(address, receiver);
399
400 sender.lock.wait();
401 receiver.lock.wait();
402 }
403 }
404
405 acceptor.unbind(address);
406
407 assertEquals(stream.bytesRead, receiver.bytesRead);
408 assertEquals(stream.size, receiver.bytesRead);
409 byte[] expectedMd5 = stream.digest.digest();
410 byte[] actualMd5 = receiver.digest.digest();
411 assertEquals(expectedMd5.length, actualMd5.length);
412 for (int i = 0; i < expectedMd5.length; i++) {
413 assertEquals(expectedMd5[i], actualMd5[i]);
414 }
415 }
416
417 private static class FixedRandomInputStream extends InputStream {
418 long size;
419
420 long bytesRead = 0;
421
422 Random random = new Random();
423
424 MessageDigest digest;
425
426 FixedRandomInputStream(long size) throws Exception {
427 this.size = size;
428 digest = MessageDigest.getInstance("MD5");
429 }
430
431 @Override
432 public int read() throws IOException {
433 if (isAllWritten())
434 return -1;
435 bytesRead++;
436 byte b = (byte) random.nextInt(255);
437 digest.update(b);
438 return b;
439 }
440
441 public long getBytesRead() {
442 return bytesRead;
443 }
444
445 public long getSize() {
446 return size;
447 }
448
449 public boolean isAllWritten() {
450 return bytesRead >= size;
451 }
452 }
453
454 private static class SenderHandler extends IoHandlerAdapter {
455 final Object lock = new Object();
456
457 InputStream inputStream;
458
459 StreamWriteFilter streamWriteFilter = new StreamWriteFilter();
460
461 SenderHandler(InputStream inputStream) {
462 this.inputStream = inputStream;
463 }
464
465 @Override
466 public void sessionCreated(IoSession session) throws Exception {
467 super.sessionCreated(session);
468 session.getFilterChain().addLast("codec", streamWriteFilter);
469 }
470
471 @Override
472 public void sessionOpened(IoSession session) throws Exception {
473 session.write(inputStream);
474 }
475
476 @Override
477 public void exceptionCaught(IoSession session, Throwable cause)
478 throws Exception {
479 synchronized (lock) {
480 lock.notifyAll();
481 }
482 }
483
484 @Override
485 public void sessionClosed(IoSession session) throws Exception {
486 synchronized (lock) {
487 lock.notifyAll();
488 }
489 }
490
491 @Override
492 public void sessionIdle(IoSession session, IdleStatus status)
493 throws Exception {
494 synchronized (lock) {
495 lock.notifyAll();
496 }
497 }
498
499 @Override
500 public void messageSent(IoSession session, Object message)
501 throws Exception {
502 if (message == inputStream) {
503 synchronized (lock) {
504 lock.notifyAll();
505 }
506 }
507 }
508 }
509
510 private static class ReceiverHandler extends IoHandlerAdapter {
511 final Object lock = new Object();
512
513 long bytesRead = 0;
514
515 long size = 0;
516
517 MessageDigest digest;
518
519 ReceiverHandler(long size) throws Exception {
520 this.size = size;
521 digest = MessageDigest.getInstance("MD5");
522 }
523
524 @Override
525 public void sessionCreated(IoSession session) throws Exception {
526 super.sessionCreated(session);
527
528 session.setIdleTime(IdleStatus.READER_IDLE, 5);
529 }
530
531 @Override
532 public void sessionIdle(IoSession session, IdleStatus status)
533 throws Exception {
534 session.close();
535 }
536
537 @Override
538 public void exceptionCaught(IoSession session, Throwable cause)
539 throws Exception {
540 synchronized (lock) {
541 lock.notifyAll();
542 }
543 }
544
545 @Override
546 public void sessionClosed(IoSession session) throws Exception {
547 synchronized (lock) {
548 lock.notifyAll();
549 }
550 }
551
552 @Override
553 public void messageReceived(IoSession session, Object message)
554 throws Exception {
555 ByteBuffer buf = (ByteBuffer) message;
556 while (buf.hasRemaining()) {
557 digest.update(buf.get());
558 bytesRead++;
559 }
560 if (bytesRead >= size) {
561 session.close();
562 }
563 }
564 }
565
566 public static class WriteRequestMatcher extends AbstractMatcher {
567 @Override
568 protected boolean argumentMatches(Object expected, Object actual) {
569 if (expected instanceof WriteRequest
570 && actual instanceof WriteRequest) {
571 WriteRequest w1 = (WriteRequest) expected;
572 WriteRequest w2 = (WriteRequest) actual;
573
574 return w1.getMessage().equals(w2.getMessage())
575 && w1.getFuture().isWritten() == w2.getFuture()
576 .isWritten();
577 }
578 return super.argumentMatches(expected, actual);
579 }
580 }
581
582 private static class DummyWriteFuture implements WriteFuture {
583 private boolean written;
584
585 public boolean isWritten() {
586 return written;
587 }
588
589 public void setWritten(boolean written) {
590 this.written = written;
591 }
592
593 public IoSession getSession() {
594 return null;
595 }
596
597 public Object getLock() {
598 return this;
599 }
600
601 public void join() {
602 }
603
604 public boolean join(long timeoutInMillis) {
605 return true;
606 }
607
608 public boolean isReady() {
609 return true;
610 }
611
612 public void addListener(IoFutureListener listener) {
613 }
614
615 public void removeListener(IoFutureListener listener) {
616 }
617 }
618 }