001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.mock;
018    
019    import java.beans.PropertyChangeListener;
020    import java.beans.PropertyChangeSupport;
021    import java.io.File;
022    import java.util.ArrayList;
023    import java.util.Arrays;
024    import java.util.Collection;
025    import java.util.HashMap;
026    import java.util.HashSet;
027    import java.util.List;
028    import java.util.Map;
029    import java.util.Set;
030    import java.util.concurrent.CopyOnWriteArrayList;
031    import java.util.concurrent.CountDownLatch;
032    import java.util.concurrent.TimeUnit;
033    
034    import org.apache.camel.CamelContext;
035    import org.apache.camel.Component;
036    import org.apache.camel.Consumer;
037    import org.apache.camel.Endpoint;
038    import org.apache.camel.Exchange;
039    import org.apache.camel.Expression;
040    import org.apache.camel.Message;
041    import org.apache.camel.Processor;
042    import org.apache.camel.Producer;
043    import org.apache.camel.builder.ExpressionClause;
044    import org.apache.camel.impl.DefaultEndpoint;
045    import org.apache.camel.impl.DefaultProducer;
046    import org.apache.camel.spi.BrowsableEndpoint;
047    import org.apache.camel.util.CamelContextHelper;
048    import org.apache.camel.util.ExpressionComparator;
049    import org.apache.camel.util.FileUtil;
050    import org.apache.camel.util.ObjectHelper;
051    import org.apache.commons.logging.Log;
052    import org.apache.commons.logging.LogFactory;
053    
054    /**
055     * A Mock endpoint which provides a literate, fluent API for testing routes
056     * using a <a href="http://jmock.org/">JMock style</a> API.
057     *
058     * @version $Revision: 766288 $
059     */
060    public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint {
061        private static final transient Log LOG = LogFactory.getLog(MockEndpoint.class);
062        private int expectedCount;
063        private int counter;
064        private Processor defaultProcessor;
065        private Map<Integer, Processor> processors;
066        private List<Exchange> receivedExchanges;
067        private List<Throwable> failures;
068        private List<Runnable> tests;
069        private CountDownLatch latch;
070        private long sleepForEmptyTest;
071        private long resultWaitTime;
072        private long resultMinimumWaitTime;
073        private int expectedMinimumCount;
074        private List expectedBodyValues;
075        private List actualBodyValues;
076        private final PropertyChangeSupport propertyChangeSupport = new PropertyChangeSupport(this);
077        private String headerName;
078        private Object headerValue;
079        private Object actualHeader;
080        private String propertyName;
081        private Object propertyValue;
082        private Object actualProperty;
083        private Processor reporter;
084    
085        public MockEndpoint(String endpointUri, Component component) {
086            super(endpointUri, component);
087            init();
088        }
089    
090        public MockEndpoint(String endpointUri) {
091            super(endpointUri);
092            init();
093        }
094    
095        public MockEndpoint() {
096            this(null);
097        }
098    
099        /**
100         * A helper method to resolve the mock endpoint of the given URI on the given context
101         *
102         * @param context the camel context to try resolve the mock endpoint from
103         * @param uri the uri of the endpoint to resolve
104         * @return the endpoint
105         */
106        public static MockEndpoint resolve(CamelContext context, String uri) {
107            return CamelContextHelper.getMandatoryEndpoint(context, uri, MockEndpoint.class);
108        }
109    
110        public static void assertWait(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException {
111            long start = System.currentTimeMillis();
112            long left = unit.toMillis(timeout);
113            long end = start + left;
114            for (MockEndpoint endpoint : endpoints) {
115                if (!endpoint.await(left, TimeUnit.MILLISECONDS)) {
116                    throw new AssertionError("Timeout waiting for endpoints to receive enough messages. " + endpoint.getEndpointUri() + " timed out.");
117                }
118                left = end - System.currentTimeMillis();
119                if (left <= 0) {
120                    left = 0;
121                }
122            }
123        }
124    
125        public static void assertIsSatisfied(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException {
126            assertWait(timeout, unit, endpoints);
127            for (MockEndpoint endpoint : endpoints) {
128                endpoint.assertIsSatisfied();
129            }
130        }
131    
132        public static void assertIsSatisfied(MockEndpoint... endpoints) throws InterruptedException {
133            for (MockEndpoint endpoint : endpoints) {
134                endpoint.assertIsSatisfied();
135            }
136        }
137    
138    
139        /**
140         * Asserts that all the expectations on any {@link MockEndpoint} instances registered
141         * in the given context are valid
142         *
143         * @param context the camel context used to find all the available endpoints to be asserted
144         */
145        public static void assertIsSatisfied(CamelContext context) throws InterruptedException {
146            ObjectHelper.notNull(context, "camelContext");
147            Collection<Endpoint> endpoints = context.getSingletonEndpoints();
148            for (Endpoint endpoint : endpoints) {
149                if (endpoint instanceof MockEndpoint) {
150                    MockEndpoint mockEndpoint = (MockEndpoint) endpoint;
151                    mockEndpoint.assertIsSatisfied();
152                }
153            }
154        }
155    
156        public static void expectsMessageCount(int count, MockEndpoint... endpoints) throws InterruptedException {
157            for (MockEndpoint endpoint : endpoints) {
158                endpoint.setExpectedMessageCount(count);
159            }
160        }
161    
162        public List<Exchange> getExchanges() {
163            return getReceivedExchanges();
164        }
165    
166        public void addPropertyChangeListener(PropertyChangeListener listener) {
167            propertyChangeSupport.addPropertyChangeListener(listener);
168        }
169    
170        public void removePropertyChangeListener(PropertyChangeListener listener) {
171            propertyChangeSupport.removePropertyChangeListener(listener);
172        }
173    
174        public Consumer createConsumer(Processor processor) throws Exception {
175            throw new UnsupportedOperationException("You cannot consume from this endpoint");
176        }
177    
178        public Producer createProducer() throws Exception {
179            return new DefaultProducer(this) {
180                public void process(Exchange exchange) {
181                    onExchange(exchange);
182                }
183            };
184        }
185    
186        public void reset() {
187            init();
188        }
189    
190    
191        // Testing API
192        // -------------------------------------------------------------------------
193    
194        /**
195         * Set the processor that will be invoked when the index
196         * message is received.
197         */
198        public void whenExchangeReceived(int index, Processor processor) {
199            this.processors.put(index, processor);
200        }
201    
202        /**
203         * Set the processor that will be invoked when the some message
204         * is received.
205         *
206         * This processor could be overwritten by
207         * {@link #whenExchangeReceived(int, Processor)} method.
208         */
209        public void whenAnyExchangeReceived(Processor processor) {
210            this.defaultProcessor = processor;
211        }
212    
213        /**
214         * Validates that all the available expectations on this endpoint are
215         * satisfied; or throw an exception
216         */
217        public void assertIsSatisfied() throws InterruptedException {
218            assertIsSatisfied(sleepForEmptyTest);
219        }
220    
221        /**
222         * Validates that all the available expectations on this endpoint are
223         * satisfied; or throw an exception
224         *
225         * @param timeoutForEmptyEndpoints the timeout in milliseconds that we
226         *                should wait for the test to be true
227         */
228        public void assertIsSatisfied(long timeoutForEmptyEndpoints) throws InterruptedException {
229            LOG.info("Asserting: " + this + " is satisfied");
230            if (expectedCount == 0) {
231                if (timeoutForEmptyEndpoints > 0) {
232                    LOG.debug("Sleeping for: " + timeoutForEmptyEndpoints + " millis to check there really are no messages received");
233                    Thread.sleep(timeoutForEmptyEndpoints);
234                }
235                assertEquals("Received message count", expectedCount, getReceivedCounter());
236            } else if (expectedCount > 0) {
237                if (expectedCount != getReceivedCounter()) {
238                    waitForCompleteLatch();
239                }
240                assertEquals("Received message count", expectedCount, getReceivedCounter());
241            } else if (expectedMinimumCount > 0 && getReceivedCounter() < expectedMinimumCount) {
242                waitForCompleteLatch();
243            }
244    
245            if (expectedMinimumCount >= 0) {
246                int receivedCounter = getReceivedCounter();
247                assertTrue("Received message count " + receivedCounter + ", expected at least " + expectedMinimumCount, expectedMinimumCount <= receivedCounter);
248            }
249    
250            for (Runnable test : tests) {
251                test.run();
252            }
253    
254            for (Throwable failure : failures) {
255                if (failure != null) {
256                    LOG.error("Caught on " + getEndpointUri() + " Exception: " + failure, failure);
257                    fail("Failed due to caught exception: " + failure);
258                }
259            }
260        }
261    
262        /**
263         * Validates that the assertions fail on this endpoint
264         */
265        public void assertIsNotSatisfied() throws InterruptedException {
266            try {
267                assertIsSatisfied();
268                fail("Expected assertion failure!");
269            } catch (AssertionError e) {
270                LOG.info("Caught expected failure: " + e);
271            }
272        }
273    
274        /**
275         * Validates that the assertions fail on this endpoint
276    
277         * @param timeoutForEmptyEndpoints the timeout in milliseconds that we
278         *        should wait for the test to be true
279         */
280        public void assertIsNotSatisfied(long timeoutForEmptyEndpoints) throws InterruptedException {
281            try {
282                assertIsSatisfied(timeoutForEmptyEndpoints);
283                fail("Expected assertion failure!");
284            } catch (AssertionError e) {
285                LOG.info("Caught expected failure: " + e);
286            }
287        }    
288        
289        /**
290         * Specifies the expected number of message exchanges that should be
291         * received by this endpoint
292         *
293         * @param expectedCount the number of message exchanges that should be
294         *                expected by this endpoint
295         */
296        public void expectedMessageCount(int expectedCount) {
297            setExpectedMessageCount(expectedCount);
298        }
299    
300        /**
301         * Specifies the minimum number of expected message exchanges that should be
302         * received by this endpoint
303         *
304         * @param expectedCount the number of message exchanges that should be
305         *                expected by this endpoint
306         */
307        public void expectedMinimumMessageCount(int expectedCount) {
308            setMinimumExpectedMessageCount(expectedCount);
309        }
310    
311        /**
312         * Adds an expectation that the given header name & value are received by this endpoint
313         */
314        public void expectedHeaderReceived(final String name, final Object value) {
315            this.headerName = name;
316            this.headerValue = value;
317    
318            expects(new Runnable() {
319                public void run() {
320                    assertTrue("No header with name " + headerName + " found.", actualHeader != null);
321    
322                    Object actualValue = getCamelContext().getTypeConverter().convertTo(actualHeader.getClass(), headerValue);
323                    assertEquals("Header of message", actualValue, actualHeader);
324                }
325            });
326        }
327    
328        /**
329         * Adds an expectation that the given property name & value are received by this endpoint
330         */
331        public void expectedPropertyReceived(final String name, final Object value) {
332            this.propertyName = name;
333            this.propertyValue = value;
334    
335            expects(new Runnable() {
336                public void run() {
337                    assertTrue("No property with name " + propertyName + " found.", actualProperty != null);
338    
339                    Object actualValue = getCamelContext().getTypeConverter().convertTo(actualProperty.getClass(), propertyValue);
340                    assertEquals("Property of message", actualValue, actualProperty);
341                }
342            });
343        }
344    
345        /**
346         * Adds an expectation that the given body values are received by this
347         * endpoint in the specified order
348         */
349        public void expectedBodiesReceived(final List bodies) {
350            expectedMessageCount(bodies.size());
351            this.expectedBodyValues = bodies;
352            this.actualBodyValues = new ArrayList();
353    
354            expects(new Runnable() {
355                public void run() {
356                    for (int i = 0; i < expectedBodyValues.size(); i++) {
357                        Exchange exchange = getReceivedExchanges().get(i);
358                        assertTrue("No exchange received for counter: " + i, exchange != null);
359    
360                        Object expectedBody = expectedBodyValues.get(i);
361                        Object actualBody = null;
362                        if (i < actualBodyValues.size()) {
363                            actualBody = actualBodyValues.get(i);
364                        }
365    
366                        assertEquals("Body of message: " + i, expectedBody, actualBody);
367                    }
368                }
369            });
370        }
371    
372        /**
373         * Adds an expectation that the given body values are received by this endpoint
374         */
375        public void expectedBodiesReceived(Object... bodies) {
376            List bodyList = new ArrayList();
377            bodyList.addAll(Arrays.asList(bodies));
378            expectedBodiesReceived(bodyList);
379        }
380    
381        /**
382         * Adds an expectation that the given body value are received by this endpoint
383         */
384        public ExpressionClause expectedBodyReceived() {
385            final ExpressionClause clause = new ExpressionClause<MockEndpoint>(this);
386    
387            expectedMessageCount(1);
388    
389            expects(new Runnable() {
390                public void run() {
391                    Exchange exchange = getReceivedExchanges().get(0);
392                    assertTrue("No exchange received for counter: " + 0, exchange != null);
393    
394                    Object actualBody = exchange.getIn().getBody();
395                    Object expectedBody = clause.evaluate(exchange, Object.class);
396    
397                    assertEquals("Body of message: " + 0, expectedBody, actualBody);
398                }
399            });
400    
401            return clause;
402        }
403    
404        /**
405         * Adds an expectation that the given body values are received by this
406         * endpoint in any order
407         */
408        public void expectedBodiesReceivedInAnyOrder(final List bodies) {
409            expectedMessageCount(bodies.size());
410            this.expectedBodyValues = bodies;
411            this.actualBodyValues = new ArrayList();
412    
413            expects(new Runnable() {
414                public void run() {
415                    Set actualBodyValuesSet = new HashSet(actualBodyValues);
416                    for (int i = 0; i < expectedBodyValues.size(); i++) {
417                        Exchange exchange = getReceivedExchanges().get(i);
418                        assertTrue("No exchange received for counter: " + i, exchange != null);
419    
420                        Object expectedBody = expectedBodyValues.get(i);
421                        assertTrue("Message with body " + expectedBody
422                                + " was expected but not found in " + actualBodyValuesSet,
423                                actualBodyValuesSet.remove(expectedBody));
424                    }
425                }
426            });
427        }
428    
429        /**
430         * Adds an expectation that the given body values are received by this
431         * endpoint in any order
432         */
433        @SuppressWarnings("unchecked")
434        public void expectedBodiesReceivedInAnyOrder(Object... bodies) {
435            List bodyList = new ArrayList();
436            bodyList.addAll(Arrays.asList(bodies));
437            expectedBodiesReceivedInAnyOrder(bodyList);
438        }
439    
440        /**
441         * Adds an expection that a file exists with the given name
442         *
443         * @param name name of file, will cater for / and \ on different OS platforms
444         */
445        public void expectedFileExists(final String name) {
446            expectedFileExists(name, null);
447        }
448    
449        /**
450         * Adds an expection that a file exists with the given name
451         * <p/>
452         * Will wait at most 5 seconds while checking for the existence of the file.
453         *
454         * @param name name of file, will cater for / and \ on different OS platforms
455         * @param content content of file to compare, can be <tt>null</tt> to not compare content
456         */
457        public void expectedFileExists(final String name, final String content) {
458            final File file = new File(FileUtil.normalizePath(name)).getAbsoluteFile();
459    
460            expects(new Runnable() {
461                public void run() {
462                    // wait at most 5 seconds for the file to exists
463                    final long timeout = System.currentTimeMillis() + 5000;
464    
465                    boolean stop = false;
466                    while (!stop && !file.exists()) {
467                        try {
468                            Thread.sleep(50);
469                        } catch (InterruptedException e) {
470                            // ignore
471                        }
472                        stop = System.currentTimeMillis() > timeout;
473                    }
474    
475                    assertTrue("The file should exists: " + name, file.exists());
476    
477                    if (content != null) {
478                        String body = getCamelContext().getTypeConverter().convertTo(String.class, file);
479                        assertEquals("Content of file: " + name, content, body);
480                    }
481                }
482            });
483        }
484    
485        /**
486         * Adds an expectation that messages received should have ascending values
487         * of the given expression such as a user generated counter value
488         */
489        public void expectsAscending(final Expression expression) {
490            expects(new Runnable() {
491                public void run() {
492                    assertMessagesAscending(expression);
493                }
494            });
495        }
496    
497        /**
498         * Adds an expectation that messages received should have ascending values
499         * of the given expression such as a user generated counter value
500         */
501        public ExpressionClause expectsAscending() {
502            final ExpressionClause clause = new ExpressionClause<MockEndpoint>(this);
503            expects(new Runnable() {
504                public void run() {
505                    assertMessagesAscending(clause.getExpressionValue());
506                }
507            });
508            return clause;
509        }
510    
511        /**
512         * Adds an expectation that messages received should have descending values
513         * of the given expression such as a user generated counter value
514         */
515        public void expectsDescending(final Expression expression) {
516            expects(new Runnable() {
517                public void run() {
518                    assertMessagesDescending(expression);
519                }
520            });
521        }
522    
523        /**
524         * Adds an expectation that messages received should have descending values
525         * of the given expression such as a user generated counter value
526         */
527        public ExpressionClause expectsDescending() {
528            final ExpressionClause clause = new ExpressionClause<MockEndpoint>(this);
529            expects(new Runnable() {
530                public void run() {
531                    assertMessagesDescending(clause.getExpressionValue());
532                }
533            });
534            return clause;
535        }
536    
537        /**
538         * Adds an expectation that no duplicate messages should be received using
539         * the expression to determine the message ID
540         *
541         * @param expression the expression used to create a unique message ID for
542         *                message comparison (which could just be the message
543         *                payload if the payload can be tested for uniqueness using
544         *                {@link Object#equals(Object)} and
545         *                {@link Object#hashCode()}
546         */
547        public void expectsNoDuplicates(final Expression expression) {
548            expects(new Runnable() {
549                public void run() {
550                    assertNoDuplicates(expression);
551                }
552            });
553        }
554    
555        /**
556         * Adds an expectation that no duplicate messages should be received using
557         * the expression to determine the message ID
558         */
559        public ExpressionClause expectsNoDuplicates() {
560            final ExpressionClause clause = new ExpressionClause<MockEndpoint>(this);
561            expects(new Runnable() {
562                public void run() {
563                    assertNoDuplicates(clause.getExpressionValue());
564                }
565            });
566            return clause;
567        }
568    
569        /**
570         * Asserts that the messages have ascending values of the given expression
571         */
572        public void assertMessagesAscending(Expression expression) {
573            assertMessagesSorted(expression, true);
574        }
575    
576        /**
577         * Asserts that the messages have descending values of the given expression
578         */
579        public void assertMessagesDescending(Expression expression) {
580            assertMessagesSorted(expression, false);
581        }
582    
583        protected void assertMessagesSorted(Expression expression, boolean ascending) {
584            String type = ascending ? "ascending" : "descending";
585            ExpressionComparator comparator = new ExpressionComparator(expression);
586            List<Exchange> list = getReceivedExchanges();
587            for (int i = 1; i < list.size(); i++) {
588                int j = i - 1;
589                Exchange e1 = list.get(j);
590                Exchange e2 = list.get(i);
591                int result = comparator.compare(e1, e2);
592                if (result == 0) {
593                    fail("Messages not " + type + ". Messages" + j + " and " + i + " are equal with value: "
594                        + expression.evaluate(e1, Object.class) + " for expression: " + expression + ". Exchanges: " + e1 + " and " + e2);
595                } else {
596                    if (!ascending) {
597                        result = result * -1;
598                    }
599                    if (result > 0) {
600                        fail("Messages not " + type + ". Message " + j + " has value: " + expression.evaluate(e1, Object.class)
601                            + " and message " + i + " has value: " + expression.evaluate(e2, Object.class) + " for expression: "
602                            + expression + ". Exchanges: " + e1 + " and " + e2);
603                    }
604                }
605            }
606        }
607    
608        public void assertNoDuplicates(Expression expression) {
609            Map<Object, Exchange> map = new HashMap<Object, Exchange>();
610            List<Exchange> list = getReceivedExchanges();
611            for (int i = 0; i < list.size(); i++) {
612                Exchange e2 = list.get(i);
613                Object key = expression.evaluate(e2, Object.class);
614                Exchange e1 = map.get(key);
615                if (e1 != null) {
616                    fail("Duplicate message found on message " + i + " has value: " + key + " for expression: " + expression + ". Exchanges: " + e1 + " and " + e2);
617                } else {
618                    map.put(key, e2);
619                }
620            }
621        }
622    
623        /**
624         * Adds the expectation which will be invoked when enough messages are received
625         */
626        public void expects(Runnable runnable) {
627            tests.add(runnable);
628        }
629    
630        /**
631         * Adds an assertion to the given message index
632         *
633         * @param messageIndex the number of the message
634         * @return the assertion clause
635         */
636        public AssertionClause message(final int messageIndex) {
637            AssertionClause clause = new AssertionClause() {
638                public void run() {
639                    applyAssertionOn(MockEndpoint.this, messageIndex, assertExchangeReceived(messageIndex));
640                }
641            };
642            expects(clause);
643            return clause;
644        }
645    
646        /**
647         * Adds an assertion to all the received messages
648         *
649         * @return the assertion clause
650         */
651        public AssertionClause allMessages() {
652            AssertionClause clause = new AssertionClause() {
653                public void run() {
654                    List<Exchange> list = getReceivedExchanges();
655                    int index = 0;
656                    for (Exchange exchange : list) {
657                        applyAssertionOn(MockEndpoint.this, index++, exchange);
658                    }
659                }
660            };
661            expects(clause);
662            return clause;
663        }
664    
665        /**
666         * Asserts that the given index of message is received (starting at zero)
667         */
668        public Exchange assertExchangeReceived(int index) {
669            int count = getReceivedCounter();
670            assertTrue("Not enough messages received. Was: " + count, count > index);
671            return getReceivedExchanges().get(index);
672        }
673    
674        // Properties
675        // -------------------------------------------------------------------------
676        public List<Throwable> getFailures() {
677            return failures;
678        }
679    
680        public int getReceivedCounter() {
681            return getReceivedExchanges().size();
682        }
683    
684        public List<Exchange> getReceivedExchanges() {
685            return receivedExchanges;
686        }
687    
688        public int getExpectedCount() {
689            return expectedCount;
690        }
691    
692        public long getSleepForEmptyTest() {
693            return sleepForEmptyTest;
694        }
695    
696        /**
697         * Allows a sleep to be specified to wait to check that this endpoint really
698         * is empty when {@link #expectedMessageCount(int)} is called with zero
699         *
700         * @param sleepForEmptyTest the milliseconds to sleep for to determine that
701         *                this endpoint really is empty
702         */
703        public void setSleepForEmptyTest(long sleepForEmptyTest) {
704            this.sleepForEmptyTest = sleepForEmptyTest;
705        }
706    
707        public long getResultWaitTime() {
708            return resultWaitTime;
709        }
710    
711        /**
712         * Sets the maximum amount of time (in millis) the {@link #assertIsSatisfied()} will
713         * wait on a latch until it is satisfied
714         */
715        public void setResultWaitTime(long resultWaitTime) {
716            this.resultWaitTime = resultWaitTime;
717        }
718    
719        /**
720         * Sets the minimum expected amount of time (in millis) the {@link #assertIsSatisfied()} will
721         * wait on a latch until it is satisfied
722         */
723        public void setMinimumResultWaitTime(long resultMinimumWaitTime) {
724            this.resultMinimumWaitTime = resultMinimumWaitTime;
725        }
726    
727        /**
728         * Specifies the expected number of message exchanges that should be
729         * received by this endpoint
730         *
731         * @param expectedCount the number of message exchanges that should be
732         *                expected by this endpoint
733         */
734        public void setExpectedMessageCount(int expectedCount) {
735            this.expectedCount = expectedCount;
736            if (expectedCount <= 0) {
737                latch = null;
738            } else {
739                latch = new CountDownLatch(expectedCount);
740            }
741        }
742    
743        /**
744         * Specifies the minimum number of expected message exchanges that should be
745         * received by this endpoint
746         *
747         * @param expectedCount the number of message exchanges that should be
748         *                expected by this endpoint
749         */
750        public void setMinimumExpectedMessageCount(int expectedCount) {
751            this.expectedMinimumCount = expectedCount;
752            if (expectedCount <= 0) {
753                latch = null;
754            } else {
755                latch = new CountDownLatch(expectedMinimumCount);
756            }
757        }
758    
759        public Processor getReporter() {
760            return reporter;
761        }
762    
763        /**
764         * Allows a processor to added to the endpoint to report on progress of the test
765         */
766        public void setReporter(Processor reporter) {
767            this.reporter = reporter;
768        }
769    
770        // Implementation methods
771        // -------------------------------------------------------------------------
772        private void init() {
773            expectedCount = -1;
774            counter = 0;
775            processors = new HashMap<Integer, Processor>();
776            receivedExchanges = new CopyOnWriteArrayList<Exchange>();
777            failures = new CopyOnWriteArrayList<Throwable>();
778            tests = new CopyOnWriteArrayList<Runnable>();
779            latch = null;
780            sleepForEmptyTest = 0;
781            resultWaitTime = 20000L;
782            resultMinimumWaitTime = 0L;
783            expectedMinimumCount = -1;
784            expectedBodyValues = null;
785            actualBodyValues = new ArrayList();
786        }
787    
788        protected synchronized void onExchange(Exchange exchange) {
789            try {
790                if (reporter != null) {
791                    reporter.process(exchange);
792                }
793                performAssertions(exchange);
794            } catch (Throwable e) {
795                // must catch throwable as AssertionException extends java.lang.Error
796                failures.add(e);
797            } finally {
798                // make sure latch is counted down to avoid test hanging forever
799                if (latch != null) {
800                    latch.countDown();
801                }
802            }
803        }
804    
805        @SuppressWarnings("unchecked")
806        protected void performAssertions(Exchange exchange) throws Exception {
807            Message in = exchange.getIn();
808            Object actualBody = in.getBody();
809    
810            if (headerName != null) {
811                actualHeader = in.getHeader(headerName);
812            }
813    
814            if (propertyName != null) {
815                actualProperty = exchange.getProperty(propertyName);
816            }
817    
818            if (expectedBodyValues != null) {
819                int index = actualBodyValues.size();
820                if (expectedBodyValues.size() > index) {
821                    Object expectedBody = expectedBodyValues.get(index);
822                    if (expectedBody != null) {
823                        actualBody = in.getBody(expectedBody.getClass());
824                    }
825                    actualBodyValues.add(actualBody);
826                }
827            }
828    
829            LOG.debug(getEndpointUri() + " >>>> " + (++counter) + " : " + exchange + " with body: " + actualBody);
830    
831            receivedExchanges.add(exchange);
832    
833            Processor processor = processors.get(getReceivedCounter()) != null
834                    ? processors.get(getReceivedCounter()) : defaultProcessor;
835    
836            if (processor != null) {
837                processor.process(exchange);
838            }
839        }
840    
841        protected void waitForCompleteLatch() throws InterruptedException {
842            if (latch == null) {
843                fail("Should have a latch!");
844            }
845    
846            // now lets wait for the results
847            LOG.debug("Waiting on the latch for: " + resultWaitTime + " millis");
848            long start = System.currentTimeMillis();
849            latch.await(resultWaitTime, TimeUnit.MILLISECONDS);
850            long delta = System.currentTimeMillis() - start;
851            LOG.debug("Took " + delta + " millis to complete latch");
852    
853            if (resultMinimumWaitTime > 0 && delta < resultMinimumWaitTime) {
854                fail("Expected minimum " + resultMinimumWaitTime
855                        + " millis waiting on the result, but was faster with " + delta + " millis.");
856            }
857        }
858    
859        protected void assertEquals(String message, Object expectedValue, Object actualValue) {
860            if (!ObjectHelper.equal(expectedValue, actualValue)) {
861                fail(message + ". Expected: <" + expectedValue + "> but was: <" + actualValue + ">");
862            }
863        }
864    
865        protected void assertTrue(String message, boolean predicate) {
866            if (!predicate) {
867                fail(message);
868            }
869        }
870    
871        protected void fail(Object message) {
872            if (LOG.isDebugEnabled()) {
873                List<Exchange> list = getReceivedExchanges();
874                int index = 0;
875                for (Exchange exchange : list) {
876                    LOG.debug("Received[" + (++index) + "]: " + exchange);
877                }
878            }
879            throw new AssertionError(getEndpointUri() + " " + message);
880        }
881    
882        public int getExpectedMinimumCount() {
883            return expectedMinimumCount;
884        }
885    
886        public void await() throws InterruptedException {
887            if (latch != null) {
888                latch.await();
889            }
890        }
891    
892        public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
893            if (latch != null) {
894                return latch.await(timeout, unit);
895            }
896            return true;
897        }
898    
899        public boolean isSingleton() {
900            return true;
901        }
902    }