View Javadoc

1   /*
2    *   @(#) $Id: AbstractIoFilterChain.java 330415 2005-11-03 02:19:03Z trustin $
3    *
4    *   Copyright 2004 The Apache Software Foundation
5    *
6    *   Licensed under the Apache License, Version 2.0 (the "License");
7    *   you may not use this file except in compliance with the License.
8    *   You may obtain a copy of the License at
9    *
10   *       http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *   Unless required by applicable law or agreed to in writing, software
13   *   distributed under the License is distributed on an "AS IS" BASIS,
14   *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   *   See the License for the specific language governing permissions and
16   *   limitations under the License.
17   *
18   */
19  package org.apache.mina.io.handler;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  
24  import org.apache.mina.common.ByteBuffer;
25  import org.apache.mina.io.IoHandler;
26  import org.apache.mina.io.IoSession;
27  
28  /***
29   * An {@link InputStream} that buffers data read from
30   * {@link IoHandler#dataRead(IoSession, ByteBuffer)} events.
31   *
32   * @author The Apache Directory Project (dev@directory.apache.org)
33   * @version $Rev$, $Date$
34   *
35   */
36  class IoSessionInputStream extends InputStream
37  {
38      private final ByteBuffer buf;
39      private boolean closed;
40      private boolean released;
41      private IOException exception;
42      private int waiters;
43      
44      IoSessionInputStream()
45      {
46          buf = ByteBuffer.allocate( 16 );
47          buf.setAutoExpand( true );
48          buf.limit( 0 );
49      }
50  
51      public synchronized int available()
52      {
53          if( released )
54          {
55              return 0;
56          }
57          else
58          {
59              return buf.remaining();
60          }
61      }
62  
63      public synchronized void close()
64      {
65          if( closed )
66          {
67              return;
68          }
69  
70          closed = true;
71          releaseBuffer();
72          
73          if( waiters != 0 )
74          {
75              this.notifyAll();
76          }
77      }
78  
79      public void mark( int readlimit )
80      {
81      }
82  
83      public boolean markSupported()
84      {
85          return false;
86      }
87  
88      public synchronized int read() throws IOException
89      {
90          waitForData();
91          if( released )
92          {
93              return -1;
94          }
95  
96          int ret = buf.get() & 0xff;
97          return ret;
98      }
99  
100     public synchronized int read( byte[] b, int off, int len ) throws IOException
101     {
102         waitForData();
103         if( released )
104         {
105             return -1;
106         }
107 
108         int readBytes;
109         if( len > buf.remaining() )
110         {
111             readBytes = buf.remaining();
112         }
113         else
114         {
115             readBytes = len;
116         }
117         buf.get( b, off, readBytes );
118         
119         return readBytes;
120     }
121 
122     public synchronized void reset() throws IOException
123     {
124         throw new IOException( "Mark is not supported." );
125     }
126 
127     private void waitForData() throws IOException
128     {
129         if( released )
130         {
131             throw new IOException( "Stream is closed." );
132         }
133 
134         waiters ++;
135         while( !released && buf.remaining() == 0 && exception == null )
136         {
137             try
138             {
139                 this.wait();
140             }
141             catch( InterruptedException e )
142             {
143             }
144         }
145         waiters --;
146         
147         if( exception != null )
148         {
149             releaseBuffer();
150             throw exception;
151         }
152         
153         if( closed && buf.remaining() == 0 )
154         {
155             releaseBuffer();
156         }
157     }
158     
159     private void releaseBuffer()
160     {
161         if( released )
162         {
163             return;
164         }
165 
166         released = true;
167         buf.release();
168     }
169 
170     synchronized void write( ByteBuffer src )
171     {
172         if( closed )
173         {
174             return;
175         }
176         
177         if( buf.hasRemaining() )
178         {
179             this.buf.compact();
180             this.buf.put( src );
181             this.buf.flip();
182         }
183         else
184         {
185             this.buf.clear();
186             this.buf.put( src );
187             this.buf.flip();
188             this.notify();
189         }
190     }
191     
192     synchronized void throwException( IOException e )
193     {
194         if( exception == null )
195         {
196             exception = e;
197             
198             if( waiters != 0 )
199             {
200                 this.notifyAll();
201             }
202         }
203     }
204 }