1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.filter.traffic;
21
22 import java.util.ArrayList;
23 import java.util.List;
24 import java.util.Set;
25
26 import org.apache.mina.common.IoFilter;
27 import org.apache.mina.common.IoFilterAdapter;
28 import org.apache.mina.common.IoFilterChain;
29 import org.apache.mina.common.IoService;
30 import org.apache.mina.common.IoSession;
31 import org.apache.mina.common.WriteException;
32 import org.apache.mina.common.WriteRequest;
33 import org.apache.mina.util.CopyOnWriteMap;
34 import org.apache.mina.util.MapBackedSet;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54 public class WriteThrottleFilter extends IoFilterAdapter {
55
56 private final Logger logger = LoggerFactory.getLogger(getClass());
57
58 private static final Set<IoService> activeServices =
59 new MapBackedSet<IoService>(new CopyOnWriteMap<IoService, Boolean>());
60
61 public static int getGlobalScheduledWriteMessages() {
62 int answer = 0;
63 List<IoService> inactiveServices = null;
64 for (IoService s: activeServices) {
65 if (s.isActive()) {
66 answer += s.getScheduledWriteMessages();
67 } else {
68 if (inactiveServices == null) {
69 inactiveServices = new ArrayList<IoService>();
70 }
71 inactiveServices.add(s);
72 }
73 }
74
75 if (inactiveServices != null) {
76 activeServices.removeAll(inactiveServices);
77 }
78
79 return answer;
80 }
81
82 public static long getGlobalScheduledWriteBytes() {
83 long answer = 0;
84 List<IoService> inactiveServices = null;
85 for (IoService s: activeServices) {
86 if (s.isActive()) {
87 answer += s.getScheduledWriteBytes();
88 } else {
89 if (inactiveServices == null) {
90 inactiveServices = new ArrayList<IoService>();
91 }
92 inactiveServices.add(s);
93 }
94 }
95
96 if (inactiveServices != null) {
97 activeServices.removeAll(inactiveServices);
98 }
99
100 return answer;
101 }
102
103 private static int getGlobalScheduledWriteMessages(IoService service) {
104 if (!activeServices.contains(service)) {
105 activeServices.add(service);
106 }
107 return getGlobalScheduledWriteMessages();
108 }
109
110 private static long getGlobalScheduledWriteBytes(IoService service) {
111 if (!activeServices.contains(service)) {
112 activeServices.add(service);
113 }
114 return getGlobalScheduledWriteBytes();
115 }
116
117 private final Object logLock = new Object();
118 private final Object blockLock = new Object();
119
120 private long lastLogTime = 0;
121 private int blockWaiters = 0;
122
123 private volatile WriteThrottlePolicy policy;
124
125 private volatile int maxSessionScheduledWriteMessages;
126 private volatile long maxSessionScheduledWriteBytes;
127 private volatile int maxServiceScheduledWriteMessages;
128 private volatile long maxServiceScheduledWriteBytes;
129 private volatile int maxGlobalScheduledWriteMessages;
130 private volatile long maxGlobalScheduledWriteBytes;
131
132
133
134
135
136 public WriteThrottleFilter() {
137 this(WriteThrottlePolicy.LOG);
138 }
139
140
141
142
143
144 public WriteThrottleFilter(WriteThrottlePolicy policy) {
145
146 this(policy, 4096, 65536, 1024 * 128, 1048576 * 64, 1024 * 256, 1028576 * 128);
147 }
148
149
150
151
152
153 public WriteThrottleFilter(
154 int maxSessionScheduledWriteMessages, long maxSessionScheduledWriteBytes,
155 int maxServiceScheduledWriteMessages, long maxServiceScheduledWriteBytes,
156 int maxGlobalScheduledWriteMessages, long maxGlobalScheduledWriteBytes) {
157 this(WriteThrottlePolicy.LOG,
158 maxSessionScheduledWriteMessages, maxSessionScheduledWriteBytes,
159 maxServiceScheduledWriteMessages, maxServiceScheduledWriteBytes,
160 maxGlobalScheduledWriteMessages, maxGlobalScheduledWriteBytes);
161 }
162
163
164
165
166
167 public WriteThrottleFilter(
168 WriteThrottlePolicy policy,
169 int maxSessionScheduledWriteMessages, long maxSessionScheduledWriteBytes,
170 int maxServiceScheduledWriteMessages, long maxServiceScheduledWriteBytes,
171 int maxGlobalScheduledWriteMessages, long maxGlobalScheduledWriteBytes) {
172
173 setPolicy(policy);
174 setMaxSessionScheduledWriteMessages(maxSessionScheduledWriteMessages);
175 setMaxSessionScheduledWriteBytes(maxSessionScheduledWriteBytes);
176 setMaxServiceScheduledWriteMessages(maxServiceScheduledWriteMessages);
177 setMaxServiceScheduledWriteBytes(maxServiceScheduledWriteBytes);
178 setMaxGlobalScheduledWriteMessages(maxGlobalScheduledWriteMessages);
179 setMaxGlobalScheduledWriteBytes(maxGlobalScheduledWriteBytes);
180 }
181
182 public WriteThrottlePolicy getPolicy() {
183 return policy;
184 }
185
186 public void setPolicy(WriteThrottlePolicy policy) {
187 if (policy == null) {
188 throw new NullPointerException("policy");
189 }
190 this.policy = policy;
191 }
192
193 public int getMaxSessionScheduledWriteMessages() {
194 return maxSessionScheduledWriteMessages;
195 }
196
197 public void setMaxSessionScheduledWriteMessages(int maxSessionScheduledWriteMessages) {
198 if (maxSessionScheduledWriteMessages < 0) {
199 maxSessionScheduledWriteMessages = 0;
200 }
201 this.maxSessionScheduledWriteMessages = maxSessionScheduledWriteMessages;
202 }
203
204 public long getMaxSessionScheduledWriteBytes() {
205 return maxSessionScheduledWriteBytes;
206 }
207
208 public void setMaxSessionScheduledWriteBytes(long maxSessionScheduledWriteBytes) {
209 if (maxSessionScheduledWriteBytes < 0) {
210 maxSessionScheduledWriteBytes = 0;
211 }
212 this.maxSessionScheduledWriteBytes = maxSessionScheduledWriteBytes;
213 }
214
215 public int getMaxServiceScheduledWriteMessages() {
216 return maxServiceScheduledWriteMessages;
217 }
218
219 public void setMaxServiceScheduledWriteMessages(int maxServiceScheduledWriteMessages) {
220 if (maxServiceScheduledWriteMessages < 0) {
221 maxServiceScheduledWriteMessages = 0;
222 }
223 this.maxServiceScheduledWriteMessages = maxServiceScheduledWriteMessages;
224 }
225
226 public long getMaxServiceScheduledWriteBytes() {
227 return maxServiceScheduledWriteBytes;
228 }
229
230 public void setMaxServiceScheduledWriteBytes(long maxServiceScheduledWriteBytes) {
231 if (maxServiceScheduledWriteBytes < 0) {
232 maxServiceScheduledWriteBytes = 0;
233 }
234 this.maxServiceScheduledWriteBytes = maxServiceScheduledWriteBytes;
235 }
236
237 public int getMaxGlobalScheduledWriteMessages() {
238 return maxGlobalScheduledWriteMessages;
239 }
240
241 public void setMaxGlobalScheduledWriteMessages(int maxGlobalScheduledWriteMessages) {
242 if (maxGlobalScheduledWriteMessages < 0) {
243 maxGlobalScheduledWriteMessages = 0;
244 }
245 this.maxGlobalScheduledWriteMessages = maxGlobalScheduledWriteMessages;
246 }
247
248 public long getMaxGlobalScheduledWriteBytes() {
249 return maxGlobalScheduledWriteBytes;
250 }
251
252 public void setMaxGlobalScheduledWriteBytes(long maxGlobalScheduledWriteBytes) {
253 if (maxGlobalScheduledWriteBytes < 0) {
254 maxGlobalScheduledWriteBytes = 0;
255 }
256 this.maxGlobalScheduledWriteBytes = maxGlobalScheduledWriteBytes;
257 }
258
259 @Override
260 public void onPreAdd(
261 IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
262 if (parent.contains(WriteThrottleFilter.class)) {
263 throw new IllegalStateException(
264 "Only one " + WriteThrottleFilter.class.getName() + " is allowed per chain.");
265 }
266 }
267
268 @Override
269 public void filterWrite(NextFilter nextFilter, IoSession session,
270 WriteRequest writeRequest) throws Exception {
271
272 WriteThrottlePolicy policy = getPolicy();
273 if (policy != WriteThrottlePolicy.OFF) {
274 if (!readyToWrite(session)) {
275 switch (policy) {
276 case FAIL:
277 log(session);
278 fail(session, writeRequest);
279 break;
280 case BLOCK:
281 log(session);
282 block(session);
283 break;
284 case LOG:
285 log(session);
286 break;
287 }
288 }
289 }
290
291 nextFilter.filterWrite(session, writeRequest);
292 }
293
294 private boolean readyToWrite(IoSession session) {
295 if (session.isClosing()) {
296 return true;
297 }
298
299 int mSession = maxSessionScheduledWriteMessages;
300 long bSession = maxSessionScheduledWriteBytes;
301 int mService = maxServiceScheduledWriteMessages;
302 long bService = maxServiceScheduledWriteBytes;
303 int mGlobal = maxGlobalScheduledWriteMessages;
304 long bGlobal = maxGlobalScheduledWriteBytes;
305
306 return (mSession == 0 || session.getScheduledWriteMessages() < mSession) &&
307 (bSession == 0 || session.getScheduledWriteBytes() < bSession) &&
308 (mService == 0 || session.getService().getScheduledWriteMessages() < mService) &&
309 (bService == 0 || session.getService().getScheduledWriteBytes() < bService) &&
310 (mGlobal == 0 || getGlobalScheduledWriteMessages(session.getService()) < mGlobal) &&
311 (bGlobal == 0 || getGlobalScheduledWriteBytes(session.getService()) < bGlobal);
312 }
313
314 private void log(IoSession session) {
315 long currentTime = System.currentTimeMillis();
316
317
318 boolean log;
319 synchronized (logLock) {
320 if (currentTime - lastLogTime > 3000) {
321 lastLogTime = currentTime;
322 log = true;
323 } else {
324 log = false;
325 }
326 }
327
328 if (log) {
329 logger.warn(getMessage(session));
330 }
331 }
332
333 @Override
334 public void messageSent(
335 NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
336 notifyWaitingWriters();
337 nextFilter.messageSent(session, writeRequest);
338 }
339
340 @Override
341 public void exceptionCaught(NextFilter nextFilter, IoSession session,
342 Throwable cause) throws Exception {
343 try {
344 nextFilter.exceptionCaught(session, cause);
345 } finally {
346 notifyWaitingWriters();
347 }
348 }
349
350 @Override
351 public void sessionClosed(NextFilter nextFilter, IoSession session)
352 throws Exception {
353 notifyWaitingWriters();
354 nextFilter.sessionClosed(session);
355 }
356
357 private void block(IoSession session) {
358 synchronized (blockLock) {
359 blockWaiters ++;
360 while (!readyToWrite(session)) {
361 try {
362 blockLock.wait();
363 } catch (InterruptedException e) {
364
365 }
366 }
367 blockWaiters --;
368 }
369 }
370
371 private void notifyWaitingWriters() {
372 synchronized (blockLock) {
373 if (blockWaiters != 0) {
374 blockLock.notifyAll();
375 }
376 }
377 }
378
379 private void fail(IoSession session, WriteRequest writeRequest) throws WriteException {
380 throw new WriteFloodException(writeRequest, getMessage(session));
381 }
382
383 private String getMessage(IoSession session) {
384 int mSession = maxSessionScheduledWriteMessages;
385 long bSession = maxSessionScheduledWriteBytes;
386 int mService = maxServiceScheduledWriteMessages;
387 long bService = maxServiceScheduledWriteBytes;
388 int mGlobal = maxGlobalScheduledWriteMessages;
389 long bGlobal = maxGlobalScheduledWriteBytes;
390
391 StringBuilder buf = new StringBuilder(512);
392 buf.append("Write requests flooded - session: ");
393 if (mSession != 0) {
394 buf.append(session.getScheduledWriteMessages());
395 buf.append(" / ");
396 buf.append(mSession);
397 buf.append(" msgs, ");
398 } else {
399 buf.append(session.getScheduledWriteMessages());
400 buf.append(" / unlimited msgs, ");
401 }
402
403 if (bSession != 0) {
404 buf.append(session.getScheduledWriteBytes());
405 buf.append(" / ");
406 buf.append(bSession);
407 buf.append(" bytes, ");
408 } else {
409 buf.append(session.getScheduledWriteBytes());
410 buf.append(" / unlimited bytes, ");
411 }
412
413 buf.append("service: ");
414 if (mService != 0) {
415 buf.append(session.getService().getScheduledWriteMessages());
416 buf.append(" / ");
417 buf.append(mService);
418 buf.append(" msgs, ");
419 } else {
420 buf.append(session.getService().getScheduledWriteMessages());
421 buf.append(" / unlimited msgs, ");
422 }
423
424 if (bService != 0) {
425 buf.append(session.getService().getScheduledWriteBytes());
426 buf.append(" / ");
427 buf.append(bService);
428 buf.append(" bytes, ");
429 } else {
430 buf.append(session.getService().getScheduledWriteBytes());
431 buf.append(" / unlimited bytes, ");
432 }
433
434 buf.append("global: ");
435 if (mGlobal != 0) {
436 buf.append(getGlobalScheduledWriteMessages());
437 buf.append(" / ");
438 buf.append(mGlobal);
439 buf.append(" msgs, ");
440 } else {
441 buf.append(getGlobalScheduledWriteMessages());
442 buf.append(" / unlimited msgs, ");
443 }
444
445 if (bGlobal != 0) {
446 buf.append(getGlobalScheduledWriteBytes());
447 buf.append(" / ");
448 buf.append(bGlobal);
449 buf.append(" bytes.");
450 } else {
451 buf.append(getGlobalScheduledWriteBytes());
452 buf.append(" / unlimited bytes.");
453 }
454
455 return buf.toString();
456 }
457 }