View Javadoc

1   /*
2    *   @(#) $Id: IoAdapter.java 165586 2005-05-02 06:27:27Z trustin $
3    *
4    *   Copyright 2004 The Apache Software Foundation
5    *
6    *   Licensed under the Apache License, Version 2.0 (the "License");
7    *   you may not use this file except in compliance with the License.
8    *   You may obtain a copy of the License at
9    *
10   *       http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *   Unless required by applicable law or agreed to in writing, software
13   *   distributed under the License is distributed on an "AS IS" BASIS,
14   *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   *   See the License for the specific language governing permissions and
16   *   limitations under the License.
17   *
18   */
19  package org.apache.mina.protocol.io;
20  
21  
22  import org.apache.mina.common.ByteBuffer;
23  import org.apache.mina.common.IdleStatus;
24  import org.apache.mina.io.IoHandler;
25  import org.apache.mina.io.IoSession;
26  import org.apache.mina.protocol.ProtocolCodecFactory;
27  import org.apache.mina.protocol.ProtocolDecoder;
28  import org.apache.mina.protocol.ProtocolEncoder;
29  import org.apache.mina.protocol.ProtocolHandler;
30  import org.apache.mina.protocol.ProtocolFilterChain;
31  import org.apache.mina.protocol.ProtocolProvider;
32  import org.apache.mina.protocol.ProtocolSession;
33  import org.apache.mina.protocol.ProtocolViolationException;
34  import org.apache.mina.util.Queue;
35  
36  /***
37   * Adapts the specified {@link ProtocolProvider} to {@link IoHandler}.
38   * This class is used by {@link IoProtocolAcceptor} and {@link IoProtocolConnector}
39   * internally.
40   * <p>
41   * It is a bridge between I/O layer and Protocol layer.  Protocol layer itself
42   * cannot do any real I/O, but it translates I/O events to more higher level
43   * ones and vice versa.
44   * 
45   * @author Trustin Lee (trustin@apache.org)
46   * @version $Rev: 165586 $, $Date: 2005-05-02 15:27:27 +0900 (?, 02  5? 2005) $
47   */
48  class IoAdapter
49  {
50      private static final String KEY = "IoAdapter.ProtocolSession";
51  
52      private final IoProtocolSessionManagerFilterChain managerFilterChain;
53  
54      IoAdapter( IoProtocolSessionManagerFilterChain filters )
55      {
56          this.managerFilterChain = filters;
57      }
58      
59      public ProtocolFilterChain getFilterChain()
60      {
61          return managerFilterChain;
62      }
63  
64      /***
65       * Converts the specified <code>protocolProvider</code> to {@link IoAdapter}
66       * to use for actual I/O.
67       * 
68       * @return a new I/O handler for the specified <code>protocolProvider</code>
69       */
70      public IoHandler adapt( ProtocolProvider protocolProvider )
71      {
72          return new SessionHandlerAdapter( protocolProvider );
73      }
74  
75      /***
76       * Returns {@link ProtocolSession} of the specified {@link IoSession}.
77       */
78      public ProtocolSession toProtocolSession( IoSession session )
79      {
80          IoHandler handler = session.getHandler();
81          if( handler instanceof SessionHandlerAdapter )
82          {
83              SessionHandlerAdapter sha = ( SessionHandlerAdapter ) handler;
84              return sha.getProtocolSession( session );
85          }
86          else
87          {
88              throw new IllegalArgumentException( "Not adapted from IoAdapter." );
89          }
90      }
91  
92      class SessionHandlerAdapter implements IoHandler
93      {
94          final ProtocolCodecFactory codecFactory;
95          final ProtocolHandler handler;
96  
97          public SessionHandlerAdapter( ProtocolProvider protocolProvider )
98          {
99              codecFactory = protocolProvider.getCodecFactory();
100             this.handler = protocolProvider.getHandler();
101         }
102         
103         public void sessionCreated( IoSession session )
104         {
105         }
106 
107         public void sessionOpened( IoSession session )
108         {
109             managerFilterChain.sessionOpened( getProtocolSession( session ) );
110         }
111 
112         public void sessionClosed( IoSession session )
113         {
114             managerFilterChain.sessionClosed( getProtocolSession( session ) );
115         }
116 
117         public void sessionIdle( IoSession session, IdleStatus status )
118         {
119             managerFilterChain.sessionIdle( getProtocolSession( session ), status );
120         }
121 
122         public void exceptionCaught( IoSession session, Throwable cause )
123         {
124             managerFilterChain.exceptionCaught( getProtocolSession( session ), cause );
125         }
126 
127         public void dataRead( IoSession session, ByteBuffer in )
128         {
129             IoProtocolSession psession = getProtocolSession( session );
130             ProtocolDecoder decoder = psession.decoder;
131             try
132             {
133                 synchronized( decoder )
134                 {
135                     decoder.decode( psession, in, psession.decOut );
136                 }
137 
138                 Queue queue = psession.decOut.getMessageQueue();
139                 synchronized( queue )
140                 {
141                     if( !queue.isEmpty() )
142                     {
143                         do
144                         {
145                             managerFilterChain.messageReceived( psession, queue.pop() );
146                         }
147                         while( !queue.isEmpty() );
148                     }
149                 }
150             }
151             catch( ProtocolViolationException pve )
152             {
153                 pve.setBuffer( in );
154                 managerFilterChain.exceptionCaught( psession, pve );
155             }
156             catch( Throwable t )
157             {
158                 managerFilterChain.exceptionCaught( psession, t );
159             }
160         }
161 
162         public void dataWritten( IoSession session, Object marker )
163         {
164             if( marker == null )
165                 return;
166             managerFilterChain.messageSent( getProtocolSession( session ),
167                                  marker );
168         }
169 
170         void doWrite( IoSession session )
171         {
172             IoProtocolSession psession = getProtocolSession( session );
173             ProtocolEncoder encoder = psession.encoder;
174             Queue writeQueue = psession.writeQueue;
175 
176             if( writeQueue.isEmpty() )
177             {
178                 return;
179             }
180 
181             try
182             {
183                 while( !writeQueue.isEmpty() )
184                 {
185                     synchronized( writeQueue )
186                     {
187                         Object message = writeQueue.pop();
188                         if( message == null )
189                             break;
190 
191                         Queue queue = psession.encOut.getBufferQueue();
192                         encoder.encode( psession, message, psession.encOut );
193                         for( ;; )
194                         {
195                             ByteBuffer buf = ( ByteBuffer ) queue.pop();
196                             if( buf == null )
197                                 break;
198                             // use marker only if it is the last ByteBuffer
199                             Object marker = queue.isEmpty() ? message : null;
200                             session.write( buf, marker );
201                         }
202                     }
203                 }
204             }
205             catch( Throwable t )
206             {
207                 managerFilterChain.exceptionCaught( psession, t );
208             }
209         }
210 
211         private IoProtocolSession getProtocolSession( IoSession session )
212         {
213             IoProtocolSession psession =
214                 ( IoProtocolSession ) session.getAttribute( KEY );
215             if( psession == null )
216             {
217                 synchronized( session )
218                 {
219                     psession =
220                         ( IoProtocolSession ) session.getAttribute( KEY );
221                     if( psession == null )
222                     {
223                         psession = new IoProtocolSession(
224                                 IoAdapter.this.managerFilterChain, session, this );
225                         session.setAttribute( KEY, psession );
226                     }
227                 }
228             }
229 
230             return psession;
231         }
232     }
233 }