001    /**
002     *
003     * Licensed to the Apache Software Foundation (ASF) under one or more
004     * contributor license agreements.  See the NOTICE file distributed with
005     * this work for additional information regarding copyright ownership.
006     * The ASF licenses this file to You under the Apache License, Version 2.0
007     * (the "License"); you may not use this file except in compliance with
008     * the License.  You may obtain a copy of the License at
009     *
010     * http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.camel.util;
019    
020    import org.apache.camel.Endpoint;
021    import org.apache.camel.Exchange;
022    import org.apache.camel.FailedToCreateProducerException;
023    import org.apache.camel.Processor;
024    import org.apache.camel.Producer;
025    import org.apache.camel.RuntimeCamelException;
026    import org.apache.camel.impl.ServiceSupport;
027    import org.apache.commons.logging.Log;
028    import org.apache.commons.logging.LogFactory;
029    
030    import java.util.HashMap;
031    import java.util.Map;
032    
033    /**
034     * @version $Revision: 537816 $
035     */
036    public class ProducerCache<E extends Exchange> extends ServiceSupport {
037        private static final Log log = LogFactory.getLog(ProducerCache.class);
038    
039        private Map<String, Producer<E>> producers = new HashMap<String, Producer<E>>();
040    
041        public synchronized Producer<E> getProducer(Endpoint<E> endpoint) {
042            String key = endpoint.getEndpointUri();
043            Producer<E> answer = producers.get(key);
044            if (answer == null) {
045                try {
046                    answer = endpoint.createProducer();
047                    answer.start();
048                }
049                catch (Exception e) {
050                    throw new FailedToCreateProducerException(endpoint, e);
051                }
052                producers.put(key, answer);
053            }
054            return answer;
055        }
056    
057        /**
058         * Sends the exchange to the given endpoint
059         *
060         * @param endpoint the endpoint to send the exchange to
061         * @param exchange the exchange to send
062         */
063        public void send(Endpoint<E> endpoint, E exchange) {
064            try {
065                Producer<E> producer = getProducer(endpoint);
066                producer.process(exchange);
067            }
068            catch (Exception e) {
069                throw new RuntimeCamelException(e);
070            }
071        }
072    
073        /**
074         * Sends an exchange to an endpoint using a supplied @{link Processor} to populate the exchange
075         *
076         * @param endpoint  the endpoint to send the exchange to
077         * @param processor the transformer used to populate the new exchange
078         */
079        public E send(Endpoint<E> endpoint, Processor processor) {
080            try {
081                Producer<E> producer = getProducer(endpoint);
082                E exchange = producer.createExchange();
083    
084                // lets populate using the processor callback
085                processor.process(exchange);
086    
087                // now lets dispatch
088                if (log.isDebugEnabled()) {
089                    log.debug(">>>> " + endpoint + " " + exchange);
090                }
091                producer.process(exchange);
092                return exchange;
093            }
094            catch (Exception e) {
095                throw new RuntimeCamelException(e);
096            }
097        }
098    
099        protected void doStop() throws Exception {
100            ServiceHelper.stopServices(producers.values());
101        }
102    
103        protected void doStart() throws Exception {
104        }
105    }