1 | package org.apache.mina.io.handler; |
2 | |
3 | import java.io.IOException; |
4 | import java.io.InputStream; |
5 | import java.io.OutputStream; |
6 | import java.io.PipedOutputStream; |
7 | import java.net.SocketTimeoutException; |
8 | |
9 | import org.apache.mina.common.ByteBuffer; |
10 | import org.apache.mina.common.IdleStatus; |
11 | import org.apache.mina.io.IoHandler; |
12 | import org.apache.mina.io.IoHandlerAdapter; |
13 | import org.apache.mina.io.IoSession; |
14 | |
15 | /** |
16 | * A {@link IoHandler} that adapts asynchronous MINA events to stream I/O. |
17 | * <p> |
18 | * Please extend this class and implement |
19 | * {@link #processStreamIo(IoSession, InputStream, OutputStream)} to |
20 | * execute your stream I/O logic; <b>please note that you must forward |
21 | * the process request to other thread or thread pool.</b> |
22 | * |
23 | * @author The Apache Directory Project (dev@directory.apache.org) |
24 | * @version $Rev: 327113 $, $Date: 2005-10-21 15:59:15 +0900 $ |
25 | */ |
26 | public abstract class StreamIoHandler extends IoHandlerAdapter |
27 | { |
28 | private static final String KEY_IN = "BlockingIoHandler.in"; |
29 | private static final String KEY_OUT = "BlockingIoHandler.out"; |
30 | private static final String KEY_STARTED = "BlockingIoHandler.started"; |
31 | |
32 | private int readTimeout; |
33 | |
34 | private int writeTimeout; |
35 | |
36 | protected StreamIoHandler() |
37 | { |
38 | } |
39 | |
40 | /** |
41 | * Implement this method to execute your stream I/O logic; |
42 | * <b>please note that you must forward the process request to other |
43 | * thread or thread pool.</b> |
44 | */ |
45 | protected abstract void processStreamIo( IoSession session, |
46 | InputStream in, OutputStream out ); |
47 | |
48 | /** |
49 | * Returns read timeout in seconds. |
50 | * The default value is <tt>0</tt> (disabled). |
51 | */ |
52 | public int getReadTimeout() |
53 | { |
54 | return readTimeout; |
55 | } |
56 | |
57 | /** |
58 | * Sets read timeout in seconds. |
59 | * The default value is <tt>0</tt> (disabled). |
60 | */ |
61 | public void setReadTimeout( int readTimeout ) |
62 | { |
63 | this.readTimeout = readTimeout; |
64 | } |
65 | |
66 | /** |
67 | * Returns write timeout in seconds. |
68 | * The default value is <tt>0</tt> (disabled). |
69 | */ |
70 | public int getWriteTimeout() |
71 | { |
72 | return writeTimeout; |
73 | } |
74 | |
75 | /** |
76 | * Sets write timeout in seconds. |
77 | * The default value is <tt>0</tt> (disabled). |
78 | */ |
79 | public void setWriteTimeout( int writeTimeout ) |
80 | { |
81 | this.writeTimeout = writeTimeout; |
82 | } |
83 | |
84 | /** |
85 | * Initializes streams and timeout settings. |
86 | */ |
87 | public void sessionOpened( IoSession session ) |
88 | { |
89 | // Set timeouts |
90 | session.getConfig().setWriteTimeout( writeTimeout ); |
91 | session.getConfig().setIdleTime( IdleStatus.READER_IDLE, readTimeout ); |
92 | |
93 | // Create streams |
94 | PipedOutputStream out = new PipedOutputStream(); |
95 | session.setAttribute( KEY_OUT, out ); |
96 | try |
97 | { |
98 | session.setAttribute( KEY_IN, new PipedInputStream( out ) ); |
99 | } |
100 | catch( IOException e ) |
101 | { |
102 | throw new StreamIoException( e ); |
103 | } |
104 | } |
105 | |
106 | /** |
107 | * Closes input stream. |
108 | */ |
109 | public void sessionClosed( IoSession session ) |
110 | { |
111 | final PipedOutputStream out = ( PipedOutputStream ) session.getAttribute( KEY_OUT ); |
112 | try { |
113 | out.close(); |
114 | } |
115 | catch( IOException e ) |
116 | { |
117 | throw new StreamIoException( e ); |
118 | } |
119 | } |
120 | |
121 | /** |
122 | * Forwards read data to input stream. |
123 | */ |
124 | public void dataRead( IoSession session, ByteBuffer buf ) |
125 | { |
126 | final PipedInputStream in = ( PipedInputStream ) session.getAttribute( KEY_IN ); |
127 | final PipedOutputStream out = ( PipedOutputStream ) session.getAttribute( KEY_OUT ); |
128 | |
129 | java.nio.ByteBuffer nioBuf = buf.buf(); |
130 | int offset = nioBuf.position(); |
131 | int length = nioBuf.limit() - offset; |
132 | if( !nioBuf.hasArray() ) |
133 | { |
134 | ByteBuffer heapBuf = ByteBuffer.allocate( length, false ); |
135 | heapBuf.put( buf ); |
136 | heapBuf.flip(); |
137 | nioBuf = heapBuf.buf(); |
138 | offset = 0; |
139 | } |
140 | |
141 | try |
142 | { |
143 | out.write( nioBuf.array(), offset, length ); |
144 | } |
145 | catch( IOException e ) |
146 | { |
147 | throw new StreamIoException( e ); |
148 | } |
149 | finally |
150 | { |
151 | beginService( session, in ); |
152 | } |
153 | } |
154 | |
155 | /** |
156 | * Forwards caught exceptions to input stream. |
157 | */ |
158 | public void exceptionCaught( IoSession session, Throwable cause ) |
159 | { |
160 | final PipedInputStream in = ( PipedInputStream ) session.getAttribute( KEY_IN ); |
161 | |
162 | IOException e = null; |
163 | if( cause instanceof StreamIoException ) |
164 | { |
165 | e = ( IOException ) cause.getCause(); |
166 | } |
167 | else if( cause instanceof IOException ) |
168 | { |
169 | e = ( IOException ) cause; |
170 | } |
171 | |
172 | if( e != null && in != null ) |
173 | { |
174 | in.setException( e ); |
175 | beginService( session, in ); |
176 | } |
177 | else |
178 | { |
179 | cause.printStackTrace(); |
180 | session.close(); |
181 | } |
182 | } |
183 | |
184 | /** |
185 | * Handles read timeout. |
186 | */ |
187 | public void sessionIdle( IoSession session, IdleStatus status ) |
188 | { |
189 | if( status == IdleStatus.READER_IDLE ) |
190 | { |
191 | throw new StreamIoException( |
192 | new SocketTimeoutException( "Read timeout" ) ); |
193 | } |
194 | } |
195 | |
196 | private void beginService( IoSession session, PipedInputStream in ) |
197 | { |
198 | if( session.getAttribute( KEY_STARTED ) == null ) |
199 | { |
200 | session.setAttribute( KEY_STARTED, Boolean.TRUE ); |
201 | processStreamIo( session, in, new ServiceOutputStream( session ) ); |
202 | } |
203 | } |
204 | |
205 | private static class PipedInputStream extends java.io.PipedInputStream |
206 | { |
207 | private IOException exception; |
208 | |
209 | public PipedInputStream(PipedOutputStream src) throws IOException |
210 | { |
211 | super( src ); |
212 | } |
213 | |
214 | public void setException( IOException e ) |
215 | { |
216 | this.exception = e; |
217 | } |
218 | |
219 | public synchronized int read() throws IOException |
220 | { |
221 | throwException(); |
222 | return super.read(); |
223 | } |
224 | |
225 | public synchronized int read( byte[] b, int off, int len ) throws IOException |
226 | { |
227 | throwException(); |
228 | return super.read( b, off, len ); |
229 | } |
230 | |
231 | private void throwException() throws IOException |
232 | { |
233 | if( exception != null ) |
234 | { |
235 | throw exception; |
236 | } |
237 | } |
238 | } |
239 | |
240 | private static class ServiceOutputStream extends OutputStream |
241 | { |
242 | private final IoSession session; |
243 | |
244 | public ServiceOutputStream( IoSession session ) |
245 | { |
246 | this.session = session; |
247 | } |
248 | |
249 | public void close() |
250 | { |
251 | session.close( true ); |
252 | } |
253 | |
254 | public void flush() |
255 | { |
256 | } |
257 | |
258 | public void write( byte[] b, int off, int len ) |
259 | { |
260 | ByteBuffer buf = ByteBuffer.wrap( b, off, len ); |
261 | buf.acquire(); // prevent from being pooled. |
262 | session.write( buf, null ); |
263 | } |
264 | |
265 | public void write( byte[] b ) |
266 | { |
267 | ByteBuffer buf = ByteBuffer.wrap( b ); |
268 | buf.acquire(); // prevent from being pooled. |
269 | session.write( buf, null ); |
270 | } |
271 | |
272 | public void write( int b ) |
273 | { |
274 | ByteBuffer buf = ByteBuffer.allocate( 1 ); |
275 | buf.put( ( byte ) b ); |
276 | buf.flip(); |
277 | session.write( buf, null ); |
278 | } |
279 | } |
280 | |
281 | private static class StreamIoException extends RuntimeException |
282 | { |
283 | private static final long serialVersionUID = 3976736960742503222L; |
284 | |
285 | public StreamIoException( IOException cause ) |
286 | { |
287 | super(cause); |
288 | } |
289 | } |
290 | } |