View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.keepalive;
21  
22  import org.apache.mina.common.AttributeKey;
23  import org.apache.mina.common.DefaultWriteRequest;
24  import org.apache.mina.common.IdleStatus;
25  import org.apache.mina.common.IoFilter;
26  import org.apache.mina.common.IoFilterAdapter;
27  import org.apache.mina.common.IoFilterChain;
28  import org.apache.mina.common.IoSession;
29  import org.apache.mina.common.IoSessionConfig;
30  import org.apache.mina.common.WriteRequest;
31  import org.slf4j.Logger;
32  import org.slf4j.LoggerFactory;
33  
34  /**
35   * An {@link IoFilter} that sends a keep-alive request on
36   * <tt>sessionIdle</tt> event with {@link IdleStatus#READER_IDLE} and sends
37   * back  the response for the keep-alive request.
38   * 
39   * <h2>Interference with {@link IoSessionConfig#setIdleTime(IdleStatus, int)}</h2>
40   * 
41   * This filter adjusts <tt>idleTime</tt> property for
42   * {@link IdleStatus#READER_IDLE} automatically.  Changing the <tt>idleTime</tt>
43   * property for {@link IdleStatus#READER_IDLE} will lead this filter to a
44   * unexpected behavior.
45   * 
46   * <h2>Implementing {@link KeepAliveMessageFactory}</h2>
47   * 
48   * To use this filter, you have to provide an implementation of
49   * {@link KeepAliveMessageFactory}, which determines a received or sent
50   * message is a keep-alive message or not and creates a new keep-alive
51   * message:
52   * 
53   * <table border="1">
54   * <tr>
55   * <th>Name</th><th>Description</th><th>Implementation</th>
56   * </tr>
57   * <tr valign="top">
58   * <td>Active</td>
59   * <td>You want a keep-alive request is sent when the reader is idle.
60   * Once the request is sent, the response for the request should be
61   * received within <tt>keepAliveRequestTimeout</tt> seconds.  Otherwise,
62   * the specified {@link KeepAlivePolicy} will be enforced.  If a keep-alive
63   * request is received, its response also should be sent back.</td>
64   * <td>Both {@link KeepAliveMessageFactory#getRequest(IoSession)} and
65   * {@link KeepAliveMessageFactory#getResponse(IoSession, Object)} must
66   * return a non-<tt>null</tt>.</td>
67   * </tr>
68   * <tr valign="top">
69   * <td>Semi-active</td>
70   * <td>You want a keep-alive request to be sent when the reader is idle.
71   * However, you don't really care if the response is received or not.
72   * If a keep-alive request is received, its response should
73   * also be sent back.
74   * </td>
75   * <td>Both {@link KeepAliveMessageFactory#getRequest(IoSession)} and
76   * {@link KeepAliveMessageFactory#getResponse(IoSession, Object)} must
77   * return a non-<tt>null</tt>, and the <tt>policy</tt> property
78   * should be set to {@link KeepAlivePolicy#OFF} or {@link KeepAlivePolicy#LOG}.
79   * </td>
80   * </tr>
81   * <tr valign="top">
82   * <td>Passive</td>
83   * <td>You don't want to send a keep-alive request by yourself, but the
84   * response should be sent back if a keep-alive request is received.</td>
85   * <td>{@link KeepAliveMessageFactory#getRequest(IoSession)} must return
86   * <tt>null</tt> and {@link KeepAliveMessageFactory#getResponse(IoSession, Object)}
87   * must return a non-<tt>null</tt>.</td>
88   * </tr>
89   * <tr valign="top">
90   * <td>Deaf Speaker</td>
91   * <td>You want a keep-alive request to be sent when the reader is idle, but
92   * you don't want to send any response back.</td> 
93   * <td>{@link KeepAliveMessageFactory#getRequest(IoSession)} must return
94   * a non-<tt>null</tt> and
95   * {@link KeepAliveMessageFactory#getResponse(IoSession, Object)} must
96   * return <tt>null</tt>.</td>
97   * </tr>
98   * <tr valign="top">
99   * <td>Silent Listener</td>
100  * <td>You don't want to send a keep-alive request by yourself nor send any
101  * response back.</td> 
102  * <td>Both {@link KeepAliveMessageFactory#getRequest(IoSession)} and
103  * {@link KeepAliveMessageFactory#getResponse(IoSession, Object)} must
104  * return <tt>null</tt>.</td>
105  * </tr>
106  * </table>
107  * Please note that you must implement
108  * {@link KeepAliveMessageFactory#isRequest(IoSession, Object)} and
109  * {@link KeepAliveMessageFactory#isResponse(IoSession, Object)} properly
110  * whatever case you chose.
111  * 
112  * <h2>Enforcing a policy</h2>
113  * 
114  * You can enforce a predefined policy by specifying a {@link KeepAlivePolicy}.
115  * The default policy is {@link KeepAlivePolicy#CLOSE}.  Setting the policy
116  * to {@link KeepAlivePolicy#OFF} stops this filter from waiting for response
117  * messages and therefore disables <tt>keepAliveRequestTimeout</tt> property.
118  *  
119  * @author The Apache MINA Project (dev@mina.apache.org)
120  * @version $Rev: 616100 $, $Date: 2008-01-28 15:58:32 -0700 (Mon, 28 Jan 2008) $
121  */
122 public class KeepAliveFilter extends IoFilterAdapter {
123     
124     private final AttributeKey WAITING_FOR_RESPONSE = new AttributeKey(
125             getClass(), "waitingForResponse");
126 
127     private final KeepAliveMessageFactory messageFactory;
128     private volatile KeepAlivePolicy policy;
129     private volatile int keepAliveRequestInterval;
130     private volatile int keepAliveRequestTimeout;
131 
132     private final Logger logger = LoggerFactory.getLogger(getClass());
133     
134     /**
135      * Creates a new instance with the default {@link KeepAlivePolicy} and
136      * the default timeout values (<tt>policy</tt> =
137      * {@link KeepAlivePolicy#CLOSE}, <tt>keepAliveRequestInterval = 60</tt>
138      * and <tt>keepAliveRequestTimeout = 30</tt>).
139      */
140     public KeepAliveFilter(KeepAliveMessageFactory messageFactory) {
141         this(messageFactory, KeepAlivePolicy.CLOSE);
142     }
143     
144     /**
145      * Creates a new instance with the default timeout values
146      * (<tt>keepAliveRequestInterval = 60</tt> and
147      * <tt>keepAliveRequestTimeout = 30</tt>).
148      */
149     public KeepAliveFilter(
150             KeepAliveMessageFactory messageFactory, KeepAlivePolicy policy) {
151         this(messageFactory, policy, 60, 30);
152     }
153 
154     /**
155      * Creates a new instance.
156      */
157     public KeepAliveFilter(
158             KeepAliveMessageFactory messageFactory, KeepAlivePolicy policy,
159             int keepAliveRequestInterval, int keepAliveRequestTimeout) {
160         if (messageFactory == null) {
161             throw new NullPointerException("messageFactory");
162         }
163         if (policy == null) {
164             throw new NullPointerException("policy");
165         }
166         
167         this.messageFactory = messageFactory;
168         this.policy = policy;
169         
170         setKeepAliveRequestInterval(keepAliveRequestInterval);
171         setKeepAliveRequestTimeout(keepAliveRequestTimeout);
172     }
173 
174     public KeepAlivePolicy getPolicy() {
175         return policy;
176     }
177 
178     public void setPolicy(KeepAlivePolicy policy) {
179         if (policy == null) {
180             throw new NullPointerException("policy");
181         }
182         this.policy = policy;
183     }
184 
185     public int getKeepAliveRequestInterval() {
186         return keepAliveRequestInterval;
187     }
188 
189     public void setKeepAliveRequestInterval(int keepAliveRequestInterval) {
190         if (keepAliveRequestInterval <= 0) {
191             throw new IllegalArgumentException(
192                     "keepAliveRequestInterval must be a positive integer: " +
193                     keepAliveRequestInterval);
194         }
195         this.keepAliveRequestInterval = keepAliveRequestInterval;
196     }
197 
198     public int getKeepAliveRequestTimeout() {
199         return keepAliveRequestTimeout;
200     }
201 
202     public void setKeepAliveRequestTimeout(int keepAliveRequestTimeout) {
203         if (keepAliveRequestTimeout <= 0) {
204             throw new IllegalArgumentException(
205                     "keepAliveRequestTimeout must be a positive integer: " +
206                     keepAliveRequestTimeout);
207         }
208         this.keepAliveRequestTimeout = keepAliveRequestTimeout;
209     }
210 
211     public KeepAliveMessageFactory getMessageFactory() {
212         return messageFactory;
213     }
214 
215     @Override
216     public void onPreAdd(IoFilterChain parent, String name,
217             NextFilter nextFilter) throws Exception {
218         if (parent.contains(this)) {
219             throw new IllegalArgumentException(
220                     "You can't add the same filter instance more than once. " +
221                     "Create another instance and add it.");
222         }
223     }
224 
225     @Override
226     public void onPostAdd(
227             IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
228         resetStatus(parent.getSession());
229     }
230 
231     @Override
232     public void onPostRemove(
233             IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
234         resetStatus(parent.getSession());
235     }
236 
237     @Override
238     public void messageReceived(
239             NextFilter nextFilter, IoSession session, Object message) throws Exception {
240         try {
241             if (messageFactory.isRequest(session, message)) {
242                 Object pongMessage =
243                     messageFactory.getResponse(session, message);
244                 
245                 if (pongMessage != null) {
246                     nextFilter.filterWrite(
247                             session, new DefaultWriteRequest(pongMessage));
248                 }
249             }
250             
251             if (messageFactory.isResponse(session, message)) {
252                 resetStatus(session);
253             }
254         } finally {
255             if (!isKeepAliveMessage(session, message)) {
256                 nextFilter.messageReceived(session, message);
257             }
258         }
259     }
260 
261     @Override
262     public void messageSent(
263             NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
264         Object message = writeRequest.getMessage();
265         if (!isKeepAliveMessage(session, message)) {
266             nextFilter.messageSent(session, writeRequest);
267         }
268     }
269 
270     @Override
271     public void sessionIdle(
272             NextFilter nextFilter, IoSession session, IdleStatus status) throws Exception {
273         try {
274             if (status == IdleStatus.READER_IDLE) {
275                 if (!session.containsAttribute(WAITING_FOR_RESPONSE)) {
276                     Object pingMessage = messageFactory.getRequest(session);
277                     if (pingMessage != null) {
278                         nextFilter.filterWrite(
279                                 session,
280                                 new DefaultWriteRequest(pingMessage));
281                         
282                         // If policy is OFF, there's no need to wait for
283                         // the response.
284                         if (getPolicy() != KeepAlivePolicy.OFF) {
285                             markStatus(session);
286                         } else {
287                             resetStatus(session);
288                         }
289                     }
290                 } else {
291                     resetStatus(session);
292                     switch (getPolicy()) {
293                     case OFF:
294                         break;
295                     case LOG:
296                         logTimeout(session);
297                         break;
298                     case EXCEPTION:
299                         throw new KeepAliveTimeoutException(
300                                 getTimeoutMessage());
301                     case CLOSE:
302                         logTimeout(session);
303                         session.close();
304                         break;
305                     default:
306                         throw new InternalError();
307                     }
308                 }
309             }
310         } finally {
311             nextFilter.sessionIdle(session, status);
312         }
313     }
314 
315     private void logTimeout(IoSession session) {
316         if (logger.isWarnEnabled()) {
317             logger.warn(getTimeoutMessage());
318         }
319     }
320 
321     private String getTimeoutMessage() {
322         return "Keep-alive response message was not received within " + 
323                getKeepAliveRequestTimeout() + " second(s).";
324     }
325 
326     private void markStatus(IoSession session) {
327         session.getConfig().setIdleTime(
328                 IdleStatus.READER_IDLE, getKeepAliveRequestTimeout());
329         session.setAttribute(WAITING_FOR_RESPONSE);
330     }
331 
332     private void resetStatus(IoSession session) {
333         session.getConfig().setIdleTime(
334                 IdleStatus.READER_IDLE, getKeepAliveRequestInterval());
335         session.removeAttribute(WAITING_FOR_RESPONSE);
336     }
337     
338     private boolean isKeepAliveMessage(IoSession session, Object message) {
339         return messageFactory.isRequest(session, message) ||
340                messageFactory.isResponse(session, message);
341     }
342 }