001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.impl;
018    
019    import org.apache.camel.CamelContext;
020    import org.apache.camel.ConsumerTemplate;
021    import org.apache.camel.Endpoint;
022    import org.apache.camel.Exchange;
023    import org.apache.camel.Message;
024    import org.apache.camel.util.CamelContextHelper;
025    import static org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException;
026    
027    /**
028     * @version $Revision: 769448 $
029     */
030    public class DefaultConsumerTemplate implements ConsumerTemplate {
031    
032        private final CamelContext context;
033        private final ConsumerCache consumerCache = new ConsumerCache();
034    
035        public DefaultConsumerTemplate(CamelContext context) {
036            this.context = context;
037        }
038    
039        public void start() throws Exception {
040            consumerCache.start();
041        }
042    
043        public void stop() throws Exception {
044            consumerCache.stop();
045        }
046    
047        public Exchange receive(String endpointUri) {
048            Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
049            return consumerCache.receive(endpoint);
050        }
051    
052        public Exchange receive(Endpoint endpoinit) {
053            return receive(endpoinit.getEndpointUri());
054        }
055    
056        public Exchange receive(String endpointUri, long timeout) {
057            Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
058            return consumerCache.receive(endpoint, timeout);
059        }
060    
061        public Exchange receive(Endpoint endpoint, long timeout) {
062            return receive(endpoint.getEndpointUri(), timeout);
063        }
064    
065        public Exchange receiveNoWait(String endpointUri) {
066            Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
067            return consumerCache.receiveNoWait(endpoint);
068        }
069    
070        public Exchange receiveNoWait(Endpoint endpoint) {
071            return receiveNoWait(endpoint.getEndpointUri());
072        }
073    
074        public Object receiveBody(String endpointUri) {
075            Exchange exchange = receive(endpointUri);
076            return extractResultBody(exchange);
077        }
078    
079        public Object receiveBody(Endpoint endpoint) {
080            return receiveBody(endpoint.getEndpointUri());
081        }
082    
083        public Object receiveBody(String endpointUri, long timeout) {
084            Exchange exchange = receive(endpointUri, timeout);
085            return extractResultBody(exchange);
086        }
087    
088        public Object receiveBody(Endpoint endpoint, long timeout) {
089            return receiveBody(endpoint.getEndpointUri(), timeout);
090        }
091    
092        public Object receiveBodyNoWait(String endpointUri) {
093            Exchange exchange = receiveNoWait(endpointUri);
094            return extractResultBody(exchange);
095        }
096    
097        public Object receiveBodyNoWait(Endpoint endpoint) {
098            return receiveBodyNoWait(endpoint.getEndpointUri());
099        }
100    
101        public <T> T receiveBody(String endpointUri, Class<T> type) {
102            Object body = receiveBody(endpointUri);
103            return context.getTypeConverter().convertTo(type, body);
104        }
105    
106        public <T> T receiveBody(Endpoint endpoint, Class<T> type) {
107            return receiveBody(endpoint.getEndpointUri(), type);
108        }
109    
110        public <T> T receiveBody(String endpointUri, long timeout, Class<T> type) {
111            Object body = receiveBody(endpointUri, timeout);
112            return context.getTypeConverter().convertTo(type, body);
113        }
114    
115        public <T> T receiveBody(Endpoint endpoint, long timeout, Class<T> type) {
116            return receiveBody(endpoint.getEndpointUri(), timeout, type);
117        }
118    
119        public <T> T receiveBodyNoWait(String endpointUri, Class<T> type) {
120            Object body = receiveBodyNoWait(endpointUri);
121            return context.getTypeConverter().convertTo(type, body);
122        }
123    
124        public <T> T receiveBodyNoWait(Endpoint endpoint, Class<T> type) {
125            return receiveBodyNoWait(endpoint.getEndpointUri(), type);
126        }
127    
128        protected Endpoint resolveMandatoryEndpoint(String endpointUri) {
129            return CamelContextHelper.getMandatoryEndpoint(context, endpointUri);
130        }
131    
132        /**
133         * Extracts the body from the given result.
134         * <p/>
135         * If the exchange pattern is provided it will try to honor it and retrive the body
136         * from either IN or OUT according to the pattern.
137         *
138         * @param result   the result
139         * @return  the result, can be <tt>null</tt>.
140         */
141        protected Object extractResultBody(Exchange result) {
142            Object answer = null;
143            if (result != null) {
144                // rethrow if there was an exception
145                if (result.getException() != null) {
146                    throw wrapRuntimeCamelException(result.getException());
147                }
148    
149                // result could have a fault message
150                if (hasFaultMessage(result)) {
151                    return result.getFault().getBody();
152                }
153    
154                // okay no fault then return the response
155                if (result.hasOut()) {
156                    // use OUT as the response
157                    answer = result.getOut().getBody();
158                } else {
159                    // use IN as the response
160                    answer = result.getIn().getBody();
161                }
162            }
163            return answer;
164        }
165    
166        protected boolean hasFaultMessage(Exchange result) {
167            if (result.hasFault()) {
168                Message faultMessage = result.getFault();
169                if (faultMessage != null) {
170                    Object faultBody = faultMessage.getBody();
171                    if (faultBody != null) {
172                        return true;
173                    }
174                }
175            }
176            return false;
177        }
178    
179    }