1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.filter.codec;
21
22 import java.net.SocketAddress;
23 import java.util.Queue;
24 import java.util.concurrent.Semaphore;
25
26 import org.apache.mina.core.buffer.IoBuffer;
27 import org.apache.mina.core.file.FileRegion;
28 import org.apache.mina.core.filterchain.IoFilter;
29 import org.apache.mina.core.filterchain.IoFilterAdapter;
30 import org.apache.mina.core.filterchain.IoFilterChain;
31 import org.apache.mina.core.future.DefaultWriteFuture;
32 import org.apache.mina.core.future.WriteFuture;
33 import org.apache.mina.core.session.AttributeKey;
34 import org.apache.mina.core.session.IoSession;
35 import org.apache.mina.core.write.DefaultWriteRequest;
36 import org.apache.mina.core.write.NothingWrittenException;
37 import org.apache.mina.core.write.WriteRequest;
38 import org.apache.mina.core.write.WriteRequestWrapper;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42
43
44
45
46
47
48
49
50 public class ProtocolCodecFilter extends IoFilterAdapter {
51
52 private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolCodecFilter.class);
53
54 private static final Class<?>[] EMPTY_PARAMS = new Class[0];
55
56 private static final IoBuffer EMPTY_BUFFER = IoBuffer.wrap(new byte[0]);
57
58 private static final AttributeKey ENCODER = new AttributeKey(ProtocolCodecFilter.class, "encoder");
59
60 private static final AttributeKey DECODER = new AttributeKey(ProtocolCodecFilter.class, "decoder");
61
62 private static final AttributeKey DECODER_OUT = new AttributeKey(ProtocolCodecFilter.class, "decoderOut");
63
64 private static final AttributeKey ENCODER_OUT = new AttributeKey(ProtocolCodecFilter.class, "encoderOut");
65
66
67 private final ProtocolCodecFactory factory;
68
69 private final Semaphore lock = new Semaphore(1, true);
70
71
72
73
74
75
76
77 public ProtocolCodecFilter(ProtocolCodecFactory factory) {
78 if (factory == null) {
79 throw new IllegalArgumentException("factory");
80 }
81
82 this.factory = factory;
83 }
84
85
86
87
88
89
90
91
92
93 public ProtocolCodecFilter(final ProtocolEncoder encoder, final ProtocolDecoder decoder) {
94 if (encoder == null) {
95 throw new IllegalArgumentException("encoder");
96 }
97 if (decoder == null) {
98 throw new IllegalArgumentException("decoder");
99 }
100
101
102 this.factory = new ProtocolCodecFactory() {
103 public ProtocolEncoder getEncoder(IoSession session) {
104 return encoder;
105 }
106
107 public ProtocolDecoder getDecoder(IoSession session) {
108 return decoder;
109 }
110 };
111 }
112
113
114
115
116
117
118
119
120
121
122 public ProtocolCodecFilter(final Class<? extends ProtocolEncoder> encoderClass,
123 final Class<? extends ProtocolDecoder> decoderClass) {
124 if (encoderClass == null) {
125 throw new IllegalArgumentException("encoderClass");
126 }
127 if (decoderClass == null) {
128 throw new IllegalArgumentException("decoderClass");
129 }
130 if (!ProtocolEncoder.class.isAssignableFrom(encoderClass)) {
131 throw new IllegalArgumentException("encoderClass: " + encoderClass.getName());
132 }
133 if (!ProtocolDecoder.class.isAssignableFrom(decoderClass)) {
134 throw new IllegalArgumentException("decoderClass: " + decoderClass.getName());
135 }
136 try {
137 encoderClass.getConstructor(EMPTY_PARAMS);
138 } catch (NoSuchMethodException e) {
139 throw new IllegalArgumentException("encoderClass doesn't have a public default constructor.");
140 }
141 try {
142 decoderClass.getConstructor(EMPTY_PARAMS);
143 } catch (NoSuchMethodException e) {
144 throw new IllegalArgumentException("decoderClass doesn't have a public default constructor.");
145 }
146
147 final ProtocolEncoder encoder;
148
149 try {
150 encoder = encoderClass.newInstance();
151 } catch (Exception e) {
152 throw new IllegalArgumentException("encoderClass cannot be initialized");
153 }
154
155 final ProtocolDecoder decoder;
156
157 try {
158 decoder = decoderClass.newInstance();
159 } catch (Exception e) {
160 throw new IllegalArgumentException("decoderClass cannot be initialized");
161 }
162
163
164 this.factory = new ProtocolCodecFactory() {
165 public ProtocolEncoder getEncoder(IoSession session) throws Exception {
166 return encoder;
167 }
168
169 public ProtocolDecoder getDecoder(IoSession session) throws Exception {
170 return decoder;
171 }
172 };
173 }
174
175
176
177
178
179
180
181 public ProtocolEncoder getEncoder(IoSession session) {
182 return (ProtocolEncoder) session.getAttribute(ENCODER);
183 }
184
185 @Override
186 public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
187 if (parent.contains(this)) {
188 throw new IllegalArgumentException(
189 "You can't add the same filter instance more than once. Create another instance and add it.");
190 }
191 }
192
193 @Override
194 public void onPostRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
195
196 disposeCodec(parent.getSession());
197 }
198
199
200
201
202
203
204
205
206
207
208
209
210
211 @Override
212 public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception {
213 LOGGER.debug("Processing a MESSAGE_RECEIVED for session {}", session.getId());
214
215 if (!(message instanceof IoBuffer)) {
216 nextFilter.messageReceived(session, message);
217 return;
218 }
219
220 IoBuffer in = (IoBuffer) message;
221 ProtocolDecoder decoder = factory.getDecoder(session);
222 ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
223
224
225
226
227
228 while (in.hasRemaining()) {
229 int oldPos = in.position();
230 try {
231 lock.acquire();
232
233 decoder.decode(session, in, decoderOut);
234
235 decoderOut.flush(nextFilter, session);
236 } catch (Exception e) {
237 ProtocolDecoderException pde;
238 if (e instanceof ProtocolDecoderException) {
239 pde = (ProtocolDecoderException) e;
240 } else {
241 pde = new ProtocolDecoderException(e);
242 }
243 if (pde.getHexdump() == null) {
244
245 int curPos = in.position();
246 in.position(oldPos);
247 pde.setHexdump(in.getHexDump());
248 in.position(curPos);
249 }
250
251 decoderOut.flush(nextFilter, session);
252 nextFilter.exceptionCaught(session, pde);
253
254
255
256
257 if (!(e instanceof RecoverableProtocolDecoderException) || (in.position() == oldPos)) {
258 break;
259 }
260 } finally {
261 lock.release();
262 }
263 }
264 }
265
266 @Override
267 public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
268 if (writeRequest instanceof EncodedWriteRequest) {
269 return;
270 }
271
272 if (writeRequest instanceof MessageWriteRequest) {
273 MessageWriteRequest wrappedRequest = (MessageWriteRequest) writeRequest;
274 nextFilter.messageSent(session, wrappedRequest.getParentRequest());
275 } else {
276 nextFilter.messageSent(session, writeRequest);
277 }
278 }
279
280 @Override
281 public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
282 Object message = writeRequest.getMessage();
283
284
285
286 if ((message instanceof IoBuffer) || (message instanceof FileRegion)) {
287 nextFilter.filterWrite(session, writeRequest);
288 return;
289 }
290
291
292 ProtocolEncoder encoder = factory.getEncoder(session);
293
294 ProtocolEncoderOutput encoderOut = getEncoderOut(session, nextFilter, writeRequest);
295
296 if (encoder == null) {
297 throw new ProtocolEncoderException("The encoder is null for the session " + session);
298 }
299
300 if (encoderOut == null) {
301 throw new ProtocolEncoderException("The encoderOut is null for the session " + session);
302 }
303
304 try {
305
306 encoder.encode(session, message, encoderOut);
307
308
309 Queue<Object> bufferQueue = ((AbstractProtocolEncoderOutput) encoderOut).getMessageQueue();
310
311
312 while (!bufferQueue.isEmpty()) {
313 Object encodedMessage = bufferQueue.poll();
314
315 if (encodedMessage == null) {
316 break;
317 }
318
319
320 if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
321 SocketAddress destination = writeRequest.getDestination();
322 WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination);
323
324 nextFilter.filterWrite(session, encodedWriteRequest);
325 }
326 }
327
328
329 nextFilter.filterWrite(session, new MessageWriteRequest(writeRequest));
330 } catch (Exception e) {
331 ProtocolEncoderException pee;
332
333
334 if (e instanceof ProtocolEncoderException) {
335 pee = (ProtocolEncoderException) e;
336 } else {
337 pee = new ProtocolEncoderException(e);
338 }
339
340 throw pee;
341 }
342 }
343
344 @Override
345 public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception {
346
347 ProtocolDecoder decoder = factory.getDecoder(session);
348 ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
349
350 try {
351 decoder.finishDecode(session, decoderOut);
352 } catch (Exception e) {
353 ProtocolDecoderException pde;
354 if (e instanceof ProtocolDecoderException) {
355 pde = (ProtocolDecoderException) e;
356 } else {
357 pde = new ProtocolDecoderException(e);
358 }
359 throw pde;
360 } finally {
361
362 disposeCodec(session);
363 decoderOut.flush(nextFilter, session);
364 }
365
366
367 nextFilter.sessionClosed(session);
368 }
369
370 private static class EncodedWriteRequest extends DefaultWriteRequest {
371 public EncodedWriteRequest(Object encodedMessage, WriteFuture future, SocketAddress destination) {
372 super(encodedMessage, future, destination);
373 }
374
375 public boolean isEncoded() {
376 return true;
377 }
378 }
379
380 private static class MessageWriteRequest extends WriteRequestWrapper {
381 public MessageWriteRequest(WriteRequest writeRequest) {
382 super(writeRequest);
383 }
384
385 @Override
386 public Object getMessage() {
387 return EMPTY_BUFFER;
388 }
389
390 @Override
391 public String toString() {
392 return "MessageWriteRequest, parent : " + super.toString();
393 }
394 }
395
396 private static class ProtocolDecoderOutputImpl extends AbstractProtocolDecoderOutput {
397 public ProtocolDecoderOutputImpl() {
398
399 }
400
401 public void flush(NextFilter nextFilter, IoSession session) {
402 Queue<Object> messageQueue = getMessageQueue();
403
404 while (!messageQueue.isEmpty()) {
405 nextFilter.messageReceived(session, messageQueue.poll());
406 }
407 }
408 }
409
410 private static class ProtocolEncoderOutputImpl extends AbstractProtocolEncoderOutput {
411 private final IoSession session;
412
413 private final NextFilter nextFilter;
414
415
416 private final SocketAddress destination;
417
418 public ProtocolEncoderOutputImpl(IoSession session, NextFilter nextFilter, WriteRequest writeRequest) {
419 this.session = session;
420 this.nextFilter = nextFilter;
421
422
423 destination = writeRequest.getDestination();
424 }
425
426 public WriteFuture flush() {
427 Queue<Object> bufferQueue = getMessageQueue();
428 WriteFuture future = null;
429
430 while (!bufferQueue.isEmpty()) {
431 Object encodedMessage = bufferQueue.poll();
432
433 if (encodedMessage == null) {
434 break;
435 }
436
437
438 if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
439 future = new DefaultWriteFuture(session);
440 nextFilter.filterWrite(session, new EncodedWriteRequest(encodedMessage, future, destination));
441 }
442 }
443
444 if (future == null) {
445
446 WriteRequest writeRequest = new DefaultWriteRequest(
447 DefaultWriteRequest.EMPTY_MESSAGE, null, destination);
448 future = DefaultWriteFuture.newNotWrittenFuture(session, new NothingWrittenException(writeRequest));
449 }
450
451 return future;
452 }
453 }
454
455
456
457
458
459
460 private void disposeCodec(IoSession session) {
461
462
463 disposeEncoder(session);
464 disposeDecoder(session);
465
466
467 disposeDecoderOut(session);
468 }
469
470
471
472
473
474
475 private void disposeEncoder(IoSession session) {
476 ProtocolEncoder encoder = (ProtocolEncoder) session.removeAttribute(ENCODER);
477 if (encoder == null) {
478 return;
479 }
480
481 try {
482 encoder.dispose(session);
483 } catch (Exception e) {
484 LOGGER.warn("Failed to dispose: " + encoder.getClass().getName() + " (" + encoder + ')');
485 }
486 }
487
488
489
490
491
492
493 private void disposeDecoder(IoSession session) {
494 ProtocolDecoder decoder = (ProtocolDecoder) session.removeAttribute(DECODER);
495 if (decoder == null) {
496 return;
497 }
498
499 try {
500 decoder.dispose(session);
501 } catch (Exception e) {
502 LOGGER.warn("Failed to dispose: " + decoder.getClass().getName() + " (" + decoder + ')');
503 }
504 }
505
506
507
508
509
510 private ProtocolDecoderOutput getDecoderOut(IoSession session, NextFilter nextFilter) {
511 ProtocolDecoderOutput out = (ProtocolDecoderOutput) session.getAttribute(DECODER_OUT);
512
513 if (out == null) {
514
515 out = new ProtocolDecoderOutputImpl();
516 session.setAttribute(DECODER_OUT, out);
517 }
518
519 return out;
520 }
521
522 private ProtocolEncoderOutput getEncoderOut(IoSession session, NextFilter nextFilter, WriteRequest writeRequest) {
523 ProtocolEncoderOutput out = (ProtocolEncoderOutput) session.getAttribute(ENCODER_OUT);
524
525 if (out == null) {
526
527 out = new ProtocolEncoderOutputImpl(session, nextFilter, writeRequest);
528 session.setAttribute(ENCODER_OUT, out);
529 }
530
531 return out;
532 }
533
534
535
536
537 private void disposeDecoderOut(IoSession session) {
538 session.removeAttribute(DECODER_OUT);
539 }
540 }