001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.util;
018    
019    import java.util.HashMap;
020    import java.util.Map;
021    
022    import org.apache.camel.Endpoint;
023    import org.apache.camel.Exchange;
024    import org.apache.camel.FailedToCreateProducerException;
025    import org.apache.camel.Processor;
026    import org.apache.camel.Producer;
027    import org.apache.camel.RuntimeCamelException;
028    import org.apache.camel.impl.ServiceSupport;
029    import org.apache.commons.logging.Log;
030    import org.apache.commons.logging.LogFactory;
031    
032    /**
033     * @version $Revision: 563607 $
034     */
035    public class ProducerCache<E extends Exchange> extends ServiceSupport {
036        private static final Log LOG = LogFactory.getLog(ProducerCache.class);
037    
038        private Map<String, Producer<E>> producers = new HashMap<String, Producer<E>>();
039    
040        public synchronized Producer<E> getProducer(Endpoint<E> endpoint) {
041            String key = endpoint.getEndpointUri();
042            Producer<E> answer = producers.get(key);
043            if (answer == null) {
044                try {
045                    answer = endpoint.createProducer();
046                    answer.start();
047                } catch (Exception e) {
048                    throw new FailedToCreateProducerException(endpoint, e);
049                }
050                producers.put(key, answer);
051            }
052            return answer;
053        }
054    
055        /**
056         * Sends the exchange to the given endpoint
057         * 
058         * @param endpoint the endpoint to send the exchange to
059         * @param exchange the exchange to send
060         */
061        public void send(Endpoint<E> endpoint, E exchange) {
062            try {
063                Producer<E> producer = getProducer(endpoint);
064                producer.process(exchange);
065            } catch (Exception e) {
066                throw new RuntimeCamelException(e);
067            }
068        }
069    
070        /**
071         * Sends an exchange to an endpoint using a supplied
072         * 
073         * @{link Processor} to populate the exchange
074         * 
075         * @param endpoint the endpoint to send the exchange to
076         * @param processor the transformer used to populate the new exchange
077         */
078        public E send(Endpoint<E> endpoint, Processor processor) {
079            try {
080                Producer<E> producer = getProducer(endpoint);
081                E exchange = producer.createExchange();
082    
083                // lets populate using the processor callback
084                processor.process(exchange);
085    
086                // now lets dispatch
087                if (LOG.isDebugEnabled()) {
088                    LOG.debug(">>>> " + endpoint + " " + exchange);
089                }
090                producer.process(exchange);
091                return exchange;
092            } catch (Exception e) {
093                throw new RuntimeCamelException(e);
094            }
095        }
096    
097        protected void doStop() throws Exception {
098            ServiceHelper.stopServices(producers.values());
099        }
100    
101        protected void doStart() throws Exception {
102        }
103    }