View Javadoc

1   package org.apache.mina.io.handler;
2   
3   import java.io.IOException;
4   import java.io.InputStream;
5   import java.io.OutputStream;
6   import java.io.PipedOutputStream;
7   import java.net.SocketTimeoutException;
8   
9   import org.apache.mina.common.ByteBuffer;
10  import org.apache.mina.common.IdleStatus;
11  import org.apache.mina.io.IoHandler;
12  import org.apache.mina.io.IoHandlerAdapter;
13  import org.apache.mina.io.IoSession;
14  
15  /***
16   * A {@link IoHandler} that adapts asynchronous MINA events to stream I/O.
17   * <p>
18   * Please extend this class and implement
19   * {@link #processStreamIo(IoSession, InputStream, OutputStream)} to
20   * execute your stream I/O logic; <b>please note that you must forward
21   * the process request to other thread or thread pool.</b>
22   * 
23   * @author The Apache Directory Project (dev@directory.apache.org)
24   * @version $Rev: 327113 $, $Date: 2005-10-21 15:59:15 +0900 $
25   */
26  public abstract class StreamIoHandler extends IoHandlerAdapter
27  {
28      private static final String KEY_IN = "BlockingIoHandler.in";
29      private static final String KEY_OUT = "BlockingIoHandler.out";
30      private static final String KEY_STARTED = "BlockingIoHandler.started";
31      
32      private int readTimeout;
33      
34      private int writeTimeout;
35      
36      protected StreamIoHandler()
37      {
38      }
39      
40      /***
41       * Implement this method to execute your stream I/O logic;
42       * <b>please note that you must forward the process request to other
43       * thread or thread pool.</b>
44       */
45      protected abstract void processStreamIo( IoSession session,
46                                               InputStream in, OutputStream out );
47      
48      /***
49       * Returns read timeout in seconds.
50       * The default value is <tt>0</tt> (disabled).
51       */
52      public int getReadTimeout()
53      {
54          return readTimeout;
55      }
56      
57      /***
58       * Sets read timeout in seconds.
59       * The default value is <tt>0</tt> (disabled).
60       */
61      public void setReadTimeout( int readTimeout )
62      {
63          this.readTimeout = readTimeout;
64      }
65      
66      /***
67       * Returns write timeout in seconds.
68       * The default value is <tt>0</tt> (disabled).
69       */
70      public int getWriteTimeout()
71      {
72          return writeTimeout;
73      }
74      
75      /***
76       * Sets write timeout in seconds.
77       * The default value is <tt>0</tt> (disabled).
78       */
79      public void setWriteTimeout( int writeTimeout )
80      {
81          this.writeTimeout = writeTimeout;
82      }
83  
84      /***
85       * Initializes streams and timeout settings.
86       */
87      public void sessionOpened( IoSession session )
88      {
89          // Set timeouts
90          session.getConfig().setWriteTimeout( writeTimeout );
91          session.getConfig().setIdleTime( IdleStatus.READER_IDLE, readTimeout );
92  
93          // Create streams
94          PipedOutputStream out = new PipedOutputStream();
95          session.setAttribute( KEY_OUT, out );
96          try
97          {
98              session.setAttribute( KEY_IN, new PipedInputStream( out ) );
99          }
100         catch( IOException e )
101         {
102             throw new StreamIoException( e );
103         }
104     }
105     
106     /***
107      * Closes input stream.
108      */
109     public void sessionClosed( IoSession session )
110     {
111         final PipedOutputStream out = ( PipedOutputStream ) session.getAttribute( KEY_OUT );
112         try {
113             out.close();
114         }
115         catch( IOException e )
116         {
117             throw new StreamIoException( e );
118         }
119     }
120 
121     /***
122      * Forwards read data to input stream.
123      */
124     public void dataRead( IoSession session, ByteBuffer buf )
125     {
126         final PipedInputStream in = ( PipedInputStream ) session.getAttribute( KEY_IN );
127         final PipedOutputStream out = ( PipedOutputStream ) session.getAttribute( KEY_OUT );
128         
129         java.nio.ByteBuffer nioBuf = buf.buf();
130         int offset = nioBuf.position();
131         int length = nioBuf.limit() - offset;
132         if( !nioBuf.hasArray() )
133         {
134             ByteBuffer heapBuf = ByteBuffer.allocate( length, false );
135             heapBuf.put( buf );
136             heapBuf.flip();
137             nioBuf = heapBuf.buf();
138             offset = 0;
139         }
140         
141         try
142         {
143             out.write( nioBuf.array(), offset, length );
144         }
145         catch( IOException e )
146         {
147             throw new StreamIoException( e );
148         }
149         finally
150         {
151             beginService( session, in );
152         }
153     }
154 
155     /***
156      * Forwards caught exceptions to input stream.
157      */
158     public void exceptionCaught( IoSession session, Throwable cause )
159     {
160         final PipedInputStream in = ( PipedInputStream ) session.getAttribute( KEY_IN );
161         
162         IOException e = null;
163         if( cause instanceof StreamIoException )
164         {
165             e = ( IOException ) cause.getCause();
166         }
167         else if( cause instanceof IOException )
168         {
169             e = ( IOException ) cause;
170         }
171         
172         if( e != null && in != null )
173         {
174             in.setException( e );
175             beginService( session, in );
176         }
177         else
178         {
179             cause.printStackTrace();
180             session.close();
181         }
182     }
183 
184     /***
185      * Handles read timeout.
186      */
187     public void sessionIdle( IoSession session, IdleStatus status )
188     {
189         if( status == IdleStatus.READER_IDLE )
190         {
191             throw new StreamIoException(
192                     new SocketTimeoutException( "Read timeout" ) );
193         }
194     }
195 
196     private void beginService( IoSession session, PipedInputStream in )
197     {
198         if( session.getAttribute( KEY_STARTED ) == null )
199         {
200             session.setAttribute( KEY_STARTED, Boolean.TRUE );
201             processStreamIo( session, in, new ServiceOutputStream( session ) );
202         }
203     }
204 
205     private static class PipedInputStream extends java.io.PipedInputStream
206     {
207         private IOException exception;
208 
209         public PipedInputStream(PipedOutputStream src) throws IOException
210         {
211             super( src );
212         }
213         
214         public void setException( IOException e )
215         {
216             this.exception = e;
217         }
218 
219         public synchronized int read() throws IOException
220         {
221             throwException();
222             return super.read();
223         }
224 
225         public synchronized int read( byte[] b, int off, int len ) throws IOException
226         {
227             throwException();
228             return super.read( b, off, len );
229         }
230         
231         private void throwException() throws IOException
232         {
233             if( exception != null )
234             {
235                 throw exception;
236             }
237         }
238     }
239 
240     private static class ServiceOutputStream extends OutputStream
241     {
242         private final IoSession session;
243         
244         public ServiceOutputStream( IoSession session )
245         {
246             this.session = session;
247         }
248 
249         public void close()
250         {
251             session.close( true );
252         }
253 
254         public void flush()
255         {
256         }
257 
258         public void write( byte[] b, int off, int len )
259         {
260             ByteBuffer buf = ByteBuffer.wrap( b, off, len );
261             buf.acquire(); // prevent from being pooled.
262             session.write( buf, null );
263         }
264 
265         public void write( byte[] b )
266         {
267             ByteBuffer buf = ByteBuffer.wrap( b );
268             buf.acquire(); // prevent from being pooled.
269             session.write( buf, null );
270         }
271 
272         public void write( int b )
273         {
274             ByteBuffer buf = ByteBuffer.allocate( 1 );
275             buf.put( ( byte ) b );
276             buf.flip();
277             session.write( buf, null );
278         }
279     }
280     
281     private static class StreamIoException extends RuntimeException
282     {
283         private static final long serialVersionUID = 3976736960742503222L;
284 
285         public StreamIoException( IOException cause )
286         {
287             super(cause);
288         }
289     }
290 }