1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.filter.codec;
21
22 import org.apache.mina.common.ByteBuffer;
23 import org.apache.mina.common.ByteBufferProxy;
24 import org.apache.mina.common.IoFilter;
25 import org.apache.mina.common.IoFilterAdapter;
26 import org.apache.mina.common.IoFilterChain;
27 import org.apache.mina.common.IoSession;
28 import org.apache.mina.common.WriteFuture;
29 import org.apache.mina.common.support.DefaultWriteFuture;
30 import org.apache.mina.filter.codec.support.SimpleProtocolDecoderOutput;
31 import org.apache.mina.filter.codec.support.SimpleProtocolEncoderOutput;
32 import org.apache.mina.util.SessionLog;
33
34
35
36
37
38
39
40
41
42 public class ProtocolCodecFilter extends IoFilterAdapter {
43 public static final String ENCODER = ProtocolCodecFilter.class.getName()
44 + ".encoder";
45
46 public static final String DECODER = ProtocolCodecFilter.class.getName()
47 + ".decoder";
48
49 private static final String DECODER_OUT = ProtocolCodecFilter.class.getName()
50 + ".decoderOut";
51
52 private static final Class<?>[] EMPTY_PARAMS = new Class[0];
53
54 private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.wrap(new byte[0]);
55
56 private final ProtocolCodecFactory factory;
57
58 public ProtocolCodecFilter(ProtocolCodecFactory factory) {
59 if (factory == null) {
60 throw new NullPointerException("factory");
61 }
62 this.factory = factory;
63 }
64
65 public ProtocolCodecFilter(final ProtocolEncoder encoder,
66 final ProtocolDecoder decoder) {
67 if (encoder == null) {
68 throw new NullPointerException("encoder");
69 }
70 if (decoder == null) {
71 throw new NullPointerException("decoder");
72 }
73
74 this.factory = new ProtocolCodecFactory() {
75 public ProtocolEncoder getEncoder() {
76 return encoder;
77 }
78
79 public ProtocolDecoder getDecoder() {
80 return decoder;
81 }
82 };
83 }
84
85 public ProtocolCodecFilter(
86 final Class<? extends ProtocolEncoder> encoderClass,
87 final Class<? extends ProtocolDecoder> decoderClass) {
88 if (encoderClass == null) {
89 throw new NullPointerException("encoderClass");
90 }
91 if (decoderClass == null) {
92 throw new NullPointerException("decoderClass");
93 }
94 if (!ProtocolEncoder.class.isAssignableFrom(encoderClass)) {
95 throw new IllegalArgumentException("encoderClass: "
96 + encoderClass.getName());
97 }
98 if (!ProtocolDecoder.class.isAssignableFrom(decoderClass)) {
99 throw new IllegalArgumentException("decoderClass: "
100 + decoderClass.getName());
101 }
102 try {
103 encoderClass.getConstructor(EMPTY_PARAMS);
104 } catch (NoSuchMethodException e) {
105 throw new IllegalArgumentException(
106 "encoderClass doesn't have a public default constructor.");
107 }
108 try {
109 decoderClass.getConstructor(EMPTY_PARAMS);
110 } catch (NoSuchMethodException e) {
111 throw new IllegalArgumentException(
112 "decoderClass doesn't have a public default constructor.");
113 }
114
115 this.factory = new ProtocolCodecFactory() {
116 public ProtocolEncoder getEncoder() throws Exception {
117 return encoderClass.newInstance();
118 }
119
120 public ProtocolDecoder getDecoder() throws Exception {
121 return decoderClass.newInstance();
122 }
123 };
124 }
125
126 @Override
127 public void onPreAdd(IoFilterChain parent, String name,
128 NextFilter nextFilter) throws Exception {
129 if (parent.contains(ProtocolCodecFilter.class)) {
130 throw new IllegalStateException(
131 "A filter chain cannot contain more than one ProtocolCodecFilter.");
132 }
133 }
134
135 @Override
136 public void onPostRemove(IoFilterChain parent, String name,
137 NextFilter nextFilter) throws Exception {
138 disposeEncoder(parent.getSession());
139 disposeDecoder(parent.getSession());
140 disposeDecoderOut(parent.getSession());
141 }
142
143 @Override
144 public void messageReceived(NextFilter nextFilter, IoSession session,
145 Object message) throws Exception {
146 if (!(message instanceof ByteBuffer)) {
147 nextFilter.messageReceived(session, message);
148 return;
149 }
150
151 ByteBuffer in = (ByteBuffer) message;
152 ProtocolDecoder decoder = getDecoder(session);
153 ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
154
155 try {
156 synchronized (decoderOut) {
157 decoder.decode(session, in, decoderOut);
158 }
159 } catch (Throwable t) {
160 ProtocolDecoderException pde;
161 if (t instanceof ProtocolDecoderException) {
162 pde = (ProtocolDecoderException) t;
163 } else {
164 pde = new ProtocolDecoderException(t);
165 }
166 pde.setHexdump(in.getHexDump());
167 throw pde;
168 } finally {
169 try {
170
171 in.release();
172 } finally {
173 decoderOut.flush();
174 }
175 }
176 }
177
178 @Override
179 public void messageSent(NextFilter nextFilter, IoSession session,
180 Object message) throws Exception {
181 if (message instanceof HiddenByteBuffer) {
182 return;
183 }
184
185 if (!(message instanceof MessageByteBuffer)) {
186 nextFilter.messageSent(session, message);
187 return;
188 }
189
190 nextFilter.messageSent(session, ((MessageByteBuffer) message).message);
191 }
192
193 @Override
194 public void filterWrite(NextFilter nextFilter, IoSession session,
195 WriteRequest writeRequest) throws Exception {
196 Object message = writeRequest.getMessage();
197 if (message instanceof ByteBuffer) {
198 nextFilter.filterWrite(session, writeRequest);
199 return;
200 }
201
202 ProtocolEncoder encoder = getEncoder(session);
203 ProtocolEncoderOutputImpl encoderOut = getEncoderOut(session,
204 nextFilter, writeRequest);
205
206 try {
207 encoder.encode(session, message, encoderOut);
208 encoderOut.flush();
209 nextFilter.filterWrite(session, new WriteRequest(
210 new MessageByteBuffer(writeRequest.getMessage()),
211 writeRequest.getFuture(), writeRequest.getDestination()));
212 } catch (Throwable t) {
213 ProtocolEncoderException pee;
214 if (t instanceof ProtocolEncoderException) {
215 pee = (ProtocolEncoderException) t;
216 } else {
217 pee = new ProtocolEncoderException(t);
218 }
219 throw pee;
220 }
221 }
222
223 @Override
224 public void sessionClosed(NextFilter nextFilter, IoSession session)
225 throws Exception {
226
227 ProtocolDecoder decoder = getDecoder(session);
228 ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
229 try {
230 decoder.finishDecode(session, decoderOut);
231 } catch (Throwable t) {
232 ProtocolDecoderException pde;
233 if (t instanceof ProtocolDecoderException) {
234 pde = (ProtocolDecoderException) t;
235 } else {
236 pde = new ProtocolDecoderException(t);
237 }
238 throw pde;
239 } finally {
240
241 disposeEncoder(session);
242 disposeDecoder(session);
243 disposeDecoderOut(session);
244 decoderOut.flush();
245 }
246
247 nextFilter.sessionClosed(session);
248 }
249
250 private ProtocolEncoder getEncoder(IoSession session) throws Exception {
251 ProtocolEncoder encoder = (ProtocolEncoder) session
252 .getAttribute(ENCODER);
253 if (encoder == null) {
254 encoder = factory.getEncoder();
255 session.setAttribute(ENCODER, encoder);
256 }
257 return encoder;
258 }
259
260 private ProtocolEncoderOutputImpl getEncoderOut(IoSession session,
261 NextFilter nextFilter, WriteRequest writeRequest) {
262 return new ProtocolEncoderOutputImpl(session, nextFilter, writeRequest);
263 }
264
265 private ProtocolDecoder getDecoder(IoSession session) throws Exception {
266 ProtocolDecoder decoder = (ProtocolDecoder) session
267 .getAttribute(DECODER);
268 if (decoder == null) {
269 decoder = factory.getDecoder();
270 session.setAttribute(DECODER, decoder);
271 }
272 return decoder;
273 }
274
275 private ProtocolDecoderOutput getDecoderOut(IoSession session,
276 NextFilter nextFilter) {
277 ProtocolDecoderOutput out = (ProtocolDecoderOutput) session.getAttribute(DECODER_OUT);
278 if (out == null) {
279 out = new SimpleProtocolDecoderOutput(session, nextFilter);
280 session.setAttribute(DECODER_OUT, out);
281 }
282 return out;
283 }
284
285 private void disposeEncoder(IoSession session) {
286 ProtocolEncoder encoder = (ProtocolEncoder) session
287 .removeAttribute(ENCODER);
288 if (encoder == null) {
289 return;
290 }
291
292 try {
293 encoder.dispose(session);
294 } catch (Throwable t) {
295 SessionLog.warn(session, "Failed to dispose: "
296 + encoder.getClass().getName() + " (" + encoder + ')');
297 }
298 }
299
300 private void disposeDecoder(IoSession session) {
301 ProtocolDecoder decoder = (ProtocolDecoder) session
302 .removeAttribute(DECODER);
303 if (decoder == null) {
304 return;
305 }
306
307 try {
308 decoder.dispose(session);
309 } catch (Throwable t) {
310 SessionLog.warn(session, "Falied to dispose: "
311 + decoder.getClass().getName() + " (" + decoder + ')');
312 }
313 }
314
315 private void disposeDecoderOut(IoSession session) {
316 session.removeAttribute(DECODER_OUT);
317 }
318
319 private static class HiddenByteBuffer extends ByteBufferProxy {
320 private HiddenByteBuffer(ByteBuffer buf) {
321 super(buf);
322 }
323 }
324
325 private static class MessageByteBuffer extends ByteBufferProxy {
326 private final Object message;
327
328 private MessageByteBuffer(Object message) {
329 super(EMPTY_BUFFER);
330 this.message = message;
331 }
332
333 @Override
334 public void acquire() {
335
336 }
337
338 @Override
339 public void release() {
340
341 }
342 }
343
344 private static class ProtocolEncoderOutputImpl extends
345 SimpleProtocolEncoderOutput {
346 private final IoSession session;
347
348 private final NextFilter nextFilter;
349
350 private final WriteRequest writeRequest;
351
352 ProtocolEncoderOutputImpl(IoSession session, NextFilter nextFilter,
353 WriteRequest writeRequest) {
354 this.session = session;
355 this.nextFilter = nextFilter;
356 this.writeRequest = writeRequest;
357 }
358
359 @Override
360 protected WriteFuture doFlush(ByteBuffer buf) {
361 WriteFuture future = new DefaultWriteFuture(session);
362 nextFilter.filterWrite(session, new WriteRequest(
363 new HiddenByteBuffer(buf), future, writeRequest
364 .getDestination()));
365 return future;
366 }
367 }
368 }