Classes in this File | Line Coverage | Branch Coverage | Complexity | ||||||||
Throttler |
|
| 0.0;0 |
1 | /** |
|
2 | * Licensed to the Apache Software Foundation (ASF) under one or more |
|
3 | * contributor license agreements. See the NOTICE file distributed with |
|
4 | * this work for additional information regarding copyright ownership. |
|
5 | * The ASF licenses this file to You under the Apache License, Version 2.0 |
|
6 | * (the "License"); you may not use this file except in compliance with |
|
7 | * the License. You may obtain a copy of the License at |
|
8 | * |
|
9 | * http://www.apache.org/licenses/LICENSE-2.0 |
|
10 | * |
|
11 | * Unless required by applicable law or agreed to in writing, software |
|
12 | * distributed under the License is distributed on an "AS IS" BASIS, |
|
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
14 | * See the License for the specific language governing permissions and |
|
15 | * limitations under the License. |
|
16 | */ |
|
17 | package org.apache.camel.processor; |
|
18 | ||
19 | import org.apache.camel.Exchange; |
|
20 | import org.apache.camel.Processor; |
|
21 | ||
22 | /** |
|
23 | * A <a href="http://activemq.apache.org/camel/throttler.html">Throttler</a> |
|
24 | * will set a limit on the maximum number of message exchanges which can be sent |
|
25 | * to a processor within a specific time period. <p/> This pattern can be |
|
26 | * extremely useful if you have some external system which meters access; such |
|
27 | * as only allowing 100 requests per second; or if huge load can cause a |
|
28 | * particular systme to malfunction or to reduce its throughput you might want |
|
29 | * to introduce some throttling. |
|
30 | * |
|
31 | * @version $Revision: $ |
|
32 | */ |
|
33 | public class Throttler extends DelayProcessorSupport { |
|
34 | private long maximumRequestsPerPeriod; |
|
35 | private long timePeriodMillis; |
|
36 | private long startTimeMillis; |
|
37 | private long requestCount; |
|
38 | ||
39 | public Throttler(Processor processor, long maximumRequestsPerPeriod) { |
|
40 | 0 | this(processor, maximumRequestsPerPeriod, 1000); |
41 | 0 | } |
42 | ||
43 | public Throttler(Processor processor, long maximumRequestsPerPeriod, long timePeriodMillis) { |
|
44 | 3 | super(processor); |
45 | 3 | this.maximumRequestsPerPeriod = maximumRequestsPerPeriod; |
46 | 3 | this.timePeriodMillis = timePeriodMillis; |
47 | 3 | } |
48 | ||
49 | @Override |
|
50 | public String toString() { |
|
51 | 3 | return "Throttler[requests: " + maximumRequestsPerPeriod + " per: " + timePeriodMillis + " (ms) to: " |
52 | + getProcessor() + "]"; |
|
53 | } |
|
54 | ||
55 | // Properties |
|
56 | // ----------------------------------------------------------------------- |
|
57 | public long getMaximumRequestsPerPeriod() { |
|
58 | 0 | return maximumRequestsPerPeriod; |
59 | } |
|
60 | ||
61 | /** |
|
62 | * Sets the maximum number of requests per time period |
|
63 | */ |
|
64 | public void setMaximumRequestsPerPeriod(long maximumRequestsPerPeriod) { |
|
65 | 0 | this.maximumRequestsPerPeriod = maximumRequestsPerPeriod; |
66 | 0 | } |
67 | ||
68 | public long getTimePeriodMillis() { |
|
69 | 0 | return timePeriodMillis; |
70 | } |
|
71 | ||
72 | /** |
|
73 | * Sets the time period during which the maximum number of requests apply |
|
74 | */ |
|
75 | public void setTimePeriodMillis(long timePeriodMillis) { |
|
76 | 0 | this.timePeriodMillis = timePeriodMillis; |
77 | 0 | } |
78 | ||
79 | /** |
|
80 | * The number of requests which have taken place so far within this time |
|
81 | * period |
|
82 | */ |
|
83 | public long getRequestCount() { |
|
84 | 0 | return requestCount; |
85 | } |
|
86 | ||
87 | /** |
|
88 | * The start time when this current period began |
|
89 | */ |
|
90 | public long getStartTimeMillis() { |
|
91 | 0 | return startTimeMillis; |
92 | } |
|
93 | ||
94 | // Implementation methods |
|
95 | // ----------------------------------------------------------------------- |
|
96 | protected void delay(Exchange exchange) throws Exception { |
|
97 | 9 | long now = currentSystemTime(); |
98 | 9 | if (startTimeMillis == 0) { |
99 | 3 | startTimeMillis = now; |
100 | } |
|
101 | 9 | if (now - startTimeMillis > timePeriodMillis) { |
102 | // we're at the start of a new time period |
|
103 | // so lets reset things |
|
104 | 0 | requestCount = 1; |
105 | 0 | startTimeMillis = now; |
106 | 0 | } else { |
107 | 9 | if (++requestCount > maximumRequestsPerPeriod) { |
108 | // lets sleep until the start of the next time period |
|
109 | 0 | long time = startTimeMillis + timePeriodMillis; |
110 | 0 | waitUntil(time, exchange); |
111 | } |
|
112 | } |
|
113 | 9 | } |
114 | } |