001 /** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one or more 004 * contributor license agreements. See the NOTICE file distributed with 005 * this work for additional information regarding copyright ownership. 006 * The ASF licenses this file to You under the Apache License, Version 2.0 007 * (the "License"); you may not use this file except in compliance with 008 * the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.camel.util; 019 020 import org.apache.camel.Endpoint; 021 import org.apache.camel.Exchange; 022 import org.apache.camel.FailedToCreateProducerException; 023 import org.apache.camel.Processor; 024 import org.apache.camel.Producer; 025 import org.apache.camel.RuntimeCamelException; 026 import org.apache.camel.impl.ServiceSupport; 027 import org.apache.commons.logging.Log; 028 import org.apache.commons.logging.LogFactory; 029 030 import java.util.HashMap; 031 import java.util.Map; 032 033 /** 034 * @version $Revision: 537816 $ 035 */ 036 public class ProducerCache<E extends Exchange> extends ServiceSupport { 037 private static final Log log = LogFactory.getLog(ProducerCache.class); 038 039 private Map<String, Producer<E>> producers = new HashMap<String, Producer<E>>(); 040 041 public synchronized Producer<E> getProducer(Endpoint<E> endpoint) { 042 String key = endpoint.getEndpointUri(); 043 Producer<E> answer = producers.get(key); 044 if (answer == null) { 045 try { 046 answer = endpoint.createProducer(); 047 answer.start(); 048 } 049 catch (Exception e) { 050 throw new FailedToCreateProducerException(endpoint, e); 051 } 052 producers.put(key, answer); 053 } 054 return answer; 055 } 056 057 /** 058 * Sends the exchange to the given endpoint 059 * 060 * @param endpoint the endpoint to send the exchange to 061 * @param exchange the exchange to send 062 */ 063 public void send(Endpoint<E> endpoint, E exchange) { 064 try { 065 Producer<E> producer = getProducer(endpoint); 066 producer.process(exchange); 067 } 068 catch (Exception e) { 069 throw new RuntimeCamelException(e); 070 } 071 } 072 073 /** 074 * Sends an exchange to an endpoint using a supplied @{link Processor} to populate the exchange 075 * 076 * @param endpoint the endpoint to send the exchange to 077 * @param processor the transformer used to populate the new exchange 078 */ 079 public E send(Endpoint<E> endpoint, Processor processor) { 080 try { 081 Producer<E> producer = getProducer(endpoint); 082 E exchange = producer.createExchange(); 083 084 // lets populate using the processor callback 085 processor.process(exchange); 086 087 // now lets dispatch 088 if (log.isDebugEnabled()) { 089 log.debug(">>>> " + endpoint + " " + exchange); 090 } 091 producer.process(exchange); 092 return exchange; 093 } 094 catch (Exception e) { 095 throw new RuntimeCamelException(e); 096 } 097 } 098 099 protected void doStop() throws Exception { 100 ServiceHelper.stopServices(producers.values()); 101 } 102 103 protected void doStart() throws Exception { 104 } 105 }