1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.mina.io.handler;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23
24 import org.apache.mina.common.ByteBuffer;
25 import org.apache.mina.io.IoHandler;
26 import org.apache.mina.io.IoSession;
27
28 /***
29 * An {@link InputStream} that buffers data read from
30 * {@link IoHandler#dataRead(IoSession, ByteBuffer)} events.
31 *
32 * @author The Apache Directory Project (dev@directory.apache.org)
33 * @version $Rev$, $Date$
34 *
35 */
36 class IoSessionInputStream extends InputStream
37 {
38 private final ByteBuffer buf;
39 private boolean closed;
40 private boolean released;
41 private IOException exception;
42 private int waiters;
43
44 IoSessionInputStream()
45 {
46 buf = ByteBuffer.allocate( 16 );
47 buf.setAutoExpand( true );
48 buf.limit( 0 );
49 }
50
51 public synchronized int available()
52 {
53 if( released )
54 {
55 return 0;
56 }
57 else
58 {
59 return buf.remaining();
60 }
61 }
62
63 public synchronized void close()
64 {
65 if( closed )
66 {
67 return;
68 }
69
70 closed = true;
71 releaseBuffer();
72
73 if( waiters != 0 )
74 {
75 this.notifyAll();
76 }
77 }
78
79 public void mark( int readlimit )
80 {
81 }
82
83 public boolean markSupported()
84 {
85 return false;
86 }
87
88 public synchronized int read() throws IOException
89 {
90 waitForData();
91 if( released )
92 {
93 return -1;
94 }
95
96 int ret = buf.get() & 0xff;
97 return ret;
98 }
99
100 public synchronized int read( byte[] b, int off, int len ) throws IOException
101 {
102 waitForData();
103 if( released )
104 {
105 return -1;
106 }
107
108 int readBytes;
109 if( len > buf.remaining() )
110 {
111 readBytes = buf.remaining();
112 }
113 else
114 {
115 readBytes = len;
116 }
117 buf.get( b, off, readBytes );
118
119 return readBytes;
120 }
121
122 public synchronized void reset() throws IOException
123 {
124 throw new IOException( "Mark is not supported." );
125 }
126
127 private void waitForData() throws IOException
128 {
129 if( released )
130 {
131 throw new IOException( "Stream is closed." );
132 }
133
134 waiters ++;
135 while( !released && buf.remaining() == 0 && exception == null )
136 {
137 try
138 {
139 this.wait();
140 }
141 catch( InterruptedException e )
142 {
143 }
144 }
145 waiters --;
146
147 if( exception != null )
148 {
149 releaseBuffer();
150 throw exception;
151 }
152
153 if( closed && buf.remaining() == 0 )
154 {
155 releaseBuffer();
156 }
157 }
158
159 private void releaseBuffer()
160 {
161 if( released )
162 {
163 return;
164 }
165
166 released = true;
167 buf.release();
168 }
169
170 synchronized void write( ByteBuffer src )
171 {
172 if( closed )
173 {
174 return;
175 }
176
177 if( buf.hasRemaining() )
178 {
179 this.buf.compact();
180 this.buf.put( src );
181 this.buf.flip();
182 }
183 else
184 {
185 this.buf.clear();
186 this.buf.put( src );
187 this.buf.flip();
188 this.notify();
189 }
190 }
191
192 synchronized void throwException( IOException e )
193 {
194 if( exception == null )
195 {
196 exception = e;
197
198 if( waiters != 0 )
199 {
200 this.notifyAll();
201 }
202 }
203 }
204 }