001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.mock;
018    
019    import java.util.ArrayList;
020    import java.util.HashMap;
021    import java.util.List;
022    import java.util.Map;
023    import java.util.concurrent.CopyOnWriteArrayList;
024    import java.util.concurrent.CountDownLatch;
025    import java.util.concurrent.TimeUnit;
026    
027    import org.apache.camel.Component;
028    import org.apache.camel.Consumer;
029    import org.apache.camel.Exchange;
030    import org.apache.camel.Expression;
031    import org.apache.camel.Message;
032    import org.apache.camel.Processor;
033    import org.apache.camel.Producer;
034    import org.apache.camel.impl.DefaultEndpoint;
035    import org.apache.camel.impl.DefaultExchange;
036    import org.apache.camel.impl.DefaultProducer;
037    import org.apache.camel.util.ExpressionComparator;
038    import org.apache.camel.util.ObjectHelper;
039    import org.apache.commons.logging.Log;
040    import org.apache.commons.logging.LogFactory;
041    
042    /**
043     * A Mock endpoint which provides a literate, fluent API for testing routes
044     * using a <a href="http://jmock.org/">JMock style</a> API.
045     * 
046     * @version $Revision: 1.1 $
047     */
048    public class MockEndpoint extends DefaultEndpoint<Exchange> {
049        private static final transient Log LOG = LogFactory.getLog(MockEndpoint.class);
050        private int expectedCount = -1;
051        private int counter;
052        private Map<Integer, Processor> processors = new HashMap<Integer, Processor>();
053        private List<Exchange> receivedExchanges = new CopyOnWriteArrayList<Exchange>();
054        private List<Throwable> failures = new CopyOnWriteArrayList<Throwable>();
055        private List<Runnable> tests = new CopyOnWriteArrayList<Runnable>();
056        private CountDownLatch latch;
057        private long sleepForEmptyTest = 1000L;
058        private long defaulResultWaitMillis = 20000L;
059        private int expectedMinimumCount = -1;
060        private List expectedBodyValues;
061        private List actualBodyValues = new ArrayList();
062    
063        public MockEndpoint(String endpointUri, Component component) {
064            super(endpointUri, component);
065        }
066    
067        public static void assertWait(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException {
068            long start = System.currentTimeMillis();
069            long left = unit.toMillis(timeout);
070            long end = start + left;
071            for (MockEndpoint endpoint : endpoints) {
072                if (!endpoint.await(left, TimeUnit.MILLISECONDS)) {
073                    throw new AssertionError("Timeout waiting for endpoints to receive enough messages. " + endpoint.getEndpointUri() + " timed out.");
074                }
075                left = end - System.currentTimeMillis();
076                if (left <= 0) {
077                    left = 0;
078                }
079            }
080        }
081    
082        public static void assertIsSatisfied(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException {
083            assertWait(timeout, unit, endpoints);
084            for (MockEndpoint endpoint : endpoints) {
085                endpoint.assertIsSatisfied();
086            }
087        }
088    
089        public static void assertIsSatisfied(MockEndpoint... endpoints) throws InterruptedException {
090            for (MockEndpoint endpoint : endpoints) {
091                endpoint.assertIsSatisfied();
092            }
093        }
094    
095        public static void expectsMessageCount(int count, MockEndpoint... endpoints) throws InterruptedException {
096            for (MockEndpoint endpoint : endpoints) {
097                endpoint.expectsMessageCount(count);
098            }
099        }
100    
101        public Exchange createExchange() {
102            return new DefaultExchange(getContext());
103        }
104    
105        public Consumer<Exchange> createConsumer(Processor processor) throws Exception {
106            throw new UnsupportedOperationException("You cannot consume from this endpoint");
107        }
108    
109        public Producer<Exchange> createProducer() throws Exception {
110            return new DefaultProducer<Exchange>(this) {
111                public void process(Exchange exchange) {
112                    onExchange(exchange);
113                }
114            };
115        }
116    
117        // Testing API
118        // -------------------------------------------------------------------------
119    
120        /**
121         * Validates that all the available expectations on this endpoint are
122         * satisfied; or throw an exception
123         */
124        public void assertIsSatisfied() throws InterruptedException {
125            assertIsSatisfied(sleepForEmptyTest);
126        }
127    
128        /**
129         * Validates that all the available expectations on this endpoint are
130         * satisfied; or throw an exception
131         * 
132         * @param timeoutForEmptyEndpoints the timeout in milliseconds that we
133         *                should wait for the test to be true
134         */
135        public void assertIsSatisfied(long timeoutForEmptyEndpoints) throws InterruptedException {
136            if (expectedCount >= 0) {
137                if (expectedCount != getReceivedCounter()) {
138                    if (expectedCount == 0) {
139                        // lets wait a little bit just in case
140                        if (timeoutForEmptyEndpoints > 0) {
141                            LOG.debug("Sleeping for: " + timeoutForEmptyEndpoints + " millis to check there really are no messages received");
142                            Thread.sleep(timeoutForEmptyEndpoints);
143                        }
144                    } else {
145                        waitForCompleteLatch();
146                    }
147                }
148                assertEquals("Received message count", expectedCount, getReceivedCounter());
149            } else if (expectedMinimumCount > 0 && getReceivedCounter() < expectedMinimumCount) {
150                waitForCompleteLatch();
151            }
152    
153            if (expectedMinimumCount >= 0) {
154                int receivedCounter = getReceivedCounter();
155                assertTrue("Received message count " + receivedCounter + ", expected at least " + expectedCount, expectedCount <= receivedCounter);
156            }
157    
158            for (Runnable test : tests) {
159                test.run();
160            }
161    
162            for (Throwable failure : failures) {
163                if (failure != null) {
164                    LOG.error("Caught on " + getEndpointUri() + " Exception: " + failure, failure);
165                    fail("Failed due to caught exception: " + failure);
166                }
167            }
168        }
169    
170        /**
171         * Validates that the assertions fail on this endpoint
172         */
173        public void assertIsNotSatisfied() throws InterruptedException {
174            try {
175                assertIsSatisfied();
176                fail("Expected assertion failure!");
177            } catch (AssertionError e) {
178                LOG.info("Caught expected failure: " + e);
179            }
180        }
181    
182        /**
183         * Specifies the expected number of message exchanges that should be
184         * received by this endpoint
185         * 
186         * @param expectedCount the number of message exchanges that should be
187         *                expected by this endpoint
188         */
189        public void expectedMessageCount(int expectedCount) {
190            this.expectedCount = expectedCount;
191            if (expectedCount <= 0) {
192                latch = null;
193            } else {
194                latch = new CountDownLatch(expectedCount);
195            }
196        }
197    
198        /**
199         * Specifies the minimum number of expected message exchanges that should be
200         * received by this endpoint
201         * 
202         * @param expectedCount the number of message exchanges that should be
203         *                expected by this endpoint
204         */
205        public void expectedMinimumMessageCount(int expectedCount) {
206            this.expectedMinimumCount = expectedCount;
207            if (expectedCount <= 0) {
208                latch = null;
209            } else {
210                latch = new CountDownLatch(expectedMinimumCount);
211            }
212        }
213    
214        /**
215         * Adds an expectation that the given body values are received by this
216         * endpoint
217         */
218        public void expectedBodiesReceived(final List bodies) {
219            expectedMessageCount(bodies.size());
220            this.expectedBodyValues = bodies;
221            this.actualBodyValues = new ArrayList();
222    
223            expects(new Runnable() {
224                public void run() {
225                    for (int i = 0; i < expectedBodyValues.size(); i++) {
226                        Exchange exchange = getReceivedExchanges().get(i);
227                        assertTrue("No exchange received for counter: " + i, exchange != null);
228    
229                        Object expectedBody = expectedBodyValues.get(i);
230                        Object actualBody = actualBodyValues.get(i);
231    
232                        assertEquals("Body of message: " + i, expectedBody, actualBody);
233                    }
234                }
235            });
236        }
237    
238        /**
239         * Adds an expectation that the given body values are received by this
240         * endpoint
241         */
242        public void expectedBodiesReceived(Object... bodies) {
243            List bodyList = new ArrayList();
244            for (Object body : bodies) {
245                bodyList.add(body);
246            }
247            expectedBodiesReceived(bodyList);
248        }
249    
250        /**
251         * Adds an expectation that messages received should have ascending values
252         * of the given expression such as a user generated counter value
253         * 
254         * @param expression
255         */
256        public void expectsAscending(final Expression<Exchange> expression) {
257            expects(new Runnable() {
258                public void run() {
259                    assertMessagesAscending(expression);
260                }
261            });
262        }
263    
264        /**
265         * Adds an expectation that messages received should have descending values
266         * of the given expression such as a user generated counter value
267         * 
268         * @param expression
269         */
270        public void expectsDescending(final Expression<Exchange> expression) {
271            expects(new Runnable() {
272                public void run() {
273                    assertMessagesDescending(expression);
274                }
275            });
276        }
277    
278        /**
279         * Adds an expectation that no duplicate messages should be received using
280         * the expression to determine the message ID
281         * 
282         * @param expression the expression used to create a unique message ID for
283         *                message comparison (which could just be the message
284         *                payload if the payload can be tested for uniqueness using
285         *                {@link Object#equals(Object)} and
286         *                {@link Object#hashCode()}
287         */
288        public void expectsNoDuplicates(final Expression<Exchange> expression) {
289            expects(new Runnable() {
290                public void run() {
291                    assertNoDuplicates(expression);
292                }
293            });
294        }
295    
296        /**
297         * Asserts that the messages have ascending values of the given expression
298         */
299        public void assertMessagesAscending(Expression<Exchange> expression) {
300            assertMessagesSorted(expression, true);
301        }
302    
303        /**
304         * Asserts that the messages have descending values of the given expression
305         */
306        public void assertMessagesDescending(Expression<Exchange> expression) {
307            assertMessagesSorted(expression, false);
308        }
309    
310        protected void assertMessagesSorted(Expression<Exchange> expression, boolean ascending) {
311            String type = ascending ? "ascending" : "descending";
312            ExpressionComparator comparator = new ExpressionComparator(expression);
313            List<Exchange> list = getReceivedExchanges();
314            for (int i = 1; i < list.size(); i++) {
315                int j = i - 1;
316                Exchange e1 = list.get(j);
317                Exchange e2 = list.get(i);
318                int result = comparator.compare(e1, e2);
319                if (result == 0) {
320                    fail("Messages not " + type + ". Messages" + j + " and " + i + " are equal with value: " + expression.evaluate(e1) + " for expression: " + expression + ". Exchanges: " + e1 + " and "
321                         + e2);
322                } else {
323                    if (!ascending) {
324                        result = result * -1;
325                    }
326                    if (result > 0) {
327                        fail("Messages not " + type + ". Message " + j + " has value: " + expression.evaluate(e1) + " and message " + i + " has value: " + expression.evaluate(e2) + " for expression: "
328                             + expression + ". Exchanges: " + e1 + " and " + e2);
329                    }
330                }
331            }
332        }
333    
334        public void assertNoDuplicates(Expression<Exchange> expression) {
335            Map<Object, Exchange> map = new HashMap<Object, Exchange>();
336            List<Exchange> list = getReceivedExchanges();
337            for (int i = 0; i < list.size(); i++) {
338                Exchange e2 = list.get(i);
339                Object key = expression.evaluate(e2);
340                Exchange e1 = map.get(key);
341                if (e1 != null) {
342                    fail("Duplicate message found on message " + i + " has value: " + key + " for expression: " + expression + ". Exchanges: " + e1 + " and " + e2);
343                } else {
344                    map.put(key, e2);
345                }
346            }
347        }
348    
349        /**
350         * Adds the expection which will be invoked when enough messages are
351         * received
352         */
353        public void expects(Runnable runnable) {
354            tests.add(runnable);
355        }
356    
357        /**
358         * Adds an assertion to the given message index
359         * 
360         * @param messageIndex the number of the message
361         * @return the assertion clause
362         */
363        public AssertionClause message(final int messageIndex) {
364            AssertionClause clause = new AssertionClause() {
365                public void run() {
366                    applyAssertionOn(MockEndpoint.this, messageIndex, assertExchangeReceived(messageIndex));
367                }
368            };
369            expects(clause);
370            return clause;
371        }
372    
373        /**
374         * Adds an assertion to all the received messages
375         * 
376         * @return the assertion clause
377         */
378        public AssertionClause allMessages() {
379            AssertionClause clause = new AssertionClause() {
380                public void run() {
381                    List<Exchange> list = getReceivedExchanges();
382                    int index = 0;
383                    for (Exchange exchange : list) {
384                        applyAssertionOn(MockEndpoint.this, index++, exchange);
385                    }
386                }
387            };
388            expects(clause);
389            return clause;
390        }
391    
392        /**
393         * Asserts that the given index of message is received (starting at zero)
394         */
395        public Exchange assertExchangeReceived(int index) {
396            int count = getReceivedCounter();
397            assertTrue("Not enough messages received. Was: " + count, count > index);
398            return getReceivedExchanges().get(index);
399        }
400    
401        // Properties
402        // -------------------------------------------------------------------------
403        public List<Throwable> getFailures() {
404            return failures;
405        }
406    
407        public int getReceivedCounter() {
408            return getReceivedExchanges().size();
409        }
410    
411        public List<Exchange> getReceivedExchanges() {
412            return receivedExchanges;
413        }
414    
415        public int getExpectedCount() {
416            return expectedCount;
417        }
418    
419        public long getSleepForEmptyTest() {
420            return sleepForEmptyTest;
421        }
422    
423        /**
424         * Allows a sleep to be specified to wait to check that this endpoint really
425         * is empty when {@link #expectedMessageCount(int)} is called with zero
426         * 
427         * @param sleepForEmptyTest the milliseconds to sleep for to determine that
428         *                this endpoint really is empty
429         */
430        public void setSleepForEmptyTest(long sleepForEmptyTest) {
431            this.sleepForEmptyTest = sleepForEmptyTest;
432        }
433    
434        public long getDefaulResultWaitMillis() {
435            return defaulResultWaitMillis;
436        }
437    
438        /**
439         * Sets the maximum amount of time the {@link #assertIsSatisfied()} will
440         * wait on a latch until it is satisfied
441         */
442        public void setDefaulResultWaitMillis(long defaulResultWaitMillis) {
443            this.defaulResultWaitMillis = defaulResultWaitMillis;
444        }
445    
446        // Implementation methods
447        // -------------------------------------------------------------------------
448        protected synchronized void onExchange(Exchange exchange) {
449            try {
450                Message in = exchange.getIn();
451                Object actualBody = in.getBody();
452    
453                if (expectedBodyValues != null) {
454                    int index = actualBodyValues.size();
455                    if (expectedBodyValues.size() > index) {
456                        Object expectedBody = expectedBodyValues.get(index);
457                        if (expectedBody != null) {
458                            actualBody = in.getBody(expectedBody.getClass());
459                        }
460                        actualBodyValues.add(actualBody);
461                    }
462                }
463    
464                LOG.debug(getEndpointUri() + " >>>> " + (++counter) + " : " + exchange + " with body: " + actualBody);
465    
466                receivedExchanges.add(exchange);
467    
468                Processor processor = processors.get(getReceivedCounter());
469                if (processor != null) {
470                    processor.process(exchange);
471                }
472    
473                if (latch != null) {
474                    latch.countDown();
475                }
476            } catch (Exception e) {
477                failures.add(e);
478            }
479        }
480    
481        protected void waitForCompleteLatch() throws InterruptedException {
482            if (latch == null) {
483                fail("Should have a latch!");
484            }
485    
486            // now lets wait for the results
487            LOG.debug("Waiting on the latch for: " + defaulResultWaitMillis + " millis");
488            latch.await(defaulResultWaitMillis, TimeUnit.MILLISECONDS);
489        }
490    
491        protected void assertEquals(String message, Object expectedValue, Object actualValue) {
492            if (!ObjectHelper.equals(expectedValue, actualValue)) {
493                fail(message + ". Expected: <" + expectedValue + "> but was: <" + actualValue + ">");
494            }
495        }
496    
497        protected void assertTrue(String message, boolean predicate) {
498            if (!predicate) {
499                fail(message);
500            }
501        }
502    
503        protected void fail(Object message) {
504            if (LOG.isDebugEnabled()) {
505                List<Exchange> list = getReceivedExchanges();
506                int index = 0;
507                for (Exchange exchange : list) {
508                    LOG.debug("Received[" + (++index) + "]: " + exchange);
509                }
510            }
511            throw new AssertionError(getEndpointUri() + " " + message);
512        }
513    
514        public int getExpectedMinimumCount() {
515            return expectedMinimumCount;
516        }
517    
518        public void await() throws InterruptedException {
519            if (latch != null) {
520                latch.await();
521            }
522        }
523    
524        public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
525            if (latch != null) {
526                return latch.await(timeout, unit);
527            }
528            return true;
529        }
530    
531        public boolean isSingleton() {
532            return true;
533        }
534    
535    }