1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.filter.executor;
21
22 import java.util.concurrent.atomic.AtomicInteger;
23
24 import org.apache.mina.core.session.IoEvent;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28
29
30
31
32
33
34 public class IoEventQueueThrottle implements IoEventQueueHandler {
35
36 private final Logger logger = LoggerFactory.getLogger(getClass());
37
38
39 private final IoEventSizeEstimator eventSizeEstimator;
40
41 private volatile int threshold;
42
43 private final Object lock = new Object();
44 private final AtomicInteger counter = new AtomicInteger();
45 private int waiters;
46
47 public IoEventQueueThrottle() {
48 this(new DefaultIoEventSizeEstimator(), 65536);
49 }
50
51 public IoEventQueueThrottle(int threshold) {
52 this(new DefaultIoEventSizeEstimator(), threshold);
53 }
54
55 public IoEventQueueThrottle(IoEventSizeEstimator eventSizeEstimator, int threshold) {
56 if (eventSizeEstimator == null) {
57 throw new NullPointerException("eventSizeEstimator");
58 }
59 this.eventSizeEstimator = eventSizeEstimator;
60
61 setThreshold(threshold);
62 }
63
64 public IoEventSizeEstimator getEventSizeEstimator() {
65 return eventSizeEstimator;
66 }
67
68 public int getThreshold() {
69 return threshold;
70 }
71
72 public int getCounter() {
73 return counter.get();
74 }
75
76 public void setThreshold(int threshold) {
77 if (threshold <= 0) {
78 throw new IllegalArgumentException("threshold: " + threshold);
79 }
80 this.threshold = threshold;
81 }
82
83 public boolean accept(Object source, IoEvent event) {
84 return true;
85 }
86
87 public void offered(Object source, IoEvent event) {
88 int eventSize = estimateSize(event);
89 int currentCounter = counter.addAndGet(eventSize);
90 logState();
91
92 if (currentCounter >= threshold) {
93 block();
94 }
95 }
96
97 public void polled(Object source, IoEvent event) {
98 int eventSize = estimateSize(event);
99 int currentCounter = counter.addAndGet(-eventSize);
100
101 logState();
102
103 if (currentCounter < threshold) {
104 unblock();
105 }
106 }
107
108 private int estimateSize(IoEvent event) {
109 int size = getEventSizeEstimator().estimateSize(event);
110 if (size < 0) {
111 throw new IllegalStateException(
112 IoEventSizeEstimator.class.getSimpleName() + " returned " +
113 "a negative value (" + size + "): " + event);
114 }
115 return size;
116 }
117
118 private void logState() {
119 if (logger.isDebugEnabled()) {
120 logger.debug(Thread.currentThread().getName() + " state: " + counter.get() + " / " + getThreshold());
121 }
122 }
123
124 protected void block() {
125 if (logger.isDebugEnabled()) {
126 logger.debug(Thread.currentThread().getName() + " blocked: " + counter.get() + " >= " + threshold);
127 }
128
129 synchronized (lock) {
130 while (counter.get() >= threshold) {
131 waiters ++;
132 try {
133 lock.wait();
134 } catch (InterruptedException e) {
135
136 } finally {
137 waiters --;
138 }
139 }
140 }
141
142 if (logger.isDebugEnabled()) {
143 logger.debug(Thread.currentThread().getName() + " unblocked: " + counter.get() + " < " + threshold);
144 }
145 }
146
147 protected void unblock() {
148 synchronized (lock) {
149 if (waiters > 0) {
150 lock.notify();
151 }
152 }
153 }
154 }