1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    * 
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.commons.io;
18  
19  import java.io.ByteArrayInputStream;
20  import java.io.IOException;
21  import java.io.InputStream;
22  import java.io.OutputStream;
23  import java.util.HashMap;
24  import java.util.Iterator;
25  import java.util.Random;
26  import junit.framework.TestCase;
27  
28  import org.apache.commons.io.output.ByteArrayOutputStream;
29  import org.apache.commons.io.output.DemuxOutputStream;
30  import org.apache.commons.io.input.DemuxInputStream;
31  
32  /**
33   * Basic unit tests for the multiplexing streams.
34   *
35   * @author <a href="mailto:peter@apache.org">Peter Donald</a>
36   */
37  public class DemuxTestCase
38      extends TestCase
39  {
40      private static final String T1 = "Thread1";
41      private static final String T2 = "Thread2";
42      private static final String T3 = "Thread3";
43      private static final String T4 = "Thread4";
44  
45      private static final String DATA1 = "Data for thread1";
46      private static final String DATA2 = "Data for thread2";
47      private static final String DATA3 = "Data for thread3";
48      private static final String DATA4 = "Data for thread4";
49  
50      private static Random c_random = new Random();
51      private HashMap m_outputMap = new HashMap();
52      private HashMap m_threadMap = new HashMap();
53  
54      public DemuxTestCase( String name )
55      {
56          super( name );
57      }
58  
59      private String getOutput( String threadName )
60          throws IOException
61      {
62          ByteArrayOutputStream output =
63              (ByteArrayOutputStream)m_outputMap.get( threadName );
64          assertNotNull( "getOutput()", output );
65  
66          return output.toString();
67      }
68  
69      private String getInput( String threadName )
70          throws IOException
71      {
72          ReaderThread thread = (ReaderThread)m_threadMap.get( threadName );
73          assertNotNull( "getInput()", thread );
74  
75          return thread.getData();
76      }
77  
78      private void doStart()
79          throws Exception
80      {
81          Iterator iterator = m_threadMap.keySet().iterator();
82          while( iterator.hasNext() )
83          {
84              String name = (String)iterator.next();
85              Thread thread = (Thread)m_threadMap.get( name );
86              thread.start();
87          }
88      }
89  
90      private void doJoin()
91          throws Exception
92      {
93          Iterator iterator = m_threadMap.keySet().iterator();
94          while( iterator.hasNext() )
95          {
96              String name = (String)iterator.next();
97              Thread thread = (Thread)m_threadMap.get( name );
98              thread.join();
99          }
100     }
101 
102     private void startWriter( String name,
103                               String data,
104                               DemuxOutputStream demux )
105         throws Exception
106     {
107         ByteArrayOutputStream output = new ByteArrayOutputStream();
108         m_outputMap.put( name, output );
109         WriterThread thread =
110             new WriterThread( name, data, output, demux );
111         m_threadMap.put( name, thread );
112     }
113 
114     private void startReader( String name,
115                               String data,
116                               DemuxInputStream demux )
117         throws Exception
118     {
119         ByteArrayInputStream input = new ByteArrayInputStream( data.getBytes() );
120         ReaderThread thread = new ReaderThread( name, input, demux );
121         m_threadMap.put( name, thread );
122     }
123 
124     public void testOutputStream()
125         throws Exception
126     {
127         DemuxOutputStream output = new DemuxOutputStream();
128         startWriter( T1, DATA1, output );
129         startWriter( T2, DATA2, output );
130         startWriter( T3, DATA3, output );
131         startWriter( T4, DATA4, output );
132 
133         doStart();
134         doJoin();
135 
136         assertEquals( "Data1", DATA1, getOutput( T1 ) );
137         assertEquals( "Data2", DATA2, getOutput( T2 ) );
138         assertEquals( "Data3", DATA3, getOutput( T3 ) );
139         assertEquals( "Data4", DATA4, getOutput( T4 ) );
140     }
141 
142     public void testInputStream()
143         throws Exception
144     {
145         DemuxInputStream input = new DemuxInputStream();
146         startReader( T1, DATA1, input );
147         startReader( T2, DATA2, input );
148         startReader( T3, DATA3, input );
149         startReader( T4, DATA4, input );
150 
151         doStart();
152         doJoin();
153 
154         assertEquals( "Data1", DATA1, getInput( T1 ) );
155         assertEquals( "Data2", DATA2, getInput( T2 ) );
156         assertEquals( "Data3", DATA3, getInput( T3 ) );
157         assertEquals( "Data4", DATA4, getInput( T4 ) );
158     }
159 
160     private static class ReaderThread
161         extends Thread
162     {
163         private StringBuffer m_buffer = new StringBuffer();
164         private InputStream m_input;
165         private DemuxInputStream m_demux;
166 
167         ReaderThread( String name,
168                       InputStream input,
169                       DemuxInputStream demux )
170         {
171             super( name );
172             m_input = input;
173             m_demux = demux;
174         }
175 
176         public String getData()
177         {
178             return m_buffer.toString();
179         }
180 
181         public void run()
182         {
183             m_demux.bindStream( m_input );
184 
185             try
186             {
187                 int ch = m_demux.read();
188                 while( -1 != ch )
189                 {
190                     //System.out.println( "Reading: " + (char)ch );
191                     m_buffer.append( (char)ch );
192 
193                     int sleepTime = Math.abs( c_random.nextInt() % 10 );
194                     Thread.sleep( sleepTime );
195                     ch = m_demux.read();
196                 }
197             }
198             catch( Exception e )
199             {
200                 e.printStackTrace();
201             }
202         }
203     }
204 
205     private static class WriterThread
206         extends Thread
207     {
208         private byte[] m_data;
209         private OutputStream m_output;
210         private DemuxOutputStream m_demux;
211 
212         WriterThread( String name,
213                       String data,
214                       OutputStream output,
215                       DemuxOutputStream demux )
216         {
217             super( name );
218             m_output = output;
219             m_demux = demux;
220             m_data = data.getBytes();
221         }
222 
223         public void run()
224         {
225             m_demux.bindStream( m_output );
226             for( int i = 0; i < m_data.length; i++ )
227             {
228                 try
229                 {
230                     //System.out.println( "Writing: " + (char)m_data[ i ] );
231                     m_demux.write( m_data[ i ] );
232                     int sleepTime = Math.abs( c_random.nextInt() % 10 );
233                     Thread.sleep( sleepTime );
234                 }
235                 catch( Exception e )
236                 {
237                     e.printStackTrace();
238                 }
239             }
240         }
241     }
242 }
243