1 | /* |
2 | * @(#) $Id: IoAdapter.java 332218 2005-11-10 03:52:42Z trustin $ |
3 | * |
4 | * Copyright 2004 The Apache Software Foundation |
5 | * |
6 | * Licensed under the Apache License, Version 2.0 (the "License"); |
7 | * you may not use this file except in compliance with the License. |
8 | * You may obtain a copy of the License at |
9 | * |
10 | * http://www.apache.org/licenses/LICENSE-2.0 |
11 | * |
12 | * Unless required by applicable law or agreed to in writing, software |
13 | * distributed under the License is distributed on an "AS IS" BASIS, |
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
15 | * See the License for the specific language governing permissions and |
16 | * limitations under the License. |
17 | * |
18 | */ |
19 | package org.apache.mina.protocol.io; |
20 | |
21 | |
22 | import org.apache.mina.common.ByteBuffer; |
23 | import org.apache.mina.common.IdleStatus; |
24 | import org.apache.mina.io.IoHandler; |
25 | import org.apache.mina.io.IoSession; |
26 | import org.apache.mina.protocol.ProtocolCodecFactory; |
27 | import org.apache.mina.protocol.ProtocolDecoder; |
28 | import org.apache.mina.protocol.ProtocolEncoder; |
29 | import org.apache.mina.protocol.ProtocolHandler; |
30 | import org.apache.mina.protocol.ProtocolFilterChain; |
31 | import org.apache.mina.protocol.ProtocolProvider; |
32 | import org.apache.mina.protocol.ProtocolSession; |
33 | import org.apache.mina.protocol.ProtocolViolationException; |
34 | import org.apache.mina.util.Queue; |
35 | |
36 | /** |
37 | * Adapts the specified {@link ProtocolProvider} to {@link IoHandler}. |
38 | * This class is used by {@link IoProtocolAcceptor} and {@link IoProtocolConnector} |
39 | * internally. |
40 | * <p> |
41 | * It is a bridge between I/O layer and Protocol layer. Protocol layer itself |
42 | * cannot do any real I/O, but it translates I/O events to more higher level |
43 | * ones and vice versa. |
44 | * |
45 | * @author The Apache Directory Project (dev@directory.apache.org) |
46 | * @version $Rev: 332218 $, $Date: 2005-11-10 12:52:42 +0900 $ |
47 | */ |
48 | class IoAdapter |
49 | { |
50 | private static final String KEY = "IoAdapter.ProtocolSession"; |
51 | |
52 | private final IoProtocolSessionManagerFilterChain managerFilterChain; |
53 | |
54 | IoAdapter( IoProtocolSessionManagerFilterChain filters ) |
55 | { |
56 | this.managerFilterChain = filters; |
57 | } |
58 | |
59 | public ProtocolFilterChain getFilterChain() |
60 | { |
61 | return managerFilterChain; |
62 | } |
63 | |
64 | /** |
65 | * Converts the specified <code>protocolProvider</code> to {@link IoAdapter} |
66 | * to use for actual I/O. |
67 | * |
68 | * @return a new I/O handler for the specified <code>protocolProvider</code> |
69 | */ |
70 | public IoHandler adapt( ProtocolProvider protocolProvider ) |
71 | { |
72 | return new SessionHandlerAdapter( protocolProvider ); |
73 | } |
74 | |
75 | /** |
76 | * Returns {@link ProtocolSession} of the specified {@link IoSession}. |
77 | */ |
78 | public ProtocolSession toProtocolSession( IoSession session ) |
79 | { |
80 | IoHandler handler = session.getHandler(); |
81 | if( handler instanceof SessionHandlerAdapter ) |
82 | { |
83 | SessionHandlerAdapter sha = ( SessionHandlerAdapter ) handler; |
84 | return sha.getProtocolSession( session ); |
85 | } |
86 | else |
87 | { |
88 | throw new IllegalArgumentException( "Not adapted from IoAdapter." ); |
89 | } |
90 | } |
91 | |
92 | class SessionHandlerAdapter implements IoHandler |
93 | { |
94 | final ProtocolCodecFactory codecFactory; |
95 | final ProtocolHandler handler; |
96 | |
97 | public SessionHandlerAdapter( ProtocolProvider protocolProvider ) |
98 | { |
99 | codecFactory = protocolProvider.getCodecFactory(); |
100 | this.handler = protocolProvider.getHandler(); |
101 | } |
102 | |
103 | public void sessionCreated( IoSession session ) throws Exception |
104 | { |
105 | handler.sessionCreated( getProtocolSession( session ) ); |
106 | } |
107 | |
108 | public void sessionOpened( IoSession session ) |
109 | { |
110 | managerFilterChain.sessionOpened( getProtocolSession( session ) ); |
111 | } |
112 | |
113 | public void sessionClosed( IoSession session ) |
114 | { |
115 | managerFilterChain.sessionClosed( getProtocolSession( session ) ); |
116 | } |
117 | |
118 | public void sessionIdle( IoSession session, IdleStatus status ) |
119 | { |
120 | managerFilterChain.sessionIdle( getProtocolSession( session ), status ); |
121 | } |
122 | |
123 | public void exceptionCaught( IoSession session, Throwable cause ) |
124 | { |
125 | managerFilterChain.exceptionCaught( getProtocolSession( session ), cause ); |
126 | } |
127 | |
128 | public void dataRead( IoSession session, ByteBuffer in ) |
129 | { |
130 | IoProtocolSession psession = getProtocolSession( session ); |
131 | ProtocolDecoder decoder = psession.decoder; |
132 | try |
133 | { |
134 | synchronized( decoder ) |
135 | { |
136 | decoder.decode( psession, in, psession.decOut ); |
137 | } |
138 | |
139 | Queue queue = psession.decOut.getMessageQueue(); |
140 | synchronized( queue ) |
141 | { |
142 | if( !queue.isEmpty() ) |
143 | { |
144 | do |
145 | { |
146 | managerFilterChain.messageReceived( psession, queue.pop() ); |
147 | } |
148 | while( !queue.isEmpty() ); |
149 | } |
150 | } |
151 | } |
152 | catch( ProtocolViolationException pve ) |
153 | { |
154 | pve.setHexdump( in.getHexDump() ); |
155 | managerFilterChain.exceptionCaught( psession, pve ); |
156 | } |
157 | catch( Throwable t ) |
158 | { |
159 | managerFilterChain.exceptionCaught( psession, t ); |
160 | } |
161 | } |
162 | |
163 | public void dataWritten( IoSession session, Object marker ) |
164 | { |
165 | if( marker == null ) |
166 | return; |
167 | managerFilterChain.messageSent( getProtocolSession( session ), |
168 | marker ); |
169 | } |
170 | |
171 | void doWrite( IoSession session ) |
172 | { |
173 | IoProtocolSession psession = getProtocolSession( session ); |
174 | ProtocolEncoder encoder = psession.encoder; |
175 | Queue writeQueue = psession.writeQueue; |
176 | |
177 | if( writeQueue.isEmpty() ) |
178 | { |
179 | return; |
180 | } |
181 | |
182 | try |
183 | { |
184 | while( !writeQueue.isEmpty() ) |
185 | { |
186 | synchronized( writeQueue ) |
187 | { |
188 | Object message = writeQueue.pop(); |
189 | if( message == null ) |
190 | break; |
191 | |
192 | Queue queue = psession.encOut.getBufferQueue(); |
193 | encoder.encode( psession, message, psession.encOut ); |
194 | for( ;; ) |
195 | { |
196 | ByteBuffer buf = ( ByteBuffer ) queue.pop(); |
197 | if( buf == null ) |
198 | break; |
199 | // use marker only if it is the last ByteBuffer |
200 | Object marker = queue.isEmpty() ? message : null; |
201 | session.write( buf, marker ); |
202 | } |
203 | } |
204 | } |
205 | } |
206 | catch( Throwable t ) |
207 | { |
208 | managerFilterChain.exceptionCaught( psession, t ); |
209 | } |
210 | } |
211 | |
212 | private IoProtocolSession getProtocolSession( IoSession session ) |
213 | { |
214 | IoProtocolSession psession = |
215 | ( IoProtocolSession ) session.getAttribute( KEY ); |
216 | if( psession == null ) |
217 | { |
218 | synchronized( session ) |
219 | { |
220 | psession = |
221 | ( IoProtocolSession ) session.getAttribute( KEY ); |
222 | if( psession == null ) |
223 | { |
224 | psession = new IoProtocolSession( |
225 | IoAdapter.this.managerFilterChain, session, this ); |
226 | session.setAttribute( KEY, psession ); |
227 | } |
228 | } |
229 | } |
230 | |
231 | return psession; |
232 | } |
233 | } |
234 | } |