001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel;
018    
019    import java.util.HashMap;
020    import java.util.Map;
021    
022    import org.apache.camel.impl.ServiceSupport;
023    import org.apache.camel.util.ObjectHelper;
024    import org.apache.camel.util.ProducerCache;
025    
026    /**
027     * A client helper object (named like Spring's TransactionTemplate & JmsTemplate
028     * et al) for working with Camel and sending {@link Message} instances in an
029     * {@link Exchange} to an {@link Endpoint}.
030     * 
031     * @version $Revision: 563607 $
032     */
033    public class CamelTemplate<E extends Exchange> extends ServiceSupport implements ProducerTemplate<E> {
034        private CamelContext context;
035        private ProducerCache<E> producerCache = new ProducerCache<E>();
036        private boolean useEndpointCache = true;
037        private Map<String, Endpoint<E>> endpointCache = new HashMap<String, Endpoint<E>>();
038        private Endpoint<E> defaultEndpoint;
039    
040        public CamelTemplate(CamelContext context) {
041            this.context = context;
042        }
043    
044        public CamelTemplate(CamelContext context, Endpoint defaultEndpoint) {
045            this(context);
046            this.defaultEndpoint = defaultEndpoint;
047        }
048    
049        /**
050         * Sends the exchange to the given endpoint
051         * 
052         * @param endpointUri the endpoint URI to send the exchange to
053         * @param exchange the exchange to send
054         */
055        public E send(String endpointUri, E exchange) {
056            Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
057            send(endpoint, exchange);
058            return exchange;
059        }
060    
061        /**
062         * Sends an exchange to an endpoint using a supplied
063         * 
064         * @{link Processor} to populate the exchange
065         * 
066         * @param endpointUri the endpoint URI to send the exchange to
067         * @param processor the transformer used to populate the new exchange
068         */
069        public E send(String endpointUri, Processor processor) {
070            Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
071            return send(endpoint, processor);
072        }
073    
074        /**
075         * Sends the exchange to the given endpoint
076         * 
077         * @param endpoint the endpoint to send the exchange to
078         * @param exchange the exchange to send
079         */
080        public E send(Endpoint<E> endpoint, E exchange) {
081            E convertedExchange = endpoint.toExchangeType(exchange);
082            producerCache.send(endpoint, convertedExchange);
083            return exchange;
084        }
085    
086        /**
087         * Sends an exchange to an endpoint using a supplied
088         * 
089         * @{link Processor} to populate the exchange
090         * 
091         * @param endpoint the endpoint to send the exchange to
092         * @param processor the transformer used to populate the new exchange
093         */
094        public E send(Endpoint<E> endpoint, Processor processor) {
095            return producerCache.send(endpoint, processor);
096        }
097    
098        /**
099         * Send the body to an endpoint
100         * 
101         * @param endpoint
102         * @param body = the payload
103         * @return the result
104         */
105        public Object sendBody(Endpoint<E> endpoint, final Object body) {
106            E result = send(endpoint, new Processor() {
107                public void process(Exchange exchange) {
108                    Message in = exchange.getIn();
109                    in.setBody(body);
110                }
111            });
112            return extractResultBody(result);
113        }
114    
115        /**
116         * Send the body to an endpoint
117         * 
118         * @param endpointUri
119         * @param body = the payload
120         * @return the result
121         */
122        public Object sendBody(String endpointUri, final Object body) {
123            E result = send(endpointUri, new Processor() {
124                public void process(Exchange exchange) {
125                    Message in = exchange.getIn();
126                    in.setBody(body);
127                }
128            });
129            return extractResultBody(result);
130        }
131    
132        /**
133         * Sends the body to an endpoint with a specified header and header value
134         * 
135         * @param endpointUri the endpoint URI to send to
136         * @param body the payload send
137         * @param header the header name
138         * @param headerValue the header value
139         * @return the result
140         */
141        public Object sendBodyAndHeader(String endpointUri, final Object body, final String header,
142                                        final Object headerValue) {
143            return sendBodyAndHeader(resolveMandatoryEndpoint(endpointUri), body, header, headerValue);
144        }
145    
146        /**
147         * Sends the body to an endpoint with a specified header and header value
148         * 
149         * @param endpoint the Endpoint to send to
150         * @param body the payload send
151         * @param header the header name
152         * @param headerValue the header value
153         * @return the result
154         */
155        public Object sendBodyAndHeader(Endpoint endpoint, final Object body, final String header,
156                                        final Object headerValue) {
157            E result = send(endpoint, new Processor() {
158                public void process(Exchange exchange) {
159                    Message in = exchange.getIn();
160                    in.setHeader(header, headerValue);
161                    in.setBody(body);
162                }
163            });
164            return extractResultBody(result);
165        }
166    
167        /**
168         * Sends the body to an endpoint with the specified headers and header
169         * values
170         * 
171         * @param endpointUri the endpoint URI to send to
172         * @param body the payload send
173         * @return the result
174         */
175        public Object sendBodyAndHeaders(String endpointUri, final Object body, final Map<String, Object> headers) {
176            return sendBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), body, headers);
177        }
178    
179        /**
180         * Sends the body to an endpoint with the specified headers and header
181         * values
182         * 
183         * @param endpoint the endpoint URI to send to
184         * @param body the payload send
185         * @return the result
186         */
187        public Object sendBodyAndHeaders(Endpoint endpoint, final Object body, final Map<String, Object> headers) {
188            E result = send(endpoint, new Processor() {
189                public void process(Exchange exchange) {
190                    Message in = exchange.getIn();
191                    for (Map.Entry<String, Object> header : headers.entrySet()) {
192                        in.setHeader(header.getKey(), header.getValue());
193                    }
194                    in.setBody(body);
195                }
196            });
197            return extractResultBody(result);
198        }
199    
200        // Methods using the default endpoint
201        // -----------------------------------------------------------------------
202    
203        /**
204         * Sends the body to the default endpoint and returns the result content
205         * 
206         * @param body the body to send
207         * @return the returned message body
208         */
209        public Object sendBody(Object body) {
210            return sendBody(getMandatoryDefaultEndpoint(), body);
211        }
212    
213        /**
214         * Sends the exchange to the default endpoint
215         * 
216         * @param exchange the exchange to send
217         */
218        public E send(E exchange) {
219            return send(getMandatoryDefaultEndpoint(), exchange);
220        }
221    
222        /**
223         * Sends an exchange to the default endpoint using a supplied
224         * 
225         * @{link Processor} to populate the exchange
226         * 
227         * @param processor the transformer used to populate the new exchange
228         */
229        public E send(Processor processor) {
230            return send(getMandatoryDefaultEndpoint(), processor);
231        }
232    
233        public Object sendBodyAndHeader(Object body, String header, Object headerValue) {
234            return sendBodyAndHeader(getMandatoryDefaultEndpoint(), body, header, headerValue);
235        }
236    
237        public Object sendBodyAndHeaders(Object body, Map<String, Object> headers) {
238            return sendBodyAndHeaders(getMandatoryDefaultEndpoint(), body, headers);
239        }
240    
241        // Properties
242        // -----------------------------------------------------------------------
243        public Producer<E> getProducer(Endpoint<E> endpoint) {
244            return producerCache.getProducer(endpoint);
245        }
246    
247        public CamelContext getContext() {
248            return context;
249        }
250    
251        public Endpoint<E> getDefaultEndpoint() {
252            return defaultEndpoint;
253        }
254    
255        public void setDefaultEndpoint(Endpoint<E> defaultEndpoint) {
256            this.defaultEndpoint = defaultEndpoint;
257        }
258    
259        /**
260         * Sets the default endpoint to use if none is specified
261         */
262        public void setDefaultEndpointUri(String endpointUri) {
263            setDefaultEndpoint(getContext().getEndpoint(endpointUri));
264        }
265    
266        public boolean isUseEndpointCache() {
267            return useEndpointCache;
268        }
269    
270        public void setUseEndpointCache(boolean useEndpointCache) {
271            this.useEndpointCache = useEndpointCache;
272        }
273    
274        // Implementation methods
275        // -----------------------------------------------------------------------
276    
277        protected Endpoint resolveMandatoryEndpoint(String endpointUri) {
278            Endpoint endpoint = null;
279    
280            if (isUseEndpointCache()) {
281                synchronized (endpointCache) {
282                    endpoint = endpointCache.get(endpointUri);
283                    if (endpoint == null) {
284                        endpoint = context.getEndpoint(endpointUri);
285                        if (endpoint != null) {
286                            endpointCache.put(endpointUri, endpoint);
287                        }
288                    }
289                }
290            } else {
291                endpoint = context.getEndpoint(endpointUri);
292            }
293            if (endpoint == null) {
294                throw new NoSuchEndpointException(endpointUri);
295            }
296            return endpoint;
297        }
298    
299        protected Endpoint<E> getMandatoryDefaultEndpoint() {
300            Endpoint<E> answer = getDefaultEndpoint();
301            ObjectHelper.notNull(answer, "defaultEndpoint");
302            return answer;
303        }
304    
305        protected void doStart() throws Exception {
306            producerCache.start();
307        }
308    
309        protected void doStop() throws Exception {
310            producerCache.stop();
311        }
312    
313        protected Object extractResultBody(E result) {
314            Object answer = null;
315            if (result != null) {
316                answer = result.getOut().getBody();
317                if (answer == null) {
318                    answer = result.getIn().getBody();
319                }
320            }
321            return answer;
322        }
323    }