001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.component.mock; 018 019 import java.beans.PropertyChangeListener; 020 import java.beans.PropertyChangeSupport; 021 import java.io.File; 022 import java.util.ArrayList; 023 import java.util.Arrays; 024 import java.util.Collection; 025 import java.util.HashMap; 026 import java.util.HashSet; 027 import java.util.List; 028 import java.util.Map; 029 import java.util.Set; 030 import java.util.concurrent.CopyOnWriteArrayList; 031 import java.util.concurrent.CountDownLatch; 032 import java.util.concurrent.TimeUnit; 033 034 import org.apache.camel.CamelContext; 035 import org.apache.camel.Component; 036 import org.apache.camel.Consumer; 037 import org.apache.camel.Endpoint; 038 import org.apache.camel.Exchange; 039 import org.apache.camel.Expression; 040 import org.apache.camel.Message; 041 import org.apache.camel.Processor; 042 import org.apache.camel.Producer; 043 import org.apache.camel.builder.ExpressionClause; 044 import org.apache.camel.impl.DefaultEndpoint; 045 import org.apache.camel.impl.DefaultProducer; 046 import org.apache.camel.spi.BrowsableEndpoint; 047 import org.apache.camel.util.CamelContextHelper; 048 import org.apache.camel.util.ExpressionComparator; 049 import org.apache.camel.util.FileUtil; 050 import org.apache.camel.util.ObjectHelper; 051 import org.apache.commons.logging.Log; 052 import org.apache.commons.logging.LogFactory; 053 054 /** 055 * A Mock endpoint which provides a literate, fluent API for testing routes 056 * using a <a href="http://jmock.org/">JMock style</a> API. 057 * 058 * @version $Revision: 766288 $ 059 */ 060 public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint { 061 private static final transient Log LOG = LogFactory.getLog(MockEndpoint.class); 062 private int expectedCount; 063 private int counter; 064 private Processor defaultProcessor; 065 private Map<Integer, Processor> processors; 066 private List<Exchange> receivedExchanges; 067 private List<Throwable> failures; 068 private List<Runnable> tests; 069 private CountDownLatch latch; 070 private long sleepForEmptyTest; 071 private long resultWaitTime; 072 private long resultMinimumWaitTime; 073 private int expectedMinimumCount; 074 private List expectedBodyValues; 075 private List actualBodyValues; 076 private final PropertyChangeSupport propertyChangeSupport = new PropertyChangeSupport(this); 077 private String headerName; 078 private Object headerValue; 079 private Object actualHeader; 080 private String propertyName; 081 private Object propertyValue; 082 private Object actualProperty; 083 private Processor reporter; 084 085 public MockEndpoint(String endpointUri, Component component) { 086 super(endpointUri, component); 087 init(); 088 } 089 090 public MockEndpoint(String endpointUri) { 091 super(endpointUri); 092 init(); 093 } 094 095 public MockEndpoint() { 096 this(null); 097 } 098 099 /** 100 * A helper method to resolve the mock endpoint of the given URI on the given context 101 * 102 * @param context the camel context to try resolve the mock endpoint from 103 * @param uri the uri of the endpoint to resolve 104 * @return the endpoint 105 */ 106 public static MockEndpoint resolve(CamelContext context, String uri) { 107 return CamelContextHelper.getMandatoryEndpoint(context, uri, MockEndpoint.class); 108 } 109 110 public static void assertWait(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException { 111 long start = System.currentTimeMillis(); 112 long left = unit.toMillis(timeout); 113 long end = start + left; 114 for (MockEndpoint endpoint : endpoints) { 115 if (!endpoint.await(left, TimeUnit.MILLISECONDS)) { 116 throw new AssertionError("Timeout waiting for endpoints to receive enough messages. " + endpoint.getEndpointUri() + " timed out."); 117 } 118 left = end - System.currentTimeMillis(); 119 if (left <= 0) { 120 left = 0; 121 } 122 } 123 } 124 125 public static void assertIsSatisfied(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException { 126 assertWait(timeout, unit, endpoints); 127 for (MockEndpoint endpoint : endpoints) { 128 endpoint.assertIsSatisfied(); 129 } 130 } 131 132 public static void assertIsSatisfied(MockEndpoint... endpoints) throws InterruptedException { 133 for (MockEndpoint endpoint : endpoints) { 134 endpoint.assertIsSatisfied(); 135 } 136 } 137 138 139 /** 140 * Asserts that all the expectations on any {@link MockEndpoint} instances registered 141 * in the given context are valid 142 * 143 * @param context the camel context used to find all the available endpoints to be asserted 144 */ 145 public static void assertIsSatisfied(CamelContext context) throws InterruptedException { 146 ObjectHelper.notNull(context, "camelContext"); 147 Collection<Endpoint> endpoints = context.getSingletonEndpoints(); 148 for (Endpoint endpoint : endpoints) { 149 if (endpoint instanceof MockEndpoint) { 150 MockEndpoint mockEndpoint = (MockEndpoint) endpoint; 151 mockEndpoint.assertIsSatisfied(); 152 } 153 } 154 } 155 156 public static void expectsMessageCount(int count, MockEndpoint... endpoints) throws InterruptedException { 157 for (MockEndpoint endpoint : endpoints) { 158 endpoint.setExpectedMessageCount(count); 159 } 160 } 161 162 public List<Exchange> getExchanges() { 163 return getReceivedExchanges(); 164 } 165 166 public void addPropertyChangeListener(PropertyChangeListener listener) { 167 propertyChangeSupport.addPropertyChangeListener(listener); 168 } 169 170 public void removePropertyChangeListener(PropertyChangeListener listener) { 171 propertyChangeSupport.removePropertyChangeListener(listener); 172 } 173 174 public Consumer createConsumer(Processor processor) throws Exception { 175 throw new UnsupportedOperationException("You cannot consume from this endpoint"); 176 } 177 178 public Producer createProducer() throws Exception { 179 return new DefaultProducer(this) { 180 public void process(Exchange exchange) { 181 onExchange(exchange); 182 } 183 }; 184 } 185 186 public void reset() { 187 init(); 188 } 189 190 191 // Testing API 192 // ------------------------------------------------------------------------- 193 194 /** 195 * Set the processor that will be invoked when the index 196 * message is received. 197 */ 198 public void whenExchangeReceived(int index, Processor processor) { 199 this.processors.put(index, processor); 200 } 201 202 /** 203 * Set the processor that will be invoked when the some message 204 * is received. 205 * 206 * This processor could be overwritten by 207 * {@link #whenExchangeReceived(int, Processor)} method. 208 */ 209 public void whenAnyExchangeReceived(Processor processor) { 210 this.defaultProcessor = processor; 211 } 212 213 /** 214 * Validates that all the available expectations on this endpoint are 215 * satisfied; or throw an exception 216 */ 217 public void assertIsSatisfied() throws InterruptedException { 218 assertIsSatisfied(sleepForEmptyTest); 219 } 220 221 /** 222 * Validates that all the available expectations on this endpoint are 223 * satisfied; or throw an exception 224 * 225 * @param timeoutForEmptyEndpoints the timeout in milliseconds that we 226 * should wait for the test to be true 227 */ 228 public void assertIsSatisfied(long timeoutForEmptyEndpoints) throws InterruptedException { 229 LOG.info("Asserting: " + this + " is satisfied"); 230 if (expectedCount == 0) { 231 if (timeoutForEmptyEndpoints > 0) { 232 LOG.debug("Sleeping for: " + timeoutForEmptyEndpoints + " millis to check there really are no messages received"); 233 Thread.sleep(timeoutForEmptyEndpoints); 234 } 235 assertEquals("Received message count", expectedCount, getReceivedCounter()); 236 } else if (expectedCount > 0) { 237 if (expectedCount != getReceivedCounter()) { 238 waitForCompleteLatch(); 239 } 240 assertEquals("Received message count", expectedCount, getReceivedCounter()); 241 } else if (expectedMinimumCount > 0 && getReceivedCounter() < expectedMinimumCount) { 242 waitForCompleteLatch(); 243 } 244 245 if (expectedMinimumCount >= 0) { 246 int receivedCounter = getReceivedCounter(); 247 assertTrue("Received message count " + receivedCounter + ", expected at least " + expectedMinimumCount, expectedMinimumCount <= receivedCounter); 248 } 249 250 for (Runnable test : tests) { 251 test.run(); 252 } 253 254 for (Throwable failure : failures) { 255 if (failure != null) { 256 LOG.error("Caught on " + getEndpointUri() + " Exception: " + failure, failure); 257 fail("Failed due to caught exception: " + failure); 258 } 259 } 260 } 261 262 /** 263 * Validates that the assertions fail on this endpoint 264 */ 265 public void assertIsNotSatisfied() throws InterruptedException { 266 try { 267 assertIsSatisfied(); 268 fail("Expected assertion failure!"); 269 } catch (AssertionError e) { 270 LOG.info("Caught expected failure: " + e); 271 } 272 } 273 274 /** 275 * Validates that the assertions fail on this endpoint 276 277 * @param timeoutForEmptyEndpoints the timeout in milliseconds that we 278 * should wait for the test to be true 279 */ 280 public void assertIsNotSatisfied(long timeoutForEmptyEndpoints) throws InterruptedException { 281 try { 282 assertIsSatisfied(timeoutForEmptyEndpoints); 283 fail("Expected assertion failure!"); 284 } catch (AssertionError e) { 285 LOG.info("Caught expected failure: " + e); 286 } 287 } 288 289 /** 290 * Specifies the expected number of message exchanges that should be 291 * received by this endpoint 292 * 293 * @param expectedCount the number of message exchanges that should be 294 * expected by this endpoint 295 */ 296 public void expectedMessageCount(int expectedCount) { 297 setExpectedMessageCount(expectedCount); 298 } 299 300 /** 301 * Specifies the minimum number of expected message exchanges that should be 302 * received by this endpoint 303 * 304 * @param expectedCount the number of message exchanges that should be 305 * expected by this endpoint 306 */ 307 public void expectedMinimumMessageCount(int expectedCount) { 308 setMinimumExpectedMessageCount(expectedCount); 309 } 310 311 /** 312 * Adds an expectation that the given header name & value are received by this endpoint 313 */ 314 public void expectedHeaderReceived(final String name, final Object value) { 315 this.headerName = name; 316 this.headerValue = value; 317 318 expects(new Runnable() { 319 public void run() { 320 assertTrue("No header with name " + headerName + " found.", actualHeader != null); 321 322 Object actualValue = getCamelContext().getTypeConverter().convertTo(actualHeader.getClass(), headerValue); 323 assertEquals("Header of message", actualValue, actualHeader); 324 } 325 }); 326 } 327 328 /** 329 * Adds an expectation that the given property name & value are received by this endpoint 330 */ 331 public void expectedPropertyReceived(final String name, final Object value) { 332 this.propertyName = name; 333 this.propertyValue = value; 334 335 expects(new Runnable() { 336 public void run() { 337 assertTrue("No property with name " + propertyName + " found.", actualProperty != null); 338 339 Object actualValue = getCamelContext().getTypeConverter().convertTo(actualProperty.getClass(), propertyValue); 340 assertEquals("Property of message", actualValue, actualProperty); 341 } 342 }); 343 } 344 345 /** 346 * Adds an expectation that the given body values are received by this 347 * endpoint in the specified order 348 */ 349 public void expectedBodiesReceived(final List bodies) { 350 expectedMessageCount(bodies.size()); 351 this.expectedBodyValues = bodies; 352 this.actualBodyValues = new ArrayList(); 353 354 expects(new Runnable() { 355 public void run() { 356 for (int i = 0; i < expectedBodyValues.size(); i++) { 357 Exchange exchange = getReceivedExchanges().get(i); 358 assertTrue("No exchange received for counter: " + i, exchange != null); 359 360 Object expectedBody = expectedBodyValues.get(i); 361 Object actualBody = null; 362 if (i < actualBodyValues.size()) { 363 actualBody = actualBodyValues.get(i); 364 } 365 366 assertEquals("Body of message: " + i, expectedBody, actualBody); 367 } 368 } 369 }); 370 } 371 372 /** 373 * Adds an expectation that the given body values are received by this endpoint 374 */ 375 public void expectedBodiesReceived(Object... bodies) { 376 List bodyList = new ArrayList(); 377 bodyList.addAll(Arrays.asList(bodies)); 378 expectedBodiesReceived(bodyList); 379 } 380 381 /** 382 * Adds an expectation that the given body value are received by this endpoint 383 */ 384 public ExpressionClause expectedBodyReceived() { 385 final ExpressionClause clause = new ExpressionClause<MockEndpoint>(this); 386 387 expectedMessageCount(1); 388 389 expects(new Runnable() { 390 public void run() { 391 Exchange exchange = getReceivedExchanges().get(0); 392 assertTrue("No exchange received for counter: " + 0, exchange != null); 393 394 Object actualBody = exchange.getIn().getBody(); 395 Object expectedBody = clause.evaluate(exchange, Object.class); 396 397 assertEquals("Body of message: " + 0, expectedBody, actualBody); 398 } 399 }); 400 401 return clause; 402 } 403 404 /** 405 * Adds an expectation that the given body values are received by this 406 * endpoint in any order 407 */ 408 public void expectedBodiesReceivedInAnyOrder(final List bodies) { 409 expectedMessageCount(bodies.size()); 410 this.expectedBodyValues = bodies; 411 this.actualBodyValues = new ArrayList(); 412 413 expects(new Runnable() { 414 public void run() { 415 Set actualBodyValuesSet = new HashSet(actualBodyValues); 416 for (int i = 0; i < expectedBodyValues.size(); i++) { 417 Exchange exchange = getReceivedExchanges().get(i); 418 assertTrue("No exchange received for counter: " + i, exchange != null); 419 420 Object expectedBody = expectedBodyValues.get(i); 421 assertTrue("Message with body " + expectedBody 422 + " was expected but not found in " + actualBodyValuesSet, 423 actualBodyValuesSet.remove(expectedBody)); 424 } 425 } 426 }); 427 } 428 429 /** 430 * Adds an expectation that the given body values are received by this 431 * endpoint in any order 432 */ 433 @SuppressWarnings("unchecked") 434 public void expectedBodiesReceivedInAnyOrder(Object... bodies) { 435 List bodyList = new ArrayList(); 436 bodyList.addAll(Arrays.asList(bodies)); 437 expectedBodiesReceivedInAnyOrder(bodyList); 438 } 439 440 /** 441 * Adds an expection that a file exists with the given name 442 * 443 * @param name name of file, will cater for / and \ on different OS platforms 444 */ 445 public void expectedFileExists(final String name) { 446 expectedFileExists(name, null); 447 } 448 449 /** 450 * Adds an expection that a file exists with the given name 451 * <p/> 452 * Will wait at most 5 seconds while checking for the existence of the file. 453 * 454 * @param name name of file, will cater for / and \ on different OS platforms 455 * @param content content of file to compare, can be <tt>null</tt> to not compare content 456 */ 457 public void expectedFileExists(final String name, final String content) { 458 final File file = new File(FileUtil.normalizePath(name)).getAbsoluteFile(); 459 460 expects(new Runnable() { 461 public void run() { 462 // wait at most 5 seconds for the file to exists 463 final long timeout = System.currentTimeMillis() + 5000; 464 465 boolean stop = false; 466 while (!stop && !file.exists()) { 467 try { 468 Thread.sleep(50); 469 } catch (InterruptedException e) { 470 // ignore 471 } 472 stop = System.currentTimeMillis() > timeout; 473 } 474 475 assertTrue("The file should exists: " + name, file.exists()); 476 477 if (content != null) { 478 String body = getCamelContext().getTypeConverter().convertTo(String.class, file); 479 assertEquals("Content of file: " + name, content, body); 480 } 481 } 482 }); 483 } 484 485 /** 486 * Adds an expectation that messages received should have ascending values 487 * of the given expression such as a user generated counter value 488 */ 489 public void expectsAscending(final Expression expression) { 490 expects(new Runnable() { 491 public void run() { 492 assertMessagesAscending(expression); 493 } 494 }); 495 } 496 497 /** 498 * Adds an expectation that messages received should have ascending values 499 * of the given expression such as a user generated counter value 500 */ 501 public ExpressionClause expectsAscending() { 502 final ExpressionClause clause = new ExpressionClause<MockEndpoint>(this); 503 expects(new Runnable() { 504 public void run() { 505 assertMessagesAscending(clause.getExpressionValue()); 506 } 507 }); 508 return clause; 509 } 510 511 /** 512 * Adds an expectation that messages received should have descending values 513 * of the given expression such as a user generated counter value 514 */ 515 public void expectsDescending(final Expression expression) { 516 expects(new Runnable() { 517 public void run() { 518 assertMessagesDescending(expression); 519 } 520 }); 521 } 522 523 /** 524 * Adds an expectation that messages received should have descending values 525 * of the given expression such as a user generated counter value 526 */ 527 public ExpressionClause expectsDescending() { 528 final ExpressionClause clause = new ExpressionClause<MockEndpoint>(this); 529 expects(new Runnable() { 530 public void run() { 531 assertMessagesDescending(clause.getExpressionValue()); 532 } 533 }); 534 return clause; 535 } 536 537 /** 538 * Adds an expectation that no duplicate messages should be received using 539 * the expression to determine the message ID 540 * 541 * @param expression the expression used to create a unique message ID for 542 * message comparison (which could just be the message 543 * payload if the payload can be tested for uniqueness using 544 * {@link Object#equals(Object)} and 545 * {@link Object#hashCode()} 546 */ 547 public void expectsNoDuplicates(final Expression expression) { 548 expects(new Runnable() { 549 public void run() { 550 assertNoDuplicates(expression); 551 } 552 }); 553 } 554 555 /** 556 * Adds an expectation that no duplicate messages should be received using 557 * the expression to determine the message ID 558 */ 559 public ExpressionClause expectsNoDuplicates() { 560 final ExpressionClause clause = new ExpressionClause<MockEndpoint>(this); 561 expects(new Runnable() { 562 public void run() { 563 assertNoDuplicates(clause.getExpressionValue()); 564 } 565 }); 566 return clause; 567 } 568 569 /** 570 * Asserts that the messages have ascending values of the given expression 571 */ 572 public void assertMessagesAscending(Expression expression) { 573 assertMessagesSorted(expression, true); 574 } 575 576 /** 577 * Asserts that the messages have descending values of the given expression 578 */ 579 public void assertMessagesDescending(Expression expression) { 580 assertMessagesSorted(expression, false); 581 } 582 583 protected void assertMessagesSorted(Expression expression, boolean ascending) { 584 String type = ascending ? "ascending" : "descending"; 585 ExpressionComparator comparator = new ExpressionComparator(expression); 586 List<Exchange> list = getReceivedExchanges(); 587 for (int i = 1; i < list.size(); i++) { 588 int j = i - 1; 589 Exchange e1 = list.get(j); 590 Exchange e2 = list.get(i); 591 int result = comparator.compare(e1, e2); 592 if (result == 0) { 593 fail("Messages not " + type + ". Messages" + j + " and " + i + " are equal with value: " 594 + expression.evaluate(e1, Object.class) + " for expression: " + expression + ". Exchanges: " + e1 + " and " + e2); 595 } else { 596 if (!ascending) { 597 result = result * -1; 598 } 599 if (result > 0) { 600 fail("Messages not " + type + ". Message " + j + " has value: " + expression.evaluate(e1, Object.class) 601 + " and message " + i + " has value: " + expression.evaluate(e2, Object.class) + " for expression: " 602 + expression + ". Exchanges: " + e1 + " and " + e2); 603 } 604 } 605 } 606 } 607 608 public void assertNoDuplicates(Expression expression) { 609 Map<Object, Exchange> map = new HashMap<Object, Exchange>(); 610 List<Exchange> list = getReceivedExchanges(); 611 for (int i = 0; i < list.size(); i++) { 612 Exchange e2 = list.get(i); 613 Object key = expression.evaluate(e2, Object.class); 614 Exchange e1 = map.get(key); 615 if (e1 != null) { 616 fail("Duplicate message found on message " + i + " has value: " + key + " for expression: " + expression + ". Exchanges: " + e1 + " and " + e2); 617 } else { 618 map.put(key, e2); 619 } 620 } 621 } 622 623 /** 624 * Adds the expectation which will be invoked when enough messages are received 625 */ 626 public void expects(Runnable runnable) { 627 tests.add(runnable); 628 } 629 630 /** 631 * Adds an assertion to the given message index 632 * 633 * @param messageIndex the number of the message 634 * @return the assertion clause 635 */ 636 public AssertionClause message(final int messageIndex) { 637 AssertionClause clause = new AssertionClause() { 638 public void run() { 639 applyAssertionOn(MockEndpoint.this, messageIndex, assertExchangeReceived(messageIndex)); 640 } 641 }; 642 expects(clause); 643 return clause; 644 } 645 646 /** 647 * Adds an assertion to all the received messages 648 * 649 * @return the assertion clause 650 */ 651 public AssertionClause allMessages() { 652 AssertionClause clause = new AssertionClause() { 653 public void run() { 654 List<Exchange> list = getReceivedExchanges(); 655 int index = 0; 656 for (Exchange exchange : list) { 657 applyAssertionOn(MockEndpoint.this, index++, exchange); 658 } 659 } 660 }; 661 expects(clause); 662 return clause; 663 } 664 665 /** 666 * Asserts that the given index of message is received (starting at zero) 667 */ 668 public Exchange assertExchangeReceived(int index) { 669 int count = getReceivedCounter(); 670 assertTrue("Not enough messages received. Was: " + count, count > index); 671 return getReceivedExchanges().get(index); 672 } 673 674 // Properties 675 // ------------------------------------------------------------------------- 676 public List<Throwable> getFailures() { 677 return failures; 678 } 679 680 public int getReceivedCounter() { 681 return getReceivedExchanges().size(); 682 } 683 684 public List<Exchange> getReceivedExchanges() { 685 return receivedExchanges; 686 } 687 688 public int getExpectedCount() { 689 return expectedCount; 690 } 691 692 public long getSleepForEmptyTest() { 693 return sleepForEmptyTest; 694 } 695 696 /** 697 * Allows a sleep to be specified to wait to check that this endpoint really 698 * is empty when {@link #expectedMessageCount(int)} is called with zero 699 * 700 * @param sleepForEmptyTest the milliseconds to sleep for to determine that 701 * this endpoint really is empty 702 */ 703 public void setSleepForEmptyTest(long sleepForEmptyTest) { 704 this.sleepForEmptyTest = sleepForEmptyTest; 705 } 706 707 public long getResultWaitTime() { 708 return resultWaitTime; 709 } 710 711 /** 712 * Sets the maximum amount of time (in millis) the {@link #assertIsSatisfied()} will 713 * wait on a latch until it is satisfied 714 */ 715 public void setResultWaitTime(long resultWaitTime) { 716 this.resultWaitTime = resultWaitTime; 717 } 718 719 /** 720 * Sets the minimum expected amount of time (in millis) the {@link #assertIsSatisfied()} will 721 * wait on a latch until it is satisfied 722 */ 723 public void setMinimumResultWaitTime(long resultMinimumWaitTime) { 724 this.resultMinimumWaitTime = resultMinimumWaitTime; 725 } 726 727 /** 728 * Specifies the expected number of message exchanges that should be 729 * received by this endpoint 730 * 731 * @param expectedCount the number of message exchanges that should be 732 * expected by this endpoint 733 */ 734 public void setExpectedMessageCount(int expectedCount) { 735 this.expectedCount = expectedCount; 736 if (expectedCount <= 0) { 737 latch = null; 738 } else { 739 latch = new CountDownLatch(expectedCount); 740 } 741 } 742 743 /** 744 * Specifies the minimum number of expected message exchanges that should be 745 * received by this endpoint 746 * 747 * @param expectedCount the number of message exchanges that should be 748 * expected by this endpoint 749 */ 750 public void setMinimumExpectedMessageCount(int expectedCount) { 751 this.expectedMinimumCount = expectedCount; 752 if (expectedCount <= 0) { 753 latch = null; 754 } else { 755 latch = new CountDownLatch(expectedMinimumCount); 756 } 757 } 758 759 public Processor getReporter() { 760 return reporter; 761 } 762 763 /** 764 * Allows a processor to added to the endpoint to report on progress of the test 765 */ 766 public void setReporter(Processor reporter) { 767 this.reporter = reporter; 768 } 769 770 // Implementation methods 771 // ------------------------------------------------------------------------- 772 private void init() { 773 expectedCount = -1; 774 counter = 0; 775 processors = new HashMap<Integer, Processor>(); 776 receivedExchanges = new CopyOnWriteArrayList<Exchange>(); 777 failures = new CopyOnWriteArrayList<Throwable>(); 778 tests = new CopyOnWriteArrayList<Runnable>(); 779 latch = null; 780 sleepForEmptyTest = 0; 781 resultWaitTime = 20000L; 782 resultMinimumWaitTime = 0L; 783 expectedMinimumCount = -1; 784 expectedBodyValues = null; 785 actualBodyValues = new ArrayList(); 786 } 787 788 protected synchronized void onExchange(Exchange exchange) { 789 try { 790 if (reporter != null) { 791 reporter.process(exchange); 792 } 793 performAssertions(exchange); 794 } catch (Throwable e) { 795 // must catch throwable as AssertionException extends java.lang.Error 796 failures.add(e); 797 } finally { 798 // make sure latch is counted down to avoid test hanging forever 799 if (latch != null) { 800 latch.countDown(); 801 } 802 } 803 } 804 805 @SuppressWarnings("unchecked") 806 protected void performAssertions(Exchange exchange) throws Exception { 807 Message in = exchange.getIn(); 808 Object actualBody = in.getBody(); 809 810 if (headerName != null) { 811 actualHeader = in.getHeader(headerName); 812 } 813 814 if (propertyName != null) { 815 actualProperty = exchange.getProperty(propertyName); 816 } 817 818 if (expectedBodyValues != null) { 819 int index = actualBodyValues.size(); 820 if (expectedBodyValues.size() > index) { 821 Object expectedBody = expectedBodyValues.get(index); 822 if (expectedBody != null) { 823 actualBody = in.getBody(expectedBody.getClass()); 824 } 825 actualBodyValues.add(actualBody); 826 } 827 } 828 829 LOG.debug(getEndpointUri() + " >>>> " + (++counter) + " : " + exchange + " with body: " + actualBody); 830 831 receivedExchanges.add(exchange); 832 833 Processor processor = processors.get(getReceivedCounter()) != null 834 ? processors.get(getReceivedCounter()) : defaultProcessor; 835 836 if (processor != null) { 837 processor.process(exchange); 838 } 839 } 840 841 protected void waitForCompleteLatch() throws InterruptedException { 842 if (latch == null) { 843 fail("Should have a latch!"); 844 } 845 846 // now lets wait for the results 847 LOG.debug("Waiting on the latch for: " + resultWaitTime + " millis"); 848 long start = System.currentTimeMillis(); 849 latch.await(resultWaitTime, TimeUnit.MILLISECONDS); 850 long delta = System.currentTimeMillis() - start; 851 LOG.debug("Took " + delta + " millis to complete latch"); 852 853 if (resultMinimumWaitTime > 0 && delta < resultMinimumWaitTime) { 854 fail("Expected minimum " + resultMinimumWaitTime 855 + " millis waiting on the result, but was faster with " + delta + " millis."); 856 } 857 } 858 859 protected void assertEquals(String message, Object expectedValue, Object actualValue) { 860 if (!ObjectHelper.equal(expectedValue, actualValue)) { 861 fail(message + ". Expected: <" + expectedValue + "> but was: <" + actualValue + ">"); 862 } 863 } 864 865 protected void assertTrue(String message, boolean predicate) { 866 if (!predicate) { 867 fail(message); 868 } 869 } 870 871 protected void fail(Object message) { 872 if (LOG.isDebugEnabled()) { 873 List<Exchange> list = getReceivedExchanges(); 874 int index = 0; 875 for (Exchange exchange : list) { 876 LOG.debug("Received[" + (++index) + "]: " + exchange); 877 } 878 } 879 throw new AssertionError(getEndpointUri() + " " + message); 880 } 881 882 public int getExpectedMinimumCount() { 883 return expectedMinimumCount; 884 } 885 886 public void await() throws InterruptedException { 887 if (latch != null) { 888 latch.await(); 889 } 890 } 891 892 public boolean await(long timeout, TimeUnit unit) throws InterruptedException { 893 if (latch != null) { 894 return latch.await(timeout, unit); 895 } 896 return true; 897 } 898 899 public boolean isSingleton() { 900 return true; 901 } 902 }