001 /** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one or more 004 * contributor license agreements. See the NOTICE file distributed with 005 * this work for additional information regarding copyright ownership. 006 * The ASF licenses this file to You under the Apache License, Version 2.0 007 * (the "License"); you may not use this file except in compliance with 008 * the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.camel; 019 020 import org.apache.camel.impl.ServiceSupport; 021 import org.apache.camel.util.ProducerCache; 022 023 /** 024 * A Client object for working with Camel and invoking {@link Endpoint} instances with {@link Exchange} instances 025 * 026 * @version $Revision: 535136 $ 027 */ 028 public class CamelClient<E extends Exchange> extends ServiceSupport { 029 private CamelContext context; 030 private ProducerCache<E> producerCache = new ProducerCache<E>(); 031 032 public CamelClient(CamelContext context) { 033 this.context = context; 034 } 035 036 /** 037 * Sends the exchange to the given endpoint 038 * 039 * @param endpointUri the endpoint URI to send the exchange to 040 * @param exchange the exchange to send 041 */ 042 public E send(String endpointUri, E exchange) { 043 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri); 044 send(endpoint, exchange); 045 return exchange; 046 } 047 048 /** 049 * Sends an exchange to an endpoint using a supplied @{link Processor} to populate the exchange 050 * 051 * @param endpointUri the endpoint URI to send the exchange to 052 * @param processor the transformer used to populate the new exchange 053 */ 054 public E send(String endpointUri, Processor processor) { 055 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri); 056 return send(endpoint, processor); 057 } 058 059 /** 060 * Sends the exchange to the given endpoint 061 * 062 * @param endpoint the endpoint to send the exchange to 063 * @param exchange the exchange to send 064 */ 065 public E send(Endpoint<E> endpoint, E exchange) { 066 E convertedExchange = endpoint.toExchangeType(exchange); 067 producerCache.send(endpoint, convertedExchange); 068 return exchange; 069 } 070 071 /** 072 * Sends an exchange to an endpoint using a supplied @{link Processor} to populate the exchange 073 * 074 * @param endpoint the endpoint to send the exchange to 075 * @param processor the transformer used to populate the new exchange 076 */ 077 public E send(Endpoint<E> endpoint, Processor processor) { 078 return producerCache.send(endpoint, processor); 079 } 080 081 /** 082 * Send the body to an endpoint 083 * 084 * @param endpointUri 085 * @param body = the payload 086 * @return the result 087 */ 088 public Object sendBody(String endpointUri, final Object body) { 089 E result = send(endpointUri, new Processor() { 090 public void process(Exchange exchange) { 091 Message in = exchange.getIn(); 092 in.setBody(body); 093 } 094 }); 095 return extractResultBody(result); 096 } 097 098 /** 099 * Sends the body to an endpoint with a specified header and header value 100 * 101 * @param endpointUri the endpoint URI to send to 102 * @param body the payload send 103 * @param header the header name 104 * @param headerValue the header value 105 * @return the result 106 */ 107 public Object sendBody(String endpointUri, final Object body, final String header, final Object headerValue) { 108 E result = send(endpointUri, new Processor() { 109 public void process(Exchange exchange) { 110 Message in = exchange.getIn(); 111 in.setHeader(header, headerValue); 112 in.setBody(body); 113 } 114 }); 115 return extractResultBody(result); 116 } 117 118 public Producer<E> getProducer(Endpoint<E> endpoint) { 119 return producerCache.getProducer(endpoint); 120 } 121 122 public CamelContext getContext() { 123 return context; 124 } 125 126 protected Endpoint resolveMandatoryEndpoint(String endpointUri) { 127 Endpoint endpoint = context.getEndpoint(endpointUri); 128 if (endpoint == null) { 129 throw new NoSuchEndpointException(endpointUri); 130 } 131 return endpoint; 132 } 133 134 protected void doStart() throws Exception { 135 producerCache.start(); 136 } 137 138 protected void doStop() throws Exception { 139 producerCache.stop(); 140 } 141 142 protected Object extractResultBody(E result) { 143 return result != null ? result.getOut().getBody() : null; 144 } 145 }