001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.component.mock; 018 019 import java.util.ArrayList; 020 import java.util.HashMap; 021 import java.util.List; 022 import java.util.Map; 023 import java.util.concurrent.CopyOnWriteArrayList; 024 import java.util.concurrent.CountDownLatch; 025 import java.util.concurrent.TimeUnit; 026 027 import org.apache.camel.Component; 028 import org.apache.camel.Consumer; 029 import org.apache.camel.Exchange; 030 import org.apache.camel.Expression; 031 import org.apache.camel.Message; 032 import org.apache.camel.Processor; 033 import org.apache.camel.Producer; 034 import org.apache.camel.impl.DefaultEndpoint; 035 import org.apache.camel.impl.DefaultExchange; 036 import org.apache.camel.impl.DefaultProducer; 037 import org.apache.camel.util.ExpressionComparator; 038 import org.apache.camel.util.ObjectHelper; 039 import org.apache.commons.logging.Log; 040 import org.apache.commons.logging.LogFactory; 041 042 /** 043 * A Mock endpoint which provides a literate, fluent API for testing routes 044 * using a <a href="http://jmock.org/">JMock style</a> API. 045 * 046 * @version $Revision: 1.1 $ 047 */ 048 public class MockEndpoint extends DefaultEndpoint<Exchange> { 049 private static final transient Log LOG = LogFactory.getLog(MockEndpoint.class); 050 private int expectedCount = -1; 051 private int counter; 052 private Map<Integer, Processor> processors = new HashMap<Integer, Processor>(); 053 private List<Exchange> receivedExchanges = new CopyOnWriteArrayList<Exchange>(); 054 private List<Throwable> failures = new CopyOnWriteArrayList<Throwable>(); 055 private List<Runnable> tests = new CopyOnWriteArrayList<Runnable>(); 056 private CountDownLatch latch; 057 private long sleepForEmptyTest = 1000L; 058 private long defaulResultWaitMillis = 20000L; 059 private int expectedMinimumCount = -1; 060 private List expectedBodyValues; 061 private List actualBodyValues = new ArrayList(); 062 063 public MockEndpoint(String endpointUri, Component component) { 064 super(endpointUri, component); 065 } 066 067 public static void assertWait(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException { 068 long start = System.currentTimeMillis(); 069 long left = unit.toMillis(timeout); 070 long end = start + left; 071 for (MockEndpoint endpoint : endpoints) { 072 if (!endpoint.await(left, TimeUnit.MILLISECONDS)) { 073 throw new AssertionError("Timeout waiting for endpoints to receive enough messages. " + endpoint.getEndpointUri() + " timed out."); 074 } 075 left = end - System.currentTimeMillis(); 076 if (left <= 0) { 077 left = 0; 078 } 079 } 080 } 081 082 public static void assertIsSatisfied(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException { 083 assertWait(timeout, unit, endpoints); 084 for (MockEndpoint endpoint : endpoints) { 085 endpoint.assertIsSatisfied(); 086 } 087 } 088 089 public static void assertIsSatisfied(MockEndpoint... endpoints) throws InterruptedException { 090 for (MockEndpoint endpoint : endpoints) { 091 endpoint.assertIsSatisfied(); 092 } 093 } 094 095 public static void expectsMessageCount(int count, MockEndpoint... endpoints) throws InterruptedException { 096 for (MockEndpoint endpoint : endpoints) { 097 endpoint.expectsMessageCount(count); 098 } 099 } 100 101 public Exchange createExchange() { 102 return new DefaultExchange(getContext()); 103 } 104 105 public Consumer<Exchange> createConsumer(Processor processor) throws Exception { 106 throw new UnsupportedOperationException("You cannot consume from this endpoint"); 107 } 108 109 public Producer<Exchange> createProducer() throws Exception { 110 return new DefaultProducer<Exchange>(this) { 111 public void process(Exchange exchange) { 112 onExchange(exchange); 113 } 114 }; 115 } 116 117 // Testing API 118 // ------------------------------------------------------------------------- 119 120 /** 121 * Validates that all the available expectations on this endpoint are 122 * satisfied; or throw an exception 123 */ 124 public void assertIsSatisfied() throws InterruptedException { 125 assertIsSatisfied(sleepForEmptyTest); 126 } 127 128 /** 129 * Validates that all the available expectations on this endpoint are 130 * satisfied; or throw an exception 131 * 132 * @param timeoutForEmptyEndpoints the timeout in milliseconds that we 133 * should wait for the test to be true 134 */ 135 public void assertIsSatisfied(long timeoutForEmptyEndpoints) throws InterruptedException { 136 if (expectedCount >= 0) { 137 if (expectedCount != getReceivedCounter()) { 138 if (expectedCount == 0) { 139 // lets wait a little bit just in case 140 if (timeoutForEmptyEndpoints > 0) { 141 LOG.debug("Sleeping for: " + timeoutForEmptyEndpoints + " millis to check there really are no messages received"); 142 Thread.sleep(timeoutForEmptyEndpoints); 143 } 144 } else { 145 waitForCompleteLatch(); 146 } 147 } 148 assertEquals("Received message count", expectedCount, getReceivedCounter()); 149 } else if (expectedMinimumCount > 0 && getReceivedCounter() < expectedMinimumCount) { 150 waitForCompleteLatch(); 151 } 152 153 if (expectedMinimumCount >= 0) { 154 int receivedCounter = getReceivedCounter(); 155 assertTrue("Received message count " + receivedCounter + ", expected at least " + expectedCount, expectedCount <= receivedCounter); 156 } 157 158 for (Runnable test : tests) { 159 test.run(); 160 } 161 162 for (Throwable failure : failures) { 163 if (failure != null) { 164 LOG.error("Caught on " + getEndpointUri() + " Exception: " + failure, failure); 165 fail("Failed due to caught exception: " + failure); 166 } 167 } 168 } 169 170 /** 171 * Validates that the assertions fail on this endpoint 172 */ 173 public void assertIsNotSatisfied() throws InterruptedException { 174 try { 175 assertIsSatisfied(); 176 fail("Expected assertion failure!"); 177 } catch (AssertionError e) { 178 LOG.info("Caught expected failure: " + e); 179 } 180 } 181 182 /** 183 * Specifies the expected number of message exchanges that should be 184 * received by this endpoint 185 * 186 * @param expectedCount the number of message exchanges that should be 187 * expected by this endpoint 188 */ 189 public void expectedMessageCount(int expectedCount) { 190 this.expectedCount = expectedCount; 191 if (expectedCount <= 0) { 192 latch = null; 193 } else { 194 latch = new CountDownLatch(expectedCount); 195 } 196 } 197 198 /** 199 * Specifies the minimum number of expected message exchanges that should be 200 * received by this endpoint 201 * 202 * @param expectedCount the number of message exchanges that should be 203 * expected by this endpoint 204 */ 205 public void expectedMinimumMessageCount(int expectedCount) { 206 this.expectedMinimumCount = expectedCount; 207 if (expectedCount <= 0) { 208 latch = null; 209 } else { 210 latch = new CountDownLatch(expectedMinimumCount); 211 } 212 } 213 214 /** 215 * Adds an expectation that the given body values are received by this 216 * endpoint 217 */ 218 public void expectedBodiesReceived(final List bodies) { 219 expectedMessageCount(bodies.size()); 220 this.expectedBodyValues = bodies; 221 this.actualBodyValues = new ArrayList(); 222 223 expects(new Runnable() { 224 public void run() { 225 for (int i = 0; i < expectedBodyValues.size(); i++) { 226 Exchange exchange = getReceivedExchanges().get(i); 227 assertTrue("No exchange received for counter: " + i, exchange != null); 228 229 Object expectedBody = expectedBodyValues.get(i); 230 Object actualBody = actualBodyValues.get(i); 231 232 assertEquals("Body of message: " + i, expectedBody, actualBody); 233 } 234 } 235 }); 236 } 237 238 /** 239 * Adds an expectation that the given body values are received by this 240 * endpoint 241 */ 242 public void expectedBodiesReceived(Object... bodies) { 243 List bodyList = new ArrayList(); 244 for (Object body : bodies) { 245 bodyList.add(body); 246 } 247 expectedBodiesReceived(bodyList); 248 } 249 250 /** 251 * Adds an expectation that messages received should have ascending values 252 * of the given expression such as a user generated counter value 253 * 254 * @param expression 255 */ 256 public void expectsAscending(final Expression<Exchange> expression) { 257 expects(new Runnable() { 258 public void run() { 259 assertMessagesAscending(expression); 260 } 261 }); 262 } 263 264 /** 265 * Adds an expectation that messages received should have descending values 266 * of the given expression such as a user generated counter value 267 * 268 * @param expression 269 */ 270 public void expectsDescending(final Expression<Exchange> expression) { 271 expects(new Runnable() { 272 public void run() { 273 assertMessagesDescending(expression); 274 } 275 }); 276 } 277 278 /** 279 * Adds an expectation that no duplicate messages should be received using 280 * the expression to determine the message ID 281 * 282 * @param expression the expression used to create a unique message ID for 283 * message comparison (which could just be the message 284 * payload if the payload can be tested for uniqueness using 285 * {@link Object#equals(Object)} and 286 * {@link Object#hashCode()} 287 */ 288 public void expectsNoDuplicates(final Expression<Exchange> expression) { 289 expects(new Runnable() { 290 public void run() { 291 assertNoDuplicates(expression); 292 } 293 }); 294 } 295 296 /** 297 * Asserts that the messages have ascending values of the given expression 298 */ 299 public void assertMessagesAscending(Expression<Exchange> expression) { 300 assertMessagesSorted(expression, true); 301 } 302 303 /** 304 * Asserts that the messages have descending values of the given expression 305 */ 306 public void assertMessagesDescending(Expression<Exchange> expression) { 307 assertMessagesSorted(expression, false); 308 } 309 310 protected void assertMessagesSorted(Expression<Exchange> expression, boolean ascending) { 311 String type = ascending ? "ascending" : "descending"; 312 ExpressionComparator comparator = new ExpressionComparator(expression); 313 List<Exchange> list = getReceivedExchanges(); 314 for (int i = 1; i < list.size(); i++) { 315 int j = i - 1; 316 Exchange e1 = list.get(j); 317 Exchange e2 = list.get(i); 318 int result = comparator.compare(e1, e2); 319 if (result == 0) { 320 fail("Messages not " + type + ". Messages" + j + " and " + i + " are equal with value: " + expression.evaluate(e1) + " for expression: " + expression + ". Exchanges: " + e1 + " and " 321 + e2); 322 } else { 323 if (!ascending) { 324 result = result * -1; 325 } 326 if (result > 0) { 327 fail("Messages not " + type + ". Message " + j + " has value: " + expression.evaluate(e1) + " and message " + i + " has value: " + expression.evaluate(e2) + " for expression: " 328 + expression + ". Exchanges: " + e1 + " and " + e2); 329 } 330 } 331 } 332 } 333 334 public void assertNoDuplicates(Expression<Exchange> expression) { 335 Map<Object, Exchange> map = new HashMap<Object, Exchange>(); 336 List<Exchange> list = getReceivedExchanges(); 337 for (int i = 0; i < list.size(); i++) { 338 Exchange e2 = list.get(i); 339 Object key = expression.evaluate(e2); 340 Exchange e1 = map.get(key); 341 if (e1 != null) { 342 fail("Duplicate message found on message " + i + " has value: " + key + " for expression: " + expression + ". Exchanges: " + e1 + " and " + e2); 343 } else { 344 map.put(key, e2); 345 } 346 } 347 } 348 349 /** 350 * Adds the expection which will be invoked when enough messages are 351 * received 352 */ 353 public void expects(Runnable runnable) { 354 tests.add(runnable); 355 } 356 357 /** 358 * Adds an assertion to the given message index 359 * 360 * @param messageIndex the number of the message 361 * @return the assertion clause 362 */ 363 public AssertionClause message(final int messageIndex) { 364 AssertionClause clause = new AssertionClause() { 365 public void run() { 366 applyAssertionOn(MockEndpoint.this, messageIndex, assertExchangeReceived(messageIndex)); 367 } 368 }; 369 expects(clause); 370 return clause; 371 } 372 373 /** 374 * Adds an assertion to all the received messages 375 * 376 * @return the assertion clause 377 */ 378 public AssertionClause allMessages() { 379 AssertionClause clause = new AssertionClause() { 380 public void run() { 381 List<Exchange> list = getReceivedExchanges(); 382 int index = 0; 383 for (Exchange exchange : list) { 384 applyAssertionOn(MockEndpoint.this, index++, exchange); 385 } 386 } 387 }; 388 expects(clause); 389 return clause; 390 } 391 392 /** 393 * Asserts that the given index of message is received (starting at zero) 394 */ 395 public Exchange assertExchangeReceived(int index) { 396 int count = getReceivedCounter(); 397 assertTrue("Not enough messages received. Was: " + count, count > index); 398 return getReceivedExchanges().get(index); 399 } 400 401 // Properties 402 // ------------------------------------------------------------------------- 403 public List<Throwable> getFailures() { 404 return failures; 405 } 406 407 public int getReceivedCounter() { 408 return getReceivedExchanges().size(); 409 } 410 411 public List<Exchange> getReceivedExchanges() { 412 return receivedExchanges; 413 } 414 415 public int getExpectedCount() { 416 return expectedCount; 417 } 418 419 public long getSleepForEmptyTest() { 420 return sleepForEmptyTest; 421 } 422 423 /** 424 * Allows a sleep to be specified to wait to check that this endpoint really 425 * is empty when {@link #expectedMessageCount(int)} is called with zero 426 * 427 * @param sleepForEmptyTest the milliseconds to sleep for to determine that 428 * this endpoint really is empty 429 */ 430 public void setSleepForEmptyTest(long sleepForEmptyTest) { 431 this.sleepForEmptyTest = sleepForEmptyTest; 432 } 433 434 public long getDefaulResultWaitMillis() { 435 return defaulResultWaitMillis; 436 } 437 438 /** 439 * Sets the maximum amount of time the {@link #assertIsSatisfied()} will 440 * wait on a latch until it is satisfied 441 */ 442 public void setDefaulResultWaitMillis(long defaulResultWaitMillis) { 443 this.defaulResultWaitMillis = defaulResultWaitMillis; 444 } 445 446 // Implementation methods 447 // ------------------------------------------------------------------------- 448 protected synchronized void onExchange(Exchange exchange) { 449 try { 450 Message in = exchange.getIn(); 451 Object actualBody = in.getBody(); 452 453 if (expectedBodyValues != null) { 454 int index = actualBodyValues.size(); 455 if (expectedBodyValues.size() > index) { 456 Object expectedBody = expectedBodyValues.get(index); 457 if (expectedBody != null) { 458 actualBody = in.getBody(expectedBody.getClass()); 459 } 460 actualBodyValues.add(actualBody); 461 } 462 } 463 464 LOG.debug(getEndpointUri() + " >>>> " + (++counter) + " : " + exchange + " with body: " + actualBody); 465 466 receivedExchanges.add(exchange); 467 468 Processor processor = processors.get(getReceivedCounter()); 469 if (processor != null) { 470 processor.process(exchange); 471 } 472 473 if (latch != null) { 474 latch.countDown(); 475 } 476 } catch (Exception e) { 477 failures.add(e); 478 } 479 } 480 481 protected void waitForCompleteLatch() throws InterruptedException { 482 if (latch == null) { 483 fail("Should have a latch!"); 484 } 485 486 // now lets wait for the results 487 LOG.debug("Waiting on the latch for: " + defaulResultWaitMillis + " millis"); 488 latch.await(defaulResultWaitMillis, TimeUnit.MILLISECONDS); 489 } 490 491 protected void assertEquals(String message, Object expectedValue, Object actualValue) { 492 if (!ObjectHelper.equals(expectedValue, actualValue)) { 493 fail(message + ". Expected: <" + expectedValue + "> but was: <" + actualValue + ">"); 494 } 495 } 496 497 protected void assertTrue(String message, boolean predicate) { 498 if (!predicate) { 499 fail(message); 500 } 501 } 502 503 protected void fail(Object message) { 504 if (LOG.isDebugEnabled()) { 505 List<Exchange> list = getReceivedExchanges(); 506 int index = 0; 507 for (Exchange exchange : list) { 508 LOG.debug("Received[" + (++index) + "]: " + exchange); 509 } 510 } 511 throw new AssertionError(getEndpointUri() + " " + message); 512 } 513 514 public int getExpectedMinimumCount() { 515 return expectedMinimumCount; 516 } 517 518 public void await() throws InterruptedException { 519 if (latch != null) { 520 latch.await(); 521 } 522 } 523 524 public boolean await(long timeout, TimeUnit unit) throws InterruptedException { 525 if (latch != null) { 526 return latch.await(timeout, unit); 527 } 528 return true; 529 } 530 531 public boolean isSingleton() { 532 return true; 533 } 534 535 }