001    /**
002     *
003     * Licensed to the Apache Software Foundation (ASF) under one or more
004     * contributor license agreements.  See the NOTICE file distributed with
005     * this work for additional information regarding copyright ownership.
006     * The ASF licenses this file to You under the Apache License, Version 2.0
007     * (the "License"); you may not use this file except in compliance with
008     * the License.  You may obtain a copy of the License at
009     *
010     * http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.camel.component.mock;
019    
020    import java.util.ArrayList;
021    import java.util.HashMap;
022    import java.util.List;
023    import java.util.Map;
024    import java.util.concurrent.CountDownLatch;
025    import java.util.concurrent.TimeUnit;
026    
027    import org.apache.camel.Component;
028    import org.apache.camel.Consumer;
029    import org.apache.camel.Exchange;
030    import org.apache.camel.Processor;
031    import org.apache.camel.Producer;
032    import org.apache.camel.Message;
033    import org.apache.camel.impl.DefaultEndpoint;
034    import org.apache.camel.impl.DefaultExchange;
035    import org.apache.camel.impl.DefaultProducer;
036    import org.apache.camel.util.ObjectHelper;
037    import org.apache.commons.logging.Log;
038    import org.apache.commons.logging.LogFactory;
039    
040    /**
041     * A Mock endpoint which provides a literate, fluent API for testing routes using
042     * a <a href="http://jmock.org/">JMock style</a> API.
043     *
044     * @version $Revision: 1.1 $
045     */
046    public class MockEndpoint extends DefaultEndpoint<Exchange> {
047        private static final transient Log log = LogFactory.getLog(MockEndpoint.class);
048        private int expectedCount = -1;
049        private Map<Integer, Processor> processors = new HashMap<Integer, Processor>();
050        private List<Exchange> receivedExchanges = new ArrayList<Exchange>();
051        private List<Throwable> failures = new ArrayList<Throwable>();
052        private List<Runnable> tests = new ArrayList<Runnable>();
053        private CountDownLatch latch;
054        private long sleepForEmptyTest = 0L;
055            private int expectedMinimumCount=-1;
056    
057        public static void assertWait(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException {
058            long start = System.currentTimeMillis();
059            long left = unit.toMillis(timeout);
060            long end = start + left;
061            for (MockEndpoint endpoint : endpoints) {
062                            if( !endpoint.await(left, TimeUnit.MILLISECONDS) )
063                            throw new AssertionError("Timeout waiting for endpoints to receive enough messages. "+endpoint.getEndpointUri()+" timed out.");
064                            left = end - System.currentTimeMillis();
065                            if( left <= 0 )
066                                    left = 0;
067            }
068        }
069    
070        public static void assertIsSatisfied(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException {
071            assertWait(timeout, unit, endpoints);
072            for (MockEndpoint endpoint : endpoints) {
073                endpoint.assertIsSatisfied();
074            }
075        }
076    
077        public static void assertIsSatisfied(MockEndpoint... endpoints) throws InterruptedException {
078            for (MockEndpoint endpoint : endpoints) {
079                endpoint.assertIsSatisfied();
080            }
081        }
082    
083        public static void expectsMessageCount(int count, MockEndpoint... endpoints) throws InterruptedException {
084            for (MockEndpoint endpoint : endpoints) {
085                endpoint.expectsMessageCount(count);
086            }
087        }
088    
089        public MockEndpoint(String endpointUri, Component component) {
090            super(endpointUri, component);
091        }
092    
093        public Exchange createExchange() {
094            return new DefaultExchange(getContext());
095        }
096    
097        public Consumer<Exchange> createConsumer(Processor processor) throws Exception {
098            throw new UnsupportedOperationException("You cannot consume from this endpoint");
099        }
100    
101        public Producer<Exchange> createProducer() throws Exception {
102            return new DefaultProducer<Exchange>(this) {
103                public void process(Exchange exchange) {
104                    onExchange(exchange);
105                }
106            };
107        }
108    
109        // Testing API
110        //-------------------------------------------------------------------------
111    
112        /**
113         * Validates that all the available expectations on this endpoint are satisfied; or throw an exception
114         */
115        public void assertIsSatisfied() throws InterruptedException {
116            assertIsSatisfied(sleepForEmptyTest);
117        }
118        
119        /**
120         * Validates that all the available expectations on this endpoint are satisfied; or throw an exception
121         */
122        public void assertIsSatisfied(long timeoutForEmptyEndpoints) throws InterruptedException {
123            if (latch != null) {
124                // now lets wait for the results
125                latch.await(10, TimeUnit.SECONDS);
126            }
127            else if (expectedCount == 0) {
128                // lets wait a little bit just in case
129                if (timeoutForEmptyEndpoints > 0) {
130                    Thread.sleep(timeoutForEmptyEndpoints);
131                }
132            }
133    
134            if (expectedCount >= 0) {
135                int receivedCounter = getReceivedCounter();
136                assertEquals("Received message count" , expectedCount, receivedCounter);
137            }
138            
139            if( expectedMinimumCount >= 0 ) {
140                int receivedCounter = getReceivedCounter();
141                assertTrue("Received message count "+receivedCounter+", expected at least "+expectedCount, expectedCount <= receivedCounter);
142                    
143            }
144    
145            for (Runnable test : tests) {
146                test.run();
147            }
148    
149            for (Throwable failure : failures) {
150               if (failure != null) {
151                   log.error("Caught on " + getEndpointUri() + " Exception: " + failure, failure);
152                   fail("Failed due to caught exception: " + failure);
153               }
154            }
155        }
156    
157        /**
158         * Specifies the expected number of message exchanges that should be received by this endpoint
159         *
160         * @param expectedCount the number of message exchanges that should be expected by this endpoint
161         */
162        public void expectedMessageCount(int expectedCount) {
163            this.expectedCount = expectedCount;
164            if (expectedCount <= 0) {
165                latch = null;
166            }
167            else {
168                latch = new CountDownLatch(expectedCount);
169            }
170        }
171    
172        /**
173         * Specifies the minimum number of expected message exchanges that should be received by this endpoint
174         *
175         * @param expectedCount the number of message exchanges that should be expected by this endpoint
176         */
177        public void expectedMinimumMessageCount(int expectedCount) {
178            this.expectedMinimumCount = expectedCount;
179            if (expectedCount <= 0) {
180                latch = null;
181            }
182            else {
183                latch = new CountDownLatch(expectedMinimumCount);
184            }
185        }
186    
187        /**
188         * Adds an expectation that the given body values are received by this endpoint
189         */
190        public void expectedBodiesReceived(final List bodies) {
191            expectedMessageCount(bodies.size());
192    
193            expects(new Runnable() {
194                public void run() {
195                    int counter = 0;
196                    for (Object expectedBody : bodies) {
197                        Exchange exchange = getReceivedExchanges().get(counter++);
198                        assertTrue("No exchange received for counter: " + counter, exchange != null);
199    
200                        Message in = exchange.getIn();
201    
202                        Object actualBody = (expectedBody != null)
203                                ? in.getBody(expectedBody.getClass()) : in.getBody();
204    
205                        assertEquals("Body of message: " + counter, expectedBody, actualBody);
206    
207                        log.debug(getEndpointUri() + " >>>> message: " + counter + " with body: " + actualBody);
208                    }
209                }
210            });
211        }
212    
213        /**
214         * Adds an expectation that the given body values are received by this endpoint
215         */
216        public void expectedBodiesReceived(Object... bodies) {
217            List bodyList = new ArrayList();
218            for (Object body : bodies) {
219                bodyList.add(body);
220            }
221            expectedBodiesReceived(bodyList);
222        }
223    
224    
225        /**
226         * Adds the expection which will be invoked when enough messages are received
227         */
228        public void expects(Runnable runnable) {
229            tests.add(runnable);
230        }
231    
232        /**
233         * Adds an assertion to the given message index
234         *
235         * @param messageIndex the number of the message
236         * @return the assertion clause
237         */
238        public AssertionClause message(final int messageIndex) {
239            AssertionClause clause = new AssertionClause() {
240                public void run() {
241                    applyAssertionOn(MockEndpoint.this, messageIndex, assertExchangeReceived(messageIndex));
242                }
243            };
244            expects(clause);
245            return clause;
246        }
247    
248        /**
249         * Adds an assertion to all the received messages
250         *
251         * @return the assertion clause
252         */
253        public AssertionClause allMessages() {
254            AssertionClause clause = new AssertionClause() {
255                public void run() {
256                    List<Exchange> list = getReceivedExchanges();
257                    int index = 0;
258                    for (Exchange exchange : list) {
259                        applyAssertionOn(MockEndpoint.this, index++, exchange);
260                    }
261                }
262            };
263            expects(clause);
264            return clause;
265        }
266    
267        /**
268         * Asserts that the given index of message is received (starting at zero)
269         */
270        public Exchange assertExchangeReceived(int index) {
271            int count = getReceivedCounter();
272            assertTrue("Not enough messages received. Was: " + count, count > index);
273            return getReceivedExchanges().get(index);
274        }
275    
276        // Properties
277        //-------------------------------------------------------------------------
278        public List<Throwable> getFailures() {
279            return failures;
280        }
281    
282        public int getReceivedCounter() {
283            return getReceivedExchanges().size();
284        }
285    
286        public List<Exchange> getReceivedExchanges() {
287            return receivedExchanges;
288        }
289    
290        public int getExpectedCount() {
291            return expectedCount;
292        }
293    
294        public long getSleepForEmptyTest() {
295            return sleepForEmptyTest;
296        }
297    
298        /**
299         * Allows a sleep to be specified to wait to check that this endpoint really is empty when
300         * {@link #expectedMessageCount(int)} is called with zero
301         *
302         * @param sleepForEmptyTest the milliseconds to sleep for to determine that this endpoint really is empty
303         */
304        public void setSleepForEmptyTest(long sleepForEmptyTest) {
305            this.sleepForEmptyTest = sleepForEmptyTest;
306        }
307    
308        // Implementation methods
309        //-------------------------------------------------------------------------
310        protected synchronized void onExchange(Exchange exchange) {
311            try {
312                log.debug(getEndpointUri() + " >>>> " + exchange);
313    
314                receivedExchanges.add(exchange);
315    
316                Processor processor = processors.get(getReceivedCounter());
317                if (processor != null) {
318                    processor.process(exchange);
319                }
320    
321                if (latch != null) {
322                    latch.countDown();
323                }
324            }
325            catch (Exception e) {
326                failures.add(e);
327            }
328        }
329    
330        protected void assertEquals(String message, Object expectedValue, Object actualValue) {
331            if (!ObjectHelper.equals(expectedValue, actualValue)) {
332                fail(message + ". Expected: <" + expectedValue + "> but was: <" + actualValue + ">");
333            }
334        }
335    
336        protected void assertTrue(String message, boolean predicate) {
337            if (!predicate) {
338                fail(message);
339            }
340        }
341    
342        protected void fail(Object message) {
343            throw new AssertionError(getEndpointUri() + " " + message);
344        }
345    
346            public int getExpectedMinimumCount() {
347                    return expectedMinimumCount;
348            }
349    
350            public void await() throws InterruptedException {
351                    if( latch!=null ) {
352                            latch.await();
353                    }
354            }
355    
356            public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
357                    if( latch!=null ) {
358                            return latch.await(timeout, unit);
359                    }
360                    return true;
361            }
362            
363            public boolean isSingleton() {
364                    return true;
365            }
366            
367    }