001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.impl;
018    
019    import java.util.HashMap;
020    import java.util.Map;
021    
022    import org.apache.camel.Endpoint;
023    import org.apache.camel.Exchange;
024    import org.apache.camel.ExchangePattern;
025    import org.apache.camel.FailedToCreateProducerException;
026    import org.apache.camel.Processor;
027    import org.apache.camel.Producer;
028    import org.apache.camel.ProducerCallback;
029    import org.apache.camel.ServicePoolAware;
030    import org.apache.camel.spi.ServicePool;
031    import org.apache.camel.util.ServiceHelper;
032    import org.apache.commons.logging.Log;
033    import org.apache.commons.logging.LogFactory;
034    import static org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException;
035    
036    /**
037     * Cache containing created {@link Producer}.
038     *
039     * @version $Revision: 779042 $
040     */
041    public class ProducerCache extends ServiceSupport {
042        private static final transient Log LOG = LogFactory.getLog(ProducerCache.class);
043    
044        private final Map<String, Producer> producers = new HashMap<String, Producer>();
045        private final ServicePool<Endpoint, Producer> pool;
046    
047        // TODO: Have easy configuration of pooling in Camel
048    
049        public ProducerCache(ServicePool<Endpoint, Producer> producerServicePool) {
050            this.pool = producerServicePool;
051        }
052    
053        public Producer getProducer(Endpoint endpoint) {
054            // As the producer is returned outside this method we do not want to return pooled producers
055            // so we pass in false to the method. if we returned pooled producers then the user had
056            // to remember to return it back in the pool.
057            // See method doInProducer that is safe template pattern where we handle the lifecycle and
058            // thus safely can use pooled producers there
059            return doGetProducer(endpoint, false);
060        }
061    
062        /**
063         * Sends the exchange to the given endpoint
064         *
065         * @param endpoint the endpoint to send the exchange to
066         * @param exchange the exchange to send
067         */
068        public void send(Endpoint endpoint, Exchange exchange) {
069            try {
070                sendExchange(endpoint, null, null, exchange);
071            } catch (Exception e) {
072                throw wrapRuntimeCamelException(e);
073            }
074        }
075    
076        /**
077         * Sends an exchange to an endpoint using a supplied
078         * {@link Processor} to populate the exchange
079         *
080         * @param endpoint the endpoint to send the exchange to
081         * @param processor the transformer used to populate the new exchange
082         * @return the exchange
083         */
084        public Exchange send(Endpoint endpoint, Processor processor) {
085            try {
086                return sendExchange(endpoint, null, processor, null);
087            } catch (Exception e) {
088                throw wrapRuntimeCamelException(e);
089            }
090        }
091    
092        /**
093         * Sends an exchange to an endpoint using a supplied
094         * {@link Processor} to populate the exchange
095         *
096         * @param endpoint the endpoint to send the exchange to
097         * @param pattern the message {@link ExchangePattern} such as
098         *   {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut}
099         * @param processor the transformer used to populate the new exchange
100         * @return the exchange
101         */
102        public Exchange send(Endpoint endpoint, ExchangePattern pattern, Processor processor) {
103            try {
104                return sendExchange(endpoint, pattern, processor, null);
105            } catch (Exception e) {
106                throw wrapRuntimeCamelException(e);
107            }
108        }
109    
110    
111        /**
112         * Sends an exchange to an endpoint using a supplied callback
113         *
114         * @param endpoint  the endpoint to send the exchange to
115         * @param exchange  the exchange, can be <tt>null</tt> if so then create a new exchange from the producer
116         * @param pattern   the exchange pattern, can be <tt>null</tt>
117         * @param callback  the callback
118         * @return the response from the callback
119         * @throws Exception if an internal processing error has occurred.
120         */
121        public <T> T doInProducer(Endpoint endpoint, Exchange exchange, ExchangePattern pattern, ProducerCallback<T> callback) throws Exception {
122            // get the producer and we do not mind if its pooled as we can handle returning it back to the pool
123            Producer producer = doGetProducer(endpoint, true);
124    
125            if (producer == null) {
126                if (isStopped()) {
127                    LOG.warn("Ignoring exchange sent after processor is stopped: " + exchange);
128                    return null;
129                } else {
130                    throw new IllegalStateException("No producer, this processor has not been started: " + this);
131                }
132            }
133    
134            try {
135                // invoke the callback
136                return callback.doInProducer(producer, exchange, pattern);
137            } finally {
138                if (producer instanceof ServicePoolAware) {
139                    // release back to the pool
140                    pool.release(endpoint, producer);
141                } else if (!producer.isSingleton()) {
142                    // stop non singleton producers as we should not leak resources
143                    producer.stop();
144                }
145            }
146        }
147    
148        protected Exchange sendExchange(final Endpoint endpoint, ExchangePattern pattern,
149                                        final Processor processor, Exchange exchange) throws Exception {
150            return doInProducer(endpoint, exchange, pattern, new ProducerCallback<Exchange>() {
151                public Exchange doInProducer(Producer producer, Exchange exchange, ExchangePattern pattern) throws Exception {
152                    if (exchange == null) {
153                        exchange = pattern != null ? producer.createExchange(pattern) : producer.createExchange();
154                    }
155    
156                    if (processor != null) {
157                        // lets populate using the processor callback
158                        processor.process(exchange);
159                    }
160    
161                    // now lets dispatch
162                    if (LOG.isDebugEnabled()) {
163                        LOG.debug(">>>> " + endpoint + " " + exchange);
164                    }
165                    producer.process(exchange);
166                    return exchange;
167                }
168            });
169        }
170    
171        protected synchronized Producer doGetProducer(Endpoint endpoint, boolean pooled) {
172            String key = endpoint.getEndpointUri();
173            Producer answer = producers.get(key);
174            if (pooled && answer == null) {
175                // try acquire from connection pool
176                answer = pool.acquire(endpoint);
177            }
178    
179            if (answer == null) {
180                // create a new producer
181                try {
182                    answer = endpoint.createProducer();
183                    answer.start();
184                } catch (Exception e) {
185                    throw new FailedToCreateProducerException(endpoint, e);
186                }
187    
188                // add producer to cache or pool if applicable
189                if (pooled && answer instanceof ServicePoolAware) {
190                    if (LOG.isDebugEnabled()) {
191                        LOG.debug("Adding to producer service pool with key: " + endpoint + " for producer: " + answer);
192                    }
193                    answer = pool.addAndAcquire(endpoint, answer);
194                } else if (answer.isSingleton()) {
195                    if (LOG.isDebugEnabled()) {
196                        LOG.debug("Adding to producer cache with key: " + endpoint + " for producer: " + answer);
197                    }
198                    producers.put(key, answer);
199                }
200            }
201    
202            return answer;
203        }
204    
205        protected void doStop() throws Exception {
206            ServiceHelper.stopServices(producers.values());
207            producers.clear();
208            ServiceHelper.stopServices(pool);
209        }
210    
211        protected void doStart() throws Exception {
212            ServiceHelper.startServices(pool);
213        }
214    }