001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.impl;
018    
019    import java.util.HashMap;
020    import java.util.Map;
021    import java.util.concurrent.Callable;
022    import java.util.concurrent.ExecutorService;
023    import java.util.concurrent.Future;
024    import java.util.concurrent.TimeUnit;
025    import java.util.concurrent.TimeoutException;
026    
027    import org.apache.camel.CamelContext;
028    import org.apache.camel.Endpoint;
029    import org.apache.camel.Exchange;
030    import org.apache.camel.ExchangePattern;
031    import org.apache.camel.Message;
032    import org.apache.camel.NoSuchEndpointException;
033    import org.apache.camel.Processor;
034    import org.apache.camel.Producer;
035    import org.apache.camel.ProducerTemplate;
036    import org.apache.camel.util.CamelContextHelper;
037    import org.apache.camel.util.ExchangeHelper;
038    import org.apache.camel.util.ObjectHelper;
039    import org.apache.camel.util.concurrent.ExecutorServiceHelper;
040    
041    /**
042     * A client helper object (named like Spring's TransactionTemplate & JmsTemplate
043     * et al) for working with Camel and sending {@link org.apache.camel.Message} instances in an
044     * {@link org.apache.camel.Exchange} to an {@link org.apache.camel.Endpoint}.
045     *
046     * @version $Revision: 779038 $
047     */
048    public class DefaultProducerTemplate extends ServiceSupport implements ProducerTemplate {
049        private static final int DEFAULT_THREADPOOL_SIZE = 5;
050        private final CamelContext context;
051        private final ProducerCache producerCache;
052        private Endpoint defaultEndpoint;
053        private ExecutorService executor;
054    
055        public DefaultProducerTemplate(CamelContext context) {
056            this.context = context;
057            this.executor = ExecutorServiceHelper.newScheduledThreadPool(DEFAULT_THREADPOOL_SIZE, "ProducerTemplate", true);
058            this.producerCache = new ProducerCache(context.getProducerServicePool());
059        }
060    
061        public DefaultProducerTemplate(CamelContext context, ExecutorService executor) {
062            this.context = context;
063            this.executor = executor;
064            this.producerCache = new ProducerCache(context.getProducerServicePool());
065        }
066    
067        public DefaultProducerTemplate(CamelContext context, Endpoint defaultEndpoint) {
068            this(context);
069            this.defaultEndpoint = defaultEndpoint;
070        }
071    
072        public static DefaultProducerTemplate newInstance(CamelContext camelContext, String defaultEndpointUri) {
073            Endpoint endpoint = CamelContextHelper.getMandatoryEndpoint(camelContext, defaultEndpointUri);
074            return new DefaultProducerTemplate(camelContext, endpoint);
075        }
076    
077        public Exchange send(String endpointUri, Exchange exchange) {
078            Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
079            return send(endpoint, exchange);
080        }
081    
082        public Exchange send(String endpointUri, Processor processor) {
083            Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
084            return send(endpoint, processor);
085        }
086    
087        public Exchange send(String endpointUri, ExchangePattern pattern, Processor processor) {
088            Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
089            return send(endpoint, pattern, processor);
090        }
091    
092        public Exchange send(Endpoint endpoint, Exchange exchange) {
093            producerCache.send(endpoint, exchange);
094            return exchange;
095        }
096    
097        public Exchange send(Endpoint endpoint, Processor processor) {
098            return producerCache.send(endpoint, processor);
099        }
100    
101        public Exchange send(Endpoint endpoint, ExchangePattern pattern, Processor processor) {
102            return producerCache.send(endpoint, pattern, processor);
103        }
104    
105        public Object sendBody(Endpoint endpoint, ExchangePattern pattern, Object body) {
106            Exchange result = send(endpoint, pattern, createSetBodyProcessor(body));
107            return extractResultBody(result, pattern);
108        }
109    
110        public void sendBody(Endpoint endpoint, Object body) {
111            Exchange result = send(endpoint, createSetBodyProcessor(body));
112            // must invoke extract result body in case of exception to be rethrown
113            extractResultBody(result);
114        }
115    
116        public void sendBody(String endpointUri, Object body) {
117            Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
118            sendBody(endpoint, body);
119        }
120    
121        public Object sendBody(String endpointUri, ExchangePattern pattern, Object body) {
122            Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
123            Object result = sendBody(endpoint, pattern, body);
124            if (pattern.isOutCapable()) {
125                return result;
126            } else {
127                // return null if not OUT capable
128                return null;
129            }
130        }
131    
132        public void sendBodyAndHeader(String endpointUri, final Object body, final String header, final Object headerValue) {
133            sendBodyAndHeader(resolveMandatoryEndpoint(endpointUri), body, header, headerValue);
134        }
135    
136        public void sendBodyAndHeader(Endpoint endpoint, final Object body, final String header, final Object headerValue) {
137            Exchange result = send(endpoint, createBodyAndHeaderProcessor(body, header, headerValue));
138            // must invoke extract result body in case of exception to be rethrown
139            extractResultBody(result);
140        }
141    
142        public Object sendBodyAndHeader(Endpoint endpoint, ExchangePattern pattern, final Object body,
143                                        final String header, final Object headerValue) {
144            Exchange exchange = send(endpoint, pattern, createBodyAndHeaderProcessor(body, header, headerValue));
145            Object result = extractResultBody(exchange, pattern);
146            if (pattern.isOutCapable()) {
147                return result;
148            } else {
149                // return null if not OUT capable
150                return null;
151            }
152        }
153    
154        public Object sendBodyAndHeader(String endpoint, ExchangePattern pattern, final Object body,
155                                        final String header, final Object headerValue) {
156            Exchange exchange = send(endpoint, pattern, createBodyAndHeaderProcessor(body, header, headerValue));
157            Object result = extractResultBody(exchange, pattern);
158            if (pattern.isOutCapable()) {
159                return result;
160            } else {
161                // return null if not OUT capable
162                return null;
163            }
164        }
165    
166        public void sendBodyAndProperty(String endpointUri, final Object body,
167                                          final String property, final Object propertyValue) {
168            sendBodyAndProperty(resolveMandatoryEndpoint(endpointUri), body, property, propertyValue);
169        }    
170        
171        public void sendBodyAndProperty(Endpoint endpoint, final Object body,
172                                          final String property, final Object propertyValue) {
173            Exchange result = send(endpoint, createBodyAndPropertyProcessor(body, property, propertyValue));
174            // must invoke extract result body in case of exception to be rethrown
175            extractResultBody(result);
176        }
177        
178        public Object sendBodyAndProperty(Endpoint endpoint, ExchangePattern pattern, final Object body,
179                                          final String property, final Object propertyValue) {
180            Exchange exchange = send(endpoint, pattern, createBodyAndPropertyProcessor(body, property, propertyValue));
181            Object result = extractResultBody(exchange, pattern);
182            if (pattern.isOutCapable()) {
183                return result;
184            } else {
185                // return null if not OUT capable
186                return null;
187            }
188        }
189    
190        public Object sendBodyAndProperty(String endpoint, ExchangePattern pattern, final Object body,
191                                          final String property, final Object propertyValue) {
192            Exchange exchange = send(endpoint, pattern, createBodyAndPropertyProcessor(body, property, propertyValue));
193            Object result = extractResultBody(exchange, pattern);
194            if (pattern.isOutCapable()) {
195                return result;
196            } else {
197                // return null if not OUT capable
198                return null;
199            }
200        }
201        
202        public void sendBodyAndHeaders(String endpointUri, final Object body, final Map<String, Object> headers) {
203            sendBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), body, headers);
204        }
205    
206        public void sendBodyAndHeaders(Endpoint endpoint, final Object body, final Map<String, Object> headers) {
207            Exchange result = send(endpoint, new Processor() {
208                public void process(Exchange exchange) {
209                    Message in = exchange.getIn();
210                    for (Map.Entry<String, Object> header : headers.entrySet()) {
211                        in.setHeader(header.getKey(), header.getValue());
212                    }
213                    in.setBody(body);
214                }
215            });
216            // must invoke extract result body in case of exception to be rethrown
217            extractResultBody(result);
218        }
219    
220        public Object sendBodyAndHeaders(String endpointUri, ExchangePattern pattern, Object body, Map<String, Object> headers) {
221            return sendBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), pattern, body, headers);
222        }
223    
224        public Object sendBodyAndHeaders(Endpoint endpoint, ExchangePattern pattern, final Object body, final Map<String, Object> headers) {
225            Exchange exchange = send(endpoint, pattern, new Processor() {
226                public void process(Exchange exchange) throws Exception {
227                    Message in = exchange.getIn();
228                    for (Map.Entry<String, Object> header : headers.entrySet()) {
229                        in.setHeader(header.getKey(), header.getValue());
230                    }
231                    in.setBody(body);
232                }
233            });
234            Object result = extractResultBody(exchange, pattern);
235            if (pattern.isOutCapable()) {
236                return result;
237            } else {
238                // return null if not OUT capable
239                return null;
240            }
241        }
242    
243        // Methods using an InOut ExchangePattern
244        // -----------------------------------------------------------------------
245    
246        public Exchange request(Endpoint endpoint, Processor processor) {
247            return send(endpoint, ExchangePattern.InOut, processor);
248        }
249    
250        public Object requestBody(Object body) {
251            return sendBody(getMandatoryDefaultEndpoint(), ExchangePattern.InOut, body);
252        }
253    
254        public Object requestBody(Endpoint endpoint, Object body) {
255            return sendBody(endpoint, ExchangePattern.InOut, body);
256        }
257    
258        public Object requestBodyAndHeader(Endpoint endpoint, Object body, String header, Object headerValue) {
259            return sendBodyAndHeader(endpoint, ExchangePattern.InOut, body, header, headerValue);
260        }
261    
262        public Exchange request(String endpoint, Processor processor) {
263            return send(endpoint, ExchangePattern.InOut, processor);
264        }
265    
266        public Object requestBody(String endpoint, Object body) {
267            return sendBody(endpoint, ExchangePattern.InOut, body);
268        }
269    
270        public Object requestBodyAndHeader(String endpoint, Object body, String header, Object headerValue) {
271            return sendBodyAndHeader(endpoint, ExchangePattern.InOut, body, header, headerValue);
272        }
273    
274        public Object requestBodyAndHeaders(String endpointUri, Object body, Map<String, Object> headers) {
275            return requestBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), body, headers);
276        }
277    
278        public Object requestBodyAndHeaders(Endpoint endpoint, final Object body, final Map<String, Object> headers) {
279            return sendBodyAndHeaders(endpoint, ExchangePattern.InOut, body, headers);
280        }
281    
282        public <T> T requestBody(Object body, Class<T> type) {
283            Object answer = requestBody(body);
284            return context.getTypeConverter().convertTo(type, answer);
285        }
286    
287        public <T> T requestBody(Endpoint endpoint, Object body, Class<T> type) {
288            Object answer = requestBody(endpoint, body);
289            return context.getTypeConverter().convertTo(type, answer);
290        }
291    
292        public <T> T requestBody(String endpointUri, Object body, Class<T> type) {
293            Object answer = requestBody(endpointUri, body);
294            return context.getTypeConverter().convertTo(type, answer);
295        }
296    
297        public <T> T requestBodyAndHeader(Endpoint endpoint, Object body, String header, Object headerValue, Class<T> type) {
298            Object answer = requestBodyAndHeader(endpoint, body, header, headerValue);
299            return context.getTypeConverter().convertTo(type, answer);
300        }
301    
302        public <T> T requestBodyAndHeader(String endpointUri, Object body, String header, Object headerValue, Class<T> type) {
303            Object answer = requestBodyAndHeader(endpointUri, body, header, headerValue);
304            return context.getTypeConverter().convertTo(type, answer);
305        }
306    
307        public <T> T requestBodyAndHeaders(String endpointUri, Object body, Map<String, Object> headers, Class<T> type) {
308            Object answer = requestBodyAndHeaders(endpointUri, body, headers);
309            return context.getTypeConverter().convertTo(type, answer);
310        }
311    
312        public <T> T requestBodyAndHeaders(Endpoint endpoint, Object body, Map<String, Object> headers, Class<T> type) {
313            Object answer = requestBodyAndHeaders(endpoint, body, headers);
314            return context.getTypeConverter().convertTo(type, answer);
315        }
316    
317        // Methods using the default endpoint
318        // -----------------------------------------------------------------------
319    
320        public void sendBody(Object body) {
321            sendBody(getMandatoryDefaultEndpoint(), body);
322        }
323    
324        public Exchange send(Exchange exchange) {
325            return send(getMandatoryDefaultEndpoint(), exchange);
326        }
327    
328        public Exchange send(Processor processor) {
329            return send(getMandatoryDefaultEndpoint(), processor);
330        }
331    
332        public void sendBodyAndHeader(Object body, String header, Object headerValue) {
333            sendBodyAndHeader(getMandatoryDefaultEndpoint(), body, header, headerValue);
334        }
335    
336        public void sendBodyAndProperty(Object body, String property, Object propertyValue) {
337            sendBodyAndProperty(getMandatoryDefaultEndpoint(), body, property, propertyValue);
338        }    
339        
340        public void sendBodyAndHeaders(Object body, Map<String, Object> headers) {
341            sendBodyAndHeaders(getMandatoryDefaultEndpoint(), body, headers);
342        }
343    
344        // Properties
345        // -----------------------------------------------------------------------
346        public CamelContext getContext() {
347            return context;
348        }
349    
350        public Endpoint getDefaultEndpoint() {
351            return defaultEndpoint;
352        }
353    
354        public void setDefaultEndpoint(Endpoint defaultEndpoint) {
355            this.defaultEndpoint = defaultEndpoint;
356        }
357    
358        /**
359         * Sets the default endpoint to use if none is specified
360         */
361        public void setDefaultEndpointUri(String endpointUri) {
362            setDefaultEndpoint(getContext().getEndpoint(endpointUri));
363        }
364    
365        public <T extends Endpoint> T getResolvedEndpoint(String endpointUri, Class<T> expectedClass) {
366            return context.getEndpoint(endpointUri, expectedClass);
367        }
368    
369        // Implementation methods
370        // -----------------------------------------------------------------------
371    
372        protected Processor createBodyAndHeaderProcessor(final Object body, final String header, final Object headerValue) {
373            return new Processor() {
374                public void process(Exchange exchange) {
375                    Message in = exchange.getIn();
376                    in.setHeader(header, headerValue);
377                    in.setBody(body);
378                }
379            };
380        }
381    
382        protected Processor createBodyAndPropertyProcessor(final Object body, final String property, final Object propertyValue) {
383            return new Processor() {
384                public void process(Exchange exchange) {
385                    exchange.setProperty(property, propertyValue);
386                    
387                    Message in = exchange.getIn();
388                    in.setBody(body);
389                }
390            };
391        }    
392        
393        protected Processor createSetBodyProcessor(final Object body) {
394            return new Processor() {
395                public void process(Exchange exchange) {
396                    Message in = exchange.getIn();
397                    in.setBody(body);
398                }
399            };
400        }
401    
402        protected Endpoint resolveMandatoryEndpoint(String endpointUri) {
403            Endpoint endpoint = context.getEndpoint(endpointUri);
404            if (endpoint == null) {
405                throw new NoSuchEndpointException(endpointUri);
406            }
407            return endpoint;
408        }
409    
410        protected Endpoint getMandatoryDefaultEndpoint() {
411            Endpoint answer = getDefaultEndpoint();
412            ObjectHelper.notNull(answer, "defaultEndpoint");
413            return answer;
414        }
415    
416        protected void doStart() throws Exception {
417            producerCache.start();
418        }
419    
420        protected void doStop() throws Exception {
421            producerCache.stop();
422            if (executor != null) {
423                executor.shutdown();
424            }
425        }
426    
427        protected Object extractResultBody(Exchange result) {
428            return extractResultBody(result, null);
429        }
430    
431        protected Object extractResultBody(Exchange result, ExchangePattern pattern) {
432            return ExchangeHelper.extractResultBody(result, pattern);
433        }
434    
435        public void setExecutorService(ExecutorService executorService) {
436            this.executor = executorService;
437        }
438    
439        public Future<Exchange> asyncSend(final String uri, final Exchange exchange) {
440            Callable<Exchange> task = new Callable<Exchange>() {
441                public Exchange call() throws Exception {
442                    return send(uri, exchange);
443                }
444            };
445    
446            return executor.submit(task);
447        }
448    
449        public Future<Exchange> asyncSend(final String uri, final Processor processor) {
450            Callable<Exchange> task = new Callable<Exchange>() {
451                public Exchange call() throws Exception {
452                    return send(uri, processor);
453                }
454            };
455    
456            return executor.submit(task);
457        }
458    
459        public Future<Object> asyncSendBody(final String uri, final Object body) {
460            Callable<Object> task = new Callable<Object>() {
461                public Object call() throws Exception {
462                    sendBody(uri, body);
463                    // its InOnly, so no body to return
464                    return null;
465                }
466            };
467    
468            return executor.submit(task);
469        }
470    
471        public Future<Object> asyncRequestBody(final String uri, final Object body) {
472            Callable<Object> task = new Callable<Object>() {
473                public Object call() throws Exception {
474                    return requestBody(uri, body);
475                }
476            };
477    
478            return executor.submit(task);
479        }
480    
481        public <T> Future<T> asyncRequestBody(final String uri, final Object body, final Class<T> type) {
482            Callable<T> task = new Callable<T>() {
483                public T call() throws Exception {
484                    return requestBody(uri, body, type);
485                }
486            };
487    
488            return executor.submit(task);
489        }
490    
491        public Future<Object> asyncRequestBodyAndHeader(final String endpointUri, final Object body, final String header, final Object headerValue) {
492            Callable<Object> task = new Callable<Object>() {
493                public Object call() throws Exception {
494                    return requestBodyAndHeader(endpointUri, body, header, headerValue);
495                }
496            };
497    
498            return executor.submit(task);
499        }
500    
501        public <T> Future<T> asyncRequestBodyAndHeader(final String endpointUri, final Object body, final String header, final Object headerValue, final Class<T> type) {
502            Callable<T> task = new Callable<T>() {
503                public T call() throws Exception {
504                    return requestBodyAndHeader(endpointUri, body, header, headerValue, type);
505                }
506            };
507    
508            return executor.submit(task);
509        }
510    
511        public Future<Object> asyncRequestBodyAndHeaders(final String endpointUri, final Object body, final Map<String, Object> headers) {
512            Callable<Object> task = new Callable<Object>() {
513                public Object call() throws Exception {
514                    return requestBodyAndHeaders(endpointUri, body, headers);
515                }
516            };
517    
518            return executor.submit(task);
519        }
520    
521        public <T> Future<T> asyncRequestBodyAndHeaders(final String endpointUri, final Object body, final Map<String, Object> headers, final Class<T> type) {
522            Callable<T> task = new Callable<T>() {
523                public T call() throws Exception {
524                    return requestBodyAndHeaders(endpointUri, body, headers, type);
525                }
526            };
527    
528            return executor.submit(task);
529        }
530    
531        public <T> T extractFutureBody(Future future, Class<T> type) {
532            return ExchangeHelper.extractFutureBody(context, future, type);
533        }
534    
535        public <T> T extractFutureBody(Future future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException {
536            return ExchangeHelper.extractFutureBody(context, future, timeout, unit, type);
537        }
538    
539    }