001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel; 018 019 import java.util.HashMap; 020 import java.util.Map; 021 022 import org.apache.camel.impl.ServiceSupport; 023 import org.apache.camel.util.ObjectHelper; 024 import org.apache.camel.util.ProducerCache; 025 026 /** 027 * A client helper object (named like Spring's TransactionTemplate & JmsTemplate 028 * et al) for working with Camel and sending {@link Message} instances in an 029 * {@link Exchange} to an {@link Endpoint}. 030 * 031 * @version $Revision: 563607 $ 032 */ 033 public class CamelTemplate<E extends Exchange> extends ServiceSupport implements ProducerTemplate<E> { 034 private CamelContext context; 035 private ProducerCache<E> producerCache = new ProducerCache<E>(); 036 private boolean useEndpointCache = true; 037 private Map<String, Endpoint<E>> endpointCache = new HashMap<String, Endpoint<E>>(); 038 private Endpoint<E> defaultEndpoint; 039 040 public CamelTemplate(CamelContext context) { 041 this.context = context; 042 } 043 044 public CamelTemplate(CamelContext context, Endpoint defaultEndpoint) { 045 this(context); 046 this.defaultEndpoint = defaultEndpoint; 047 } 048 049 /** 050 * Sends the exchange to the given endpoint 051 * 052 * @param endpointUri the endpoint URI to send the exchange to 053 * @param exchange the exchange to send 054 */ 055 public E send(String endpointUri, E exchange) { 056 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri); 057 send(endpoint, exchange); 058 return exchange; 059 } 060 061 /** 062 * Sends an exchange to an endpoint using a supplied 063 * 064 * @{link Processor} to populate the exchange 065 * 066 * @param endpointUri the endpoint URI to send the exchange to 067 * @param processor the transformer used to populate the new exchange 068 */ 069 public E send(String endpointUri, Processor processor) { 070 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri); 071 return send(endpoint, processor); 072 } 073 074 /** 075 * Sends the exchange to the given endpoint 076 * 077 * @param endpoint the endpoint to send the exchange to 078 * @param exchange the exchange to send 079 */ 080 public E send(Endpoint<E> endpoint, E exchange) { 081 E convertedExchange = endpoint.toExchangeType(exchange); 082 producerCache.send(endpoint, convertedExchange); 083 return exchange; 084 } 085 086 /** 087 * Sends an exchange to an endpoint using a supplied 088 * 089 * @{link Processor} to populate the exchange 090 * 091 * @param endpoint the endpoint to send the exchange to 092 * @param processor the transformer used to populate the new exchange 093 */ 094 public E send(Endpoint<E> endpoint, Processor processor) { 095 return producerCache.send(endpoint, processor); 096 } 097 098 /** 099 * Send the body to an endpoint 100 * 101 * @param endpoint 102 * @param body = the payload 103 * @return the result 104 */ 105 public Object sendBody(Endpoint<E> endpoint, final Object body) { 106 E result = send(endpoint, new Processor() { 107 public void process(Exchange exchange) { 108 Message in = exchange.getIn(); 109 in.setBody(body); 110 } 111 }); 112 return extractResultBody(result); 113 } 114 115 /** 116 * Send the body to an endpoint 117 * 118 * @param endpointUri 119 * @param body = the payload 120 * @return the result 121 */ 122 public Object sendBody(String endpointUri, final Object body) { 123 E result = send(endpointUri, new Processor() { 124 public void process(Exchange exchange) { 125 Message in = exchange.getIn(); 126 in.setBody(body); 127 } 128 }); 129 return extractResultBody(result); 130 } 131 132 /** 133 * Sends the body to an endpoint with a specified header and header value 134 * 135 * @param endpointUri the endpoint URI to send to 136 * @param body the payload send 137 * @param header the header name 138 * @param headerValue the header value 139 * @return the result 140 */ 141 public Object sendBodyAndHeader(String endpointUri, final Object body, final String header, 142 final Object headerValue) { 143 return sendBodyAndHeader(resolveMandatoryEndpoint(endpointUri), body, header, headerValue); 144 } 145 146 /** 147 * Sends the body to an endpoint with a specified header and header value 148 * 149 * @param endpoint the Endpoint to send to 150 * @param body the payload send 151 * @param header the header name 152 * @param headerValue the header value 153 * @return the result 154 */ 155 public Object sendBodyAndHeader(Endpoint endpoint, final Object body, final String header, 156 final Object headerValue) { 157 E result = send(endpoint, new Processor() { 158 public void process(Exchange exchange) { 159 Message in = exchange.getIn(); 160 in.setHeader(header, headerValue); 161 in.setBody(body); 162 } 163 }); 164 return extractResultBody(result); 165 } 166 167 /** 168 * Sends the body to an endpoint with the specified headers and header 169 * values 170 * 171 * @param endpointUri the endpoint URI to send to 172 * @param body the payload send 173 * @return the result 174 */ 175 public Object sendBodyAndHeaders(String endpointUri, final Object body, final Map<String, Object> headers) { 176 return sendBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), body, headers); 177 } 178 179 /** 180 * Sends the body to an endpoint with the specified headers and header 181 * values 182 * 183 * @param endpoint the endpoint URI to send to 184 * @param body the payload send 185 * @return the result 186 */ 187 public Object sendBodyAndHeaders(Endpoint endpoint, final Object body, final Map<String, Object> headers) { 188 E result = send(endpoint, new Processor() { 189 public void process(Exchange exchange) { 190 Message in = exchange.getIn(); 191 for (Map.Entry<String, Object> header : headers.entrySet()) { 192 in.setHeader(header.getKey(), header.getValue()); 193 } 194 in.setBody(body); 195 } 196 }); 197 return extractResultBody(result); 198 } 199 200 // Methods using the default endpoint 201 // ----------------------------------------------------------------------- 202 203 /** 204 * Sends the body to the default endpoint and returns the result content 205 * 206 * @param body the body to send 207 * @return the returned message body 208 */ 209 public Object sendBody(Object body) { 210 return sendBody(getMandatoryDefaultEndpoint(), body); 211 } 212 213 /** 214 * Sends the exchange to the default endpoint 215 * 216 * @param exchange the exchange to send 217 */ 218 public E send(E exchange) { 219 return send(getMandatoryDefaultEndpoint(), exchange); 220 } 221 222 /** 223 * Sends an exchange to the default endpoint using a supplied 224 * 225 * @{link Processor} to populate the exchange 226 * 227 * @param processor the transformer used to populate the new exchange 228 */ 229 public E send(Processor processor) { 230 return send(getMandatoryDefaultEndpoint(), processor); 231 } 232 233 public Object sendBodyAndHeader(Object body, String header, Object headerValue) { 234 return sendBodyAndHeader(getMandatoryDefaultEndpoint(), body, header, headerValue); 235 } 236 237 public Object sendBodyAndHeaders(Object body, Map<String, Object> headers) { 238 return sendBodyAndHeaders(getMandatoryDefaultEndpoint(), body, headers); 239 } 240 241 // Properties 242 // ----------------------------------------------------------------------- 243 public Producer<E> getProducer(Endpoint<E> endpoint) { 244 return producerCache.getProducer(endpoint); 245 } 246 247 public CamelContext getContext() { 248 return context; 249 } 250 251 public Endpoint<E> getDefaultEndpoint() { 252 return defaultEndpoint; 253 } 254 255 public void setDefaultEndpoint(Endpoint<E> defaultEndpoint) { 256 this.defaultEndpoint = defaultEndpoint; 257 } 258 259 /** 260 * Sets the default endpoint to use if none is specified 261 */ 262 public void setDefaultEndpointUri(String endpointUri) { 263 setDefaultEndpoint(getContext().getEndpoint(endpointUri)); 264 } 265 266 public boolean isUseEndpointCache() { 267 return useEndpointCache; 268 } 269 270 public void setUseEndpointCache(boolean useEndpointCache) { 271 this.useEndpointCache = useEndpointCache; 272 } 273 274 // Implementation methods 275 // ----------------------------------------------------------------------- 276 277 protected Endpoint resolveMandatoryEndpoint(String endpointUri) { 278 Endpoint endpoint = null; 279 280 if (isUseEndpointCache()) { 281 synchronized (endpointCache) { 282 endpoint = endpointCache.get(endpointUri); 283 if (endpoint == null) { 284 endpoint = context.getEndpoint(endpointUri); 285 if (endpoint != null) { 286 endpointCache.put(endpointUri, endpoint); 287 } 288 } 289 } 290 } else { 291 endpoint = context.getEndpoint(endpointUri); 292 } 293 if (endpoint == null) { 294 throw new NoSuchEndpointException(endpointUri); 295 } 296 return endpoint; 297 } 298 299 protected Endpoint<E> getMandatoryDefaultEndpoint() { 300 Endpoint<E> answer = getDefaultEndpoint(); 301 ObjectHelper.notNull(answer, "defaultEndpoint"); 302 return answer; 303 } 304 305 protected void doStart() throws Exception { 306 producerCache.start(); 307 } 308 309 protected void doStop() throws Exception { 310 producerCache.stop(); 311 } 312 313 protected Object extractResultBody(E result) { 314 Object answer = null; 315 if (result != null) { 316 answer = result.getOut().getBody(); 317 if (answer == null) { 318 answer = result.getIn().getBody(); 319 } 320 } 321 return answer; 322 } 323 }