View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.traffic;
21  
22  import java.util.ArrayList;
23  import java.util.List;
24  import java.util.Map;
25  import java.util.concurrent.ScheduledExecutorService;
26  import java.util.concurrent.ScheduledFuture;
27  import java.util.concurrent.TimeUnit;
28  import java.util.concurrent.atomic.AtomicInteger;
29  
30  import org.apache.mina.common.AttributeKey;
31  import org.apache.mina.common.IoBuffer;
32  import org.apache.mina.common.IoFilter;
33  import org.apache.mina.common.IoFilterAdapter;
34  import org.apache.mina.common.IoFilterChain;
35  import org.apache.mina.common.IoService;
36  import org.apache.mina.common.IoSession;
37  import org.apache.mina.common.TrafficMask;
38  import org.apache.mina.filter.executor.ExecutorFilter;
39  import org.apache.mina.util.CopyOnWriteMap;
40  import org.slf4j.Logger;
41  import org.slf4j.LoggerFactory;
42  
43  /**
44   * An {@link IoFilter} that throttles incoming traffic to
45   * prevent a unwanted {@link OutOfMemoryError} under heavy load.
46   * <p>
47   * This filter will automatically disable reads on an {@link IoSession} once
48   * the amount of the read data batched for that session in the {@link ExecutorFilter}
49   * reaches a defined threshold. It accomplishes this by adding one filter before the
50   * {@link ExecutorFilter}.
51   * <p>
52   * The size of the received data is calculated by {@link MessageSizeEstimator}.
53   * If you are using a transport whose envelope is not an {@link IoBuffer},
54   * you could write your own {@link MessageSizeEstimator} for better traffic
55   * calculation.  However, the {@link DefaultMessageSizeEstimator} will suffice
56   * in most cases.
57   * <p>
58   * It is recommended to add this filter at the end of your filter chain
59   * configuration because it is possible to subvert the behavior of the added
60   * filters by adding a filter immediately before/after the {@link ExecutorFilter}
61   * after inserting this builder, consequently leading to a unexpected behavior.
62   *
63   * @author The Apache MINA Project (dev@mina.apache.org)
64   * @version $Rev: 616100 $, $Date: 2008-01-28 15:58:32 -0700 (Mon, 28 Jan 2008) $
65   */
66  public class ReadThrottleFilter extends IoFilterAdapter {
67      
68      private static final AtomicInteger globalBufferSize = new AtomicInteger();
69      private static final Map<IoService, AtomicInteger> serviceBufferSizes =
70          new CopyOnWriteMap<IoService, AtomicInteger>();
71      
72      private static final Object globalResumeLock = new Object();
73      private static long lastGlobalResumeTime = 0;
74      private final Logger logger = LoggerFactory.getLogger(getClass());
75  
76      /**
77       * Returns the current amount of data in the buffer of the {@link ExecutorFilter}
78       * for all {@link IoSession} whose {@link IoFilterChain} has been configured by
79       * this builder.
80       */
81      public static int getGlobalBufferSize() {
82          return globalBufferSize.get();
83      }
84      
85      public static int getServiceBufferSize(IoService service) {
86          AtomicInteger answer = serviceBufferSizes.get(service);
87          if (answer == null) {
88              return 0;
89          } else {
90              return answer.get();
91          }
92      }
93      
94      private static int increaseServiceBufferSize(IoService service, int increment) {
95          AtomicInteger serviceBufferSize = serviceBufferSizes.get(service);
96          if (serviceBufferSize == null) {
97              synchronized (serviceBufferSizes) {
98                  serviceBufferSize = serviceBufferSizes.get(service);
99                  if (serviceBufferSize == null) {
100                     serviceBufferSize = new AtomicInteger(increment);
101                     serviceBufferSizes.put(service, serviceBufferSize);
102                     return increment;
103                 }
104             }
105         }
106         return serviceBufferSize.addAndGet(increment);
107     }
108     
109     private final AttributeKey STATE =
110         new AttributeKey(ReadThrottleFilter.class, "state");
111 
112     private volatile ReadThrottlePolicy policy;
113     private final MessageSizeEstimator messageSizeEstimator;
114     
115     private volatile int maxSessionBufferSize;
116     private volatile int maxServiceBufferSize;
117     private volatile int maxGlobalBufferSize;
118     
119     private final IoFilter enterFilter = new EnterFilter();
120     
121     private final ScheduledExecutorService executor;
122     private ScheduledFuture<?> resumeOthersFuture;
123     private final AtomicInteger sessionCount = new AtomicInteger();
124     private final Runnable resumeOthersTask = new Runnable() {
125         public void run() {
126             resumeOthers();
127         }
128     };
129 
130     /**
131      * Creates a new instance with 64KB <tt>maxSessionBufferSize</tt>,
132      * 128MB <tt>maxGlobalBufferSize</tt> and a new {@link DefaultMessageSizeEstimator}.
133      */
134     public ReadThrottleFilter(ScheduledExecutorService executor) {
135         this(executor, ReadThrottlePolicy.LOG);
136     }
137     
138     public ReadThrottleFilter(
139             ScheduledExecutorService executor, ReadThrottlePolicy policy) {
140         this(executor, policy, null);
141     }
142     
143     public ReadThrottleFilter(
144             ScheduledExecutorService executor,
145             ReadThrottlePolicy policy, MessageSizeEstimator messageSizeEstimator) {
146         // 64KB, 64MB, 128MB.
147         this(executor, policy, messageSizeEstimator, 65536, 1048576 * 64, 1048576 * 128);
148     }
149     
150     /**
151      * Creates a new instance with the specified <tt>maxSessionBufferSize</tt>,
152      * <tt>maxGlobalBufferSize</tt> and a new {@link DefaultMessageSizeEstimator}.
153      */
154     public ReadThrottleFilter(
155             ScheduledExecutorService executor,
156             int maxSessionBufferSize, int maxServiceBufferSize, int maxGlobalBufferSize) {
157         this(executor, ReadThrottlePolicy.LOG, maxSessionBufferSize, maxServiceBufferSize, maxGlobalBufferSize);
158     }
159 
160     public ReadThrottleFilter(
161             ScheduledExecutorService executor, ReadThrottlePolicy policy,
162             int maxSessionBufferSize, int maxServiceBufferSize, int maxGlobalBufferSize) {
163         this(executor, policy, null, maxSessionBufferSize, maxServiceBufferSize, maxGlobalBufferSize);
164     }
165 
166     /**
167      * Creates a new instance with the specified <tt>maxSessionBufferSize</tt>,
168      * <tt>maxGlobalBufferSize</tt> and {@link MessageSizeEstimator}.
169      * 
170      * @param maxSessionBufferSize the maximum amount of data in the buffer of
171      *                           the {@link ExecutorFilter} per {@link IoSession}.
172      *                           Specify {@code 0} or a smaller value to disable.
173      * @param maxGlobalBufferSize the maximum amount of data in the buffer of
174      *                            the {@link ExecutorFilter} for all {@link IoSession}
175      *                            whose {@link IoFilterChain} has been configured by
176      *                            this builder.
177      *                            Specify {@code 0} or a smaller value to disable.
178      * @param messageSizeEstimator the message size estimator. If {@code null},
179      *                             a new {@link DefaultMessageSizeEstimator} is created.
180      */
181     public ReadThrottleFilter(
182             ScheduledExecutorService executor, 
183             ReadThrottlePolicy policy, MessageSizeEstimator messageSizeEstimator,
184             int maxSessionBufferSize, int maxServiceBufferSize, int maxGlobalBufferSize) {
185         if (messageSizeEstimator == null) {
186             messageSizeEstimator = new DefaultMessageSizeEstimator();
187         }
188         this.executor = executor;
189         this.messageSizeEstimator = messageSizeEstimator;
190         setPolicy(policy);
191         setMaxSessionBufferSize(maxSessionBufferSize);
192         setMaxServiceBufferSize(maxServiceBufferSize);
193         setMaxGlobalBufferSize(maxGlobalBufferSize);
194     }
195 
196     public ReadThrottlePolicy getPolicy() {
197         return policy;
198     }
199 
200     public void setPolicy(ReadThrottlePolicy policy) {
201         if (policy == null) {
202             throw new NullPointerException("policy");
203         }
204         
205         this.policy = policy;
206     }
207 
208     /**
209      * Returns the maximum amount of data in the buffer of the {@link ExecutorFilter}
210      * per {@link IoSession}.  {@code 0} means 'disabled'.
211      */
212     public int getMaxSessionBufferSize() {
213         return maxSessionBufferSize;
214     }
215     
216     public int getMaxServiceBufferSize() {
217         return maxServiceBufferSize;
218     }
219     
220     /**
221      * Returns the maximum amount of data in the buffer of the {@link ExecutorFilter}
222      * for all {@link IoSession} whose {@link IoFilterChain} has been configured by
223      * this builder. {@code 0} means 'disabled'.
224      */
225     public int getMaxGlobalBufferSize() {
226         return maxGlobalBufferSize;
227     }
228     
229     /**
230      * Sets the maximum amount of data in the buffer of the {@link ExecutorFilter}
231      * per {@link IoSession}.  Specify {@code 0} or a smaller value to disable.
232      */
233     public void setMaxSessionBufferSize(int maxSessionBufferSize) {
234         if (maxSessionBufferSize < 0) {
235             maxSessionBufferSize = 0;
236         }
237         this.maxSessionBufferSize = maxSessionBufferSize;
238     }
239 
240     public void setMaxServiceBufferSize(int maxServiceBufferSize) {
241         if (maxServiceBufferSize < 0) {
242             maxServiceBufferSize = 0;
243         }
244         this.maxServiceBufferSize = maxServiceBufferSize;
245     }
246 
247     /**
248      * Sets the maximum amount of data in the buffer of the {@link ExecutorFilter}
249      * for all {@link IoSession} whose {@link IoFilterChain} has been configured by
250      * this builder. Specify {@code 0} or a smaller value to disable.
251      */
252     public void setMaxGlobalBufferSize(int maxGlobalBufferSize) {
253         if (maxGlobalBufferSize < 0) {
254             maxGlobalBufferSize = 0;
255         }
256         this.maxGlobalBufferSize = maxGlobalBufferSize;
257     }
258     
259     /**
260      * Returns the size estimator currently in use.
261      */
262     public MessageSizeEstimator getMessageSizeEstimator() {
263         return messageSizeEstimator;
264     }
265     
266     /**
267      * Returns the current amount of data in the buffer of the {@link ExecutorFilter}
268      * for the specified {@link IoSession}.
269      */
270     public int getSessionBufferSize(IoSession session) {
271         State state = (State) session.getAttribute(STATE);
272         if (state == null) {
273             return 0;
274         }
275         
276         synchronized (state) {
277             return state.sessionBufferSize;
278         }
279     }
280     
281     @Override
282     public void onPreAdd(
283             IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
284         if (!parent.contains(ExecutorFilter.class)) {
285             throw new IllegalStateException(
286                     "At least one " + ExecutorFilter.class.getName() + " must exist in the chain.");
287         }
288         if (parent.contains(this)) {
289             throw new IllegalArgumentException(
290                     "You can't add the same filter instance more than once.  Create another instance and add it.");
291         }
292     }
293     
294     @Override
295     public void onPostAdd(
296             IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
297         
298         // My previous filter must be an ExecutorFilter.
299         IoFilter lastFilter = null;
300         for (IoFilterChain.Entry e: parent.getAll()) {
301             IoFilter currentFilter = e.getFilter();
302             if (currentFilter == this) {
303                 if (lastFilter instanceof ExecutorFilter) {
304                     // Good!
305                     break;
306                 } else {
307                     throw new IllegalStateException(
308                             ReadThrottleFilter.class.getName() + " must be placed after " +
309                             "an " + ExecutorFilter.class.getName() + " in the chain");
310                 }
311             }
312             
313             lastFilter = currentFilter;
314         }
315         
316         // Add an entering filter before the ExecutorFilter.
317         parent.getEntry(lastFilter).addBefore(name + ".preprocessor", enterFilter);
318         
319         int previousSessionCount = sessionCount.getAndIncrement();
320         if (previousSessionCount == 0) {
321             synchronized (resumeOthersTask) {
322                 resumeOthersFuture = executor.scheduleWithFixedDelay(
323                         resumeOthersTask, 3000, 3000, TimeUnit.MILLISECONDS);
324             }
325         }
326     }
327 
328     @Override
329     public void onPostRemove(
330             IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
331         // Remove the enter filter together.
332         try {
333             parent.remove(enterFilter);
334         } catch (Exception e) {
335             // Ignore.
336         }
337         
338         int currentSessionCount = sessionCount.decrementAndGet();
339         if (currentSessionCount == 0) {
340             synchronized (resumeOthersTask) {
341                 resumeOthersFuture.cancel(false);
342                 resumeOthersFuture = null;
343             }
344         }
345     }
346 
347     @Override
348     public void messageReceived(
349             NextFilter nextFilter, IoSession session, Object message) throws Exception {
350         exit(session, estimateSize(message));
351         nextFilter.messageReceived(session, message);
352     }
353 
354     @Override
355     public void filterSetTrafficMask(NextFilter nextFilter, IoSession session,
356             TrafficMask trafficMask) throws Exception {
357         
358         if (trafficMask.isReadable()) {
359             State state = getState(session);
360             boolean suspendedRead;
361             synchronized (state) {
362                 suspendedRead = state.suspendedRead;
363             }
364             
365             // Suppress resumeRead() if read is suspended by this filter.
366             if (suspendedRead) {
367                 trafficMask = trafficMask.and(TrafficMask.WRITE);
368             }
369         }
370         
371         nextFilter.filterSetTrafficMask(session, trafficMask);
372     }
373 
374     private class EnterFilter extends IoFilterAdapter {
375         @Override
376         public void onPreRemove(
377                 IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
378             // Remove the exit filter together.
379             try {
380                 parent.remove(ReadThrottleFilter.this);
381             } catch (Exception e) {
382                 // Ignore.
383             }
384         }
385         
386         @Override
387         public void onPostRemove(
388                 IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
389             parent.getSession().removeAttribute(STATE);
390         }
391     
392         @Override
393         public void messageReceived(
394                 NextFilter nextFilter, IoSession session, Object message) throws Exception {
395             enter(session, estimateSize(message));
396             nextFilter.messageReceived(session, message);
397         }
398     }
399     
400     private int estimateSize(Object message) {
401         int size = messageSizeEstimator.estimateSize(message);
402         if (size < 0) {
403             throw new IllegalStateException(
404                     MessageSizeEstimator.class.getSimpleName() + " returned " +
405                     "a negative value (" + size + "): " + message);
406         }
407         return size;
408     }
409 
410     private void enter(IoSession session, int size) {
411         State state = getState(session);
412 
413         int globalBufferSize = ReadThrottleFilter.globalBufferSize.addAndGet(size);
414         int serviceBufferSize = increaseServiceBufferSize(session.getService(), size);
415 
416         int maxGlobalBufferSize = this.maxGlobalBufferSize;
417         int maxServiceBufferSize = this.maxServiceBufferSize;
418         int maxSessionBufferSize = this.maxSessionBufferSize;
419         
420         ReadThrottlePolicy policy = getPolicy();
421         
422         boolean enforcePolicy = false;
423         int sessionBufferSize;
424         synchronized (state) {
425             sessionBufferSize = (state.sessionBufferSize += size);
426             if ((maxSessionBufferSize != 0 && sessionBufferSize >= maxSessionBufferSize) ||
427                 (maxServiceBufferSize != 0 && serviceBufferSize >= maxServiceBufferSize) ||
428                 (maxGlobalBufferSize  != 0 && globalBufferSize  >= maxGlobalBufferSize)) {
429                 enforcePolicy = true;
430                 switch (policy) {
431                 case EXCEPTION:
432                 case BLOCK:
433                     state.suspendedRead = true;
434                 }
435             }
436         }
437 
438         if (logger.isDebugEnabled()) {
439             logger.debug(getMessage(session, "  Entered - "));
440         }
441         
442         if (enforcePolicy) {
443             switch (policy) {
444             case CLOSE:
445                 log(session, state);
446                 session.close();
447                 raiseException(session);
448                 break;
449             case EXCEPTION:
450                 suspend(session, state, logger);
451                 raiseException(session);
452                 break;
453             case BLOCK:
454                 suspend(session, state, logger);
455                 break;
456             case LOG:
457                 log(session, state);
458                 break;
459             }
460         }
461     }
462 
463     private void suspend(IoSession session, State state, Logger logger) {
464         log(session, state);
465         session.suspendRead();
466         if (logger.isDebugEnabled()) {
467             logger.debug(getMessage(session, "Suspended - "));
468         }
469     }
470     
471     private void exit(IoSession session, int size) {
472         State state = getState(session);
473 
474         int globalBufferSize = ReadThrottleFilter.globalBufferSize.addAndGet(-size);
475         if (globalBufferSize < 0) {
476             throw new IllegalStateException("globalBufferSize: " + globalBufferSize);
477         }
478         
479         int serviceBufferSize = increaseServiceBufferSize(session.getService(), -size);
480         if (serviceBufferSize < 0) {
481             throw new IllegalStateException("serviceBufferSize: " + serviceBufferSize);
482         }
483 
484         int maxGlobalBufferSize = this.maxGlobalBufferSize;
485         int maxServiceBufferSize = this.maxServiceBufferSize;
486         int maxSessionBufferSize = this.maxSessionBufferSize;
487         
488         int sessionBufferSize;
489         
490         boolean enforcePolicy = false;
491         synchronized (state) {
492             sessionBufferSize = (state.sessionBufferSize -= size);
493             if (sessionBufferSize < 0) {
494                 throw new IllegalStateException("sessionBufferSize: " + sessionBufferSize);
495             }
496             if ((maxGlobalBufferSize == 0 || globalBufferSize < maxGlobalBufferSize) &&
497                 (maxServiceBufferSize == 0 || serviceBufferSize < maxServiceBufferSize) &&
498                 (maxSessionBufferSize == 0 || sessionBufferSize < maxSessionBufferSize)) {
499                 state.suspendedRead = false;
500                 enforcePolicy = true;
501             }
502         }
503         
504         if (logger.isDebugEnabled()) {
505             logger.debug(getMessage(session, "   Exited - "));
506         }
507         
508         if (enforcePolicy) {
509             session.resumeRead();
510             if (logger.isDebugEnabled()) {
511                 logger.debug(getMessage(session, "  Resumed - "));
512             }
513         }
514         
515         resumeOthers();
516     }
517     
518     private void resumeOthers() {
519         long currentTime = System.currentTimeMillis();
520         
521         // Try to resume other sessions every other second.
522         boolean resumeOthers;
523         synchronized (globalResumeLock) {
524             if (currentTime - lastGlobalResumeTime > 1000) {
525                 lastGlobalResumeTime = currentTime;
526                 resumeOthers = true;
527             } else {
528                 resumeOthers = false;
529             }
530         }
531         
532         if (resumeOthers) {
533             int maxGlobalBufferSize = this.maxGlobalBufferSize;
534             if (maxGlobalBufferSize == 0 || globalBufferSize.get() < maxGlobalBufferSize) {
535                 List<IoService> inactiveServices = null;
536                 for (IoService service: serviceBufferSizes.keySet()) {
537                     resumeService(service);
538                     
539                     if (!service.isActive()) {
540                         if (inactiveServices == null) {
541                             inactiveServices = new ArrayList<IoService>();
542                         }
543                         inactiveServices.add(service);
544                     }
545                     
546                     // Remove inactive services from the map.
547                     if (inactiveServices != null) {
548                         for (IoService s: inactiveServices) {
549                             serviceBufferSizes.remove(s);
550                         }
551                     }
552 
553                     synchronized (globalResumeLock) {
554                         lastGlobalResumeTime = System.currentTimeMillis();
555                     }
556                 }
557             }
558         }
559     }
560     
561     private void resumeService(IoService service) {
562         int maxServiceBufferSize = this.maxServiceBufferSize;
563         if (maxServiceBufferSize == 0 || getServiceBufferSize(service) < maxServiceBufferSize) {
564             for (IoSession session: service.getManagedSessions()) {
565                 resume(session);
566             }
567         }
568     }
569     
570     private void resume(IoSession session) {
571         State state = (State) session.getAttribute(STATE);
572         if (state == null) {
573             return;
574         }
575         
576         int maxSessionBufferSize = this.maxSessionBufferSize;
577         boolean resume = false;
578         synchronized (state) {
579             if ((maxSessionBufferSize == 0 || state.sessionBufferSize < maxSessionBufferSize)) {
580                 state.suspendedRead = false;
581                 resume = true;
582             }
583         }
584 
585         if (resume) {
586             session.resumeRead();
587             if (logger.isDebugEnabled()) {
588                 logger.debug(getMessage(session, "  Resumed - "));
589             }
590         }
591     }
592 
593     private void log(IoSession session, State state) {
594         long currentTime = System.currentTimeMillis();
595         
596         // Prevent log flood by logging every 3 seconds.
597         boolean log;
598         synchronized (state.logLock) {
599             if (currentTime - state.lastLogTime > 3000) {
600                 state.lastLogTime = currentTime;
601                 log = true;
602             } else {
603                 log = false;
604             }
605         }
606         
607         if (log) {
608             logger.warn(getMessage(session));
609         }
610     }
611     
612     private void raiseException(IoSession session) {
613         throw new ReadFloodException(getMessage(session));
614     }
615     
616     private String getMessage(IoSession session) {
617         return getMessage(session, "Read buffer flooded - ");
618     }
619     
620     private String getMessage(IoSession session, String prefix) {
621         int  sessionLimit = maxSessionBufferSize;
622         int  serviceLimit = maxServiceBufferSize;
623         int  globalLimit  = maxGlobalBufferSize;
624 
625         StringBuilder buf = new StringBuilder(512);
626         buf.append(prefix);
627         buf.append("session: ");
628         if (sessionLimit != 0) {
629             buf.append(getSessionBufferSize(session));
630             buf.append(" / ");
631             buf.append(sessionLimit);
632             buf.append(" bytes, ");
633         } else {
634             buf.append(getSessionBufferSize(session));
635             buf.append(" / unlimited bytes, ");
636         }
637         
638         buf.append("service: ");
639         if (serviceLimit != 0) {
640             buf.append(getServiceBufferSize(session.getService()));
641             buf.append(" / ");
642             buf.append(serviceLimit);
643             buf.append(" bytes, ");
644         } else {
645             buf.append(getServiceBufferSize(session.getService()));
646             buf.append(" / unlimited bytes, ");
647         }
648         
649         buf.append("global: ");
650         if (globalLimit != 0) {
651             buf.append(getGlobalBufferSize());
652             buf.append(" / ");
653             buf.append(globalLimit);
654             buf.append(" bytes.");
655         } else {
656             buf.append(getGlobalBufferSize());
657             buf.append(" / unlimited bytes.");
658         }
659         
660         return buf.toString();
661     }
662     
663     private State getState(IoSession session) {
664         State state = (State) session.getAttribute(STATE);
665         if (state == null) {
666             state = new State();
667             State oldState = (State) session.setAttributeIfAbsent(STATE, state);
668             if (oldState != null) {
669                 state = oldState;
670             }
671         }
672         return state;
673     }
674     
675     @Override
676     public String toString() {
677         return String.valueOf(getGlobalBufferSize()) + '/' + getMaxGlobalBufferSize();
678     }
679 
680     private static class State {
681         private int sessionBufferSize;
682         private boolean suspendedRead;
683 
684         private final Object logLock = new Object();
685         private long lastLogTime = 0;
686     }
687 }