View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.codec;
21  
22  import org.apache.mina.common.AttributeKey;
23  import org.apache.mina.common.IoBuffer;
24  import org.apache.mina.common.IoSession;
25  import org.apache.mina.common.TransportMetadata;
26  import org.apache.mina.common.UnderivableBuffer;
27  
28  /**
29   * A {@link ProtocolDecoder} that cumulates the content of received
30   * buffers to a <em>cumulative buffer</em> to help users implement decoders.
31   * <p>
32   * If the received {@link IoBuffer} is only a part of a message.
33   * decoders should cumulate received buffers to make a message complete or
34   * to postpone decoding until more buffers arrive.
35   * <p>
36   * Here is an example decoder that decodes CRLF terminated lines into
37   * <code>Command</code> objects:
38   * <pre>
39   * public class CrLfTerminatedCommandLineDecoder
40   *         extends CumulativeProtocolDecoder {
41   *
42   *     private Command parseCommand(IoBuffer in) {
43   *         // Convert the bytes in the specified buffer to a
44   *         // Command object.
45   *         ...
46   *     }
47   *
48   *     protected boolean doDecode(
49   *             IoSession session, IoBuffer in, ProtocolDecoderOutput out)
50   *             throws Exception {
51   *
52   *         // Remember the initial position.
53   *         int start = in.position();
54   *
55   *         // Now find the first CRLF in the buffer.
56   *         byte previous = 0;
57   *         while (in.hasRemaining()) {
58   *             byte current = in.get();
59   *
60   *             if (previous == '\r' && current == '\n') {
61   *                 // Remember the current position and limit.
62   *                 int position = in.position();
63   *                 int limit = in.limit();
64   *                 try {
65   *                     in.position(start);
66   *                     in.limit(position);
67   *                     // The bytes between in.position() and in.limit()
68   *                     // now contain a full CRLF terminated line.
69   *                     out.write(parseCommand(in.slice()));
70   *                 } finally {
71   *                     // Set the position to point right after the
72   *                     // detected line and set the limit to the old
73   *                     // one.
74   *                     in.position(position);
75   *                     in.limit(limit);
76   *                 }
77   *                 // Decoded one line; CumulativeProtocolDecoder will
78   *                 // call me again until I return false. So just
79   *                 // return true until there are no more lines in the
80   *                 // buffer.
81   *                 return true;
82   *             }
83   *
84   *             previous = current;
85   *         }
86   *
87   *         // Could not find CRLF in the buffer. Reset the initial
88   *         // position to the one we recorded above.
89   *         in.position(start);
90   *
91   *         return false;
92   *     }
93   * }
94   * </pre>
95   * <p>
96   * Please note that this decoder simply forward the call to
97   * {@link #doDecode(IoSession, IoBuffer, ProtocolDecoderOutput)} if the
98   * underlying transport doesn't have a packet fragmentation.  Whether the
99   * transport has fragmentation or not is determined by querying 
100  * {@link TransportMetadata}.
101  * 
102  * @author The Apache MINA Project (dev@mina.apache.org)
103  * @version $Rev: 604802 $, $Date: 2007-12-17 02:22:05 -0700 (Mon, 17 Dec 2007) $
104  */
105 public abstract class CumulativeProtocolDecoder extends ProtocolDecoderAdapter {
106 
107     private final AttributeKey BUFFER = new AttributeKey(getClass(), "buffer");
108 
109     /**
110      * Creates a new instance.
111      */
112     protected CumulativeProtocolDecoder() {
113     }
114 
115     /**
116      * Cumulates content of <tt>in</tt> into internal buffer and forwards
117      * decoding request to {@link #doDecode(IoSession, IoBuffer, ProtocolDecoderOutput)}.
118      * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt>
119      * and the cumulative buffer is compacted after decoding ends.
120      *
121      * @throws IllegalStateException if your <tt>doDecode()</tt> returned
122      *                               <tt>true</tt> not consuming the cumulative buffer.
123      */
124     public void decode(IoSession session, IoBuffer in,
125             ProtocolDecoderOutput out) throws Exception {
126         if (!session.getTransportMetadata().hasFragmentation()) {
127             doDecode(session, in, out);
128             return;
129         }
130 
131         boolean usingSessionBuffer = true;
132         IoBuffer buf = (IoBuffer) session.getAttribute(BUFFER);
133         // If we have a session buffer, append data to that; otherwise
134         // use the buffer read from the network directly.
135         if (buf != null) {
136             buf.put(in);
137             buf.flip();
138         } else {
139             buf = in;
140             usingSessionBuffer = false;
141         }
142 
143         for (;;) {
144             int oldPos = buf.position();
145             boolean decoded = doDecode(session, buf, out);
146             if (decoded) {
147                 if (buf.position() == oldPos) {
148                     throw new IllegalStateException(
149                             "doDecode() can't return true when buffer is not consumed.");
150                 }
151 
152                 if (!buf.hasRemaining()) {
153                     break;
154                 }
155             } else {
156                 break;
157             }
158         }
159 
160         // if there is any data left that cannot be decoded, we store
161         // it in a buffer in the session and next time this decoder is
162         // invoked the session buffer gets appended to
163         if (buf.hasRemaining()) {
164             if (usingSessionBuffer) {
165                 buf.compact();
166             } else {
167                 storeRemainingInSession(buf, session);
168             }
169         } else {
170             if (usingSessionBuffer) {
171                 removeSessionBuffer(session);
172             }
173         }
174     }
175 
176     /**
177      * Implement this method to consume the specified cumulative buffer and
178      * decode its content into message(s).
179      *
180      * @param in the cumulative buffer
181      * @return <tt>true</tt> if and only if there's more to decode in the buffer
182      *         and you want to have <tt>doDecode</tt> method invoked again.
183      *         Return <tt>false</tt> if remaining data is not enough to decode,
184      *         then this method will be invoked again when more data is cumulated.
185      * @throws Exception if cannot decode <tt>in</tt>.
186      */
187     protected abstract boolean doDecode(IoSession session, IoBuffer in,
188             ProtocolDecoderOutput out) throws Exception;
189 
190     /**
191      * Releases the cumulative buffer used by the specified <tt>session</tt>.
192      * Please don't forget to call <tt>super.dispose( session )</tt> when
193      * you override this method.
194      */
195     @Override
196     public void dispose(IoSession session) throws Exception {
197         removeSessionBuffer(session);
198     }
199 
200     private void removeSessionBuffer(IoSession session) {
201         session.removeAttribute(BUFFER);
202     }
203 
204     private void storeRemainingInSession(IoBuffer buf, IoSession session) {
205         final IoBuffer remainingBuf = new UnderivableBuffer(
206                 IoBuffer.allocate(buf.capacity()).setAutoExpand(true));
207         
208         remainingBuf.order(buf.order());
209         remainingBuf.put(buf);
210         
211         session.setAttribute(BUFFER, remainingBuf);
212     }
213 }