View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.codec;
21  
22  import org.apache.mina.common.ByteBuffer;
23  import org.apache.mina.common.ByteBufferProxy;
24  import org.apache.mina.common.IoFilter;
25  import org.apache.mina.common.IoFilterAdapter;
26  import org.apache.mina.common.IoFilterChain;
27  import org.apache.mina.common.IoSession;
28  import org.apache.mina.common.WriteFuture;
29  import org.apache.mina.common.support.DefaultWriteFuture;
30  import org.apache.mina.filter.codec.support.SimpleProtocolDecoderOutput;
31  import org.apache.mina.filter.codec.support.SimpleProtocolEncoderOutput;
32  import org.apache.mina.util.SessionLog;
33  
34  /**
35   * An {@link IoFilter} which translates binary or protocol specific data into
36   * message object and vice versa using {@link ProtocolCodecFactory},
37   * {@link ProtocolEncoder}, or {@link ProtocolDecoder}.
38   *
39   * @author The Apache Directory Project (mina-dev@directory.apache.org)
40   * @version $Rev: 555855 $, $Date: 2007-07-13 12:19:00 +0900 (금, 13  7월 2007) $
41   */
42  public class ProtocolCodecFilter extends IoFilterAdapter {
43      public static final String ENCODER = ProtocolCodecFilter.class.getName()
44              + ".encoder";
45  
46      public static final String DECODER = ProtocolCodecFilter.class.getName()
47              + ".decoder";
48  
49      private static final String DECODER_LOCK = ProtocolCodecFilter.class
50              .getName()
51              + ".decoderLock";
52  
53      private static final Class<?>[] EMPTY_PARAMS = new Class[0];
54  
55      private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.wrap(new byte[0]);
56  
57      private final ProtocolCodecFactory factory;
58  
59      public ProtocolCodecFilter(ProtocolCodecFactory factory) {
60          if (factory == null) {
61              throw new NullPointerException("factory");
62          }
63          this.factory = factory;
64      }
65  
66      public ProtocolCodecFilter(final ProtocolEncoder encoder,
67              final ProtocolDecoder decoder) {
68          if (encoder == null) {
69              throw new NullPointerException("encoder");
70          }
71          if (decoder == null) {
72              throw new NullPointerException("decoder");
73          }
74  
75          this.factory = new ProtocolCodecFactory() {
76              public ProtocolEncoder getEncoder() {
77                  return encoder;
78              }
79  
80              public ProtocolDecoder getDecoder() {
81                  return decoder;
82              }
83          };
84      }
85  
86      public ProtocolCodecFilter(
87              final Class<? extends ProtocolEncoder> encoderClass,
88              final Class<? extends ProtocolDecoder> decoderClass) {
89          if (encoderClass == null) {
90              throw new NullPointerException("encoderClass");
91          }
92          if (decoderClass == null) {
93              throw new NullPointerException("decoderClass");
94          }
95          if (!ProtocolEncoder.class.isAssignableFrom(encoderClass)) {
96              throw new IllegalArgumentException("encoderClass: "
97                      + encoderClass.getName());
98          }
99          if (!ProtocolDecoder.class.isAssignableFrom(decoderClass)) {
100             throw new IllegalArgumentException("decoderClass: "
101                     + decoderClass.getName());
102         }
103         try {
104             encoderClass.getConstructor(EMPTY_PARAMS);
105         } catch (NoSuchMethodException e) {
106             throw new IllegalArgumentException(
107                     "encoderClass doesn't have a public default constructor.");
108         }
109         try {
110             decoderClass.getConstructor(EMPTY_PARAMS);
111         } catch (NoSuchMethodException e) {
112             throw new IllegalArgumentException(
113                     "decoderClass doesn't have a public default constructor.");
114         }
115 
116         this.factory = new ProtocolCodecFactory() {
117             public ProtocolEncoder getEncoder() throws Exception {
118                 return encoderClass.newInstance();
119             }
120 
121             public ProtocolDecoder getDecoder() throws Exception {
122                 return decoderClass.newInstance();
123             }
124         };
125     }
126 
127     @Override
128     public void onPreAdd(IoFilterChain parent, String name,
129             NextFilter nextFilter) throws Exception {
130         if (parent.contains(ProtocolCodecFilter.class)) {
131             throw new IllegalStateException(
132                     "A filter chain cannot contain more than one ProtocolCodecFilter.");
133         }
134     }
135 
136     public void onPostRemove(IoFilterChain parent, String name,
137             NextFilter nextFilter) throws Exception {
138         disposeEncoder(parent.getSession());
139         disposeDecoder(parent.getSession());
140     }
141 
142     @Override
143     public void messageReceived(NextFilter nextFilter, IoSession session,
144             Object message) throws Exception {
145         if (!(message instanceof ByteBuffer)) {
146             nextFilter.messageReceived(session, message);
147             return;
148         }
149 
150         ByteBuffer in = (ByteBuffer) message;
151         ProtocolDecoder decoder = getDecoder(session);
152         Object decoderLock = getDecoderLock(session);
153         ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
154 
155         try {
156             synchronized (decoderLock) {
157                 decoder.decode(session, in, decoderOut);
158             }
159         } catch (Throwable t) {
160             ProtocolDecoderException pde;
161             if (t instanceof ProtocolDecoderException) {
162                 pde = (ProtocolDecoderException) t;
163             } else {
164                 pde = new ProtocolDecoderException(t);
165             }
166             pde.setHexdump(in.getHexDump());
167             throw pde;
168         } finally {
169             // Dispose the decoder if this session is connectionless.
170             if (session.getTransportType().isConnectionless()) {
171                 disposeDecoder(session);
172             }
173 
174             // Release the read buffer.
175             in.release();
176 
177             decoderOut.flush();
178         }
179     }
180 
181     @Override
182     public void messageSent(NextFilter nextFilter, IoSession session,
183             Object message) throws Exception {
184         if (message instanceof HiddenByteBuffer) {
185             return;
186         }
187 
188         if (!(message instanceof MessageByteBuffer)) {
189             nextFilter.messageSent(session, message);
190             return;
191         }
192 
193         nextFilter.messageSent(session, ((MessageByteBuffer) message).message);
194     }
195 
196     @Override
197     public void filterWrite(NextFilter nextFilter, IoSession session,
198             WriteRequest writeRequest) throws Exception {
199         Object message = writeRequest.getMessage();
200         if (message instanceof ByteBuffer) {
201             nextFilter.filterWrite(session, writeRequest);
202             return;
203         }
204 
205         ProtocolEncoder encoder = getEncoder(session);
206         ProtocolEncoderOutputImpl encoderOut = getEncoderOut(session,
207                 nextFilter, writeRequest);
208 
209         try {
210             encoder.encode(session, message, encoderOut);
211             encoderOut.flush();
212             nextFilter.filterWrite(session, new WriteRequest(
213                     new MessageByteBuffer(writeRequest.getMessage()),
214                     writeRequest.getFuture(), writeRequest.getDestination()));
215         } catch (Throwable t) {
216             ProtocolEncoderException pee;
217             if (t instanceof ProtocolEncoderException) {
218                 pee = (ProtocolEncoderException) t;
219             } else {
220                 pee = new ProtocolEncoderException(t);
221             }
222             throw pee;
223         } finally {
224             // Dispose the encoder if this session is connectionless.
225             if (session.getTransportType().isConnectionless()) {
226                 disposeEncoder(session);
227             }
228         }
229     }
230 
231     @Override
232     public void sessionClosed(NextFilter nextFilter, IoSession session)
233             throws Exception {
234         // Call finishDecode() first when a connection is closed.
235         ProtocolDecoder decoder = getDecoder(session);
236         ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
237         try {
238             decoder.finishDecode(session, decoderOut);
239         } catch (Throwable t) {
240             ProtocolDecoderException pde;
241             if (t instanceof ProtocolDecoderException) {
242                 pde = (ProtocolDecoderException) t;
243             } else {
244                 pde = new ProtocolDecoderException(t);
245             }
246             throw pde;
247         } finally {
248             // Dispose all.
249             disposeEncoder(session);
250             disposeDecoder(session);
251 
252             decoderOut.flush();
253         }
254 
255         nextFilter.sessionClosed(session);
256     }
257 
258     private ProtocolEncoder getEncoder(IoSession session) throws Exception {
259         ProtocolEncoder encoder = (ProtocolEncoder) session
260                 .getAttribute(ENCODER);
261         if (encoder == null) {
262             encoder = factory.getEncoder();
263             session.setAttribute(ENCODER, encoder);
264         }
265         return encoder;
266     }
267 
268     private ProtocolEncoderOutputImpl getEncoderOut(IoSession session,
269             NextFilter nextFilter, WriteRequest writeRequest) {
270         return new ProtocolEncoderOutputImpl(session, nextFilter, writeRequest);
271     }
272 
273     private ProtocolDecoder getDecoder(IoSession session) throws Exception {
274         ProtocolDecoder decoder = (ProtocolDecoder) session
275                 .getAttribute(DECODER);
276         if (decoder == null) {
277             decoder = factory.getDecoder();
278             session.setAttribute(DECODER, decoder);
279         }
280         return decoder;
281     }
282 
283     private Object getDecoderLock(IoSession session) {
284         Object lock = session.getAttribute(DECODER_LOCK);
285         if (lock == null) {
286             lock = new Object();
287             session.setAttribute(DECODER_LOCK, lock);
288         }
289 
290         return lock;
291     }
292 
293     private ProtocolDecoderOutput getDecoderOut(IoSession session,
294             NextFilter nextFilter) {
295         return new SimpleProtocolDecoderOutput(session, nextFilter);
296     }
297 
298     private void disposeEncoder(IoSession session) {
299         ProtocolEncoder encoder = (ProtocolEncoder) session
300                 .removeAttribute(ENCODER);
301         if (encoder == null) {
302             return;
303         }
304 
305         try {
306             encoder.dispose(session);
307         } catch (Throwable t) {
308             SessionLog.warn(session, "Failed to dispose: "
309                     + encoder.getClass().getName() + " (" + encoder + ')');
310         }
311     }
312 
313     private void disposeDecoder(IoSession session) {
314         ProtocolDecoder decoder = (ProtocolDecoder) session
315                 .removeAttribute(DECODER);
316         if (decoder == null) {
317             return;
318         }
319 
320         try {
321             decoder.dispose(session);
322         } catch (Throwable t) {
323             SessionLog.warn(session, "Falied to dispose: "
324                     + decoder.getClass().getName() + " (" + decoder + ')');
325         }
326     }
327 
328     private static class HiddenByteBuffer extends ByteBufferProxy {
329         private HiddenByteBuffer(ByteBuffer buf) {
330             super(buf);
331         }
332     }
333 
334     private static class MessageByteBuffer extends ByteBufferProxy {
335         private final Object message;
336 
337         private MessageByteBuffer(Object message) {
338             super(EMPTY_BUFFER);
339             this.message = message;
340         }
341 
342         @Override
343         public void acquire() {
344             // no-op since we are wraping a zero-byte buffer, this instance is to just curry the message
345         }
346 
347         @Override
348         public void release() {
349             // no-op since we are wraping a zero-byte buffer, this instance is to just curry the message
350         }
351     }
352 
353     private static class ProtocolEncoderOutputImpl extends
354             SimpleProtocolEncoderOutput {
355         private final IoSession session;
356 
357         private final NextFilter nextFilter;
358 
359         private final WriteRequest writeRequest;
360 
361         ProtocolEncoderOutputImpl(IoSession session, NextFilter nextFilter,
362                 WriteRequest writeRequest) {
363             this.session = session;
364             this.nextFilter = nextFilter;
365             this.writeRequest = writeRequest;
366         }
367 
368         @Override
369         protected WriteFuture doFlush(ByteBuffer buf) {
370             WriteFuture future = new DefaultWriteFuture(session);
371             nextFilter.filterWrite(session, new WriteRequest(
372                     new HiddenByteBuffer(buf), future, writeRequest
373                             .getDestination()));
374             return future;
375         }
376     }
377 }