1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.stream;
21  
22  import java.net.InetSocketAddress;
23  import java.net.SocketAddress;
24  import java.security.MessageDigest;
25  import java.util.LinkedList;
26  import java.util.Queue;
27  import java.util.Random;
28  import java.util.concurrent.CountDownLatch;
29  import java.util.concurrent.TimeUnit;
30  
31  import junit.framework.TestCase;
32  import junit.framework.Assert;
33  
34  import org.apache.mina.core.buffer.IoBuffer;
35  import org.apache.mina.core.filterchain.IoFilter.NextFilter;
36  import org.apache.mina.core.future.IoFutureListener;
37  import org.apache.mina.core.future.WriteFuture;
38  import org.apache.mina.core.service.IoHandlerAdapter;
39  import org.apache.mina.core.session.DummySession;
40  import org.apache.mina.core.session.IdleStatus;
41  import org.apache.mina.core.session.IoSession;
42  import org.apache.mina.core.write.DefaultWriteRequest;
43  import org.apache.mina.core.write.WriteRequest;
44  import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
45  import org.apache.mina.transport.socket.nio.NioSocketConnector;
46  import org.apache.mina.util.AvailablePortFinder;
47  import org.easymock.IArgumentMatcher;
48  import org.easymock.classextension.EasyMock;
49  
50  /**
51   * TODO Add documentation
52   * 
53   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
54   */
55  public abstract class AbstractStreamWriteFilterTest<M, U extends AbstractStreamWriteFilter<M>> extends TestCase {
56  
57      protected final IoSession session = new DummySession();
58  
59      abstract protected U createFilter();
60      
61      abstract protected M createMessage(byte[] data) throws Exception;
62      
63      public void testWriteEmptyFile() throws Exception {
64          AbstractStreamWriteFilter<M> filter = createFilter();
65          M message = createMessage(new byte[0]);
66  
67          WriteRequest writeRequest = new DefaultWriteRequest(message,
68                  new DummyWriteFuture());
69  
70          NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
71         /*
72           * Record expectations
73           */
74          nextFilter.messageSent(session, writeRequest);
75  
76          /*
77           * Replay.
78           */
79          EasyMock.replay(nextFilter);
80  
81          filter.filterWrite(nextFilter, session, writeRequest);
82  
83          /*
84           * Verify.
85           */
86          EasyMock.verify(nextFilter);
87  
88          assertTrue(writeRequest.getFuture().isWritten());
89      }
90  
91      /**
92       * Tests that the filter just passes objects which aren't FileRegion's
93       * through to the next filter.
94       *
95       * @throws Exception when something goes wrong
96       */
97      public void testWriteNonFileRegionMessage() throws Exception {
98          AbstractStreamWriteFilter<M> filter = createFilter();
99  
100         Object message = new Object();
101         WriteRequest writeRequest = new DefaultWriteRequest(message,
102                 new DummyWriteFuture());
103 
104         NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
105         /*
106          * Record expectations
107          */
108         nextFilter.filterWrite(session, writeRequest);
109         nextFilter.messageSent(session, writeRequest);
110 
111         /*
112          * Replay.
113          */
114         EasyMock.replay(nextFilter);
115 
116         filter.filterWrite(nextFilter, session, writeRequest);
117         filter.messageSent(nextFilter, session, writeRequest);
118 
119         /*
120          * Verify.
121          */
122         EasyMock.verify(nextFilter);
123     }
124 
125     /**
126      * Tests when the contents of the file fits into one write buffer.
127      *
128      * @throws Exception when something goes wrong
129      */
130     public void testWriteSingleBufferFile() throws Exception {
131         byte[] data = new byte[] { 1, 2, 3, 4 };
132 
133         AbstractStreamWriteFilter<M> filter = createFilter();
134         M message = createMessage(data);
135 
136         WriteRequest writeRequest = new DefaultWriteRequest(message,
137                 new DummyWriteFuture());
138 
139         NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
140         /*
141          * Record expectations
142          */
143         nextFilter.filterWrite(EasyMock.eq(session), eqWriteRequest(new DefaultWriteRequest(IoBuffer
144                 .wrap(data))));
145         nextFilter.messageSent(session, writeRequest);
146 
147         /*
148          * Replay.
149          */
150         EasyMock.replay(nextFilter);
151 
152         filter.filterWrite(nextFilter, session, writeRequest);
153         filter.messageSent(nextFilter, session, writeRequest);
154 
155         /*
156          * Verify.
157          */
158         EasyMock.verify(nextFilter);
159 
160         assertTrue(writeRequest.getFuture().isWritten());
161     }
162 
163     /**
164      * Tests when the contents of the file doesn't fit into one write buffer.
165      *
166      * @throws Exception when something goes wrong
167      */
168     public void testWriteSeveralBuffersStream() throws Exception {
169         AbstractStreamWriteFilter<M> filter = createFilter();
170         filter.setWriteBufferSize(4);
171 
172         byte[] data = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
173         byte[] chunk1 = new byte[] { 1, 2, 3, 4 };
174         byte[] chunk2 = new byte[] { 5, 6, 7, 8 };
175         byte[] chunk3 = new byte[] { 9, 10 };
176 
177         M message = createMessage(data);
178         WriteRequest writeRequest = new DefaultWriteRequest(message,
179                 new DummyWriteFuture());
180 
181         WriteRequest chunk1Request = new DefaultWriteRequest(IoBuffer
182                 .wrap(chunk1));
183         WriteRequest chunk2Request = new DefaultWriteRequest(IoBuffer
184                 .wrap(chunk2));
185         WriteRequest chunk3Request = new DefaultWriteRequest(IoBuffer
186                 .wrap(chunk3));
187 
188         NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
189         /*
190          * Record expectations
191          */
192         nextFilter.filterWrite(EasyMock.eq(session), eqWriteRequest(chunk1Request));
193         nextFilter.filterWrite(EasyMock.eq(session), eqWriteRequest(chunk2Request));
194         nextFilter.filterWrite(EasyMock.eq(session), eqWriteRequest(chunk3Request));
195         nextFilter.messageSent(EasyMock.eq(session), eqWriteRequest(writeRequest));
196 
197         /*
198          * Replay.
199          */
200         EasyMock.replay(nextFilter);
201 
202         filter.filterWrite(nextFilter, session, writeRequest);
203         filter.messageSent(nextFilter, session, chunk1Request);
204         filter.messageSent(nextFilter, session, chunk2Request);
205         filter.messageSent(nextFilter, session, chunk3Request);
206 
207         /*
208          * Verify.
209          */
210         EasyMock.verify(nextFilter);
211 
212         assertTrue(writeRequest.getFuture().isWritten());
213     }
214 
215     public void testWriteWhileWriteInProgress() throws Exception {
216         AbstractStreamWriteFilter<M> filter = createFilter();
217         M message = createMessage(new byte[5]);
218 
219         Queue<WriteRequest> queue = new LinkedList<WriteRequest>();
220 
221         /*
222          * Make up the situation.
223          */
224         session.setAttribute(filter.CURRENT_STREAM, message);
225         session.setAttribute(filter.WRITE_REQUEST_QUEUE, queue);
226 
227         NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
228         /*
229          * Replay.  (We recorded *nothing* because nothing should occur.)
230          */
231         EasyMock.replay(nextFilter);
232 
233         WriteRequest wr = new DefaultWriteRequest(new Object(),
234                 new DummyWriteFuture());
235         filter.filterWrite(nextFilter, session, wr);
236         assertEquals(1, queue.size());
237         assertSame(wr, queue.poll());
238 
239         /*
240          * Verify.
241          */
242         EasyMock.verify(nextFilter);
243 
244         session.removeAttribute(filter.CURRENT_STREAM);
245         session.removeAttribute(filter.WRITE_REQUEST_QUEUE);
246     }
247 
248     public void testWritesWriteRequestQueueWhenFinished() throws Exception {
249         AbstractStreamWriteFilter<M> filter = createFilter();
250         M message = createMessage(new byte[0]);
251         
252         WriteRequest wrs[] = new WriteRequest[] {
253                 new DefaultWriteRequest(new Object(), new DummyWriteFuture()),
254                 new DefaultWriteRequest(new Object(), new DummyWriteFuture()),
255                 new DefaultWriteRequest(new Object(), new DummyWriteFuture()) };
256         Queue<WriteRequest> queue = new LinkedList<WriteRequest>();
257         queue.add(wrs[0]);
258         queue.add(wrs[1]);
259         queue.add(wrs[2]);
260 
261         /*
262          * Make up the situation.
263          */
264         session.setAttribute(filter.CURRENT_STREAM, message);
265         session.setAttribute(filter.CURRENT_WRITE_REQUEST,
266                 new DefaultWriteRequest(message));
267         session.setAttribute(filter.WRITE_REQUEST_QUEUE, queue);
268 
269         /*
270          * Record expectations
271          */
272         NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
273         nextFilter.filterWrite(session, wrs[0]);
274         nextFilter.filterWrite(session, wrs[1]);
275         nextFilter.filterWrite(session, wrs[2]);
276         nextFilter.messageSent(EasyMock.eq(session), eqWriteRequest(new DefaultWriteRequest(message)));
277 
278         /*
279          * Replay.
280          */
281         EasyMock.replay(nextFilter);
282 
283         filter.messageSent(nextFilter, session, new DefaultWriteRequest(
284                 new Object()));
285         assertEquals(0, queue.size());
286 
287         /*
288          * Verify.
289          */
290         EasyMock.verify(nextFilter);
291     }
292 
293     /**
294      * Tests that {@link StreamWriteFilter#setWriteBufferSize(int)} checks the
295      * specified size.
296      */
297     public void testSetWriteBufferSize() {
298         AbstractStreamWriteFilter<M> filter = createFilter();
299 
300         try {
301             filter.setWriteBufferSize(0);
302             fail("0 writeBuferSize specified. IllegalArgumentException expected.");
303         } catch (IllegalArgumentException iae) {
304             // Pass, exception was thrown
305             // Signifies a successful test execution
306             Assert.assertTrue(true);
307         }
308 
309         try {
310             filter.setWriteBufferSize(-100);
311             fail("Negative writeBuferSize specified. IllegalArgumentException expected.");
312         } catch (IllegalArgumentException iae) {
313             // Pass, exception was thrown
314             // Signifies a successful test execution
315             Assert.assertTrue(true);
316         }
317 
318         filter.setWriteBufferSize(1);
319         assertEquals(1, filter.getWriteBufferSize());
320         filter.setWriteBufferSize(1024);
321         assertEquals(1024, filter.getWriteBufferSize());
322     }
323 
324     public void testWriteUsingSocketTransport() throws Exception {
325         NioSocketAcceptor acceptor = new NioSocketAcceptor();
326         acceptor.setReuseAddress(true);
327         SocketAddress address = new InetSocketAddress("localhost",
328                 AvailablePortFinder.getNextAvailable());
329 
330         NioSocketConnector connector = new NioSocketConnector();
331 
332         // Generate 4MB of random data
333         byte[] data = new byte[4 * 1024 * 1024];
334         new Random().nextBytes(data);
335 
336         byte[] expectedMd5 = MessageDigest.getInstance("MD5").digest(data);
337 
338         M message = createMessage(data);
339         
340         SenderHandler sender = new SenderHandler(message);
341         ReceiverHandler receiver = new ReceiverHandler(data.length);
342 
343         acceptor.setHandler(sender);
344         connector.setHandler(receiver);
345         
346         acceptor.bind(address);
347         connector.connect(address);
348         sender.latch.await();
349         receiver.latch.await();
350 
351         acceptor.dispose();
352 
353         assertEquals(data.length, receiver.bytesRead);
354         byte[] actualMd5 = receiver.digest.digest();
355         assertEquals(expectedMd5.length, actualMd5.length);
356         for (int i = 0; i < expectedMd5.length; i++) {
357             assertEquals(expectedMd5[i], actualMd5[i]);
358         }
359     }
360 
361     private class SenderHandler extends IoHandlerAdapter {
362         final CountDownLatch latch = new CountDownLatch(1);
363 
364         private M message;
365 
366         StreamWriteFilter streamWriteFilter = new StreamWriteFilter();
367 
368         SenderHandler(M message) {
369             this.message = message;
370         }
371 
372         @Override
373         public void sessionCreated(IoSession session) throws Exception {
374             super.sessionCreated(session);
375             session.getFilterChain().addLast("codec", streamWriteFilter);
376         }
377 
378         @Override
379         public void sessionOpened(IoSession session) throws Exception {
380             session.write(message);
381         }
382 
383         @Override
384         public void exceptionCaught(IoSession session, Throwable cause)
385                 throws Exception {
386             latch.countDown();
387         }
388 
389         @Override
390         public void sessionClosed(IoSession session) throws Exception {
391             latch.countDown();
392         }
393 
394         @Override
395         public void sessionIdle(IoSession session, IdleStatus status)
396                 throws Exception {
397             latch.countDown();
398         }
399 
400         @Override
401         public void messageSent(IoSession session, Object message)
402                 throws Exception {
403             if (message == this.message) {
404                 latch.countDown();
405             }
406         }
407     }
408 
409     private static class ReceiverHandler extends IoHandlerAdapter {
410         final CountDownLatch latch = new CountDownLatch(1);
411 
412         long bytesRead = 0;
413 
414         long size = 0;
415 
416         MessageDigest digest;
417 
418         ReceiverHandler(long size) throws Exception {
419             this.size = size;
420             digest = MessageDigest.getInstance("MD5");
421         }
422 
423         @Override
424         public void sessionCreated(IoSession session) throws Exception {
425             super.sessionCreated(session);
426 
427             session.getConfig().setIdleTime(IdleStatus.READER_IDLE, 5);
428         }
429 
430         @Override
431         public void sessionIdle(IoSession session, IdleStatus status)
432                 throws Exception {
433             session.close(true);
434         }
435 
436         @Override
437         public void exceptionCaught(IoSession session, Throwable cause)
438                 throws Exception {
439             latch.countDown();
440         }
441 
442         @Override
443         public void sessionClosed(IoSession session) throws Exception {
444             latch.countDown();
445         }
446 
447         @Override
448         public void messageReceived(IoSession session, Object message)
449                 throws Exception {
450             IoBuffer buf = (IoBuffer) message;
451             while (buf.hasRemaining()) {
452                 digest.update(buf.get());
453                 bytesRead++;
454             }
455             if (bytesRead >= size) {
456                 session.close(true);
457             }
458         }
459     }
460 
461     public static WriteRequest eqWriteRequest(WriteRequest expected) {
462         EasyMock.reportMatcher(new WriteRequestMatcher(expected));
463         return null;
464     }
465     
466     private static class WriteRequestMatcher implements IArgumentMatcher {
467         private final WriteRequest expected;
468         
469         public WriteRequestMatcher(WriteRequest expected) {
470                 this.expected = expected;
471         }
472         
473         public boolean matches(Object actual) {
474             if (actual instanceof WriteRequest) {
475                 WriteRequest w2 = (WriteRequest) actual;
476 
477                 return expected.getMessage().equals(w2.getMessage())
478                         && expected.getFuture().isWritten() == w2.getFuture()
479                         .isWritten();
480             }
481             return false;
482         }
483         public void appendTo(StringBuffer buffer) {
484                 buffer.append("Expected a WriteRequest with the message '").append(expected.getMessage()).append("'");
485         }
486     }
487 
488     private static class DummyWriteFuture implements WriteFuture {
489         private boolean written;
490 
491         /**
492          * Default constructor
493          */
494         public DummyWriteFuture() {
495             super();
496         }
497         
498         public boolean isWritten() {
499             return written;
500         }
501 
502         public void setWritten() {
503             this.written = true;
504         }
505 
506         public IoSession getSession() {
507             return null;
508         }
509 
510         public Object getLock() {
511             return this;
512         }
513 
514         public void join() {
515             // Do nothing
516         }
517 
518         public boolean join(long timeoutInMillis) {
519             return true;
520         }
521 
522         public boolean isDone() {
523             return true;
524         }
525 
526         public WriteFuture addListener(IoFutureListener<?> listener) {
527             return this;
528         }
529 
530         public WriteFuture removeListener(IoFutureListener<?> listener) {
531             return this;
532         }
533 
534         public WriteFuture await() throws InterruptedException {
535             return this;
536         }
537 
538         public boolean await(long timeout, TimeUnit unit)
539                 throws InterruptedException {
540             return true;
541         }
542 
543         public boolean await(long timeoutMillis) throws InterruptedException {
544             return true;
545         }
546 
547         public WriteFuture awaitUninterruptibly() {
548             return this;
549         }
550 
551         public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
552             return true;
553         }
554 
555         public boolean awaitUninterruptibly(long timeoutMillis) {
556             return true;
557         }
558 
559         public Throwable getException() {
560             return null;
561         }
562 
563         public void setException(Throwable cause) {
564             throw new IllegalStateException();
565         }
566     }
567 
568     
569 }