1 |
|
|
2 |
|
|
3 |
|
|
4 |
|
|
5 |
|
|
6 |
|
|
7 |
|
|
8 |
|
|
9 |
|
|
10 |
|
|
11 |
|
|
12 |
|
|
13 |
|
|
14 |
|
|
15 |
|
|
16 |
|
|
17 |
|
|
18 |
|
package org.apache.camel.component.mock; |
19 |
|
|
20 |
|
import java.util.ArrayList; |
21 |
|
import java.util.HashMap; |
22 |
|
import java.util.List; |
23 |
|
import java.util.Map; |
24 |
|
import java.util.concurrent.CountDownLatch; |
25 |
|
import java.util.concurrent.TimeUnit; |
26 |
|
|
27 |
|
import org.apache.camel.Component; |
28 |
|
import org.apache.camel.Consumer; |
29 |
|
import org.apache.camel.Exchange; |
30 |
|
import org.apache.camel.Processor; |
31 |
|
import org.apache.camel.Producer; |
32 |
|
import org.apache.camel.Message; |
33 |
|
import org.apache.camel.impl.DefaultEndpoint; |
34 |
|
import org.apache.camel.impl.DefaultExchange; |
35 |
|
import org.apache.camel.impl.DefaultProducer; |
36 |
|
import org.apache.camel.util.ObjectHelper; |
37 |
|
import org.apache.commons.logging.Log; |
38 |
|
import org.apache.commons.logging.LogFactory; |
39 |
|
|
40 |
|
|
41 |
|
|
42 |
|
|
43 |
|
|
44 |
|
|
45 |
|
|
46 |
29 |
public class MockEndpoint extends DefaultEndpoint<Exchange> { |
47 |
1 |
private static final transient Log log = LogFactory.getLog(MockEndpoint.class); |
48 |
35 |
private int expectedCount = -1; |
49 |
35 |
private Map<Integer, Processor> processors = new HashMap<Integer, Processor>(); |
50 |
35 |
private List<Exchange> receivedExchanges = new ArrayList<Exchange>(); |
51 |
35 |
private List<Throwable> failures = new ArrayList<Throwable>(); |
52 |
35 |
private List<Runnable> tests = new ArrayList<Runnable>(); |
53 |
|
private CountDownLatch latch; |
54 |
35 |
private long sleepForEmptyTest = 0L; |
55 |
35 |
private int expectedMinimumCount=-1; |
56 |
|
|
57 |
|
public static void assertWait(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException { |
58 |
0 |
long start = System.currentTimeMillis(); |
59 |
0 |
long left = unit.toMillis(timeout); |
60 |
0 |
long end = start + left; |
61 |
0 |
for (MockEndpoint endpoint : endpoints) { |
62 |
0 |
if( !endpoint.await(left, TimeUnit.MILLISECONDS) ) |
63 |
0 |
throw new AssertionError("Timeout waiting for endpoints to receive enough messages. "+endpoint.getEndpointUri()+" timed out."); |
64 |
0 |
left = end - System.currentTimeMillis(); |
65 |
0 |
if( left <= 0 ) |
66 |
0 |
left = 0; |
67 |
|
} |
68 |
0 |
} |
69 |
|
|
70 |
|
public static void assertIsSatisfied(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException { |
71 |
0 |
assertWait(timeout, unit, endpoints); |
72 |
0 |
for (MockEndpoint endpoint : endpoints) { |
73 |
0 |
endpoint.assertIsSatisfied(); |
74 |
|
} |
75 |
0 |
} |
76 |
|
|
77 |
|
public static void assertIsSatisfied(MockEndpoint... endpoints) throws InterruptedException { |
78 |
26 |
for (MockEndpoint endpoint : endpoints) { |
79 |
19 |
endpoint.assertIsSatisfied(); |
80 |
|
} |
81 |
7 |
} |
82 |
|
|
83 |
|
public static void expectsMessageCount(int count, MockEndpoint... endpoints) throws InterruptedException { |
84 |
15 |
for (MockEndpoint endpoint : endpoints) { |
85 |
6 |
endpoint.expectsMessageCount(count); |
86 |
|
} |
87 |
9 |
} |
88 |
|
|
89 |
|
public MockEndpoint(String endpointUri, Component component) { |
90 |
35 |
super(endpointUri, component); |
91 |
35 |
} |
92 |
|
|
93 |
|
public Exchange createExchange() { |
94 |
0 |
return new DefaultExchange(getContext()); |
95 |
|
} |
96 |
|
|
97 |
|
public Consumer<Exchange> createConsumer(Processor processor) throws Exception { |
98 |
0 |
throw new UnsupportedOperationException("You cannot consume from this endpoint"); |
99 |
|
} |
100 |
|
|
101 |
|
public Producer<Exchange> createProducer() throws Exception { |
102 |
36 |
return new DefaultProducer<Exchange>(this) { |
103 |
36 |
public void process(Exchange exchange) { |
104 |
33 |
onExchange(exchange); |
105 |
33 |
} |
106 |
|
}; |
107 |
|
} |
108 |
|
|
109 |
|
|
110 |
|
|
111 |
|
|
112 |
|
|
113 |
|
|
114 |
|
|
115 |
|
public void assertIsSatisfied() throws InterruptedException { |
116 |
35 |
assertIsSatisfied(sleepForEmptyTest); |
117 |
35 |
} |
118 |
|
|
119 |
|
|
120 |
|
|
121 |
|
|
122 |
|
public void assertIsSatisfied(long timeoutForEmptyEndpoints) throws InterruptedException { |
123 |
35 |
if (latch != null) { |
124 |
|
|
125 |
23 |
latch.await(10, TimeUnit.SECONDS); |
126 |
23 |
} |
127 |
12 |
else if (expectedCount == 0) { |
128 |
|
|
129 |
6 |
if (timeoutForEmptyEndpoints > 0) { |
130 |
0 |
Thread.sleep(timeoutForEmptyEndpoints); |
131 |
|
} |
132 |
|
} |
133 |
|
|
134 |
35 |
if (expectedCount >= 0) { |
135 |
29 |
int receivedCounter = getReceivedCounter(); |
136 |
29 |
assertEquals("Received message count" , expectedCount, receivedCounter); |
137 |
|
} |
138 |
|
|
139 |
35 |
if( expectedMinimumCount >= 0 ) { |
140 |
0 |
int receivedCounter = getReceivedCounter(); |
141 |
0 |
assertTrue("Received message count "+receivedCounter+", expected at least "+expectedCount, expectedCount <= receivedCounter); |
142 |
|
|
143 |
|
} |
144 |
|
|
145 |
35 |
for (Runnable test : tests) { |
146 |
23 |
test.run(); |
147 |
23 |
} |
148 |
|
|
149 |
35 |
for (Throwable failure : failures) { |
150 |
0 |
if (failure != null) { |
151 |
0 |
log.error("Caught on " + getEndpointUri() + " Exception: " + failure, failure); |
152 |
0 |
fail("Failed due to caught exception: " + failure); |
153 |
|
} |
154 |
0 |
} |
155 |
35 |
} |
156 |
|
|
157 |
|
|
158 |
|
|
159 |
|
|
160 |
|
|
161 |
|
|
162 |
|
public void expectedMessageCount(int expectedCount) { |
163 |
29 |
this.expectedCount = expectedCount; |
164 |
29 |
if (expectedCount <= 0) { |
165 |
6 |
latch = null; |
166 |
6 |
} |
167 |
|
else { |
168 |
23 |
latch = new CountDownLatch(expectedCount); |
169 |
|
} |
170 |
29 |
} |
171 |
|
|
172 |
|
|
173 |
|
|
174 |
|
|
175 |
|
|
176 |
|
|
177 |
|
public void expectedMinimumMessageCount(int expectedCount) { |
178 |
0 |
this.expectedMinimumCount = expectedCount; |
179 |
0 |
if (expectedCount <= 0) { |
180 |
0 |
latch = null; |
181 |
0 |
} |
182 |
|
else { |
183 |
0 |
latch = new CountDownLatch(expectedMinimumCount); |
184 |
|
} |
185 |
0 |
} |
186 |
|
|
187 |
|
|
188 |
|
|
189 |
|
|
190 |
|
public void expectedBodiesReceived(final List bodies) { |
191 |
19 |
expectedMessageCount(bodies.size()); |
192 |
|
|
193 |
19 |
expects(new Runnable() { |
194 |
19 |
public void run() { |
195 |
19 |
int counter = 0; |
196 |
19 |
for (Object expectedBody : bodies) { |
197 |
29 |
Exchange exchange = getReceivedExchanges().get(counter++); |
198 |
29 |
assertTrue("No exchange received for counter: " + counter, exchange != null); |
199 |
|
|
200 |
29 |
Message in = exchange.getIn(); |
201 |
|
|
202 |
29 |
Object actualBody = (expectedBody != null) |
203 |
|
? in.getBody(expectedBody.getClass()) : in.getBody(); |
204 |
|
|
205 |
29 |
assertEquals("Body of message: " + counter, expectedBody, actualBody); |
206 |
|
|
207 |
29 |
log.debug(getEndpointUri() + " >>>> message: " + counter + " with body: " + actualBody); |
208 |
29 |
} |
209 |
19 |
} |
210 |
|
}); |
211 |
19 |
} |
212 |
|
|
213 |
|
|
214 |
|
|
215 |
|
|
216 |
|
public void expectedBodiesReceived(Object... bodies) { |
217 |
19 |
List bodyList = new ArrayList(); |
218 |
48 |
for (Object body : bodies) { |
219 |
29 |
bodyList.add(body); |
220 |
|
} |
221 |
19 |
expectedBodiesReceived(bodyList); |
222 |
19 |
} |
223 |
|
|
224 |
|
|
225 |
|
|
226 |
|
|
227 |
|
|
228 |
|
public void expects(Runnable runnable) { |
229 |
23 |
tests.add(runnable); |
230 |
23 |
} |
231 |
|
|
232 |
|
|
233 |
|
|
234 |
|
|
235 |
|
|
236 |
|
|
237 |
|
|
238 |
|
public AssertionClause message(final int messageIndex) { |
239 |
4 |
AssertionClause clause = new AssertionClause() { |
240 |
4 |
public void run() { |
241 |
4 |
applyAssertionOn(MockEndpoint.this, messageIndex, assertExchangeReceived(messageIndex)); |
242 |
4 |
} |
243 |
|
}; |
244 |
4 |
expects(clause); |
245 |
4 |
return clause; |
246 |
|
} |
247 |
|
|
248 |
|
|
249 |
|
|
250 |
|
|
251 |
|
|
252 |
|
|
253 |
|
public AssertionClause allMessages() { |
254 |
0 |
AssertionClause clause = new AssertionClause() { |
255 |
0 |
public void run() { |
256 |
0 |
List<Exchange> list = getReceivedExchanges(); |
257 |
0 |
int index = 0; |
258 |
0 |
for (Exchange exchange : list) { |
259 |
0 |
applyAssertionOn(MockEndpoint.this, index++, exchange); |
260 |
0 |
} |
261 |
0 |
} |
262 |
|
}; |
263 |
0 |
expects(clause); |
264 |
0 |
return clause; |
265 |
|
} |
266 |
|
|
267 |
|
|
268 |
|
|
269 |
|
|
270 |
|
public Exchange assertExchangeReceived(int index) { |
271 |
4 |
int count = getReceivedCounter(); |
272 |
4 |
assertTrue("Not enough messages received. Was: " + count, count > index); |
273 |
4 |
return getReceivedExchanges().get(index); |
274 |
|
} |
275 |
|
|
276 |
|
|
277 |
|
|
278 |
|
public List<Throwable> getFailures() { |
279 |
0 |
return failures; |
280 |
|
} |
281 |
|
|
282 |
|
public int getReceivedCounter() { |
283 |
66 |
return getReceivedExchanges().size(); |
284 |
|
} |
285 |
|
|
286 |
|
public List<Exchange> getReceivedExchanges() { |
287 |
100 |
return receivedExchanges; |
288 |
|
} |
289 |
|
|
290 |
|
public int getExpectedCount() { |
291 |
0 |
return expectedCount; |
292 |
|
} |
293 |
|
|
294 |
|
public long getSleepForEmptyTest() { |
295 |
0 |
return sleepForEmptyTest; |
296 |
|
} |
297 |
|
|
298 |
|
|
299 |
|
|
300 |
|
|
301 |
|
|
302 |
|
|
303 |
|
|
304 |
|
public void setSleepForEmptyTest(long sleepForEmptyTest) { |
305 |
0 |
this.sleepForEmptyTest = sleepForEmptyTest; |
306 |
0 |
} |
307 |
|
|
308 |
|
|
309 |
|
|
310 |
|
protected synchronized void onExchange(Exchange exchange) { |
311 |
|
try { |
312 |
33 |
log.debug(getEndpointUri() + " >>>> " + exchange); |
313 |
|
|
314 |
33 |
receivedExchanges.add(exchange); |
315 |
|
|
316 |
33 |
Processor processor = processors.get(getReceivedCounter()); |
317 |
33 |
if (processor != null) { |
318 |
0 |
processor.process(exchange); |
319 |
|
} |
320 |
|
|
321 |
33 |
if (latch != null) { |
322 |
33 |
latch.countDown(); |
323 |
|
} |
324 |
|
} |
325 |
0 |
catch (Exception e) { |
326 |
0 |
failures.add(e); |
327 |
33 |
} |
328 |
33 |
} |
329 |
|
|
330 |
|
protected void assertEquals(String message, Object expectedValue, Object actualValue) { |
331 |
58 |
if (!ObjectHelper.equals(expectedValue, actualValue)) { |
332 |
0 |
fail(message + ". Expected: <" + expectedValue + "> but was: <" + actualValue + ">"); |
333 |
|
} |
334 |
58 |
} |
335 |
|
|
336 |
|
protected void assertTrue(String message, boolean predicate) { |
337 |
33 |
if (!predicate) { |
338 |
0 |
fail(message); |
339 |
|
} |
340 |
33 |
} |
341 |
|
|
342 |
|
protected void fail(Object message) { |
343 |
0 |
throw new AssertionError(getEndpointUri() + " " + message); |
344 |
|
} |
345 |
|
|
346 |
|
public int getExpectedMinimumCount() { |
347 |
0 |
return expectedMinimumCount; |
348 |
|
} |
349 |
|
|
350 |
|
public void await() throws InterruptedException { |
351 |
0 |
if( latch!=null ) { |
352 |
0 |
latch.await(); |
353 |
|
} |
354 |
0 |
} |
355 |
|
|
356 |
|
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { |
357 |
0 |
if( latch!=null ) { |
358 |
0 |
return latch.await(timeout, unit); |
359 |
|
} |
360 |
0 |
return true; |
361 |
|
} |
362 |
|
|
363 |
|
public boolean isSingleton() { |
364 |
35 |
return true; |
365 |
|
} |
366 |
|
|
367 |
|
} |