1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.filter.stream;
21
22 import java.net.InetSocketAddress;
23 import java.net.SocketAddress;
24 import java.security.MessageDigest;
25 import java.util.LinkedList;
26 import java.util.Queue;
27 import java.util.Random;
28 import java.util.concurrent.CountDownLatch;
29 import java.util.concurrent.TimeUnit;
30
31 import junit.framework.TestCase;
32 import junit.framework.Assert;
33
34 import org.apache.mina.core.buffer.IoBuffer;
35 import org.apache.mina.core.filterchain.IoFilter.NextFilter;
36 import org.apache.mina.core.future.IoFutureListener;
37 import org.apache.mina.core.future.WriteFuture;
38 import org.apache.mina.core.service.IoHandlerAdapter;
39 import org.apache.mina.core.session.DummySession;
40 import org.apache.mina.core.session.IdleStatus;
41 import org.apache.mina.core.session.IoSession;
42 import org.apache.mina.core.write.DefaultWriteRequest;
43 import org.apache.mina.core.write.WriteRequest;
44 import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
45 import org.apache.mina.transport.socket.nio.NioSocketConnector;
46 import org.apache.mina.util.AvailablePortFinder;
47 import org.easymock.IArgumentMatcher;
48 import org.easymock.classextension.EasyMock;
49
50
51
52
53
54
55 public abstract class AbstractStreamWriteFilterTest<M, U extends AbstractStreamWriteFilter<M>> extends TestCase {
56
57 protected final IoSession session = new DummySession();
58
59 abstract protected U createFilter();
60
61 abstract protected M createMessage(byte[] data) throws Exception;
62
63 public void testWriteEmptyFile() throws Exception {
64 AbstractStreamWriteFilter<M> filter = createFilter();
65 M message = createMessage(new byte[0]);
66
67 WriteRequest writeRequest = new DefaultWriteRequest(message,
68 new DummyWriteFuture());
69
70 NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
71
72
73
74 nextFilter.messageSent(session, writeRequest);
75
76
77
78
79 EasyMock.replay(nextFilter);
80
81 filter.filterWrite(nextFilter, session, writeRequest);
82
83
84
85
86 EasyMock.verify(nextFilter);
87
88 assertTrue(writeRequest.getFuture().isWritten());
89 }
90
91
92
93
94
95
96
97 public void testWriteNonFileRegionMessage() throws Exception {
98 AbstractStreamWriteFilter<M> filter = createFilter();
99
100 Object message = new Object();
101 WriteRequest writeRequest = new DefaultWriteRequest(message,
102 new DummyWriteFuture());
103
104 NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
105
106
107
108 nextFilter.filterWrite(session, writeRequest);
109 nextFilter.messageSent(session, writeRequest);
110
111
112
113
114 EasyMock.replay(nextFilter);
115
116 filter.filterWrite(nextFilter, session, writeRequest);
117 filter.messageSent(nextFilter, session, writeRequest);
118
119
120
121
122 EasyMock.verify(nextFilter);
123 }
124
125
126
127
128
129
130 public void testWriteSingleBufferFile() throws Exception {
131 byte[] data = new byte[] { 1, 2, 3, 4 };
132
133 AbstractStreamWriteFilter<M> filter = createFilter();
134 M message = createMessage(data);
135
136 WriteRequest writeRequest = new DefaultWriteRequest(message,
137 new DummyWriteFuture());
138
139 NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
140
141
142
143 nextFilter.filterWrite(EasyMock.eq(session), eqWriteRequest(new DefaultWriteRequest(IoBuffer
144 .wrap(data))));
145 nextFilter.messageSent(session, writeRequest);
146
147
148
149
150 EasyMock.replay(nextFilter);
151
152 filter.filterWrite(nextFilter, session, writeRequest);
153 filter.messageSent(nextFilter, session, writeRequest);
154
155
156
157
158 EasyMock.verify(nextFilter);
159
160 assertTrue(writeRequest.getFuture().isWritten());
161 }
162
163
164
165
166
167
168 public void testWriteSeveralBuffersStream() throws Exception {
169 AbstractStreamWriteFilter<M> filter = createFilter();
170 filter.setWriteBufferSize(4);
171
172 byte[] data = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
173 byte[] chunk1 = new byte[] { 1, 2, 3, 4 };
174 byte[] chunk2 = new byte[] { 5, 6, 7, 8 };
175 byte[] chunk3 = new byte[] { 9, 10 };
176
177 M message = createMessage(data);
178 WriteRequest writeRequest = new DefaultWriteRequest(message,
179 new DummyWriteFuture());
180
181 WriteRequest chunk1Request = new DefaultWriteRequest(IoBuffer
182 .wrap(chunk1));
183 WriteRequest chunk2Request = new DefaultWriteRequest(IoBuffer
184 .wrap(chunk2));
185 WriteRequest chunk3Request = new DefaultWriteRequest(IoBuffer
186 .wrap(chunk3));
187
188 NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
189
190
191
192 nextFilter.filterWrite(EasyMock.eq(session), eqWriteRequest(chunk1Request));
193 nextFilter.filterWrite(EasyMock.eq(session), eqWriteRequest(chunk2Request));
194 nextFilter.filterWrite(EasyMock.eq(session), eqWriteRequest(chunk3Request));
195 nextFilter.messageSent(EasyMock.eq(session), eqWriteRequest(writeRequest));
196
197
198
199
200 EasyMock.replay(nextFilter);
201
202 filter.filterWrite(nextFilter, session, writeRequest);
203 filter.messageSent(nextFilter, session, chunk1Request);
204 filter.messageSent(nextFilter, session, chunk2Request);
205 filter.messageSent(nextFilter, session, chunk3Request);
206
207
208
209
210 EasyMock.verify(nextFilter);
211
212 assertTrue(writeRequest.getFuture().isWritten());
213 }
214
215 public void testWriteWhileWriteInProgress() throws Exception {
216 AbstractStreamWriteFilter<M> filter = createFilter();
217 M message = createMessage(new byte[5]);
218
219 Queue<WriteRequest> queue = new LinkedList<WriteRequest>();
220
221
222
223
224 session.setAttribute(filter.CURRENT_STREAM, message);
225 session.setAttribute(filter.WRITE_REQUEST_QUEUE, queue);
226
227 NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
228
229
230
231 EasyMock.replay(nextFilter);
232
233 WriteRequest wr = new DefaultWriteRequest(new Object(),
234 new DummyWriteFuture());
235 filter.filterWrite(nextFilter, session, wr);
236 assertEquals(1, queue.size());
237 assertSame(wr, queue.poll());
238
239
240
241
242 EasyMock.verify(nextFilter);
243
244 session.removeAttribute(filter.CURRENT_STREAM);
245 session.removeAttribute(filter.WRITE_REQUEST_QUEUE);
246 }
247
248 public void testWritesWriteRequestQueueWhenFinished() throws Exception {
249 AbstractStreamWriteFilter<M> filter = createFilter();
250 M message = createMessage(new byte[0]);
251
252 WriteRequest wrs[] = new WriteRequest[] {
253 new DefaultWriteRequest(new Object(), new DummyWriteFuture()),
254 new DefaultWriteRequest(new Object(), new DummyWriteFuture()),
255 new DefaultWriteRequest(new Object(), new DummyWriteFuture()) };
256 Queue<WriteRequest> queue = new LinkedList<WriteRequest>();
257 queue.add(wrs[0]);
258 queue.add(wrs[1]);
259 queue.add(wrs[2]);
260
261
262
263
264 session.setAttribute(filter.CURRENT_STREAM, message);
265 session.setAttribute(filter.CURRENT_WRITE_REQUEST,
266 new DefaultWriteRequest(message));
267 session.setAttribute(filter.WRITE_REQUEST_QUEUE, queue);
268
269
270
271
272 NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
273 nextFilter.filterWrite(session, wrs[0]);
274 nextFilter.filterWrite(session, wrs[1]);
275 nextFilter.filterWrite(session, wrs[2]);
276 nextFilter.messageSent(EasyMock.eq(session), eqWriteRequest(new DefaultWriteRequest(message)));
277
278
279
280
281 EasyMock.replay(nextFilter);
282
283 filter.messageSent(nextFilter, session, new DefaultWriteRequest(
284 new Object()));
285 assertEquals(0, queue.size());
286
287
288
289
290 EasyMock.verify(nextFilter);
291 }
292
293
294
295
296
297 public void testSetWriteBufferSize() {
298 AbstractStreamWriteFilter<M> filter = createFilter();
299
300 try {
301 filter.setWriteBufferSize(0);
302 fail("0 writeBuferSize specified. IllegalArgumentException expected.");
303 } catch (IllegalArgumentException iae) {
304
305
306 Assert.assertTrue(true);
307 }
308
309 try {
310 filter.setWriteBufferSize(-100);
311 fail("Negative writeBuferSize specified. IllegalArgumentException expected.");
312 } catch (IllegalArgumentException iae) {
313
314
315 Assert.assertTrue(true);
316 }
317
318 filter.setWriteBufferSize(1);
319 assertEquals(1, filter.getWriteBufferSize());
320 filter.setWriteBufferSize(1024);
321 assertEquals(1024, filter.getWriteBufferSize());
322 }
323
324 public void testWriteUsingSocketTransport() throws Exception {
325 NioSocketAcceptor acceptor = new NioSocketAcceptor();
326 acceptor.setReuseAddress(true);
327 SocketAddress address = new InetSocketAddress("localhost",
328 AvailablePortFinder.getNextAvailable());
329
330 NioSocketConnector connector = new NioSocketConnector();
331
332
333 byte[] data = new byte[4 * 1024 * 1024];
334 new Random().nextBytes(data);
335
336 byte[] expectedMd5 = MessageDigest.getInstance("MD5").digest(data);
337
338 M message = createMessage(data);
339
340 SenderHandler sender = new SenderHandler(message);
341 ReceiverHandler receiver = new ReceiverHandler(data.length);
342
343 acceptor.setHandler(sender);
344 connector.setHandler(receiver);
345
346 acceptor.bind(address);
347 connector.connect(address);
348 sender.latch.await();
349 receiver.latch.await();
350
351 acceptor.dispose();
352
353 assertEquals(data.length, receiver.bytesRead);
354 byte[] actualMd5 = receiver.digest.digest();
355 assertEquals(expectedMd5.length, actualMd5.length);
356 for (int i = 0; i < expectedMd5.length; i++) {
357 assertEquals(expectedMd5[i], actualMd5[i]);
358 }
359 }
360
361 private class SenderHandler extends IoHandlerAdapter {
362 final CountDownLatch latch = new CountDownLatch(1);
363
364 private M message;
365
366 StreamWriteFilter streamWriteFilter = new StreamWriteFilter();
367
368 SenderHandler(M message) {
369 this.message = message;
370 }
371
372 @Override
373 public void sessionCreated(IoSession session) throws Exception {
374 super.sessionCreated(session);
375 session.getFilterChain().addLast("codec", streamWriteFilter);
376 }
377
378 @Override
379 public void sessionOpened(IoSession session) throws Exception {
380 session.write(message);
381 }
382
383 @Override
384 public void exceptionCaught(IoSession session, Throwable cause)
385 throws Exception {
386 latch.countDown();
387 }
388
389 @Override
390 public void sessionClosed(IoSession session) throws Exception {
391 latch.countDown();
392 }
393
394 @Override
395 public void sessionIdle(IoSession session, IdleStatus status)
396 throws Exception {
397 latch.countDown();
398 }
399
400 @Override
401 public void messageSent(IoSession session, Object message)
402 throws Exception {
403 if (message == this.message) {
404 latch.countDown();
405 }
406 }
407 }
408
409 private static class ReceiverHandler extends IoHandlerAdapter {
410 final CountDownLatch latch = new CountDownLatch(1);
411
412 long bytesRead = 0;
413
414 long size = 0;
415
416 MessageDigest digest;
417
418 ReceiverHandler(long size) throws Exception {
419 this.size = size;
420 digest = MessageDigest.getInstance("MD5");
421 }
422
423 @Override
424 public void sessionCreated(IoSession session) throws Exception {
425 super.sessionCreated(session);
426
427 session.getConfig().setIdleTime(IdleStatus.READER_IDLE, 5);
428 }
429
430 @Override
431 public void sessionIdle(IoSession session, IdleStatus status)
432 throws Exception {
433 session.close(true);
434 }
435
436 @Override
437 public void exceptionCaught(IoSession session, Throwable cause)
438 throws Exception {
439 latch.countDown();
440 }
441
442 @Override
443 public void sessionClosed(IoSession session) throws Exception {
444 latch.countDown();
445 }
446
447 @Override
448 public void messageReceived(IoSession session, Object message)
449 throws Exception {
450 IoBuffer buf = (IoBuffer) message;
451 while (buf.hasRemaining()) {
452 digest.update(buf.get());
453 bytesRead++;
454 }
455 if (bytesRead >= size) {
456 session.close(true);
457 }
458 }
459 }
460
461 public static WriteRequest eqWriteRequest(WriteRequest expected) {
462 EasyMock.reportMatcher(new WriteRequestMatcher(expected));
463 return null;
464 }
465
466 private static class WriteRequestMatcher implements IArgumentMatcher {
467 private final WriteRequest expected;
468
469 public WriteRequestMatcher(WriteRequest expected) {
470 this.expected = expected;
471 }
472
473 public boolean matches(Object actual) {
474 if (actual instanceof WriteRequest) {
475 WriteRequest w2 = (WriteRequest) actual;
476
477 return expected.getMessage().equals(w2.getMessage())
478 && expected.getFuture().isWritten() == w2.getFuture()
479 .isWritten();
480 }
481 return false;
482 }
483 public void appendTo(StringBuffer buffer) {
484 buffer.append("Expected a WriteRequest with the message '").append(expected.getMessage()).append("'");
485 }
486 }
487
488 private static class DummyWriteFuture implements WriteFuture {
489 private boolean written;
490
491
492
493
494 public DummyWriteFuture() {
495 super();
496 }
497
498 public boolean isWritten() {
499 return written;
500 }
501
502 public void setWritten() {
503 this.written = true;
504 }
505
506 public IoSession getSession() {
507 return null;
508 }
509
510 public Object getLock() {
511 return this;
512 }
513
514 public void join() {
515
516 }
517
518 public boolean join(long timeoutInMillis) {
519 return true;
520 }
521
522 public boolean isDone() {
523 return true;
524 }
525
526 public WriteFuture addListener(IoFutureListener<?> listener) {
527 return this;
528 }
529
530 public WriteFuture removeListener(IoFutureListener<?> listener) {
531 return this;
532 }
533
534 public WriteFuture await() throws InterruptedException {
535 return this;
536 }
537
538 public boolean await(long timeout, TimeUnit unit)
539 throws InterruptedException {
540 return true;
541 }
542
543 public boolean await(long timeoutMillis) throws InterruptedException {
544 return true;
545 }
546
547 public WriteFuture awaitUninterruptibly() {
548 return this;
549 }
550
551 public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
552 return true;
553 }
554
555 public boolean awaitUninterruptibly(long timeoutMillis) {
556 return true;
557 }
558
559 public Throwable getException() {
560 return null;
561 }
562
563 public void setException(Throwable cause) {
564 throw new IllegalStateException();
565 }
566 }
567
568
569 }