001    /**
002     *
003     * Licensed to the Apache Software Foundation (ASF) under one or more
004     * contributor license agreements.  See the NOTICE file distributed with
005     * this work for additional information regarding copyright ownership.
006     * The ASF licenses this file to You under the Apache License, Version 2.0
007     * (the "License"); you may not use this file except in compliance with
008     * the License.  You may obtain a copy of the License at
009     *
010     * http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.camel;
019    
020    import org.apache.camel.impl.ServiceSupport;
021    import org.apache.camel.util.ProducerCache;
022    
023    /**
024     * A Client object for working with Camel and invoking {@link Endpoint} instances with {@link Exchange} instances
025     *
026     * @version $Revision: 535136 $
027     */
028    public class CamelClient<E extends Exchange> extends ServiceSupport {
029        private CamelContext context;
030        private ProducerCache<E> producerCache = new ProducerCache<E>();
031    
032        public CamelClient(CamelContext context) {
033            this.context = context;
034        }
035    
036        /**
037         * Sends the exchange to the given endpoint
038         *
039         * @param endpointUri the endpoint URI to send the exchange to
040         * @param exchange    the exchange to send
041         */
042        public E send(String endpointUri, E exchange) {
043            Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
044            send(endpoint, exchange);
045            return exchange;
046        }
047    
048        /**
049         * Sends an exchange to an endpoint using a supplied @{link Processor} to populate the exchange
050         *
051         * @param endpointUri the endpoint URI to send the exchange to
052         * @param processor   the transformer used to populate the new exchange
053         */
054        public E send(String endpointUri, Processor processor) {
055            Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
056            return send(endpoint, processor);
057        }
058    
059        /**
060         * Sends the exchange to the given endpoint
061         *
062         * @param endpoint the endpoint to send the exchange to
063         * @param exchange the exchange to send
064         */
065        public E send(Endpoint<E> endpoint, E exchange) {
066            E convertedExchange = endpoint.toExchangeType(exchange);
067            producerCache.send(endpoint, convertedExchange);
068            return exchange;
069        }
070    
071        /**
072         * Sends an exchange to an endpoint using a supplied @{link Processor} to populate the exchange
073         *
074         * @param endpoint  the endpoint to send the exchange to
075         * @param processor the transformer used to populate the new exchange
076         */
077        public E send(Endpoint<E> endpoint, Processor processor) {
078            return producerCache.send(endpoint, processor);
079        }
080    
081        /**
082         * Send the body to an endpoint
083         *
084         * @param endpointUri
085         * @param body        = the payload
086         * @return the result
087         */
088        public Object sendBody(String endpointUri, final Object body) {
089            E result = send(endpointUri, new Processor() {
090                public void process(Exchange exchange) {
091                    Message in = exchange.getIn();
092                    in.setBody(body);
093                }
094            });
095            return extractResultBody(result);
096        }
097    
098        /**
099         * Sends the body to an endpoint with a specified header and header value
100         *
101         * @param endpointUri the endpoint URI to send to
102         * @param body        the payload send
103         * @param header      the header name
104         * @param headerValue the header value
105         * @return the result
106         */
107        public Object sendBody(String endpointUri, final Object body, final String header, final Object headerValue) {
108            E result = send(endpointUri, new Processor() {
109                public void process(Exchange exchange) {
110                    Message in = exchange.getIn();
111                    in.setHeader(header, headerValue);
112                    in.setBody(body);
113                }
114            });
115            return extractResultBody(result);
116        }
117    
118        public Producer<E> getProducer(Endpoint<E> endpoint) {
119            return producerCache.getProducer(endpoint);
120        }
121    
122        public CamelContext getContext() {
123            return context;
124        }
125    
126        protected Endpoint resolveMandatoryEndpoint(String endpointUri) {
127            Endpoint endpoint = context.getEndpoint(endpointUri);
128            if (endpoint == null) {
129                throw new NoSuchEndpointException(endpointUri);
130            }
131            return endpoint;
132        }
133    
134        protected void doStart() throws Exception {
135            producerCache.start();
136        }
137    
138        protected void doStop() throws Exception {
139            producerCache.stop();
140        }
141    
142        protected Object extractResultBody(E result) {
143            return result != null ? result.getOut().getBody() : null;
144        }
145    }