1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter;
21  
22  import java.io.ByteArrayInputStream;
23  import java.io.IOException;
24  import java.io.InputStream;
25  import java.net.InetSocketAddress;
26  import java.net.SocketAddress;
27  import java.security.MessageDigest;
28  import java.util.LinkedList;
29  import java.util.Queue;
30  import java.util.Random;
31  
32  import junit.framework.TestCase;
33  import org.apache.mina.common.ByteBuffer;
34  import org.apache.mina.common.IdleStatus;
35  import org.apache.mina.common.IoAcceptor;
36  import org.apache.mina.common.IoConnector;
37  import org.apache.mina.common.IoFilter.NextFilter;
38  import org.apache.mina.common.IoFilter.WriteRequest;
39  import org.apache.mina.common.IoFutureListener;
40  import org.apache.mina.common.IoHandlerAdapter;
41  import org.apache.mina.common.IoSession;
42  import org.apache.mina.common.WriteFuture;
43  import org.apache.mina.common.support.DefaultWriteFuture;
44  import org.apache.mina.transport.socket.nio.SocketAcceptor;
45  import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
46  import org.apache.mina.transport.socket.nio.SocketConnector;
47  import org.apache.mina.util.AvailablePortFinder;
48  import org.easymock.AbstractMatcher;
49  import org.easymock.MockControl;
50  
51  /**
52   * Tests {@link StreamWriteFilter}.
53   *
54   * @author The Apache Directory Project (mina-dev@directory.apache.org)
55   * @version $Rev$, $Date$
56   */
57  public class StreamWriteFilterTest extends TestCase {
58      MockControl mockSession;
59  
60      MockControl mockNextFilter;
61  
62      IoSession session;
63  
64      NextFilter nextFilter;
65  
66      @Override
67      protected void setUp() throws Exception {
68          super.setUp();
69  
70          /*
71           * Create the mocks.
72           */
73          mockSession = MockControl.createControl(IoSession.class);
74          mockNextFilter = MockControl.createControl(NextFilter.class);
75          session = (IoSession) mockSession.getMock();
76          nextFilter = (NextFilter) mockNextFilter.getMock();
77  
78          session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
79          mockSession.setReturnValue(null);
80      }
81  
82      public void testWriteEmptyStream() throws Exception {
83          StreamWriteFilter filter = new StreamWriteFilter();
84  
85          InputStream stream = new ByteArrayInputStream(new byte[0]);
86          WriteRequest writeRequest = new WriteRequest(stream,
87                  new DummyWriteFuture());
88  
89          /*
90           * Record expectations
91           */
92          nextFilter.messageSent(session, stream);
93  
94          /*
95           * Replay.
96           */
97          mockNextFilter.replay();
98          mockSession.replay();
99  
100         filter.filterWrite(nextFilter, session, writeRequest);
101 
102         /*
103          * Verify.
104          */
105         mockNextFilter.verify();
106         mockSession.verify();
107 
108         assertTrue(writeRequest.getFuture().isWritten());
109     }
110 
111     /**
112      * Tests that the filter just passes objects which aren't InputStreams
113      * through to the next filter.
114      */
115     public void testWriteNonStreamMessage() throws Exception {
116         StreamWriteFilter filter = new StreamWriteFilter();
117 
118         Object message = new Object();
119         WriteRequest writeRequest = new WriteRequest(message,
120                 new DummyWriteFuture());
121 
122         /*
123          * Record expectations
124          */
125         nextFilter.filterWrite(session, writeRequest);
126         session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
127         mockSession.setReturnValue(null);
128         nextFilter.messageSent(session, message);
129 
130         /*
131          * Replay.
132          */
133         mockNextFilter.replay();
134         mockSession.replay();
135 
136         filter.filterWrite(nextFilter, session, writeRequest);
137         filter.messageSent(nextFilter, session, message);
138 
139         /*
140          * Verify.
141          */
142         mockNextFilter.verify();
143         mockSession.verify();
144     }
145 
146     /**
147      * Tests when the contents of the stream fits into one write buffer.
148      */
149     public void testWriteSingleBufferStream() throws Exception {
150         StreamWriteFilter filter = new StreamWriteFilter();
151 
152         byte[] data = new byte[] { 1, 2, 3, 4 };
153 
154         InputStream stream = new ByteArrayInputStream(data);
155         WriteRequest writeRequest = new WriteRequest(stream,
156                 new DummyWriteFuture());
157 
158         /*
159          * Record expectations
160          */
161         session.setAttribute(StreamWriteFilter.CURRENT_STREAM, stream);
162         mockSession.setReturnValue(null);
163         session.setAttribute(StreamWriteFilter.INITIAL_WRITE_FUTURE,
164                 writeRequest.getFuture());
165         mockSession.setReturnValue(null);
166         nextFilter
167                 .filterWrite(session, new WriteRequest(ByteBuffer.wrap(data)));
168         mockNextFilter.setMatcher(new WriteRequestMatcher());
169 
170         session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
171         mockSession.setReturnValue(stream);
172         session.removeAttribute(StreamWriteFilter.CURRENT_STREAM);
173         mockSession.setReturnValue(stream);
174         session.removeAttribute(StreamWriteFilter.INITIAL_WRITE_FUTURE);
175         mockSession.setReturnValue(writeRequest.getFuture());
176         session.removeAttribute(StreamWriteFilter.WRITE_REQUEST_QUEUE);
177         mockSession.setReturnValue(null);
178         nextFilter.messageSent(session, stream);
179 
180         /*
181          * Replay.
182          */
183         mockNextFilter.replay();
184         mockSession.replay();
185 
186         filter.filterWrite(nextFilter, session, writeRequest);
187         filter.messageSent(nextFilter, session, data);
188 
189         /*
190          * Verify.
191          */
192         mockNextFilter.verify();
193         mockSession.verify();
194 
195         assertTrue(writeRequest.getFuture().isWritten());
196     }
197 
198     /**
199      * Tests when the contents of the stream doesn't fit into one write buffer.
200      */
201     public void testWriteSeveralBuffersStream() throws Exception {
202         StreamWriteFilter filter = new StreamWriteFilter();
203         filter.setWriteBufferSize(4);
204 
205         byte[] data = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
206         byte[] chunk1 = new byte[] { 1, 2, 3, 4 };
207         byte[] chunk2 = new byte[] { 5, 6, 7, 8 };
208         byte[] chunk3 = new byte[] { 9, 10 };
209 
210         InputStream stream = new ByteArrayInputStream(data);
211         WriteRequest writeRequest = new WriteRequest(stream,
212                 new DummyWriteFuture());
213 
214         /*
215          * Record expectations
216          */
217         session.setAttribute(StreamWriteFilter.CURRENT_STREAM, stream);
218         mockSession.setReturnValue(null);
219         session.setAttribute(StreamWriteFilter.INITIAL_WRITE_FUTURE,
220                 writeRequest.getFuture());
221         mockSession.setReturnValue(null);
222         nextFilter.filterWrite(session, new WriteRequest(ByteBuffer
223                 .wrap(chunk1)));
224         mockNextFilter.setMatcher(new WriteRequestMatcher());
225 
226         session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
227         mockSession.setReturnValue(stream);
228         nextFilter.filterWrite(session, new WriteRequest(ByteBuffer
229                 .wrap(chunk2)));
230 
231         session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
232         mockSession.setReturnValue(stream);
233         nextFilter.filterWrite(session, new WriteRequest(ByteBuffer
234                 .wrap(chunk3)));
235 
236         session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
237         mockSession.setReturnValue(stream);
238         session.removeAttribute(StreamWriteFilter.CURRENT_STREAM);
239         mockSession.setReturnValue(stream);
240         session.removeAttribute(StreamWriteFilter.INITIAL_WRITE_FUTURE);
241         mockSession.setReturnValue(writeRequest.getFuture());
242         session.removeAttribute(StreamWriteFilter.WRITE_REQUEST_QUEUE);
243         mockSession.setReturnValue(null);
244         nextFilter.messageSent(session, stream);
245 
246         /*
247          * Replay.
248          */
249         mockNextFilter.replay();
250         mockSession.replay();
251 
252         filter.filterWrite(nextFilter, session, writeRequest);
253         filter.messageSent(nextFilter, session, chunk1);
254         filter.messageSent(nextFilter, session, chunk2);
255         filter.messageSent(nextFilter, session, chunk3);
256 
257         /*
258          * Verify.
259          */
260         mockNextFilter.verify();
261         mockSession.verify();
262 
263         assertTrue(writeRequest.getFuture().isWritten());
264     }
265 
266     public void testWriteWhileWriteInProgress() throws Exception {
267         StreamWriteFilter filter = new StreamWriteFilter();
268 
269         Queue<? extends Object> queue = new LinkedList<Object>();
270         InputStream stream = new ByteArrayInputStream(new byte[5]);
271 
272         /*
273          * Record expectations
274          */
275         mockSession.reset();
276         session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
277         mockSession.setReturnValue(stream);
278         session.getAttribute(StreamWriteFilter.WRITE_REQUEST_QUEUE);
279         mockSession.setReturnValue(queue);
280 
281         /*
282          * Replay.
283          */
284         mockNextFilter.replay();
285         mockSession.replay();
286 
287         WriteRequest wr = new WriteRequest(new Object(), new DummyWriteFuture());
288         filter.filterWrite(nextFilter, session, wr);
289         assertEquals(1, queue.size());
290         assertSame(wr, queue.poll());
291 
292         /*
293          * Verify.
294          */
295         mockNextFilter.verify();
296         mockSession.verify();
297     }
298 
299     public void testWritesWriteRequestQueueWhenFinished() throws Exception {
300         StreamWriteFilter filter = new StreamWriteFilter();
301 
302         WriteRequest wrs[] = new WriteRequest[] {
303                 new WriteRequest(new Object(), new DummyWriteFuture()),
304                 new WriteRequest(new Object(), new DummyWriteFuture()),
305                 new WriteRequest(new Object(), new DummyWriteFuture()) };
306         Queue<WriteRequest> queue = new LinkedList<WriteRequest>();
307         queue.add(wrs[0]);
308         queue.add(wrs[1]);
309         queue.add(wrs[2]);
310         InputStream stream = new ByteArrayInputStream(new byte[0]);
311 
312         /*
313          * Record expectations
314          */
315         mockSession.reset();
316 
317         session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
318         mockSession.setReturnValue(stream);
319         session.removeAttribute(StreamWriteFilter.CURRENT_STREAM);
320         mockSession.setReturnValue(stream);
321         session.removeAttribute(StreamWriteFilter.INITIAL_WRITE_FUTURE);
322         mockSession.setReturnValue(new DefaultWriteFuture(session));
323         session.removeAttribute(StreamWriteFilter.WRITE_REQUEST_QUEUE);
324         mockSession.setReturnValue(queue);
325 
326         nextFilter.filterWrite(session, wrs[0]);
327         session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
328         mockSession.setReturnValue(null);
329         nextFilter.filterWrite(session, wrs[1]);
330         session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
331         mockSession.setReturnValue(null);
332         nextFilter.filterWrite(session, wrs[2]);
333         session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
334         mockSession.setReturnValue(null);
335 
336         nextFilter.messageSent(session, stream);
337 
338         /*
339          * Replay.
340          */
341         mockNextFilter.replay();
342         mockSession.replay();
343 
344         filter.messageSent(nextFilter, session, new Object());
345         assertEquals(0, queue.size());
346 
347         /*
348          * Verify.
349          */
350         mockNextFilter.verify();
351         mockSession.verify();
352     }
353 
354     /**
355      * Tests that {@link StreamWriteFilter#setWriteBufferSize(int)} checks the
356      * specified size.
357      */
358     public void testSetWriteBufferSize() throws Exception {
359         StreamWriteFilter filter = new StreamWriteFilter();
360 
361         try {
362             filter.setWriteBufferSize(0);
363             fail("0 writeBuferSize specified. IllegalArgumentException expected.");
364         } catch (IllegalArgumentException iae) {
365         }
366 
367         try {
368             filter.setWriteBufferSize(-100);
369             fail("Negative writeBuferSize specified. IllegalArgumentException expected.");
370         } catch (IllegalArgumentException iae) {
371         }
372 
373         filter.setWriteBufferSize(1);
374         assertEquals(1, filter.getWriteBufferSize());
375         filter.setWriteBufferSize(1024);
376         assertEquals(1024, filter.getWriteBufferSize());
377     }
378 
379     public void testWriteUsingSocketTransport() throws Exception {
380         IoAcceptor acceptor = new SocketAcceptor();
381         ((SocketAcceptorConfig) acceptor.getDefaultConfig())
382                 .setReuseAddress(true);
383         SocketAddress address = new InetSocketAddress("localhost",
384                 AvailablePortFinder.getNextAvailable());
385 
386         IoConnector connector = new SocketConnector();
387 
388         FixedRandomInputStream stream = new FixedRandomInputStream(
389                 4 * 1024 * 1024);
390 
391         SenderHandler sender = new SenderHandler(stream);
392         ReceiverHandler receiver = new ReceiverHandler(stream.size);
393 
394         acceptor.bind(address, sender);
395 
396         synchronized (sender.lock) {
397             synchronized (receiver.lock) {
398                 connector.connect(address, receiver);
399 
400                 sender.lock.wait();
401                 receiver.lock.wait();
402             }
403         }
404 
405         acceptor.unbind(address);
406 
407         assertEquals(stream.bytesRead, receiver.bytesRead);
408         assertEquals(stream.size, receiver.bytesRead);
409         byte[] expectedMd5 = stream.digest.digest();
410         byte[] actualMd5 = receiver.digest.digest();
411         assertEquals(expectedMd5.length, actualMd5.length);
412         for (int i = 0; i < expectedMd5.length; i++) {
413             assertEquals(expectedMd5[i], actualMd5[i]);
414         }
415     }
416 
417     private static class FixedRandomInputStream extends InputStream {
418         long size;
419 
420         long bytesRead = 0;
421 
422         Random random = new Random();
423 
424         MessageDigest digest;
425 
426         FixedRandomInputStream(long size) throws Exception {
427             this.size = size;
428             digest = MessageDigest.getInstance("MD5");
429         }
430 
431         @Override
432         public int read() throws IOException {
433             if (isAllWritten())
434                 return -1;
435             bytesRead++;
436             byte b = (byte) random.nextInt(255);
437             digest.update(b);
438             return b;
439         }
440 
441         public long getBytesRead() {
442             return bytesRead;
443         }
444 
445         public long getSize() {
446             return size;
447         }
448 
449         public boolean isAllWritten() {
450             return bytesRead >= size;
451         }
452     }
453 
454     private static class SenderHandler extends IoHandlerAdapter {
455         final Object lock = new Object();
456 
457         InputStream inputStream;
458 
459         StreamWriteFilter streamWriteFilter = new StreamWriteFilter();
460 
461         SenderHandler(InputStream inputStream) {
462             this.inputStream = inputStream;
463         }
464 
465         @Override
466         public void sessionCreated(IoSession session) throws Exception {
467             super.sessionCreated(session);
468             session.getFilterChain().addLast("codec", streamWriteFilter);
469         }
470 
471         @Override
472         public void sessionOpened(IoSession session) throws Exception {
473             session.write(inputStream);
474         }
475 
476         @Override
477         public void exceptionCaught(IoSession session, Throwable cause)
478                 throws Exception {
479             synchronized (lock) {
480                 lock.notifyAll();
481             }
482         }
483 
484         @Override
485         public void sessionClosed(IoSession session) throws Exception {
486             synchronized (lock) {
487                 lock.notifyAll();
488             }
489         }
490 
491         @Override
492         public void sessionIdle(IoSession session, IdleStatus status)
493                 throws Exception {
494             synchronized (lock) {
495                 lock.notifyAll();
496             }
497         }
498 
499         @Override
500         public void messageSent(IoSession session, Object message)
501                 throws Exception {
502             if (message == inputStream) {
503                 synchronized (lock) {
504                     lock.notifyAll();
505                 }
506             }
507         }
508     }
509 
510     private static class ReceiverHandler extends IoHandlerAdapter {
511         final Object lock = new Object();
512 
513         long bytesRead = 0;
514 
515         long size = 0;
516 
517         MessageDigest digest;
518 
519         ReceiverHandler(long size) throws Exception {
520             this.size = size;
521             digest = MessageDigest.getInstance("MD5");
522         }
523 
524         @Override
525         public void sessionCreated(IoSession session) throws Exception {
526             super.sessionCreated(session);
527 
528             session.setIdleTime(IdleStatus.READER_IDLE, 5);
529         }
530 
531         @Override
532         public void sessionIdle(IoSession session, IdleStatus status)
533                 throws Exception {
534             session.close();
535         }
536 
537         @Override
538         public void exceptionCaught(IoSession session, Throwable cause)
539                 throws Exception {
540             synchronized (lock) {
541                 lock.notifyAll();
542             }
543         }
544 
545         @Override
546         public void sessionClosed(IoSession session) throws Exception {
547             synchronized (lock) {
548                 lock.notifyAll();
549             }
550         }
551 
552         @Override
553         public void messageReceived(IoSession session, Object message)
554                 throws Exception {
555             ByteBuffer buf = (ByteBuffer) message;
556             while (buf.hasRemaining()) {
557                 digest.update(buf.get());
558                 bytesRead++;
559             }
560             if (bytesRead >= size) {
561                 session.close();
562             }
563         }
564     }
565 
566     public static class WriteRequestMatcher extends AbstractMatcher {
567         @Override
568         protected boolean argumentMatches(Object expected, Object actual) {
569             if (expected instanceof WriteRequest
570                     && actual instanceof WriteRequest) {
571                 WriteRequest w1 = (WriteRequest) expected;
572                 WriteRequest w2 = (WriteRequest) actual;
573 
574                 return w1.getMessage().equals(w2.getMessage())
575                         && w1.getFuture().isWritten() == w2.getFuture()
576                                 .isWritten();
577             }
578             return super.argumentMatches(expected, actual);
579         }
580     }
581 
582     private static class DummyWriteFuture implements WriteFuture {
583         private boolean written;
584 
585         public boolean isWritten() {
586             return written;
587         }
588 
589         public void setWritten(boolean written) {
590             this.written = written;
591         }
592 
593         public IoSession getSession() {
594             return null;
595         }
596 
597         public Object getLock() {
598             return this;
599         }
600 
601         public void join() {
602         }
603 
604         public boolean join(long timeoutInMillis) {
605             return true;
606         }
607 
608         public boolean isReady() {
609             return true;
610         }
611 
612         public void addListener(IoFutureListener listener) {
613         }
614 
615         public void removeListener(IoFutureListener listener) {
616         }
617     }
618 }