001    /**
002     *
003     * Licensed to the Apache Software Foundation (ASF) under one or more
004     * contributor license agreements.  See the NOTICE file distributed with
005     * this work for additional information regarding copyright ownership.
006     * The ASF licenses this file to You under the Apache License, Version 2.0
007     * (the "License"); you may not use this file except in compliance with
008     * the License.  You may obtain a copy of the License at
009     *
010     * http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.camel;
019    
020    import org.apache.camel.impl.ServiceSupport;
021    import org.apache.camel.util.ObjectHelper;
022    import org.apache.camel.util.ProducerCache;
023    
024    import java.util.HashMap;
025    import java.util.Map;
026    
027    /**
028     * A client helper object (named like Spring's TransactionTemplate & JmsTemplate et al)
029     * for working with Camel and sending {@link Message} instances in an {@link Exchange}
030     * to an {@link Endpoint}.
031     *
032     * @version $Revision: 537937 $
033     */
034    public class CamelTemplate<E extends Exchange> extends ServiceSupport {
035        private CamelContext context;
036        private ProducerCache<E> producerCache = new ProducerCache<E>();
037        private boolean useEndpointCache = true;
038        private Map<String, Endpoint<E>> endpointCache = new HashMap<String, Endpoint<E>>();
039        private Endpoint<E> defaultEndpoint;
040    
041    
042        public CamelTemplate(CamelContext context) {
043            this.context = context;
044        }
045    
046        public CamelTemplate(CamelContext context, Endpoint defaultEndpoint) {
047            this(context);
048            this.defaultEndpoint = defaultEndpoint;
049        }
050    
051        /**
052         * Sends the exchange to the given endpoint
053         *
054         * @param endpointUri the endpoint URI to send the exchange to
055         * @param exchange    the exchange to send
056         */
057        public E send(String endpointUri, E exchange) {
058            Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
059            send(endpoint, exchange);
060            return exchange;
061        }
062    
063        /**
064         * Sends an exchange to an endpoint using a supplied @{link Processor} to populate the exchange
065         *
066         * @param endpointUri the endpoint URI to send the exchange to
067         * @param processor   the transformer used to populate the new exchange
068         */
069        public E send(String endpointUri, Processor processor) {
070            Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
071            return send(endpoint, processor);
072        }
073    
074        /**
075         * Sends the exchange to the given endpoint
076         *
077         * @param endpoint the endpoint to send the exchange to
078         * @param exchange the exchange to send
079         */
080        public E send(Endpoint<E> endpoint, E exchange) {
081            E convertedExchange = endpoint.toExchangeType(exchange);
082            producerCache.send(endpoint, convertedExchange);
083            return exchange;
084        }
085    
086        /**
087         * Sends an exchange to an endpoint using a supplied @{link Processor} to populate the exchange
088         *
089         * @param endpoint  the endpoint to send the exchange to
090         * @param processor the transformer used to populate the new exchange
091         */
092        public E send(Endpoint<E> endpoint, Processor processor) {
093            return producerCache.send(endpoint, processor);
094        }
095    
096        /**
097         * Send the body to an endpoint
098         *
099         * @param endpoint
100         * @param body     = the payload
101         * @return the result
102         */
103        public Object sendBody(Endpoint<E> endpoint, final Object body) {
104            E result = send(endpoint, new Processor() {
105                public void process(Exchange exchange) {
106                    Message in = exchange.getIn();
107                    in.setBody(body);
108                }
109            });
110            return extractResultBody(result);
111        }
112    
113        /**
114         * Send the body to an endpoint
115         *
116         * @param endpointUri
117         * @param body        = the payload
118         * @return the result
119         */
120        public Object sendBody(String endpointUri, final Object body) {
121            E result = send(endpointUri, new Processor() {
122                public void process(Exchange exchange) {
123                    Message in = exchange.getIn();
124                    in.setBody(body);
125                }
126            });
127            return extractResultBody(result);
128        }
129    
130        /**
131         * Sends the body to an endpoint with a specified header and header value
132         *
133         * @param endpointUri the endpoint URI to send to
134         * @param body        the payload send
135         * @param header      the header name
136         * @param headerValue the header value
137         * @return the result
138         */
139        public Object sendBody(String endpointUri, final Object body, final String header, final Object headerValue) {
140            E result = send(endpointUri, new Processor() {
141                public void process(Exchange exchange) {
142                    Message in = exchange.getIn();
143                    in.setHeader(header, headerValue);
144                    in.setBody(body);
145                }
146            });
147            return extractResultBody(result);
148        }
149        
150        /**
151         * Sends the body to an endpoint with the specified headers and header values
152         *
153         * @param endpointUri the endpoint URI to send to
154         * @param body        the payload send
155         * @return the result
156         */
157        public Object sendBody(String endpointUri, final Object body, final Map<String, Object> headers) {
158            E result = send(endpointUri, new Processor() {
159                public void process(Exchange exchange) {
160                    Message in = exchange.getIn();
161                    for (Map.Entry<String, Object> header : headers.entrySet()) {
162                        in.setHeader(header.getKey(), header.getValue());
163                                    }
164                    in.setBody(body);
165                }
166            });
167            return extractResultBody(result);
168        }
169    
170        // Methods using the default endpoint
171        //-----------------------------------------------------------------------
172    
173        /**
174         * Sends the body to the default endpoint and returns the result content
175         *
176         * @param body the body to send
177         * @return the returned message body
178         */
179        public Object sendBody(Object body) {
180            return sendBody(getMandatoryDefaultEndpoint(), body);
181        }
182    
183        /**
184         * Sends the exchange to the default endpoint
185         *
186         * @param exchange the exchange to send
187         */
188        public E send(E exchange) {
189            return send(getMandatoryDefaultEndpoint(), exchange);
190        }
191    
192        /**
193         * Sends an exchange to the default endpoint
194         * using a supplied @{link Processor} to populate the exchange
195         *
196         * @param processor the transformer used to populate the new exchange
197         */
198        public E send(Processor processor) {
199            return send(getMandatoryDefaultEndpoint(), processor);
200        }
201    
202    
203        // Properties
204        //-----------------------------------------------------------------------
205        public Producer<E> getProducer(Endpoint<E> endpoint) {
206            return producerCache.getProducer(endpoint);
207        }
208    
209        public CamelContext getContext() {
210            return context;
211        }
212    
213        public Endpoint<E> getDefaultEndpoint() {
214            return defaultEndpoint;
215        }
216    
217        public void setDefaultEndpoint(Endpoint<E> defaultEndpoint) {
218            this.defaultEndpoint = defaultEndpoint;
219        }
220    
221        /**
222         * Sets the default endpoint to use if none is specified
223         */
224        public void setDefaultEndpointUri(String endpointUri) {
225            setDefaultEndpoint(getContext().getEndpoint(endpointUri));
226        }
227    
228        public boolean isUseEndpointCache() {
229            return useEndpointCache;
230        }
231    
232        public void setUseEndpointCache(boolean useEndpointCache) {
233            this.useEndpointCache = useEndpointCache;
234        }
235    
236        // Implementation methods
237        //-----------------------------------------------------------------------
238    
239        protected Endpoint resolveMandatoryEndpoint(String endpointUri) {
240            Endpoint endpoint = null;
241    
242            if (isUseEndpointCache()) {
243                synchronized (endpointCache) {
244                    endpoint = endpointCache.get(endpointUri);
245                    if (endpoint == null) {
246                        endpoint = context.getEndpoint(endpointUri);
247                        if (endpoint != null) {
248                            endpointCache.put(endpointUri, endpoint);
249                        }
250                    }
251                }
252            }
253            else {
254                endpoint = context.getEndpoint(endpointUri);
255            }
256            if (endpoint == null) {
257                throw new NoSuchEndpointException(endpointUri);
258            }
259            return endpoint;
260        }
261    
262        protected Endpoint<E> getMandatoryDefaultEndpoint() {
263            Endpoint<E> answer = getDefaultEndpoint();
264            ObjectHelper.notNull(answer, "defaultEndpoint");
265            return answer;
266        }
267    
268        protected void doStart() throws Exception {
269            producerCache.start();
270        }
271    
272        protected void doStop() throws Exception {
273            producerCache.stop();
274        }
275    
276        protected Object extractResultBody(E result) {
277            return result != null ? result.getOut().getBody() : null;
278        }
279    }