001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.impl; 018 019 import java.util.HashMap; 020 import java.util.Map; 021 022 import org.apache.camel.Endpoint; 023 import org.apache.camel.Exchange; 024 import org.apache.camel.ExchangePattern; 025 import org.apache.camel.FailedToCreateProducerException; 026 import org.apache.camel.Processor; 027 import org.apache.camel.Producer; 028 import org.apache.camel.ProducerCallback; 029 import org.apache.camel.ServicePoolAware; 030 import org.apache.camel.spi.ServicePool; 031 import org.apache.camel.util.ServiceHelper; 032 import org.apache.commons.logging.Log; 033 import org.apache.commons.logging.LogFactory; 034 import static org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException; 035 036 /** 037 * Cache containing created {@link Producer}. 038 * 039 * @version $Revision: 779042 $ 040 */ 041 public class ProducerCache extends ServiceSupport { 042 private static final transient Log LOG = LogFactory.getLog(ProducerCache.class); 043 044 private final Map<String, Producer> producers = new HashMap<String, Producer>(); 045 private final ServicePool<Endpoint, Producer> pool; 046 047 // TODO: Have easy configuration of pooling in Camel 048 049 public ProducerCache(ServicePool<Endpoint, Producer> producerServicePool) { 050 this.pool = producerServicePool; 051 } 052 053 public Producer getProducer(Endpoint endpoint) { 054 // As the producer is returned outside this method we do not want to return pooled producers 055 // so we pass in false to the method. if we returned pooled producers then the user had 056 // to remember to return it back in the pool. 057 // See method doInProducer that is safe template pattern where we handle the lifecycle and 058 // thus safely can use pooled producers there 059 return doGetProducer(endpoint, false); 060 } 061 062 /** 063 * Sends the exchange to the given endpoint 064 * 065 * @param endpoint the endpoint to send the exchange to 066 * @param exchange the exchange to send 067 */ 068 public void send(Endpoint endpoint, Exchange exchange) { 069 try { 070 sendExchange(endpoint, null, null, exchange); 071 } catch (Exception e) { 072 throw wrapRuntimeCamelException(e); 073 } 074 } 075 076 /** 077 * Sends an exchange to an endpoint using a supplied 078 * {@link Processor} to populate the exchange 079 * 080 * @param endpoint the endpoint to send the exchange to 081 * @param processor the transformer used to populate the new exchange 082 * @return the exchange 083 */ 084 public Exchange send(Endpoint endpoint, Processor processor) { 085 try { 086 return sendExchange(endpoint, null, processor, null); 087 } catch (Exception e) { 088 throw wrapRuntimeCamelException(e); 089 } 090 } 091 092 /** 093 * Sends an exchange to an endpoint using a supplied 094 * {@link Processor} to populate the exchange 095 * 096 * @param endpoint the endpoint to send the exchange to 097 * @param pattern the message {@link ExchangePattern} such as 098 * {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut} 099 * @param processor the transformer used to populate the new exchange 100 * @return the exchange 101 */ 102 public Exchange send(Endpoint endpoint, ExchangePattern pattern, Processor processor) { 103 try { 104 return sendExchange(endpoint, pattern, processor, null); 105 } catch (Exception e) { 106 throw wrapRuntimeCamelException(e); 107 } 108 } 109 110 111 /** 112 * Sends an exchange to an endpoint using a supplied callback 113 * 114 * @param endpoint the endpoint to send the exchange to 115 * @param exchange the exchange, can be <tt>null</tt> if so then create a new exchange from the producer 116 * @param pattern the exchange pattern, can be <tt>null</tt> 117 * @param callback the callback 118 * @return the response from the callback 119 * @throws Exception if an internal processing error has occurred. 120 */ 121 public <T> T doInProducer(Endpoint endpoint, Exchange exchange, ExchangePattern pattern, ProducerCallback<T> callback) throws Exception { 122 // get the producer and we do not mind if its pooled as we can handle returning it back to the pool 123 Producer producer = doGetProducer(endpoint, true); 124 125 if (producer == null) { 126 if (isStopped()) { 127 LOG.warn("Ignoring exchange sent after processor is stopped: " + exchange); 128 return null; 129 } else { 130 throw new IllegalStateException("No producer, this processor has not been started: " + this); 131 } 132 } 133 134 try { 135 // invoke the callback 136 return callback.doInProducer(producer, exchange, pattern); 137 } finally { 138 if (producer instanceof ServicePoolAware) { 139 // release back to the pool 140 pool.release(endpoint, producer); 141 } else if (!producer.isSingleton()) { 142 // stop non singleton producers as we should not leak resources 143 producer.stop(); 144 } 145 } 146 } 147 148 protected Exchange sendExchange(final Endpoint endpoint, ExchangePattern pattern, 149 final Processor processor, Exchange exchange) throws Exception { 150 return doInProducer(endpoint, exchange, pattern, new ProducerCallback<Exchange>() { 151 public Exchange doInProducer(Producer producer, Exchange exchange, ExchangePattern pattern) throws Exception { 152 if (exchange == null) { 153 exchange = pattern != null ? producer.createExchange(pattern) : producer.createExchange(); 154 } 155 156 if (processor != null) { 157 // lets populate using the processor callback 158 processor.process(exchange); 159 } 160 161 // now lets dispatch 162 if (LOG.isDebugEnabled()) { 163 LOG.debug(">>>> " + endpoint + " " + exchange); 164 } 165 producer.process(exchange); 166 return exchange; 167 } 168 }); 169 } 170 171 protected synchronized Producer doGetProducer(Endpoint endpoint, boolean pooled) { 172 String key = endpoint.getEndpointUri(); 173 Producer answer = producers.get(key); 174 if (pooled && answer == null) { 175 // try acquire from connection pool 176 answer = pool.acquire(endpoint); 177 } 178 179 if (answer == null) { 180 // create a new producer 181 try { 182 answer = endpoint.createProducer(); 183 answer.start(); 184 } catch (Exception e) { 185 throw new FailedToCreateProducerException(endpoint, e); 186 } 187 188 // add producer to cache or pool if applicable 189 if (pooled && answer instanceof ServicePoolAware) { 190 if (LOG.isDebugEnabled()) { 191 LOG.debug("Adding to producer service pool with key: " + endpoint + " for producer: " + answer); 192 } 193 answer = pool.addAndAcquire(endpoint, answer); 194 } else if (answer.isSingleton()) { 195 if (LOG.isDebugEnabled()) { 196 LOG.debug("Adding to producer cache with key: " + endpoint + " for producer: " + answer); 197 } 198 producers.put(key, answer); 199 } 200 } 201 202 return answer; 203 } 204 205 protected void doStop() throws Exception { 206 ServiceHelper.stopServices(producers.values()); 207 producers.clear(); 208 ServiceHelper.stopServices(pool); 209 } 210 211 protected void doStart() throws Exception { 212 ServiceHelper.startServices(pool); 213 } 214 }