001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.impl; 018 019 import java.util.HashMap; 020 import java.util.Map; 021 import java.util.concurrent.Callable; 022 import java.util.concurrent.ExecutorService; 023 import java.util.concurrent.Future; 024 import java.util.concurrent.TimeUnit; 025 import java.util.concurrent.TimeoutException; 026 027 import org.apache.camel.CamelContext; 028 import org.apache.camel.Endpoint; 029 import org.apache.camel.Exchange; 030 import org.apache.camel.ExchangePattern; 031 import org.apache.camel.Message; 032 import org.apache.camel.NoSuchEndpointException; 033 import org.apache.camel.Processor; 034 import org.apache.camel.Producer; 035 import org.apache.camel.ProducerTemplate; 036 import org.apache.camel.util.CamelContextHelper; 037 import org.apache.camel.util.ExchangeHelper; 038 import org.apache.camel.util.ObjectHelper; 039 import org.apache.camel.util.concurrent.ExecutorServiceHelper; 040 041 /** 042 * A client helper object (named like Spring's TransactionTemplate & JmsTemplate 043 * et al) for working with Camel and sending {@link org.apache.camel.Message} instances in an 044 * {@link org.apache.camel.Exchange} to an {@link org.apache.camel.Endpoint}. 045 * 046 * @version $Revision: 779038 $ 047 */ 048 public class DefaultProducerTemplate extends ServiceSupport implements ProducerTemplate { 049 private static final int DEFAULT_THREADPOOL_SIZE = 5; 050 private final CamelContext context; 051 private final ProducerCache producerCache; 052 private Endpoint defaultEndpoint; 053 private ExecutorService executor; 054 055 public DefaultProducerTemplate(CamelContext context) { 056 this.context = context; 057 this.executor = ExecutorServiceHelper.newScheduledThreadPool(DEFAULT_THREADPOOL_SIZE, "ProducerTemplate", true); 058 this.producerCache = new ProducerCache(context.getProducerServicePool()); 059 } 060 061 public DefaultProducerTemplate(CamelContext context, ExecutorService executor) { 062 this.context = context; 063 this.executor = executor; 064 this.producerCache = new ProducerCache(context.getProducerServicePool()); 065 } 066 067 public DefaultProducerTemplate(CamelContext context, Endpoint defaultEndpoint) { 068 this(context); 069 this.defaultEndpoint = defaultEndpoint; 070 } 071 072 public static DefaultProducerTemplate newInstance(CamelContext camelContext, String defaultEndpointUri) { 073 Endpoint endpoint = CamelContextHelper.getMandatoryEndpoint(camelContext, defaultEndpointUri); 074 return new DefaultProducerTemplate(camelContext, endpoint); 075 } 076 077 public Exchange send(String endpointUri, Exchange exchange) { 078 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri); 079 return send(endpoint, exchange); 080 } 081 082 public Exchange send(String endpointUri, Processor processor) { 083 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri); 084 return send(endpoint, processor); 085 } 086 087 public Exchange send(String endpointUri, ExchangePattern pattern, Processor processor) { 088 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri); 089 return send(endpoint, pattern, processor); 090 } 091 092 public Exchange send(Endpoint endpoint, Exchange exchange) { 093 producerCache.send(endpoint, exchange); 094 return exchange; 095 } 096 097 public Exchange send(Endpoint endpoint, Processor processor) { 098 return producerCache.send(endpoint, processor); 099 } 100 101 public Exchange send(Endpoint endpoint, ExchangePattern pattern, Processor processor) { 102 return producerCache.send(endpoint, pattern, processor); 103 } 104 105 public Object sendBody(Endpoint endpoint, ExchangePattern pattern, Object body) { 106 Exchange result = send(endpoint, pattern, createSetBodyProcessor(body)); 107 return extractResultBody(result, pattern); 108 } 109 110 public void sendBody(Endpoint endpoint, Object body) { 111 Exchange result = send(endpoint, createSetBodyProcessor(body)); 112 // must invoke extract result body in case of exception to be rethrown 113 extractResultBody(result); 114 } 115 116 public void sendBody(String endpointUri, Object body) { 117 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri); 118 sendBody(endpoint, body); 119 } 120 121 public Object sendBody(String endpointUri, ExchangePattern pattern, Object body) { 122 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri); 123 Object result = sendBody(endpoint, pattern, body); 124 if (pattern.isOutCapable()) { 125 return result; 126 } else { 127 // return null if not OUT capable 128 return null; 129 } 130 } 131 132 public void sendBodyAndHeader(String endpointUri, final Object body, final String header, final Object headerValue) { 133 sendBodyAndHeader(resolveMandatoryEndpoint(endpointUri), body, header, headerValue); 134 } 135 136 public void sendBodyAndHeader(Endpoint endpoint, final Object body, final String header, final Object headerValue) { 137 Exchange result = send(endpoint, createBodyAndHeaderProcessor(body, header, headerValue)); 138 // must invoke extract result body in case of exception to be rethrown 139 extractResultBody(result); 140 } 141 142 public Object sendBodyAndHeader(Endpoint endpoint, ExchangePattern pattern, final Object body, 143 final String header, final Object headerValue) { 144 Exchange exchange = send(endpoint, pattern, createBodyAndHeaderProcessor(body, header, headerValue)); 145 Object result = extractResultBody(exchange, pattern); 146 if (pattern.isOutCapable()) { 147 return result; 148 } else { 149 // return null if not OUT capable 150 return null; 151 } 152 } 153 154 public Object sendBodyAndHeader(String endpoint, ExchangePattern pattern, final Object body, 155 final String header, final Object headerValue) { 156 Exchange exchange = send(endpoint, pattern, createBodyAndHeaderProcessor(body, header, headerValue)); 157 Object result = extractResultBody(exchange, pattern); 158 if (pattern.isOutCapable()) { 159 return result; 160 } else { 161 // return null if not OUT capable 162 return null; 163 } 164 } 165 166 public void sendBodyAndProperty(String endpointUri, final Object body, 167 final String property, final Object propertyValue) { 168 sendBodyAndProperty(resolveMandatoryEndpoint(endpointUri), body, property, propertyValue); 169 } 170 171 public void sendBodyAndProperty(Endpoint endpoint, final Object body, 172 final String property, final Object propertyValue) { 173 Exchange result = send(endpoint, createBodyAndPropertyProcessor(body, property, propertyValue)); 174 // must invoke extract result body in case of exception to be rethrown 175 extractResultBody(result); 176 } 177 178 public Object sendBodyAndProperty(Endpoint endpoint, ExchangePattern pattern, final Object body, 179 final String property, final Object propertyValue) { 180 Exchange exchange = send(endpoint, pattern, createBodyAndPropertyProcessor(body, property, propertyValue)); 181 Object result = extractResultBody(exchange, pattern); 182 if (pattern.isOutCapable()) { 183 return result; 184 } else { 185 // return null if not OUT capable 186 return null; 187 } 188 } 189 190 public Object sendBodyAndProperty(String endpoint, ExchangePattern pattern, final Object body, 191 final String property, final Object propertyValue) { 192 Exchange exchange = send(endpoint, pattern, createBodyAndPropertyProcessor(body, property, propertyValue)); 193 Object result = extractResultBody(exchange, pattern); 194 if (pattern.isOutCapable()) { 195 return result; 196 } else { 197 // return null if not OUT capable 198 return null; 199 } 200 } 201 202 public void sendBodyAndHeaders(String endpointUri, final Object body, final Map<String, Object> headers) { 203 sendBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), body, headers); 204 } 205 206 public void sendBodyAndHeaders(Endpoint endpoint, final Object body, final Map<String, Object> headers) { 207 Exchange result = send(endpoint, new Processor() { 208 public void process(Exchange exchange) { 209 Message in = exchange.getIn(); 210 for (Map.Entry<String, Object> header : headers.entrySet()) { 211 in.setHeader(header.getKey(), header.getValue()); 212 } 213 in.setBody(body); 214 } 215 }); 216 // must invoke extract result body in case of exception to be rethrown 217 extractResultBody(result); 218 } 219 220 public Object sendBodyAndHeaders(String endpointUri, ExchangePattern pattern, Object body, Map<String, Object> headers) { 221 return sendBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), pattern, body, headers); 222 } 223 224 public Object sendBodyAndHeaders(Endpoint endpoint, ExchangePattern pattern, final Object body, final Map<String, Object> headers) { 225 Exchange exchange = send(endpoint, pattern, new Processor() { 226 public void process(Exchange exchange) throws Exception { 227 Message in = exchange.getIn(); 228 for (Map.Entry<String, Object> header : headers.entrySet()) { 229 in.setHeader(header.getKey(), header.getValue()); 230 } 231 in.setBody(body); 232 } 233 }); 234 Object result = extractResultBody(exchange, pattern); 235 if (pattern.isOutCapable()) { 236 return result; 237 } else { 238 // return null if not OUT capable 239 return null; 240 } 241 } 242 243 // Methods using an InOut ExchangePattern 244 // ----------------------------------------------------------------------- 245 246 public Exchange request(Endpoint endpoint, Processor processor) { 247 return send(endpoint, ExchangePattern.InOut, processor); 248 } 249 250 public Object requestBody(Object body) { 251 return sendBody(getMandatoryDefaultEndpoint(), ExchangePattern.InOut, body); 252 } 253 254 public Object requestBody(Endpoint endpoint, Object body) { 255 return sendBody(endpoint, ExchangePattern.InOut, body); 256 } 257 258 public Object requestBodyAndHeader(Endpoint endpoint, Object body, String header, Object headerValue) { 259 return sendBodyAndHeader(endpoint, ExchangePattern.InOut, body, header, headerValue); 260 } 261 262 public Exchange request(String endpoint, Processor processor) { 263 return send(endpoint, ExchangePattern.InOut, processor); 264 } 265 266 public Object requestBody(String endpoint, Object body) { 267 return sendBody(endpoint, ExchangePattern.InOut, body); 268 } 269 270 public Object requestBodyAndHeader(String endpoint, Object body, String header, Object headerValue) { 271 return sendBodyAndHeader(endpoint, ExchangePattern.InOut, body, header, headerValue); 272 } 273 274 public Object requestBodyAndHeaders(String endpointUri, Object body, Map<String, Object> headers) { 275 return requestBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), body, headers); 276 } 277 278 public Object requestBodyAndHeaders(Endpoint endpoint, final Object body, final Map<String, Object> headers) { 279 return sendBodyAndHeaders(endpoint, ExchangePattern.InOut, body, headers); 280 } 281 282 public <T> T requestBody(Object body, Class<T> type) { 283 Object answer = requestBody(body); 284 return context.getTypeConverter().convertTo(type, answer); 285 } 286 287 public <T> T requestBody(Endpoint endpoint, Object body, Class<T> type) { 288 Object answer = requestBody(endpoint, body); 289 return context.getTypeConverter().convertTo(type, answer); 290 } 291 292 public <T> T requestBody(String endpointUri, Object body, Class<T> type) { 293 Object answer = requestBody(endpointUri, body); 294 return context.getTypeConverter().convertTo(type, answer); 295 } 296 297 public <T> T requestBodyAndHeader(Endpoint endpoint, Object body, String header, Object headerValue, Class<T> type) { 298 Object answer = requestBodyAndHeader(endpoint, body, header, headerValue); 299 return context.getTypeConverter().convertTo(type, answer); 300 } 301 302 public <T> T requestBodyAndHeader(String endpointUri, Object body, String header, Object headerValue, Class<T> type) { 303 Object answer = requestBodyAndHeader(endpointUri, body, header, headerValue); 304 return context.getTypeConverter().convertTo(type, answer); 305 } 306 307 public <T> T requestBodyAndHeaders(String endpointUri, Object body, Map<String, Object> headers, Class<T> type) { 308 Object answer = requestBodyAndHeaders(endpointUri, body, headers); 309 return context.getTypeConverter().convertTo(type, answer); 310 } 311 312 public <T> T requestBodyAndHeaders(Endpoint endpoint, Object body, Map<String, Object> headers, Class<T> type) { 313 Object answer = requestBodyAndHeaders(endpoint, body, headers); 314 return context.getTypeConverter().convertTo(type, answer); 315 } 316 317 // Methods using the default endpoint 318 // ----------------------------------------------------------------------- 319 320 public void sendBody(Object body) { 321 sendBody(getMandatoryDefaultEndpoint(), body); 322 } 323 324 public Exchange send(Exchange exchange) { 325 return send(getMandatoryDefaultEndpoint(), exchange); 326 } 327 328 public Exchange send(Processor processor) { 329 return send(getMandatoryDefaultEndpoint(), processor); 330 } 331 332 public void sendBodyAndHeader(Object body, String header, Object headerValue) { 333 sendBodyAndHeader(getMandatoryDefaultEndpoint(), body, header, headerValue); 334 } 335 336 public void sendBodyAndProperty(Object body, String property, Object propertyValue) { 337 sendBodyAndProperty(getMandatoryDefaultEndpoint(), body, property, propertyValue); 338 } 339 340 public void sendBodyAndHeaders(Object body, Map<String, Object> headers) { 341 sendBodyAndHeaders(getMandatoryDefaultEndpoint(), body, headers); 342 } 343 344 // Properties 345 // ----------------------------------------------------------------------- 346 public CamelContext getContext() { 347 return context; 348 } 349 350 public Endpoint getDefaultEndpoint() { 351 return defaultEndpoint; 352 } 353 354 public void setDefaultEndpoint(Endpoint defaultEndpoint) { 355 this.defaultEndpoint = defaultEndpoint; 356 } 357 358 /** 359 * Sets the default endpoint to use if none is specified 360 */ 361 public void setDefaultEndpointUri(String endpointUri) { 362 setDefaultEndpoint(getContext().getEndpoint(endpointUri)); 363 } 364 365 public <T extends Endpoint> T getResolvedEndpoint(String endpointUri, Class<T> expectedClass) { 366 return context.getEndpoint(endpointUri, expectedClass); 367 } 368 369 // Implementation methods 370 // ----------------------------------------------------------------------- 371 372 protected Processor createBodyAndHeaderProcessor(final Object body, final String header, final Object headerValue) { 373 return new Processor() { 374 public void process(Exchange exchange) { 375 Message in = exchange.getIn(); 376 in.setHeader(header, headerValue); 377 in.setBody(body); 378 } 379 }; 380 } 381 382 protected Processor createBodyAndPropertyProcessor(final Object body, final String property, final Object propertyValue) { 383 return new Processor() { 384 public void process(Exchange exchange) { 385 exchange.setProperty(property, propertyValue); 386 387 Message in = exchange.getIn(); 388 in.setBody(body); 389 } 390 }; 391 } 392 393 protected Processor createSetBodyProcessor(final Object body) { 394 return new Processor() { 395 public void process(Exchange exchange) { 396 Message in = exchange.getIn(); 397 in.setBody(body); 398 } 399 }; 400 } 401 402 protected Endpoint resolveMandatoryEndpoint(String endpointUri) { 403 Endpoint endpoint = context.getEndpoint(endpointUri); 404 if (endpoint == null) { 405 throw new NoSuchEndpointException(endpointUri); 406 } 407 return endpoint; 408 } 409 410 protected Endpoint getMandatoryDefaultEndpoint() { 411 Endpoint answer = getDefaultEndpoint(); 412 ObjectHelper.notNull(answer, "defaultEndpoint"); 413 return answer; 414 } 415 416 protected void doStart() throws Exception { 417 producerCache.start(); 418 } 419 420 protected void doStop() throws Exception { 421 producerCache.stop(); 422 if (executor != null) { 423 executor.shutdown(); 424 } 425 } 426 427 protected Object extractResultBody(Exchange result) { 428 return extractResultBody(result, null); 429 } 430 431 protected Object extractResultBody(Exchange result, ExchangePattern pattern) { 432 return ExchangeHelper.extractResultBody(result, pattern); 433 } 434 435 public void setExecutorService(ExecutorService executorService) { 436 this.executor = executorService; 437 } 438 439 public Future<Exchange> asyncSend(final String uri, final Exchange exchange) { 440 Callable<Exchange> task = new Callable<Exchange>() { 441 public Exchange call() throws Exception { 442 return send(uri, exchange); 443 } 444 }; 445 446 return executor.submit(task); 447 } 448 449 public Future<Exchange> asyncSend(final String uri, final Processor processor) { 450 Callable<Exchange> task = new Callable<Exchange>() { 451 public Exchange call() throws Exception { 452 return send(uri, processor); 453 } 454 }; 455 456 return executor.submit(task); 457 } 458 459 public Future<Object> asyncSendBody(final String uri, final Object body) { 460 Callable<Object> task = new Callable<Object>() { 461 public Object call() throws Exception { 462 sendBody(uri, body); 463 // its InOnly, so no body to return 464 return null; 465 } 466 }; 467 468 return executor.submit(task); 469 } 470 471 public Future<Object> asyncRequestBody(final String uri, final Object body) { 472 Callable<Object> task = new Callable<Object>() { 473 public Object call() throws Exception { 474 return requestBody(uri, body); 475 } 476 }; 477 478 return executor.submit(task); 479 } 480 481 public <T> Future<T> asyncRequestBody(final String uri, final Object body, final Class<T> type) { 482 Callable<T> task = new Callable<T>() { 483 public T call() throws Exception { 484 return requestBody(uri, body, type); 485 } 486 }; 487 488 return executor.submit(task); 489 } 490 491 public Future<Object> asyncRequestBodyAndHeader(final String endpointUri, final Object body, final String header, final Object headerValue) { 492 Callable<Object> task = new Callable<Object>() { 493 public Object call() throws Exception { 494 return requestBodyAndHeader(endpointUri, body, header, headerValue); 495 } 496 }; 497 498 return executor.submit(task); 499 } 500 501 public <T> Future<T> asyncRequestBodyAndHeader(final String endpointUri, final Object body, final String header, final Object headerValue, final Class<T> type) { 502 Callable<T> task = new Callable<T>() { 503 public T call() throws Exception { 504 return requestBodyAndHeader(endpointUri, body, header, headerValue, type); 505 } 506 }; 507 508 return executor.submit(task); 509 } 510 511 public Future<Object> asyncRequestBodyAndHeaders(final String endpointUri, final Object body, final Map<String, Object> headers) { 512 Callable<Object> task = new Callable<Object>() { 513 public Object call() throws Exception { 514 return requestBodyAndHeaders(endpointUri, body, headers); 515 } 516 }; 517 518 return executor.submit(task); 519 } 520 521 public <T> Future<T> asyncRequestBodyAndHeaders(final String endpointUri, final Object body, final Map<String, Object> headers, final Class<T> type) { 522 Callable<T> task = new Callable<T>() { 523 public T call() throws Exception { 524 return requestBodyAndHeaders(endpointUri, body, headers, type); 525 } 526 }; 527 528 return executor.submit(task); 529 } 530 531 public <T> T extractFutureBody(Future future, Class<T> type) { 532 return ExchangeHelper.extractFutureBody(context, future, type); 533 } 534 535 public <T> T extractFutureBody(Future future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException { 536 return ExchangeHelper.extractFutureBody(context, future, timeout, unit, type); 537 } 538 539 }