Coverage Report - org.apache.camel.component.mock.MockEndpoint
 
Classes in this File Line Coverage Branch Coverage Complexity
MockEndpoint
68% 
71% 
0
 
 1  
 /**
 2  
  * Licensed to the Apache Software Foundation (ASF) under one or more
 3  
  * contributor license agreements.  See the NOTICE file distributed with
 4  
  * this work for additional information regarding copyright ownership.
 5  
  * The ASF licenses this file to You under the Apache License, Version 2.0
 6  
  * (the "License"); you may not use this file except in compliance with
 7  
  * the License.  You may obtain a copy of the License at
 8  
  *
 9  
  *      http://www.apache.org/licenses/LICENSE-2.0
 10  
  *
 11  
  * Unless required by applicable law or agreed to in writing, software
 12  
  * distributed under the License is distributed on an "AS IS" BASIS,
 13  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 14  
  * See the License for the specific language governing permissions and
 15  
  * limitations under the License.
 16  
  */
 17  
 package org.apache.camel.component.mock;
 18  
 
 19  
 import java.util.ArrayList;
 20  
 import java.util.HashMap;
 21  
 import java.util.List;
 22  
 import java.util.Map;
 23  
 import java.util.concurrent.CopyOnWriteArrayList;
 24  
 import java.util.concurrent.CountDownLatch;
 25  
 import java.util.concurrent.TimeUnit;
 26  
 
 27  
 import org.apache.camel.Component;
 28  
 import org.apache.camel.Consumer;
 29  
 import org.apache.camel.Exchange;
 30  
 import org.apache.camel.Expression;
 31  
 import org.apache.camel.Message;
 32  
 import org.apache.camel.Processor;
 33  
 import org.apache.camel.Producer;
 34  
 import org.apache.camel.impl.DefaultEndpoint;
 35  
 import org.apache.camel.impl.DefaultExchange;
 36  
 import org.apache.camel.impl.DefaultProducer;
 37  
 import org.apache.camel.util.ExpressionComparator;
 38  
 import org.apache.camel.util.ObjectHelper;
 39  
 import org.apache.commons.logging.Log;
 40  
 import org.apache.commons.logging.LogFactory;
 41  
 
 42  
 /**
 43  
  * A Mock endpoint which provides a literate, fluent API for testing routes
 44  
  * using a <a href="http://jmock.org/">JMock style</a> API.
 45  
  * 
 46  
  * @version $Revision: 1.1 $
 47  
  */
 48  378
 public class MockEndpoint extends DefaultEndpoint<Exchange> {
 49  3
     private static final transient Log LOG = LogFactory.getLog(MockEndpoint.class);
 50  246
     private int expectedCount = -1;
 51  
     private int counter;
 52  246
     private Map<Integer, Processor> processors = new HashMap<Integer, Processor>();
 53  246
     private List<Exchange> receivedExchanges = new CopyOnWriteArrayList<Exchange>();
 54  246
     private List<Throwable> failures = new CopyOnWriteArrayList<Throwable>();
 55  246
     private List<Runnable> tests = new CopyOnWriteArrayList<Runnable>();
 56  
     private CountDownLatch latch;
 57  246
     private long sleepForEmptyTest = 1000L;
 58  246
     private long defaulResultWaitMillis = 20000L;
 59  246
     private int expectedMinimumCount = -1;
 60  
     private List expectedBodyValues;
 61  246
     private List actualBodyValues = new ArrayList();
 62  
 
 63  
     public MockEndpoint(String endpointUri, Component component) {
 64  246
         super(endpointUri, component);
 65  246
     }
 66  
 
 67  
     public static void assertWait(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException {
 68  0
         long start = System.currentTimeMillis();
 69  0
         long left = unit.toMillis(timeout);
 70  0
         long end = start + left;
 71  0
         for (MockEndpoint endpoint : endpoints) {
 72  0
             if (!endpoint.await(left, TimeUnit.MILLISECONDS)) {
 73  0
                 throw new AssertionError("Timeout waiting for endpoints to receive enough messages. " + endpoint.getEndpointUri() + " timed out.");
 74  
             }
 75  0
             left = end - System.currentTimeMillis();
 76  0
             if (left <= 0) {
 77  0
                 left = 0;
 78  
             }
 79  
         }
 80  0
     }
 81  
 
 82  
     public static void assertIsSatisfied(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException {
 83  0
         assertWait(timeout, unit, endpoints);
 84  0
         for (MockEndpoint endpoint : endpoints) {
 85  0
             endpoint.assertIsSatisfied();
 86  
         }
 87  0
     }
 88  
 
 89  
     public static void assertIsSatisfied(MockEndpoint... endpoints) throws InterruptedException {
 90  222
         for (MockEndpoint endpoint : endpoints) {
 91  153
             endpoint.assertIsSatisfied();
 92  
         }
 93  69
     }
 94  
 
 95  
     public static void expectsMessageCount(int count, MockEndpoint... endpoints) throws InterruptedException {
 96  45
         for (MockEndpoint endpoint : endpoints) {
 97  18
             endpoint.expectsMessageCount(count);
 98  
         }
 99  27
     }
 100  
 
 101  
     public Exchange createExchange() {
 102  0
         return new DefaultExchange(getContext());
 103  
     }
 104  
 
 105  
     public Consumer<Exchange> createConsumer(Processor processor) throws Exception {
 106  0
         throw new UnsupportedOperationException("You cannot consume from this endpoint");
 107  
     }
 108  
 
 109  
     public Producer<Exchange> createProducer() throws Exception {
 110  243
         return new DefaultProducer<Exchange>(this) {
 111  243
             public void process(Exchange exchange) {
 112  279
                 onExchange(exchange);
 113  279
             }
 114  
         };
 115  
     }
 116  
 
 117  
     // Testing API
 118  
     // -------------------------------------------------------------------------
 119  
 
 120  
     /**
 121  
      * Validates that all the available expectations on this endpoint are
 122  
      * satisfied; or throw an exception
 123  
      */
 124  
     public void assertIsSatisfied() throws InterruptedException {
 125  249
         assertIsSatisfied(sleepForEmptyTest);
 126  237
     }
 127  
 
 128  
     /**
 129  
      * Validates that all the available expectations on this endpoint are
 130  
      * satisfied; or throw an exception
 131  
      * 
 132  
      * @param timeoutForEmptyEndpoints the timeout in milliseconds that we
 133  
      *                should wait for the test to be true
 134  
      */
 135  
     public void assertIsSatisfied(long timeoutForEmptyEndpoints) throws InterruptedException {
 136  249
         if (expectedCount >= 0) {
 137  165
             if (expectedCount != getReceivedCounter()) {
 138  30
                 if (expectedCount == 0) {
 139  
                     // lets wait a little bit just in case
 140  0
                     if (timeoutForEmptyEndpoints > 0) {
 141  0
                         LOG.debug("Sleeping for: " + timeoutForEmptyEndpoints + " millis to check there really are no messages received");
 142  0
                         Thread.sleep(timeoutForEmptyEndpoints);
 143  0
                     }
 144  
                 } else {
 145  30
                     waitForCompleteLatch();
 146  
                 }
 147  
             }
 148  165
             assertEquals("Received message count", expectedCount, getReceivedCounter());
 149  162
         } else if (expectedMinimumCount > 0 && getReceivedCounter() < expectedMinimumCount) {
 150  0
             waitForCompleteLatch();
 151  
         }
 152  
 
 153  246
         if (expectedMinimumCount >= 0) {
 154  0
             int receivedCounter = getReceivedCounter();
 155  0
             assertTrue("Received message count " + receivedCounter + ", expected at least " + expectedCount, expectedCount <= receivedCounter);
 156  
         }
 157  
 
 158  246
         for (Runnable test : tests) {
 159  102
             test.run();
 160  93
         }
 161  
 
 162  237
         for (Throwable failure : failures) {
 163  0
             if (failure != null) {
 164  0
                 LOG.error("Caught on " + getEndpointUri() + " Exception: " + failure, failure);
 165  0
                 fail("Failed due to caught exception: " + failure);
 166  
             }
 167  0
         }
 168  237
     }
 169  
 
 170  
     /**
 171  
      * Validates that the assertions fail on this endpoint
 172  
      */
 173  
     public void assertIsNotSatisfied() throws InterruptedException {
 174  
         try {
 175  9
             assertIsSatisfied();
 176  0
             fail("Expected assertion failure!");
 177  9
         } catch (AssertionError e) {
 178  9
             LOG.info("Caught expected failure: " + e);
 179  0
         }
 180  9
     }
 181  
 
 182  
     /**
 183  
      * Specifies the expected number of message exchanges that should be
 184  
      * received by this endpoint
 185  
      * 
 186  
      * @param expectedCount the number of message exchanges that should be
 187  
      *                expected by this endpoint
 188  
      */
 189  
     public void expectedMessageCount(int expectedCount) {
 190  165
         this.expectedCount = expectedCount;
 191  165
         if (expectedCount <= 0) {
 192  21
             latch = null;
 193  21
         } else {
 194  144
             latch = new CountDownLatch(expectedCount);
 195  
         }
 196  165
     }
 197  
 
 198  
     /**
 199  
      * Specifies the minimum number of expected message exchanges that should be
 200  
      * received by this endpoint
 201  
      * 
 202  
      * @param expectedCount the number of message exchanges that should be
 203  
      *                expected by this endpoint
 204  
      */
 205  
     public void expectedMinimumMessageCount(int expectedCount) {
 206  0
         this.expectedMinimumCount = expectedCount;
 207  0
         if (expectedCount <= 0) {
 208  0
             latch = null;
 209  0
         } else {
 210  0
             latch = new CountDownLatch(expectedMinimumCount);
 211  
         }
 212  0
     }
 213  
 
 214  
     /**
 215  
      * Adds an expectation that the given body values are received by this
 216  
      * endpoint
 217  
      */
 218  
     public void expectedBodiesReceived(final List bodies) {
 219  75
         expectedMessageCount(bodies.size());
 220  75
         this.expectedBodyValues = bodies;
 221  75
         this.actualBodyValues = new ArrayList();
 222  
 
 223  75
         expects(new Runnable() {
 224  75
             public void run() {
 225  174
                 for (int i = 0; i < expectedBodyValues.size(); i++) {
 226  102
                     Exchange exchange = getReceivedExchanges().get(i);
 227  102
                     assertTrue("No exchange received for counter: " + i, exchange != null);
 228  
 
 229  102
                     Object expectedBody = expectedBodyValues.get(i);
 230  102
                     Object actualBody = actualBodyValues.get(i);
 231  
 
 232  102
                     assertEquals("Body of message: " + i, expectedBody, actualBody);
 233  
                 }
 234  72
             }
 235  
         });
 236  75
     }
 237  
 
 238  
     /**
 239  
      * Adds an expectation that the given body values are received by this
 240  
      * endpoint
 241  
      */
 242  
     public void expectedBodiesReceived(Object... bodies) {
 243  75
         List bodyList = new ArrayList();
 244  180
         for (Object body : bodies) {
 245  105
             bodyList.add(body);
 246  
         }
 247  75
         expectedBodiesReceived(bodyList);
 248  75
     }
 249  
 
 250  
     /**
 251  
      * Adds an expectation that messages received should have ascending values
 252  
      * of the given expression such as a user generated counter value
 253  
      * 
 254  
      * @param expression
 255  
      */
 256  
     public void expectsAscending(final Expression<Exchange> expression) {
 257  6
         expects(new Runnable() {
 258  6
             public void run() {
 259  6
                 assertMessagesAscending(expression);
 260  3
             }
 261  
         });
 262  6
     }
 263  
 
 264  
     /**
 265  
      * Adds an expectation that messages received should have descending values
 266  
      * of the given expression such as a user generated counter value
 267  
      * 
 268  
      * @param expression
 269  
      */
 270  
     public void expectsDescending(final Expression<Exchange> expression) {
 271  6
         expects(new Runnable() {
 272  6
             public void run() {
 273  6
                 assertMessagesDescending(expression);
 274  3
             }
 275  
         });
 276  6
     }
 277  
 
 278  
     /**
 279  
      * Adds an expectation that no duplicate messages should be received using
 280  
      * the expression to determine the message ID
 281  
      * 
 282  
      * @param expression the expression used to create a unique message ID for
 283  
      *                message comparison (which could just be the message
 284  
      *                payload if the payload can be tested for uniqueness using
 285  
      *                {@link Object#equals(Object)} and
 286  
      *                {@link Object#hashCode()}
 287  
      */
 288  
     public void expectsNoDuplicates(final Expression<Exchange> expression) {
 289  6
         expects(new Runnable() {
 290  6
             public void run() {
 291  6
                 assertNoDuplicates(expression);
 292  3
             }
 293  
         });
 294  6
     }
 295  
 
 296  
     /**
 297  
      * Asserts that the messages have ascending values of the given expression
 298  
      */
 299  
     public void assertMessagesAscending(Expression<Exchange> expression) {
 300  6
         assertMessagesSorted(expression, true);
 301  3
     }
 302  
 
 303  
     /**
 304  
      * Asserts that the messages have descending values of the given expression
 305  
      */
 306  
     public void assertMessagesDescending(Expression<Exchange> expression) {
 307  6
         assertMessagesSorted(expression, false);
 308  3
     }
 309  
 
 310  
     protected void assertMessagesSorted(Expression<Exchange> expression, boolean ascending) {
 311  12
         String type = ascending ? "ascending" : "descending";
 312  12
         ExpressionComparator comparator = new ExpressionComparator(expression);
 313  12
         List<Exchange> list = getReceivedExchanges();
 314  54
         for (int i = 1; i < list.size(); i++) {
 315  48
             int j = i - 1;
 316  48
             Exchange e1 = list.get(j);
 317  48
             Exchange e2 = list.get(i);
 318  48
             int result = comparator.compare(e1, e2);
 319  48
             if (result == 0) {
 320  0
                 fail("Messages not " + type + ". Messages" + j + " and " + i + " are equal with value: " + expression.evaluate(e1) + " for expression: " + expression + ". Exchanges: " + e1 + " and "
 321  
                      + e2);
 322  0
             } else {
 323  48
                 if (!ascending) {
 324  24
                     result = result * -1;
 325  
                 }
 326  48
                 if (result > 0) {
 327  6
                     fail("Messages not " + type + ". Message " + j + " has value: " + expression.evaluate(e1) + " and message " + i + " has value: " + expression.evaluate(e2) + " for expression: "
 328  
                          + expression + ". Exchanges: " + e1 + " and " + e2);
 329  
                 }
 330  
             }
 331  
         }
 332  6
     }
 333  
 
 334  
     public void assertNoDuplicates(Expression<Exchange> expression) {
 335  6
         Map<Object, Exchange> map = new HashMap<Object, Exchange>();
 336  6
         List<Exchange> list = getReceivedExchanges();
 337  33
         for (int i = 0; i < list.size(); i++) {
 338  30
             Exchange e2 = list.get(i);
 339  30
             Object key = expression.evaluate(e2);
 340  30
             Exchange e1 = map.get(key);
 341  30
             if (e1 != null) {
 342  3
                 fail("Duplicate message found on message " + i + " has value: " + key + " for expression: " + expression + ". Exchanges: " + e1 + " and " + e2);
 343  0
             } else {
 344  27
                 map.put(key, e2);
 345  
             }
 346  
         }
 347  3
     }
 348  
 
 349  
     /**
 350  
      * Adds the expection which will be invoked when enough messages are
 351  
      * received
 352  
      */
 353  
     public void expects(Runnable runnable) {
 354  105
         tests.add(runnable);
 355  105
     }
 356  
 
 357  
     /**
 358  
      * Adds an assertion to the given message index
 359  
      * 
 360  
      * @param messageIndex the number of the message
 361  
      * @return the assertion clause
 362  
      */
 363  
     public AssertionClause message(final int messageIndex) {
 364  12
         AssertionClause clause = new AssertionClause() {
 365  12
             public void run() {
 366  12
                 applyAssertionOn(MockEndpoint.this, messageIndex, assertExchangeReceived(messageIndex));
 367  12
             }
 368  
         };
 369  12
         expects(clause);
 370  12
         return clause;
 371  
     }
 372  
 
 373  
     /**
 374  
      * Adds an assertion to all the received messages
 375  
      * 
 376  
      * @return the assertion clause
 377  
      */
 378  
     public AssertionClause allMessages() {
 379  0
         AssertionClause clause = new AssertionClause() {
 380  0
             public void run() {
 381  0
                 List<Exchange> list = getReceivedExchanges();
 382  0
                 int index = 0;
 383  0
                 for (Exchange exchange : list) {
 384  0
                     applyAssertionOn(MockEndpoint.this, index++, exchange);
 385  0
                 }
 386  0
             }
 387  
         };
 388  0
         expects(clause);
 389  0
         return clause;
 390  
     }
 391  
 
 392  
     /**
 393  
      * Asserts that the given index of message is received (starting at zero)
 394  
      */
 395  
     public Exchange assertExchangeReceived(int index) {
 396  12
         int count = getReceivedCounter();
 397  12
         assertTrue("Not enough messages received. Was: " + count, count > index);
 398  12
         return getReceivedExchanges().get(index);
 399  
     }
 400  
 
 401  
     // Properties
 402  
     // -------------------------------------------------------------------------
 403  
     public List<Throwable> getFailures() {
 404  0
         return failures;
 405  
     }
 406  
 
 407  
     public int getReceivedCounter() {
 408  621
         return getReceivedExchanges().size();
 409  
     }
 410  
 
 411  
     public List<Exchange> getReceivedExchanges() {
 412  762
         return receivedExchanges;
 413  
     }
 414  
 
 415  
     public int getExpectedCount() {
 416  0
         return expectedCount;
 417  
     }
 418  
 
 419  
     public long getSleepForEmptyTest() {
 420  0
         return sleepForEmptyTest;
 421  
     }
 422  
 
 423  
     /**
 424  
      * Allows a sleep to be specified to wait to check that this endpoint really
 425  
      * is empty when {@link #expectedMessageCount(int)} is called with zero
 426  
      * 
 427  
      * @param sleepForEmptyTest the milliseconds to sleep for to determine that
 428  
      *                this endpoint really is empty
 429  
      */
 430  
     public void setSleepForEmptyTest(long sleepForEmptyTest) {
 431  0
         this.sleepForEmptyTest = sleepForEmptyTest;
 432  0
     }
 433  
 
 434  
     public long getDefaulResultWaitMillis() {
 435  0
         return defaulResultWaitMillis;
 436  
     }
 437  
 
 438  
     /**
 439  
      * Sets the maximum amount of time the {@link #assertIsSatisfied()} will
 440  
      * wait on a latch until it is satisfied
 441  
      */
 442  
     public void setDefaulResultWaitMillis(long defaulResultWaitMillis) {
 443  18
         this.defaulResultWaitMillis = defaulResultWaitMillis;
 444  18
     }
 445  
 
 446  
     // Implementation methods
 447  
     // -------------------------------------------------------------------------
 448  
     protected synchronized void onExchange(Exchange exchange) {
 449  
         try {
 450  279
             Message in = exchange.getIn();
 451  279
             Object actualBody = in.getBody();
 452  
 
 453  279
             if (expectedBodyValues != null) {
 454  108
                 int index = actualBodyValues.size();
 455  108
                 if (expectedBodyValues.size() > index) {
 456  105
                     Object expectedBody = expectedBodyValues.get(index);
 457  105
                     if (expectedBody != null) {
 458  105
                         actualBody = in.getBody(expectedBody.getClass());
 459  
                     }
 460  105
                     actualBodyValues.add(actualBody);
 461  
                 }
 462  
             }
 463  
 
 464  279
             LOG.debug(getEndpointUri() + " >>>> " + (++counter) + " : " + exchange + " with body: " + actualBody);
 465  
 
 466  279
             receivedExchanges.add(exchange);
 467  
 
 468  279
             Processor processor = processors.get(getReceivedCounter());
 469  279
             if (processor != null) {
 470  0
                 processor.process(exchange);
 471  
             }
 472  
 
 473  279
             if (latch != null) {
 474  186
                 latch.countDown();
 475  
             }
 476  0
         } catch (Exception e) {
 477  0
             failures.add(e);
 478  279
         }
 479  279
     }
 480  
 
 481  
     protected void waitForCompleteLatch() throws InterruptedException {
 482  30
         if (latch == null) {
 483  0
             fail("Should have a latch!");
 484  
         }
 485  
 
 486  
         // now lets wait for the results
 487  30
         LOG.debug("Waiting on the latch for: " + defaulResultWaitMillis + " millis");
 488  30
         latch.await(defaulResultWaitMillis, TimeUnit.MILLISECONDS);
 489  30
     }
 490  
 
 491  
     protected void assertEquals(String message, Object expectedValue, Object actualValue) {
 492  267
         if (!ObjectHelper.equals(expectedValue, actualValue)) {
 493  3
             fail(message + ". Expected: <" + expectedValue + "> but was: <" + actualValue + ">");
 494  
         }
 495  264
     }
 496  
 
 497  
     protected void assertTrue(String message, boolean predicate) {
 498  114
         if (!predicate) {
 499  0
             fail(message);
 500  
         }
 501  114
     }
 502  
 
 503  
     protected void fail(Object message) {
 504  12
         if (LOG.isDebugEnabled()) {
 505  0
             List<Exchange> list = getReceivedExchanges();
 506  0
             int index = 0;
 507  0
             for (Exchange exchange : list) {
 508  0
                 LOG.debug("Received[" + (++index) + "]: " + exchange);
 509  0
             }
 510  
         }
 511  12
         throw new AssertionError(getEndpointUri() + " " + message);
 512  
     }
 513  
 
 514  
     public int getExpectedMinimumCount() {
 515  0
         return expectedMinimumCount;
 516  
     }
 517  
 
 518  
     public void await() throws InterruptedException {
 519  0
         if (latch != null) {
 520  0
             latch.await();
 521  
         }
 522  0
     }
 523  
 
 524  
     public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
 525  0
         if (latch != null) {
 526  0
             return latch.await(timeout, unit);
 527  
         }
 528  0
         return true;
 529  
     }
 530  
 
 531  
     public boolean isSingleton() {
 532  246
         return true;
 533  
     }
 534  
 
 535  
 }