1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *  
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *  
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License. 
18   *  
19   */
20  package org.apache.mina.filter;
21  
22  import java.io.ByteArrayInputStream;
23  import java.io.IOException;
24  import java.io.InputStream;
25  import java.net.InetSocketAddress;
26  import java.net.SocketAddress;
27  import java.security.MessageDigest;
28  import java.util.Random;
29  
30  import junit.framework.TestCase;
31  
32  import org.apache.mina.common.ByteBuffer;
33  import org.apache.mina.common.IdleStatus;
34  import org.apache.mina.common.IoAcceptor;
35  import org.apache.mina.common.IoConnector;
36  import org.apache.mina.common.IoFutureListener;
37  import org.apache.mina.common.IoHandlerAdapter;
38  import org.apache.mina.common.IoSession;
39  import org.apache.mina.common.WriteFuture;
40  import org.apache.mina.common.IoFilter.NextFilter;
41  import org.apache.mina.common.IoFilter.WriteRequest;
42  import org.apache.mina.common.support.DefaultWriteFuture;
43  import org.apache.mina.transport.socket.nio.SocketAcceptor;
44  import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
45  import org.apache.mina.transport.socket.nio.SocketConnector;
46  import org.apache.mina.util.AvailablePortFinder;
47  import org.apache.mina.util.Queue;
48  import org.easymock.AbstractMatcher;
49  import org.easymock.MockControl;
50  
51  /**
52   * Tests {@link StreamWriteFilter}.
53   * 
54   * @author The Apache Directory Project (mina-dev@directory.apache.org)
55   * @version $Rev$, $Date$
56   */
57  public class StreamWriteFilterTest extends TestCase {
58      MockControl mockSession;
59  
60      MockControl mockNextFilter;
61  
62      IoSession session;
63  
64      NextFilter nextFilter;
65  
66      protected void setUp() throws Exception {
67          super.setUp();
68  
69          /*
70           * Create the mocks.
71           */
72          mockSession = MockControl.createControl(IoSession.class);
73          mockNextFilter = MockControl.createControl(NextFilter.class);
74          session = (IoSession) mockSession.getMock();
75          nextFilter = (NextFilter) mockNextFilter.getMock();
76  
77          session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
78          mockSession.setReturnValue(null);
79      }
80  
81      public void testWriteEmptyStream() throws Exception {
82          StreamWriteFilter filter = new StreamWriteFilter();
83  
84          InputStream stream = new ByteArrayInputStream(new byte[0]);
85          WriteRequest writeRequest = new WriteRequest(stream,
86                  new DummyWriteFuture());
87  
88          /*
89           * Record expectations
90           */
91          nextFilter.messageSent(session, stream);
92  
93          /*
94           * Replay.
95           */
96          mockNextFilter.replay();
97          mockSession.replay();
98  
99          filter.filterWrite(nextFilter, session, writeRequest);
100 
101         /*
102          * Verify.
103          */
104         mockNextFilter.verify();
105         mockSession.verify();
106 
107         assertTrue(writeRequest.getFuture().isWritten());
108     }
109 
110     /**
111      * Tests that the filter just passes objects which aren't InputStreams
112      * through to the next filter.
113      */
114     public void testWriteNonStreamMessage() throws Exception {
115         StreamWriteFilter filter = new StreamWriteFilter();
116 
117         Object message = new Object();
118         WriteRequest writeRequest = new WriteRequest(message,
119                 new DummyWriteFuture());
120 
121         /*
122          * Record expectations
123          */
124         nextFilter.filterWrite(session, writeRequest);
125         session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
126         mockSession.setReturnValue(null);
127         nextFilter.messageSent(session, message);
128 
129         /*
130          * Replay.
131          */
132         mockNextFilter.replay();
133         mockSession.replay();
134 
135         filter.filterWrite(nextFilter, session, writeRequest);
136         filter.messageSent(nextFilter, session, message);
137 
138         /*
139          * Verify.
140          */
141         mockNextFilter.verify();
142         mockSession.verify();
143     }
144 
145     /**
146      * Tests when the contents of the stream fits into one write buffer.
147      */
148     public void testWriteSingleBufferStream() throws Exception {
149         StreamWriteFilter filter = new StreamWriteFilter();
150 
151         byte[] data = new byte[] { 1, 2, 3, 4 };
152 
153         InputStream stream = new ByteArrayInputStream(data);
154         WriteRequest writeRequest = new WriteRequest(stream,
155                 new DummyWriteFuture());
156 
157         /*
158          * Record expectations
159          */
160         session.setAttribute(StreamWriteFilter.CURRENT_STREAM, stream);
161         mockSession.setReturnValue(null);
162         session.setAttribute(StreamWriteFilter.INITIAL_WRITE_FUTURE,
163                 writeRequest.getFuture());
164         mockSession.setReturnValue(null);
165         nextFilter
166                 .filterWrite(session, new WriteRequest(ByteBuffer.wrap(data)));
167         mockNextFilter.setMatcher(new WriteRequestMatcher());
168 
169         session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
170         mockSession.setReturnValue(stream);
171         session.removeAttribute(StreamWriteFilter.CURRENT_STREAM);
172         mockSession.setReturnValue(stream);
173         session.removeAttribute(StreamWriteFilter.INITIAL_WRITE_FUTURE);
174         mockSession.setReturnValue(writeRequest.getFuture());
175         session.removeAttribute(StreamWriteFilter.WRITE_REQUEST_QUEUE);
176         mockSession.setReturnValue(null);
177         nextFilter.messageSent(session, stream);
178 
179         /*
180          * Replay.
181          */
182         mockNextFilter.replay();
183         mockSession.replay();
184 
185         filter.filterWrite(nextFilter, session, writeRequest);
186         filter.messageSent(nextFilter, session, data);
187 
188         /*
189          * Verify.
190          */
191         mockNextFilter.verify();
192         mockSession.verify();
193 
194         assertTrue(writeRequest.getFuture().isWritten());
195     }
196 
197     /**
198      * Tests when the contents of the stream doesn't fit into one write buffer.
199      */
200     public void testWriteSeveralBuffersStream() throws Exception {
201         StreamWriteFilter filter = new StreamWriteFilter();
202         filter.setWriteBufferSize(4);
203 
204         byte[] data = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
205         byte[] chunk1 = new byte[] { 1, 2, 3, 4 };
206         byte[] chunk2 = new byte[] { 5, 6, 7, 8 };
207         byte[] chunk3 = new byte[] { 9, 10 };
208 
209         InputStream stream = new ByteArrayInputStream(data);
210         WriteRequest writeRequest = new WriteRequest(stream,
211                 new DummyWriteFuture());
212 
213         /*
214          * Record expectations
215          */
216         session.setAttribute(StreamWriteFilter.CURRENT_STREAM, stream);
217         mockSession.setReturnValue(null);
218         session.setAttribute(StreamWriteFilter.INITIAL_WRITE_FUTURE,
219                 writeRequest.getFuture());
220         mockSession.setReturnValue(null);
221         nextFilter.filterWrite(session, new WriteRequest(ByteBuffer
222                 .wrap(chunk1)));
223         mockNextFilter.setMatcher(new WriteRequestMatcher());
224 
225         session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
226         mockSession.setReturnValue(stream);
227         nextFilter.filterWrite(session, new WriteRequest(ByteBuffer
228                 .wrap(chunk2)));
229 
230         session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
231         mockSession.setReturnValue(stream);
232         nextFilter.filterWrite(session, new WriteRequest(ByteBuffer
233                 .wrap(chunk3)));
234 
235         session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
236         mockSession.setReturnValue(stream);
237         session.removeAttribute(StreamWriteFilter.CURRENT_STREAM);
238         mockSession.setReturnValue(stream);
239         session.removeAttribute(StreamWriteFilter.INITIAL_WRITE_FUTURE);
240         mockSession.setReturnValue(writeRequest.getFuture());
241         session.removeAttribute(StreamWriteFilter.WRITE_REQUEST_QUEUE);
242         mockSession.setReturnValue(null);
243         nextFilter.messageSent(session, stream);
244 
245         /*
246          * Replay.
247          */
248         mockNextFilter.replay();
249         mockSession.replay();
250 
251         filter.filterWrite(nextFilter, session, writeRequest);
252         filter.messageSent(nextFilter, session, chunk1);
253         filter.messageSent(nextFilter, session, chunk2);
254         filter.messageSent(nextFilter, session, chunk3);
255 
256         /*
257          * Verify.
258          */
259         mockNextFilter.verify();
260         mockSession.verify();
261 
262         assertTrue(writeRequest.getFuture().isWritten());
263     }
264 
265     public void testWriteWhileWriteInProgress() throws Exception {
266         StreamWriteFilter filter = new StreamWriteFilter();
267 
268         Queue queue = new Queue();
269         InputStream stream = new ByteArrayInputStream(new byte[5]);
270 
271         /*
272          * Record expectations
273          */
274         mockSession.reset();
275         session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
276         mockSession.setReturnValue(stream);
277         session.getAttribute(StreamWriteFilter.WRITE_REQUEST_QUEUE);
278         mockSession.setReturnValue(queue);
279 
280         /*
281          * Replay.
282          */
283         mockNextFilter.replay();
284         mockSession.replay();
285 
286         WriteRequest wr = new WriteRequest(new Object(), new DummyWriteFuture());
287         filter.filterWrite(nextFilter, session, wr);
288         assertEquals(1, queue.size());
289         assertSame(wr, queue.pop());
290 
291         /*
292          * Verify.
293          */
294         mockNextFilter.verify();
295         mockSession.verify();
296     }
297 
298     public void testWritesWriteRequestQueueWhenFinished() throws Exception {
299         StreamWriteFilter filter = new StreamWriteFilter();
300 
301         WriteRequest wrs[] = new WriteRequest[] {
302                 new WriteRequest(new Object(), new DummyWriteFuture()),
303                 new WriteRequest(new Object(), new DummyWriteFuture()),
304                 new WriteRequest(new Object(), new DummyWriteFuture()) };
305         Queue queue = new Queue();
306         queue.push(wrs[0]);
307         queue.push(wrs[1]);
308         queue.push(wrs[2]);
309         InputStream stream = new ByteArrayInputStream(new byte[0]);
310 
311         /*
312          * Record expectations
313          */
314         mockSession.reset();
315 
316         session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
317         mockSession.setReturnValue(stream);
318         session.removeAttribute(StreamWriteFilter.CURRENT_STREAM);
319         mockSession.setReturnValue(stream);
320         session.removeAttribute(StreamWriteFilter.INITIAL_WRITE_FUTURE);
321         mockSession.setReturnValue(new DefaultWriteFuture(session));
322         session.removeAttribute(StreamWriteFilter.WRITE_REQUEST_QUEUE);
323         mockSession.setReturnValue(queue);
324 
325         nextFilter.filterWrite(session, wrs[0]);
326         session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
327         mockSession.setReturnValue(null);
328         nextFilter.filterWrite(session, wrs[1]);
329         session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
330         mockSession.setReturnValue(null);
331         nextFilter.filterWrite(session, wrs[2]);
332         session.getAttribute(StreamWriteFilter.CURRENT_STREAM);
333         mockSession.setReturnValue(null);
334 
335         nextFilter.messageSent(session, stream);
336 
337         /*
338          * Replay.
339          */
340         mockNextFilter.replay();
341         mockSession.replay();
342 
343         filter.messageSent(nextFilter, session, new Object());
344         assertEquals(0, queue.size());
345 
346         /*
347          * Verify.
348          */
349         mockNextFilter.verify();
350         mockSession.verify();
351     }
352 
353     /**
354      * Tests that {@link StreamWriteFilter#setWriteBufferSize(int)} checks the
355      * specified size.
356      */
357     public void testSetWriteBufferSize() throws Exception {
358         StreamWriteFilter filter = new StreamWriteFilter();
359 
360         try {
361             filter.setWriteBufferSize(0);
362             fail("0 writeBuferSize specified. IllegalArgumentException expected.");
363         } catch (IllegalArgumentException iae) {
364         }
365 
366         try {
367             filter.setWriteBufferSize(-100);
368             fail("Negative writeBuferSize specified. IllegalArgumentException expected.");
369         } catch (IllegalArgumentException iae) {
370         }
371 
372         filter.setWriteBufferSize(1);
373         assertEquals(1, filter.getWriteBufferSize());
374         filter.setWriteBufferSize(1024);
375         assertEquals(1024, filter.getWriteBufferSize());
376     }
377 
378     public void testWriteUsingSocketTransport() throws Exception {
379         IoAcceptor acceptor = new SocketAcceptor();
380         ((SocketAcceptorConfig) acceptor.getDefaultConfig())
381                 .setReuseAddress(true);
382         SocketAddress address = new InetSocketAddress("localhost",
383                 AvailablePortFinder.getNextAvailable());
384 
385         IoConnector connector = new SocketConnector();
386 
387         FixedRandomInputStream stream = new FixedRandomInputStream(
388                 4 * 1024 * 1024);
389 
390         SenderHandler sender = new SenderHandler(stream);
391         ReceiverHandler receiver = new ReceiverHandler(stream.size);
392 
393         acceptor.bind(address, sender);
394 
395         synchronized (sender.lock) {
396             synchronized (receiver.lock) {
397                 connector.connect(address, receiver);
398 
399                 sender.lock.wait();
400                 receiver.lock.wait();
401             }
402         }
403 
404         acceptor.unbind(address);
405 
406         assertEquals(stream.bytesRead, receiver.bytesRead);
407         assertEquals(stream.size, receiver.bytesRead);
408         byte[] expectedMd5 = stream.digest.digest();
409         byte[] actualMd5 = receiver.digest.digest();
410         assertEquals(expectedMd5.length, actualMd5.length);
411         for (int i = 0; i < expectedMd5.length; i++) {
412             assertEquals(expectedMd5[i], actualMd5[i]);
413         }
414     }
415 
416     private static class FixedRandomInputStream extends InputStream {
417         long size;
418 
419         long bytesRead = 0;
420 
421         Random random = new Random();
422 
423         MessageDigest digest;
424 
425         public FixedRandomInputStream(long size) throws Exception {
426             this.size = size;
427             digest = MessageDigest.getInstance("MD5");
428         }
429 
430         public int read() throws IOException {
431             if (isAllWritten())
432                 return -1;
433             bytesRead++;
434             byte b = (byte) random.nextInt(255);
435             digest.update(b);
436             return b;
437         }
438 
439         public long getBytesRead() {
440             return bytesRead;
441         }
442 
443         public long getSize() {
444             return size;
445         }
446 
447         public boolean isAllWritten() {
448             return bytesRead >= size;
449         }
450     }
451 
452     private static class SenderHandler extends IoHandlerAdapter {
453         Object lock = new Object();
454 
455         InputStream inputStream;
456 
457         StreamWriteFilter streamWriteFilter = new StreamWriteFilter();
458 
459         public SenderHandler(InputStream inputStream) {
460             this.inputStream = inputStream;
461         }
462 
463         public void sessionCreated(IoSession session) throws Exception {
464             super.sessionCreated(session);
465             session.getFilterChain().addLast("codec", streamWriteFilter);
466         }
467 
468         public void sessionOpened(IoSession session) throws Exception {
469             session.write(inputStream);
470         }
471 
472         public void exceptionCaught(IoSession session, Throwable cause)
473                 throws Exception {
474             synchronized (lock) {
475                 lock.notifyAll();
476             }
477         }
478 
479         public void sessionClosed(IoSession session) throws Exception {
480             synchronized (lock) {
481                 lock.notifyAll();
482             }
483         }
484 
485         public void sessionIdle(IoSession session, IdleStatus status)
486                 throws Exception {
487             synchronized (lock) {
488                 lock.notifyAll();
489             }
490         }
491 
492         public void messageSent(IoSession session, Object message)
493                 throws Exception {
494             if (message == inputStream) {
495                 synchronized (lock) {
496                     lock.notifyAll();
497                 }
498             }
499         }
500     }
501 
502     private static class ReceiverHandler extends IoHandlerAdapter {
503         Object lock = new Object();
504 
505         long bytesRead = 0;
506 
507         long size = 0;
508 
509         MessageDigest digest;
510 
511         public ReceiverHandler(long size) throws Exception {
512             this.size = size;
513             digest = MessageDigest.getInstance("MD5");
514         }
515 
516         public void sessionCreated(IoSession session) throws Exception {
517             super.sessionCreated(session);
518 
519             session.setIdleTime(IdleStatus.READER_IDLE, 5);
520         }
521 
522         public void sessionIdle(IoSession session, IdleStatus status)
523                 throws Exception {
524             session.close();
525         }
526 
527         public void exceptionCaught(IoSession session, Throwable cause)
528                 throws Exception {
529             synchronized (lock) {
530                 lock.notifyAll();
531             }
532         }
533 
534         public void sessionClosed(IoSession session) throws Exception {
535             synchronized (lock) {
536                 lock.notifyAll();
537             }
538         }
539 
540         public void messageReceived(IoSession session, Object message)
541                 throws Exception {
542             ByteBuffer buf = (ByteBuffer) message;
543             while (buf.hasRemaining()) {
544                 digest.update(buf.get());
545                 bytesRead++;
546             }
547             if (bytesRead >= size) {
548                 session.close();
549             }
550         }
551     }
552 
553     public static class WriteRequestMatcher extends AbstractMatcher {
554         protected boolean argumentMatches(Object expected, Object actual) {
555             if (expected instanceof WriteRequest
556                     && expected instanceof WriteRequest) {
557                 WriteRequest w1 = (WriteRequest) expected;
558                 WriteRequest w2 = (WriteRequest) actual;
559 
560                 return w1.getMessage().equals(w2.getMessage())
561                         && w1.getFuture().isWritten() == w2.getFuture()
562                                 .isWritten();
563             }
564             return super.argumentMatches(expected, actual);
565         }
566     }
567 
568     private static class DummyWriteFuture implements WriteFuture {
569         private boolean written;
570 
571         public boolean isWritten() {
572             return written;
573         }
574 
575         public void setWritten(boolean written) {
576             this.written = written;
577         }
578 
579         public IoSession getSession() {
580             return null;
581         }
582 
583         public Object getLock() {
584             return this;
585         }
586 
587         public void join() {
588         }
589 
590         public boolean join(long timeoutInMillis) {
591             return true;
592         }
593 
594         public boolean isReady() {
595             return true;
596         }
597 
598         public void addListener(IoFutureListener listener) {
599         }
600 
601         public void removeListener(IoFutureListener listener) {
602         }
603     }
604 }