001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.impl;
018    
019    import java.util.Map;
020    import java.util.concurrent.Callable;
021    import java.util.concurrent.ExecutorService;
022    import java.util.concurrent.Future;
023    import java.util.concurrent.TimeUnit;
024    import java.util.concurrent.TimeoutException;
025    
026    import org.apache.camel.CamelContext;
027    import org.apache.camel.Endpoint;
028    import org.apache.camel.Exchange;
029    import org.apache.camel.ExchangePattern;
030    import org.apache.camel.Message;
031    import org.apache.camel.NoSuchEndpointException;
032    import org.apache.camel.Processor;
033    import org.apache.camel.ProducerTemplate;
034    import org.apache.camel.util.CamelContextHelper;
035    import org.apache.camel.util.ExchangeHelper;
036    import org.apache.camel.util.ObjectHelper;
037    import org.apache.camel.util.concurrent.ExecutorServiceHelper;
038    
039    /**
040     * A client helper object (named like Spring's TransactionTemplate & JmsTemplate
041     * et al) for working with Camel and sending {@link org.apache.camel.Message} instances in an
042     * {@link org.apache.camel.Exchange} to an {@link org.apache.camel.Endpoint}.
043     *
044     * @version $Revision: 794648 $
045     */
046    public class DefaultProducerTemplate extends ServiceSupport implements ProducerTemplate {
047        private static final int DEFAULT_THREADPOOL_SIZE = 5;
048        private final CamelContext context;
049        private final ProducerCache producerCache;
050        private Endpoint defaultEndpoint;
051        private ExecutorService executor;
052    
053        public DefaultProducerTemplate(CamelContext context) {
054            this.context = context;
055            this.executor = ExecutorServiceHelper.newScheduledThreadPool(DEFAULT_THREADPOOL_SIZE, "ProducerTemplate", true);
056            this.producerCache = new ProducerCache(context.getProducerServicePool());
057        }
058    
059        public DefaultProducerTemplate(CamelContext context, ExecutorService executor) {
060            this.context = context;
061            this.executor = executor;
062            this.producerCache = new ProducerCache(context.getProducerServicePool());
063        }
064    
065        public DefaultProducerTemplate(CamelContext context, Endpoint defaultEndpoint) {
066            this(context);
067            this.defaultEndpoint = defaultEndpoint;
068        }
069    
070        public static DefaultProducerTemplate newInstance(CamelContext camelContext, String defaultEndpointUri) {
071            Endpoint endpoint = CamelContextHelper.getMandatoryEndpoint(camelContext, defaultEndpointUri);
072            return new DefaultProducerTemplate(camelContext, endpoint);
073        }
074    
075        public Exchange send(String endpointUri, Exchange exchange) {
076            Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
077            return send(endpoint, exchange);
078        }
079    
080        public Exchange send(String endpointUri, Processor processor) {
081            Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
082            return send(endpoint, processor);
083        }
084    
085        public Exchange send(String endpointUri, ExchangePattern pattern, Processor processor) {
086            Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
087            return send(endpoint, pattern, processor);
088        }
089    
090        public Exchange send(Endpoint endpoint, Exchange exchange) {
091            producerCache.send(endpoint, exchange);
092            return exchange;
093        }
094    
095        public Exchange send(Endpoint endpoint, Processor processor) {
096            return producerCache.send(endpoint, processor);
097        }
098    
099        public Exchange send(Endpoint endpoint, ExchangePattern pattern, Processor processor) {
100            return producerCache.send(endpoint, pattern, processor);
101        }
102    
103        public Object sendBody(Endpoint endpoint, ExchangePattern pattern, Object body) {
104            Exchange result = send(endpoint, pattern, createSetBodyProcessor(body));
105            return extractResultBody(result, pattern);
106        }
107    
108        public void sendBody(Endpoint endpoint, Object body) {
109            Exchange result = send(endpoint, createSetBodyProcessor(body));
110            // must invoke extract result body in case of exception to be rethrown
111            extractResultBody(result);
112        }
113    
114        public void sendBody(String endpointUri, Object body) {
115            Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
116            sendBody(endpoint, body);
117        }
118    
119        public Object sendBody(String endpointUri, ExchangePattern pattern, Object body) {
120            Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
121            Object result = sendBody(endpoint, pattern, body);
122            if (pattern.isOutCapable()) {
123                return result;
124            } else {
125                // return null if not OUT capable
126                return null;
127            }
128        }
129    
130        public void sendBodyAndHeader(String endpointUri, final Object body, final String header, final Object headerValue) {
131            sendBodyAndHeader(resolveMandatoryEndpoint(endpointUri), body, header, headerValue);
132        }
133    
134        public void sendBodyAndHeader(Endpoint endpoint, final Object body, final String header, final Object headerValue) {
135            Exchange result = send(endpoint, createBodyAndHeaderProcessor(body, header, headerValue));
136            // must invoke extract result body in case of exception to be rethrown
137            extractResultBody(result);
138        }
139    
140        public Object sendBodyAndHeader(Endpoint endpoint, ExchangePattern pattern, final Object body,
141                                        final String header, final Object headerValue) {
142            Exchange exchange = send(endpoint, pattern, createBodyAndHeaderProcessor(body, header, headerValue));
143            Object result = extractResultBody(exchange, pattern);
144            if (pattern.isOutCapable()) {
145                return result;
146            } else {
147                // return null if not OUT capable
148                return null;
149            }
150        }
151    
152        public Object sendBodyAndHeader(String endpoint, ExchangePattern pattern, final Object body,
153                                        final String header, final Object headerValue) {
154            Exchange exchange = send(endpoint, pattern, createBodyAndHeaderProcessor(body, header, headerValue));
155            Object result = extractResultBody(exchange, pattern);
156            if (pattern.isOutCapable()) {
157                return result;
158            } else {
159                // return null if not OUT capable
160                return null;
161            }
162        }
163    
164        public void sendBodyAndProperty(String endpointUri, final Object body,
165                                          final String property, final Object propertyValue) {
166            sendBodyAndProperty(resolveMandatoryEndpoint(endpointUri), body, property, propertyValue);
167        }    
168        
169        public void sendBodyAndProperty(Endpoint endpoint, final Object body,
170                                          final String property, final Object propertyValue) {
171            Exchange result = send(endpoint, createBodyAndPropertyProcessor(body, property, propertyValue));
172            // must invoke extract result body in case of exception to be rethrown
173            extractResultBody(result);
174        }
175        
176        public Object sendBodyAndProperty(Endpoint endpoint, ExchangePattern pattern, final Object body,
177                                          final String property, final Object propertyValue) {
178            Exchange exchange = send(endpoint, pattern, createBodyAndPropertyProcessor(body, property, propertyValue));
179            Object result = extractResultBody(exchange, pattern);
180            if (pattern.isOutCapable()) {
181                return result;
182            } else {
183                // return null if not OUT capable
184                return null;
185            }
186        }
187    
188        public Object sendBodyAndProperty(String endpoint, ExchangePattern pattern, final Object body,
189                                          final String property, final Object propertyValue) {
190            Exchange exchange = send(endpoint, pattern, createBodyAndPropertyProcessor(body, property, propertyValue));
191            Object result = extractResultBody(exchange, pattern);
192            if (pattern.isOutCapable()) {
193                return result;
194            } else {
195                // return null if not OUT capable
196                return null;
197            }
198        }
199        
200        public void sendBodyAndHeaders(String endpointUri, final Object body, final Map<String, Object> headers) {
201            sendBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), body, headers);
202        }
203    
204        public void sendBodyAndHeaders(Endpoint endpoint, final Object body, final Map<String, Object> headers) {
205            Exchange result = send(endpoint, new Processor() {
206                public void process(Exchange exchange) {
207                    Message in = exchange.getIn();
208                    for (Map.Entry<String, Object> header : headers.entrySet()) {
209                        in.setHeader(header.getKey(), header.getValue());
210                    }
211                    in.setBody(body);
212                }
213            });
214            // must invoke extract result body in case of exception to be rethrown
215            extractResultBody(result);
216        }
217    
218        public Object sendBodyAndHeaders(String endpointUri, ExchangePattern pattern, Object body, Map<String, Object> headers) {
219            return sendBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), pattern, body, headers);
220        }
221    
222        public Object sendBodyAndHeaders(Endpoint endpoint, ExchangePattern pattern, final Object body, final Map<String, Object> headers) {
223            Exchange exchange = send(endpoint, pattern, new Processor() {
224                public void process(Exchange exchange) throws Exception {
225                    Message in = exchange.getIn();
226                    for (Map.Entry<String, Object> header : headers.entrySet()) {
227                        in.setHeader(header.getKey(), header.getValue());
228                    }
229                    in.setBody(body);
230                }
231            });
232            Object result = extractResultBody(exchange, pattern);
233            if (pattern.isOutCapable()) {
234                return result;
235            } else {
236                // return null if not OUT capable
237                return null;
238            }
239        }
240    
241        // Methods using an InOut ExchangePattern
242        // -----------------------------------------------------------------------
243    
244        public Exchange request(Endpoint endpoint, Processor processor) {
245            return send(endpoint, ExchangePattern.InOut, processor);
246        }
247    
248        public Object requestBody(Object body) {
249            return sendBody(getMandatoryDefaultEndpoint(), ExchangePattern.InOut, body);
250        }
251    
252        public Object requestBody(Endpoint endpoint, Object body) {
253            return sendBody(endpoint, ExchangePattern.InOut, body);
254        }
255    
256        public Object requestBodyAndHeader(Endpoint endpoint, Object body, String header, Object headerValue) {
257            return sendBodyAndHeader(endpoint, ExchangePattern.InOut, body, header, headerValue);
258        }
259    
260        public Exchange request(String endpoint, Processor processor) {
261            return send(endpoint, ExchangePattern.InOut, processor);
262        }
263    
264        public Object requestBody(String endpoint, Object body) {
265            return sendBody(endpoint, ExchangePattern.InOut, body);
266        }
267    
268        public Object requestBodyAndHeader(String endpoint, Object body, String header, Object headerValue) {
269            return sendBodyAndHeader(endpoint, ExchangePattern.InOut, body, header, headerValue);
270        }
271    
272        public Object requestBodyAndHeaders(String endpointUri, Object body, Map<String, Object> headers) {
273            return requestBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), body, headers);
274        }
275    
276        public Object requestBodyAndHeaders(Endpoint endpoint, final Object body, final Map<String, Object> headers) {
277            return sendBodyAndHeaders(endpoint, ExchangePattern.InOut, body, headers);
278        }
279    
280        public <T> T requestBody(Object body, Class<T> type) {
281            Object answer = requestBody(body);
282            return context.getTypeConverter().convertTo(type, answer);
283        }
284    
285        public <T> T requestBody(Endpoint endpoint, Object body, Class<T> type) {
286            Object answer = requestBody(endpoint, body);
287            return context.getTypeConverter().convertTo(type, answer);
288        }
289    
290        public <T> T requestBody(String endpointUri, Object body, Class<T> type) {
291            Object answer = requestBody(endpointUri, body);
292            return context.getTypeConverter().convertTo(type, answer);
293        }
294    
295        public <T> T requestBodyAndHeader(Endpoint endpoint, Object body, String header, Object headerValue, Class<T> type) {
296            Object answer = requestBodyAndHeader(endpoint, body, header, headerValue);
297            return context.getTypeConverter().convertTo(type, answer);
298        }
299    
300        public <T> T requestBodyAndHeader(String endpointUri, Object body, String header, Object headerValue, Class<T> type) {
301            Object answer = requestBodyAndHeader(endpointUri, body, header, headerValue);
302            return context.getTypeConverter().convertTo(type, answer);
303        }
304    
305        public <T> T requestBodyAndHeaders(String endpointUri, Object body, Map<String, Object> headers, Class<T> type) {
306            Object answer = requestBodyAndHeaders(endpointUri, body, headers);
307            return context.getTypeConverter().convertTo(type, answer);
308        }
309    
310        public <T> T requestBodyAndHeaders(Endpoint endpoint, Object body, Map<String, Object> headers, Class<T> type) {
311            Object answer = requestBodyAndHeaders(endpoint, body, headers);
312            return context.getTypeConverter().convertTo(type, answer);
313        }
314    
315        // Methods using the default endpoint
316        // -----------------------------------------------------------------------
317    
318        public void sendBody(Object body) {
319            sendBody(getMandatoryDefaultEndpoint(), body);
320        }
321    
322        public Exchange send(Exchange exchange) {
323            return send(getMandatoryDefaultEndpoint(), exchange);
324        }
325    
326        public Exchange send(Processor processor) {
327            return send(getMandatoryDefaultEndpoint(), processor);
328        }
329    
330        public void sendBodyAndHeader(Object body, String header, Object headerValue) {
331            sendBodyAndHeader(getMandatoryDefaultEndpoint(), body, header, headerValue);
332        }
333    
334        public void sendBodyAndProperty(Object body, String property, Object propertyValue) {
335            sendBodyAndProperty(getMandatoryDefaultEndpoint(), body, property, propertyValue);
336        }    
337        
338        public void sendBodyAndHeaders(Object body, Map<String, Object> headers) {
339            sendBodyAndHeaders(getMandatoryDefaultEndpoint(), body, headers);
340        }
341    
342        // Properties
343        // -----------------------------------------------------------------------
344        public CamelContext getContext() {
345            return context;
346        }
347    
348        public Endpoint getDefaultEndpoint() {
349            return defaultEndpoint;
350        }
351    
352        public void setDefaultEndpoint(Endpoint defaultEndpoint) {
353            this.defaultEndpoint = defaultEndpoint;
354        }
355    
356        /**
357         * Sets the default endpoint to use if none is specified
358         */
359        public void setDefaultEndpointUri(String endpointUri) {
360            setDefaultEndpoint(getContext().getEndpoint(endpointUri));
361        }
362    
363        public <T extends Endpoint> T getResolvedEndpoint(String endpointUri, Class<T> expectedClass) {
364            return context.getEndpoint(endpointUri, expectedClass);
365        }
366    
367        // Implementation methods
368        // -----------------------------------------------------------------------
369    
370        protected Processor createBodyAndHeaderProcessor(final Object body, final String header, final Object headerValue) {
371            return new Processor() {
372                public void process(Exchange exchange) {
373                    Message in = exchange.getIn();
374                    in.setHeader(header, headerValue);
375                    in.setBody(body);
376                }
377            };
378        }
379    
380        protected Processor createBodyAndPropertyProcessor(final Object body, final String property, final Object propertyValue) {
381            return new Processor() {
382                public void process(Exchange exchange) {
383                    exchange.setProperty(property, propertyValue);
384                    
385                    Message in = exchange.getIn();
386                    in.setBody(body);
387                }
388            };
389        }    
390        
391        protected Processor createSetBodyProcessor(final Object body) {
392            return new Processor() {
393                public void process(Exchange exchange) {
394                    Message in = exchange.getIn();
395                    in.setBody(body);
396                }
397            };
398        }
399    
400        protected Endpoint resolveMandatoryEndpoint(String endpointUri) {
401            Endpoint endpoint = context.getEndpoint(endpointUri);
402            if (endpoint == null) {
403                throw new NoSuchEndpointException(endpointUri);
404            }
405            return endpoint;
406        }
407    
408        protected Endpoint getMandatoryDefaultEndpoint() {
409            Endpoint answer = getDefaultEndpoint();
410            ObjectHelper.notNull(answer, "defaultEndpoint");
411            return answer;
412        }
413    
414        protected void doStart() throws Exception {
415            producerCache.start();
416        }
417    
418        protected void doStop() throws Exception {
419            producerCache.stop();
420            if (executor != null) {
421                executor.shutdown();
422            }
423        }
424    
425        protected Object extractResultBody(Exchange result) {
426            return extractResultBody(result, null);
427        }
428    
429        protected Object extractResultBody(Exchange result, ExchangePattern pattern) {
430            return ExchangeHelper.extractResultBody(result, pattern);
431        }
432    
433        public void setExecutorService(ExecutorService executorService) {
434            this.executor = executorService;
435        }
436    
437        public Future<Exchange> asyncSend(final String uri, final Exchange exchange) {
438            Callable<Exchange> task = new Callable<Exchange>() {
439                public Exchange call() throws Exception {
440                    return send(uri, exchange);
441                }
442            };
443    
444            return executor.submit(task);
445        }
446    
447        public Future<Exchange> asyncSend(final String uri, final Processor processor) {
448            Callable<Exchange> task = new Callable<Exchange>() {
449                public Exchange call() throws Exception {
450                    return send(uri, processor);
451                }
452            };
453    
454            return executor.submit(task);
455        }
456    
457        public Future<Object> asyncSendBody(final String uri, final Object body) {
458            Callable<Object> task = new Callable<Object>() {
459                public Object call() throws Exception {
460                    sendBody(uri, body);
461                    // its InOnly, so no body to return
462                    return null;
463                }
464            };
465    
466            return executor.submit(task);
467        }
468    
469        public Future<Object> asyncRequestBody(final String uri, final Object body) {
470            Callable<Object> task = new Callable<Object>() {
471                public Object call() throws Exception {
472                    return requestBody(uri, body);
473                }
474            };
475    
476            return executor.submit(task);
477        }
478    
479        public <T> Future<T> asyncRequestBody(final String uri, final Object body, final Class<T> type) {
480            Callable<T> task = new Callable<T>() {
481                public T call() throws Exception {
482                    return requestBody(uri, body, type);
483                }
484            };
485    
486            return executor.submit(task);
487        }
488    
489        public Future<Object> asyncRequestBodyAndHeader(final String endpointUri, final Object body, final String header, final Object headerValue) {
490            Callable<Object> task = new Callable<Object>() {
491                public Object call() throws Exception {
492                    return requestBodyAndHeader(endpointUri, body, header, headerValue);
493                }
494            };
495    
496            return executor.submit(task);
497        }
498    
499        public <T> Future<T> asyncRequestBodyAndHeader(final String endpointUri, final Object body, final String header, final Object headerValue, final Class<T> type) {
500            Callable<T> task = new Callable<T>() {
501                public T call() throws Exception {
502                    return requestBodyAndHeader(endpointUri, body, header, headerValue, type);
503                }
504            };
505    
506            return executor.submit(task);
507        }
508    
509        public Future<Object> asyncRequestBodyAndHeaders(final String endpointUri, final Object body, final Map<String, Object> headers) {
510            Callable<Object> task = new Callable<Object>() {
511                public Object call() throws Exception {
512                    return requestBodyAndHeaders(endpointUri, body, headers);
513                }
514            };
515    
516            return executor.submit(task);
517        }
518    
519        public <T> Future<T> asyncRequestBodyAndHeaders(final String endpointUri, final Object body, final Map<String, Object> headers, final Class<T> type) {
520            Callable<T> task = new Callable<T>() {
521                public T call() throws Exception {
522                    return requestBodyAndHeaders(endpointUri, body, headers, type);
523                }
524            };
525    
526            return executor.submit(task);
527        }
528    
529        public <T> T extractFutureBody(Future future, Class<T> type) {
530            return ExchangeHelper.extractFutureBody(context, future, type);
531        }
532    
533        public <T> T extractFutureBody(Future future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException {
534            return ExchangeHelper.extractFutureBody(context, future, timeout, unit, type);
535        }
536    
537    }