View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.codec;
21  
22  import org.apache.mina.common.ByteBuffer;
23  import org.apache.mina.common.ByteBufferProxy;
24  import org.apache.mina.common.IoFilter;
25  import org.apache.mina.common.IoFilterAdapter;
26  import org.apache.mina.common.IoFilterChain;
27  import org.apache.mina.common.IoSession;
28  import org.apache.mina.common.WriteFuture;
29  import org.apache.mina.common.support.DefaultWriteFuture;
30  import org.apache.mina.filter.codec.support.SimpleProtocolDecoderOutput;
31  import org.apache.mina.filter.codec.support.SimpleProtocolEncoderOutput;
32  import org.apache.mina.util.SessionLog;
33  
34  /**
35   * An {@link IoFilter} which translates binary or protocol specific data into
36   * message object and vice versa using {@link ProtocolCodecFactory},
37   * {@link ProtocolEncoder}, or {@link ProtocolDecoder}.
38   *
39   * @author The Apache Directory Project (mina-dev@directory.apache.org)
40   * @version $Rev: 585074 $, $Date: 2007-10-16 17:13:01 +0900 (화, 16 10월 2007) $
41   */
42  public class ProtocolCodecFilter extends IoFilterAdapter {
43      public static final String ENCODER = ProtocolCodecFilter.class.getName()
44              + ".encoder";
45  
46      public static final String DECODER = ProtocolCodecFilter.class.getName()
47              + ".decoder";
48  
49      private static final String DECODER_OUT = ProtocolCodecFilter.class.getName()
50              + ".decoderOut";
51  
52      private static final Class<?>[] EMPTY_PARAMS = new Class[0];
53  
54      private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.wrap(new byte[0]);
55  
56      private final ProtocolCodecFactory factory;
57  
58      public ProtocolCodecFilter(ProtocolCodecFactory factory) {
59          if (factory == null) {
60              throw new NullPointerException("factory");
61          }
62          this.factory = factory;
63      }
64  
65      public ProtocolCodecFilter(final ProtocolEncoder encoder,
66              final ProtocolDecoder decoder) {
67          if (encoder == null) {
68              throw new NullPointerException("encoder");
69          }
70          if (decoder == null) {
71              throw new NullPointerException("decoder");
72          }
73  
74          this.factory = new ProtocolCodecFactory() {
75              public ProtocolEncoder getEncoder() {
76                  return encoder;
77              }
78  
79              public ProtocolDecoder getDecoder() {
80                  return decoder;
81              }
82          };
83      }
84  
85      public ProtocolCodecFilter(
86              final Class<? extends ProtocolEncoder> encoderClass,
87              final Class<? extends ProtocolDecoder> decoderClass) {
88          if (encoderClass == null) {
89              throw new NullPointerException("encoderClass");
90          }
91          if (decoderClass == null) {
92              throw new NullPointerException("decoderClass");
93          }
94          if (!ProtocolEncoder.class.isAssignableFrom(encoderClass)) {
95              throw new IllegalArgumentException("encoderClass: "
96                      + encoderClass.getName());
97          }
98          if (!ProtocolDecoder.class.isAssignableFrom(decoderClass)) {
99              throw new IllegalArgumentException("decoderClass: "
100                     + decoderClass.getName());
101         }
102         try {
103             encoderClass.getConstructor(EMPTY_PARAMS);
104         } catch (NoSuchMethodException e) {
105             throw new IllegalArgumentException(
106                     "encoderClass doesn't have a public default constructor.");
107         }
108         try {
109             decoderClass.getConstructor(EMPTY_PARAMS);
110         } catch (NoSuchMethodException e) {
111             throw new IllegalArgumentException(
112                     "decoderClass doesn't have a public default constructor.");
113         }
114 
115         this.factory = new ProtocolCodecFactory() {
116             public ProtocolEncoder getEncoder() throws Exception {
117                 return encoderClass.newInstance();
118             }
119 
120             public ProtocolDecoder getDecoder() throws Exception {
121                 return decoderClass.newInstance();
122             }
123         };
124     }
125 
126     @Override
127     public void onPreAdd(IoFilterChain parent, String name,
128             NextFilter nextFilter) throws Exception {
129         if (parent.contains(ProtocolCodecFilter.class)) {
130             throw new IllegalStateException(
131                     "A filter chain cannot contain more than one ProtocolCodecFilter.");
132         }
133     }
134 
135     public void onPostRemove(IoFilterChain parent, String name,
136             NextFilter nextFilter) throws Exception {
137         disposeEncoder(parent.getSession());
138         disposeDecoder(parent.getSession());
139         disposeDecoderOut(parent.getSession());
140     }
141 
142     @Override
143     public void messageReceived(NextFilter nextFilter, IoSession session,
144             Object message) throws Exception {
145         if (!(message instanceof ByteBuffer)) {
146             nextFilter.messageReceived(session, message);
147             return;
148         }
149 
150         ByteBuffer in = (ByteBuffer) message;
151         ProtocolDecoder decoder = getDecoder(session);
152         ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
153 
154         try {
155             while (in.hasRemaining()) {
156                 int oldPos = in.position();
157                 try {
158                     synchronized (decoderOut) {
159                         decoder.decode(session, in, decoderOut);
160                     }
161                     // Finish decoding if no exception was thrown.
162                     decoderOut.flush();
163                     break;
164                 } catch (Throwable t) {
165                     ProtocolDecoderException pde;
166                     if (t instanceof ProtocolDecoderException) {
167                         pde = (ProtocolDecoderException) t;
168                     } else {
169                         pde = new ProtocolDecoderException(t);
170                     }
171                     pde.setHexdump(in.getHexDump());
172                     
173                     // Fire the exceptionCaught event.
174                     decoderOut.flush();
175                     nextFilter.exceptionCaught(session, pde);
176                     
177                     // Stop retrying if the buffer position didn't change
178                     // because retrying can cause an infinite loop.
179                     if (in.position() == oldPos) {
180                         break;
181                     }
182                 }
183             }
184         } finally {
185             // Release the read buffer.
186             in.release();
187         }
188     }
189 
190     @Override
191     public void messageSent(NextFilter nextFilter, IoSession session,
192             Object message) throws Exception {
193         if (message instanceof HiddenByteBuffer) {
194             return;
195         }
196 
197         if (!(message instanceof MessageByteBuffer)) {
198             nextFilter.messageSent(session, message);
199             return;
200         }
201 
202         nextFilter.messageSent(session, ((MessageByteBuffer) message).message);
203     }
204 
205     @Override
206     public void filterWrite(NextFilter nextFilter, IoSession session,
207             WriteRequest writeRequest) throws Exception {
208         Object message = writeRequest.getMessage();
209         if (message instanceof ByteBuffer) {
210             nextFilter.filterWrite(session, writeRequest);
211             return;
212         }
213 
214         ProtocolEncoder encoder = getEncoder(session);
215         ProtocolEncoderOutputImpl encoderOut = getEncoderOut(session,
216                 nextFilter, writeRequest);
217 
218         try {
219             encoder.encode(session, message, encoderOut);
220             encoderOut.flush();
221             nextFilter.filterWrite(session, new WriteRequest(
222                     new MessageByteBuffer(writeRequest.getMessage()),
223                     writeRequest.getFuture(), writeRequest.getDestination()));
224         } catch (Throwable t) {
225             ProtocolEncoderException pee;
226             if (t instanceof ProtocolEncoderException) {
227                 pee = (ProtocolEncoderException) t;
228             } else {
229                 pee = new ProtocolEncoderException(t);
230             }
231             throw pee;
232         }
233     }
234 
235     @Override
236     public void sessionClosed(NextFilter nextFilter, IoSession session)
237             throws Exception {
238         // Call finishDecode() first when a connection is closed.
239         ProtocolDecoder decoder = getDecoder(session);
240         ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
241         try {
242             decoder.finishDecode(session, decoderOut);
243         } catch (Throwable t) {
244             ProtocolDecoderException pde;
245             if (t instanceof ProtocolDecoderException) {
246                 pde = (ProtocolDecoderException) t;
247             } else {
248                 pde = new ProtocolDecoderException(t);
249             }
250             throw pde;
251         } finally {
252             // Dispose all.
253             disposeEncoder(session);
254             disposeDecoder(session);
255             disposeDecoderOut(session);
256             decoderOut.flush();
257         }
258 
259         nextFilter.sessionClosed(session);
260     }
261 
262     private ProtocolEncoder getEncoder(IoSession session) throws Exception {
263         ProtocolEncoder encoder = (ProtocolEncoder) session
264                 .getAttribute(ENCODER);
265         if (encoder == null) {
266             encoder = factory.getEncoder();
267             session.setAttribute(ENCODER, encoder);
268         }
269         return encoder;
270     }
271 
272     private ProtocolEncoderOutputImpl getEncoderOut(IoSession session,
273             NextFilter nextFilter, WriteRequest writeRequest) {
274         return new ProtocolEncoderOutputImpl(session, nextFilter, writeRequest);
275     }
276 
277     private ProtocolDecoder getDecoder(IoSession session) throws Exception {
278         ProtocolDecoder decoder = (ProtocolDecoder) session
279                 .getAttribute(DECODER);
280         if (decoder == null) {
281             decoder = factory.getDecoder();
282             session.setAttribute(DECODER, decoder);
283         }
284         return decoder;
285     }
286 
287     private ProtocolDecoderOutput getDecoderOut(IoSession session,
288             NextFilter nextFilter) {
289         ProtocolDecoderOutput out = (ProtocolDecoderOutput) session.getAttribute(DECODER_OUT);
290         if (out == null) {
291             out = new SimpleProtocolDecoderOutput(session, nextFilter);
292             session.setAttribute(DECODER_OUT, out);
293         }
294         return out;
295     }
296 
297     private void disposeEncoder(IoSession session) {
298         ProtocolEncoder encoder = (ProtocolEncoder) session
299                 .removeAttribute(ENCODER);
300         if (encoder == null) {
301             return;
302         }
303 
304         try {
305             encoder.dispose(session);
306         } catch (Throwable t) {
307             SessionLog.warn(session, "Failed to dispose: "
308                     + encoder.getClass().getName() + " (" + encoder + ')');
309         }
310     }
311 
312     private void disposeDecoder(IoSession session) {
313         ProtocolDecoder decoder = (ProtocolDecoder) session
314                 .removeAttribute(DECODER);
315         if (decoder == null) {
316             return;
317         }
318 
319         try {
320             decoder.dispose(session);
321         } catch (Throwable t) {
322             SessionLog.warn(session, "Falied to dispose: "
323                     + decoder.getClass().getName() + " (" + decoder + ')');
324         }
325     }
326     
327     private void disposeDecoderOut(IoSession session) {
328         session.removeAttribute(DECODER_OUT);
329     }
330     
331     private static class HiddenByteBuffer extends ByteBufferProxy {
332         private HiddenByteBuffer(ByteBuffer buf) {
333             super(buf);
334         }
335     }
336 
337     private static class MessageByteBuffer extends ByteBufferProxy {
338         private final Object message;
339 
340         private MessageByteBuffer(Object message) {
341             super(EMPTY_BUFFER);
342             this.message = message;
343         }
344 
345         @Override
346         public void acquire() {
347             // no-op since we are wraping a zero-byte buffer, this instance is to just curry the message
348         }
349 
350         @Override
351         public void release() {
352             // no-op since we are wraping a zero-byte buffer, this instance is to just curry the message
353         }
354     }
355 
356     private static class ProtocolEncoderOutputImpl extends
357             SimpleProtocolEncoderOutput {
358         private final IoSession session;
359 
360         private final NextFilter nextFilter;
361 
362         private final WriteRequest writeRequest;
363 
364         ProtocolEncoderOutputImpl(IoSession session, NextFilter nextFilter,
365                 WriteRequest writeRequest) {
366             this.session = session;
367             this.nextFilter = nextFilter;
368             this.writeRequest = writeRequest;
369         }
370 
371         @Override
372         protected WriteFuture doFlush(ByteBuffer buf) {
373             WriteFuture future = new DefaultWriteFuture(session);
374             nextFilter.filterWrite(session, new WriteRequest(
375                     new HiddenByteBuffer(buf), future, writeRequest
376                             .getDestination()));
377             return future;
378         }
379     }
380 }