001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.mock;
018    
019    import java.beans.PropertyChangeListener;
020    import java.beans.PropertyChangeSupport;
021    import java.io.File;
022    import java.util.ArrayList;
023    import java.util.Arrays;
024    import java.util.Collection;
025    import java.util.HashMap;
026    import java.util.HashSet;
027    import java.util.List;
028    import java.util.Map;
029    import java.util.Set;
030    import java.util.concurrent.CopyOnWriteArrayList;
031    import java.util.concurrent.CountDownLatch;
032    import java.util.concurrent.TimeUnit;
033    
034    import org.apache.camel.CamelContext;
035    import org.apache.camel.Component;
036    import org.apache.camel.Consumer;
037    import org.apache.camel.Endpoint;
038    import org.apache.camel.Exchange;
039    import org.apache.camel.Expression;
040    import org.apache.camel.Message;
041    import org.apache.camel.Processor;
042    import org.apache.camel.Producer;
043    import org.apache.camel.impl.DefaultEndpoint;
044    import org.apache.camel.impl.DefaultProducer;
045    import org.apache.camel.spi.BrowsableEndpoint;
046    import org.apache.camel.util.CamelContextHelper;
047    import org.apache.camel.util.ExpressionComparator;
048    import org.apache.camel.util.FileUtil;
049    import org.apache.camel.util.ObjectHelper;
050    import org.apache.commons.logging.Log;
051    import org.apache.commons.logging.LogFactory;
052    
053    /**
054     * A Mock endpoint which provides a literate, fluent API for testing routes
055     * using a <a href="http://jmock.org/">JMock style</a> API.
056     *
057     * @version $Revision: 752464 $
058     */
059    public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint {
060        private static final transient Log LOG = LogFactory.getLog(MockEndpoint.class);
061        private int expectedCount;
062        private int counter;
063        private Processor defaultProcessor;
064        private Map<Integer, Processor> processors;
065        private List<Exchange> receivedExchanges;
066        private List<Throwable> failures;
067        private List<Runnable> tests;
068        private CountDownLatch latch;
069        private long sleepForEmptyTest;
070        private long resultWaitTime;
071        private long resultMinimumWaitTime;
072        private int expectedMinimumCount;
073        private List expectedBodyValues;
074        private List actualBodyValues;
075        private final PropertyChangeSupport propertyChangeSupport = new PropertyChangeSupport(this);
076        private String headerName;
077        private Object headerValue;
078        private Object actualHeader;
079        private Processor reporter;
080    
081        public MockEndpoint(String endpointUri, Component component) {
082            super(endpointUri, component);
083            init();
084        }
085    
086        public MockEndpoint(String endpointUri) {
087            super(endpointUri);
088            init();
089        }
090    
091        public MockEndpoint() {
092            this(null);
093        }
094    
095        /**
096         * A helper method to resolve the mock endpoint of the given URI on the given context
097         *
098         * @param context the camel context to try resolve the mock endpoint from
099         * @param uri the uri of the endpoint to resolve
100         * @return the endpoint
101         */
102        public static MockEndpoint resolve(CamelContext context, String uri) {
103            return CamelContextHelper.getMandatoryEndpoint(context, uri, MockEndpoint.class);
104        }
105    
106        public static void assertWait(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException {
107            long start = System.currentTimeMillis();
108            long left = unit.toMillis(timeout);
109            long end = start + left;
110            for (MockEndpoint endpoint : endpoints) {
111                if (!endpoint.await(left, TimeUnit.MILLISECONDS)) {
112                    throw new AssertionError("Timeout waiting for endpoints to receive enough messages. " + endpoint.getEndpointUri() + " timed out.");
113                }
114                left = end - System.currentTimeMillis();
115                if (left <= 0) {
116                    left = 0;
117                }
118            }
119        }
120    
121        public static void assertIsSatisfied(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException {
122            assertWait(timeout, unit, endpoints);
123            for (MockEndpoint endpoint : endpoints) {
124                endpoint.assertIsSatisfied();
125            }
126        }
127    
128        public static void assertIsSatisfied(MockEndpoint... endpoints) throws InterruptedException {
129            for (MockEndpoint endpoint : endpoints) {
130                endpoint.assertIsSatisfied();
131            }
132        }
133    
134    
135        /**
136         * Asserts that all the expectations on any {@link MockEndpoint} instances registered
137         * in the given context are valid
138         *
139         * @param context the camel context used to find all the available endpoints to be asserted
140         */
141        public static void assertIsSatisfied(CamelContext context) throws InterruptedException {
142            ObjectHelper.notNull(context, "camelContext");
143            Collection<Endpoint> endpoints = context.getSingletonEndpoints();
144            for (Endpoint endpoint : endpoints) {
145                if (endpoint instanceof MockEndpoint) {
146                    MockEndpoint mockEndpoint = (MockEndpoint) endpoint;
147                    mockEndpoint.assertIsSatisfied();
148                }
149            }
150        }
151    
152        public static void expectsMessageCount(int count, MockEndpoint... endpoints) throws InterruptedException {
153            for (MockEndpoint endpoint : endpoints) {
154                endpoint.setExpectedMessageCount(count);
155            }
156        }
157    
158        public List<Exchange> getExchanges() {
159            return getReceivedExchanges();
160        }
161    
162        public void addPropertyChangeListener(PropertyChangeListener listener) {
163            propertyChangeSupport.addPropertyChangeListener(listener);
164        }
165    
166        public void removePropertyChangeListener(PropertyChangeListener listener) {
167            propertyChangeSupport.removePropertyChangeListener(listener);
168        }
169    
170        public Consumer createConsumer(Processor processor) throws Exception {
171            throw new UnsupportedOperationException("You cannot consume from this endpoint");
172        }
173    
174        public Producer createProducer() throws Exception {
175            return new DefaultProducer(this) {
176                public void process(Exchange exchange) {
177                    onExchange(exchange);
178                }
179            };
180        }
181    
182        public void reset() {
183            init();
184        }
185    
186    
187        // Testing API
188        // -------------------------------------------------------------------------
189    
190        /**
191         * Set the processor that will be invoked when the index
192         * message is received.
193         */
194        public void whenExchangeReceived(int index, Processor processor) {
195            this.processors.put(index, processor);
196        }
197    
198        /**
199         * Set the processor that will be invoked when the some message
200         * is received.
201         *
202         * This processor could be overwritten by
203         * {@link #whenExchangeReceived(int, Processor)} method.
204         */
205        public void whenAnyExchangeReceived(Processor processor) {
206            this.defaultProcessor = processor;
207        }
208    
209        /**
210         * Validates that all the available expectations on this endpoint are
211         * satisfied; or throw an exception
212         */
213        public void assertIsSatisfied() throws InterruptedException {
214            assertIsSatisfied(sleepForEmptyTest);
215        }
216    
217        /**
218         * Validates that all the available expectations on this endpoint are
219         * satisfied; or throw an exception
220         *
221         * @param timeoutForEmptyEndpoints the timeout in milliseconds that we
222         *                should wait for the test to be true
223         */
224        public void assertIsSatisfied(long timeoutForEmptyEndpoints) throws InterruptedException {
225            LOG.info("Asserting: " + this + " is satisfied");
226            if (expectedCount == 0) {
227                if (timeoutForEmptyEndpoints > 0) {
228                    LOG.debug("Sleeping for: " + timeoutForEmptyEndpoints + " millis to check there really are no messages received");
229                    Thread.sleep(timeoutForEmptyEndpoints);
230                }
231                assertEquals("Received message count", expectedCount, getReceivedCounter());
232            } else if (expectedCount > 0) {
233                if (expectedCount != getReceivedCounter()) {
234                    waitForCompleteLatch();
235                }
236                assertEquals("Received message count", expectedCount, getReceivedCounter());
237            } else if (expectedMinimumCount > 0 && getReceivedCounter() < expectedMinimumCount) {
238                waitForCompleteLatch();
239            }
240    
241            if (expectedMinimumCount >= 0) {
242                int receivedCounter = getReceivedCounter();
243                assertTrue("Received message count " + receivedCounter + ", expected at least " + expectedMinimumCount, expectedMinimumCount <= receivedCounter);
244            }
245    
246            for (Runnable test : tests) {
247                test.run();
248            }
249    
250            for (Throwable failure : failures) {
251                if (failure != null) {
252                    LOG.error("Caught on " + getEndpointUri() + " Exception: " + failure, failure);
253                    fail("Failed due to caught exception: " + failure);
254                }
255            }
256        }
257    
258        /**
259         * Validates that the assertions fail on this endpoint
260         */
261        public void assertIsNotSatisfied() throws InterruptedException {
262            try {
263                assertIsSatisfied();
264                fail("Expected assertion failure!");
265            } catch (AssertionError e) {
266                LOG.info("Caught expected failure: " + e);
267            }
268        }
269    
270        /**
271         * Validates that the assertions fail on this endpoint
272    
273         * @param timeoutForEmptyEndpoints the timeout in milliseconds that we
274         *        should wait for the test to be true
275         */
276        public void assertIsNotSatisfied(long timeoutForEmptyEndpoints) throws InterruptedException {
277            try {
278                assertIsSatisfied(timeoutForEmptyEndpoints);
279                fail("Expected assertion failure!");
280            } catch (AssertionError e) {
281                LOG.info("Caught expected failure: " + e);
282            }
283        }    
284        
285        /**
286         * Specifies the expected number of message exchanges that should be
287         * received by this endpoint
288         *
289         * @param expectedCount the number of message exchanges that should be
290         *                expected by this endpoint
291         */
292        public void expectedMessageCount(int expectedCount) {
293            setExpectedMessageCount(expectedCount);
294        }
295    
296        /**
297         * Specifies the minimum number of expected message exchanges that should be
298         * received by this endpoint
299         *
300         * @param expectedCount the number of message exchanges that should be
301         *                expected by this endpoint
302         */
303        public void expectedMinimumMessageCount(int expectedCount) {
304            setMinimumExpectedMessageCount(expectedCount);
305        }
306    
307        /**
308         * Adds an expectation that the given header name & value are received by this
309         * endpoint
310         */
311        public void expectedHeaderReceived(final String name, final Object value) {
312            this.headerName = name;
313            this.headerValue = value;
314    
315            expects(new Runnable() {
316                public void run() {
317                    assertTrue("No header with name " + headerName + " found.", actualHeader != null);
318    
319                    Object actualValue = getCamelContext().getTypeConverter().convertTo(actualHeader.getClass(), headerValue);
320                    assertEquals("Header of message", actualValue, actualHeader);
321                }
322            });
323        }
324    
325        /**
326         * Adds an expectation that the given body values are received by this
327         * endpoint in the specified order
328         */
329        public void expectedBodiesReceived(final List bodies) {
330            expectedMessageCount(bodies.size());
331            this.expectedBodyValues = bodies;
332            this.actualBodyValues = new ArrayList();
333    
334            expects(new Runnable() {
335                public void run() {
336                    for (int i = 0; i < expectedBodyValues.size(); i++) {
337                        Exchange exchange = getReceivedExchanges().get(i);
338                        assertTrue("No exchange received for counter: " + i, exchange != null);
339    
340                        Object expectedBody = expectedBodyValues.get(i);
341                        Object actualBody = null;
342                        if (i < actualBodyValues.size()) {
343                            actualBody = actualBodyValues.get(i);
344                        }
345    
346                        assertEquals("Body of message: " + i, expectedBody, actualBody);
347                    }
348                }
349            });
350        }
351    
352        /**
353         * Adds an expectation that the given body values are received by this
354         * endpoint
355         */
356        public void expectedBodiesReceived(Object... bodies) {
357            List bodyList = new ArrayList();
358            bodyList.addAll(Arrays.asList(bodies));
359            expectedBodiesReceived(bodyList);
360        }
361    
362        /**
363         * Adds an expectation that the given body values are received by this
364         * endpoint in any order
365         */
366        public void expectedBodiesReceivedInAnyOrder(final List bodies) {
367            expectedMessageCount(bodies.size());
368            this.expectedBodyValues = bodies;
369            this.actualBodyValues = new ArrayList();
370    
371            expects(new Runnable() {
372                public void run() {
373                    Set actualBodyValuesSet = new HashSet(actualBodyValues);
374                    for (int i = 0; i < expectedBodyValues.size(); i++) {
375                        Exchange exchange = getReceivedExchanges().get(i);
376                        assertTrue("No exchange received for counter: " + i, exchange != null);
377    
378                        Object expectedBody = expectedBodyValues.get(i);
379                        assertTrue("Message with body " + expectedBody
380                                + " was expected but not found in " + actualBodyValuesSet,
381                                actualBodyValuesSet.remove(expectedBody));
382                    }
383                }
384            });
385        }
386    
387        /**
388         * Adds an expectation that the given body values are received by this
389         * endpoint in any order
390         */
391        @SuppressWarnings("unchecked")
392        public void expectedBodiesReceivedInAnyOrder(Object... bodies) {
393            List bodyList = new ArrayList();
394            bodyList.addAll(Arrays.asList(bodies));
395            expectedBodiesReceivedInAnyOrder(bodyList);
396        }
397    
398        /**
399         * Adds an expection that a file exists with the given name
400         *
401         * @param name name of file, will cater for / and \ on different OS platforms
402         */
403        public void expectedFileExists(final String name) {
404            expectedFileExists(name, null);
405        }
406    
407        /**
408         * Adds an expection that a file exists with the given name
409         * <p/>
410         * Will wait at most 5 seconds while checking for the existence of the file.
411         *
412         * @param name name of file, will cater for / and \ on different OS platforms
413         * @param content content of file to compare, can be <tt>null</tt> to not compare content
414         */
415        public void expectedFileExists(final String name, final String content) {
416            final File file = new File(FileUtil.normalizePath(name)).getAbsoluteFile();
417    
418            expects(new Runnable() {
419                public void run() {
420                    // wait at most 5 seconds for the file to exists
421                    final long timeout = System.currentTimeMillis() + 5000;
422    
423                    boolean stop = false;
424                    while (!stop && !file.exists()) {
425                        try {
426                            Thread.sleep(50);
427                        } catch (InterruptedException e) {
428                            // ignore
429                        }
430                        stop = System.currentTimeMillis() > timeout;
431                    }
432    
433                    assertTrue("The file should exists: " + name, file.exists());
434    
435                    if (content != null) {
436                        String body = getCamelContext().getTypeConverter().convertTo(String.class, file);
437                        assertEquals("Content of file: " + name, content, body);
438                    }
439                }
440            });
441        }
442    
443        /**
444         * Adds an expectation that messages received should have ascending values
445         * of the given expression such as a user generated counter value
446         */
447        public void expectsAscending(final Expression expression) {
448            expects(new Runnable() {
449                public void run() {
450                    assertMessagesAscending(expression);
451                }
452            });
453        }
454    
455        /**
456         * Adds an expectation that messages received should have descending values
457         * of the given expression such as a user generated counter value
458         */
459        public void expectsDescending(final Expression expression) {
460            expects(new Runnable() {
461                public void run() {
462                    assertMessagesDescending(expression);
463                }
464            });
465        }
466    
467        /**
468         * Adds an expectation that no duplicate messages should be received using
469         * the expression to determine the message ID
470         *
471         * @param expression the expression used to create a unique message ID for
472         *                message comparison (which could just be the message
473         *                payload if the payload can be tested for uniqueness using
474         *                {@link Object#equals(Object)} and
475         *                {@link Object#hashCode()}
476         */
477        public void expectsNoDuplicates(final Expression expression) {
478            expects(new Runnable() {
479                public void run() {
480                    assertNoDuplicates(expression);
481                }
482            });
483        }
484    
485        /**
486         * Asserts that the messages have ascending values of the given expression
487         */
488        public void assertMessagesAscending(Expression expression) {
489            assertMessagesSorted(expression, true);
490        }
491    
492        /**
493         * Asserts that the messages have descending values of the given expression
494         */
495        public void assertMessagesDescending(Expression expression) {
496            assertMessagesSorted(expression, false);
497        }
498    
499        protected void assertMessagesSorted(Expression expression, boolean ascending) {
500            String type = ascending ? "ascending" : "descending";
501            ExpressionComparator comparator = new ExpressionComparator(expression);
502            List<Exchange> list = getReceivedExchanges();
503            for (int i = 1; i < list.size(); i++) {
504                int j = i - 1;
505                Exchange e1 = list.get(j);
506                Exchange e2 = list.get(i);
507                int result = comparator.compare(e1, e2);
508                if (result == 0) {
509                    fail("Messages not " + type + ". Messages" + j + " and " + i + " are equal with value: " + expression.evaluate(e1) + " for expression: " + expression + ". Exchanges: " + e1 + " and "
510                         + e2);
511                } else {
512                    if (!ascending) {
513                        result = result * -1;
514                    }
515                    if (result > 0) {
516                        fail("Messages not " + type + ". Message " + j + " has value: " + expression.evaluate(e1) + " and message " + i + " has value: " + expression.evaluate(e2) + " for expression: "
517                             + expression + ". Exchanges: " + e1 + " and " + e2);
518                    }
519                }
520            }
521        }
522    
523        public void assertNoDuplicates(Expression expression) {
524            Map<Object, Exchange> map = new HashMap<Object, Exchange>();
525            List<Exchange> list = getReceivedExchanges();
526            for (int i = 0; i < list.size(); i++) {
527                Exchange e2 = list.get(i);
528                Object key = expression.evaluate(e2);
529                Exchange e1 = map.get(key);
530                if (e1 != null) {
531                    fail("Duplicate message found on message " + i + " has value: " + key + " for expression: " + expression + ". Exchanges: " + e1 + " and " + e2);
532                } else {
533                    map.put(key, e2);
534                }
535            }
536        }
537    
538        /**
539         * Adds the expectation which will be invoked when enough messages are
540         * received
541         */
542        public void expects(Runnable runnable) {
543            tests.add(runnable);
544        }
545    
546        /**
547         * Adds an assertion to the given message index
548         *
549         * @param messageIndex the number of the message
550         * @return the assertion clause
551         */
552        public AssertionClause message(final int messageIndex) {
553            AssertionClause clause = new AssertionClause() {
554                public void run() {
555                    applyAssertionOn(MockEndpoint.this, messageIndex, assertExchangeReceived(messageIndex));
556                }
557            };
558            expects(clause);
559            return clause;
560        }
561    
562        /**
563         * Adds an assertion to all the received messages
564         *
565         * @return the assertion clause
566         */
567        public AssertionClause allMessages() {
568            AssertionClause clause = new AssertionClause() {
569                public void run() {
570                    List<Exchange> list = getReceivedExchanges();
571                    int index = 0;
572                    for (Exchange exchange : list) {
573                        applyAssertionOn(MockEndpoint.this, index++, exchange);
574                    }
575                }
576            };
577            expects(clause);
578            return clause;
579        }
580    
581        /**
582         * Asserts that the given index of message is received (starting at zero)
583         */
584        public Exchange assertExchangeReceived(int index) {
585            int count = getReceivedCounter();
586            assertTrue("Not enough messages received. Was: " + count, count > index);
587            return getReceivedExchanges().get(index);
588        }
589    
590        // Properties
591        // -------------------------------------------------------------------------
592        public List<Throwable> getFailures() {
593            return failures;
594        }
595    
596        public int getReceivedCounter() {
597            return getReceivedExchanges().size();
598        }
599    
600        public List<Exchange> getReceivedExchanges() {
601            return receivedExchanges;
602        }
603    
604        public int getExpectedCount() {
605            return expectedCount;
606        }
607    
608        public long getSleepForEmptyTest() {
609            return sleepForEmptyTest;
610        }
611    
612        /**
613         * Allows a sleep to be specified to wait to check that this endpoint really
614         * is empty when {@link #expectedMessageCount(int)} is called with zero
615         *
616         * @param sleepForEmptyTest the milliseconds to sleep for to determine that
617         *                this endpoint really is empty
618         */
619        public void setSleepForEmptyTest(long sleepForEmptyTest) {
620            this.sleepForEmptyTest = sleepForEmptyTest;
621        }
622    
623        public long getResultWaitTime() {
624            return resultWaitTime;
625        }
626    
627        /**
628         * Sets the maximum amount of time (in millis) the {@link #assertIsSatisfied()} will
629         * wait on a latch until it is satisfied
630         */
631        public void setResultWaitTime(long resultWaitTime) {
632            this.resultWaitTime = resultWaitTime;
633        }
634    
635        /**
636         * Sets the minimum expected amount of time (in millis) the {@link #assertIsSatisfied()} will
637         * wait on a latch until it is satisfied
638         */
639        public void setMinimumResultWaitTime(long resultMinimumWaitTime) {
640            this.resultMinimumWaitTime = resultMinimumWaitTime;
641        }
642    
643        /**
644         * Specifies the expected number of message exchanges that should be
645         * received by this endpoint
646         *
647         * @param expectedCount the number of message exchanges that should be
648         *                expected by this endpoint
649         */
650        public void setExpectedMessageCount(int expectedCount) {
651            this.expectedCount = expectedCount;
652            if (expectedCount <= 0) {
653                latch = null;
654            } else {
655                latch = new CountDownLatch(expectedCount);
656            }
657        }
658    
659        /**
660         * Specifies the minimum number of expected message exchanges that should be
661         * received by this endpoint
662         *
663         * @param expectedCount the number of message exchanges that should be
664         *                expected by this endpoint
665         */
666        public void setMinimumExpectedMessageCount(int expectedCount) {
667            this.expectedMinimumCount = expectedCount;
668            if (expectedCount <= 0) {
669                latch = null;
670            } else {
671                latch = new CountDownLatch(expectedMinimumCount);
672            }
673        }
674    
675        public Processor getReporter() {
676            return reporter;
677        }
678    
679        /**
680         * Allows a processor to added to the endpoint to report on progress of the test
681         */
682        public void setReporter(Processor reporter) {
683            this.reporter = reporter;
684        }
685    
686        // Implementation methods
687        // -------------------------------------------------------------------------
688        private void init() {
689            expectedCount = -1;
690            counter = 0;
691            processors = new HashMap<Integer, Processor>();
692            receivedExchanges = new CopyOnWriteArrayList<Exchange>();
693            failures = new CopyOnWriteArrayList<Throwable>();
694            tests = new CopyOnWriteArrayList<Runnable>();
695            latch = null;
696            sleepForEmptyTest = 0;
697            resultWaitTime = 20000L;
698            resultMinimumWaitTime = 0L;
699            expectedMinimumCount = -1;
700            expectedBodyValues = null;
701            actualBodyValues = new ArrayList();
702        }
703    
704        protected synchronized void onExchange(Exchange exchange) {
705            try {
706                if (reporter != null) {
707                    reporter.process(exchange);
708                }
709    
710                performAssertions(exchange);
711            } catch (Exception e) {
712                failures.add(e);
713            }
714            if (latch != null) {
715                latch.countDown();
716            }
717        }
718    
719        @SuppressWarnings("unchecked")
720        protected void performAssertions(Exchange exchange) throws Exception {
721            Message in = exchange.getIn();
722            Object actualBody = in.getBody();
723    
724            if (headerName != null) {
725                actualHeader = in.getHeader(headerName);
726            }
727    
728            if (expectedBodyValues != null) {
729                int index = actualBodyValues.size();
730                if (expectedBodyValues.size() > index) {
731                    Object expectedBody = expectedBodyValues.get(index);
732                    if (expectedBody != null) {
733                        actualBody = in.getBody(expectedBody.getClass());
734                    }
735                    actualBodyValues.add(actualBody);
736                }
737            }
738    
739            LOG.debug(getEndpointUri() + " >>>> " + (++counter) + " : " + exchange + " with body: " + actualBody);
740    
741            receivedExchanges.add(exchange);
742    
743            Processor processor = processors.get(getReceivedCounter()) != null
744                    ? processors.get(getReceivedCounter()) : defaultProcessor;
745    
746            if (processor != null) {
747                processor.process(exchange);
748            }
749        }
750    
751        protected void waitForCompleteLatch() throws InterruptedException {
752            if (latch == null) {
753                fail("Should have a latch!");
754            }
755    
756            // now lets wait for the results
757            LOG.debug("Waiting on the latch for: " + resultWaitTime + " millis");
758            long start = System.currentTimeMillis();
759            latch.await(resultWaitTime, TimeUnit.MILLISECONDS);
760            long delta = System.currentTimeMillis() - start;
761            LOG.debug("Took " + delta + " millis to complete latch");
762    
763            if (resultMinimumWaitTime > 0 && delta < resultMinimumWaitTime) {
764                fail("Expected minimum " + resultMinimumWaitTime
765                        + " millis waiting on the result, but was faster with " + delta + " millis.");
766            }
767        }
768    
769        protected void assertEquals(String message, Object expectedValue, Object actualValue) {
770            if (!ObjectHelper.equal(expectedValue, actualValue)) {
771                fail(message + ". Expected: <" + expectedValue + "> but was: <" + actualValue + ">");
772            }
773        }
774    
775        protected void assertTrue(String message, boolean predicate) {
776            if (!predicate) {
777                fail(message);
778            }
779        }
780    
781        protected void fail(Object message) {
782            if (LOG.isDebugEnabled()) {
783                List<Exchange> list = getReceivedExchanges();
784                int index = 0;
785                for (Exchange exchange : list) {
786                    LOG.debug("Received[" + (++index) + "]: " + exchange);
787                }
788            }
789            throw new AssertionError(getEndpointUri() + " " + message);
790        }
791    
792        public int getExpectedMinimumCount() {
793            return expectedMinimumCount;
794        }
795    
796        public void await() throws InterruptedException {
797            if (latch != null) {
798                latch.await();
799            }
800        }
801    
802        public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
803            if (latch != null) {
804                return latch.await(timeout, unit);
805            }
806            return true;
807        }
808    
809        public boolean isSingleton() {
810            return true;
811        }
812    }