Coverage Report - org.apache.camel.CamelTemplate
 
Classes in this File Line Coverage Branch Coverage Complexity
CamelTemplate
46% 
83% 
0
 
 1  
 /**
 2  
  *
 3  
  * Licensed to the Apache Software Foundation (ASF) under one or more
 4  
  * contributor license agreements.  See the NOTICE file distributed with
 5  
  * this work for additional information regarding copyright ownership.
 6  
  * The ASF licenses this file to You under the Apache License, Version 2.0
 7  
  * (the "License"); you may not use this file except in compliance with
 8  
  * the License.  You may obtain a copy of the License at
 9  
  *
 10  
  * http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.camel;
 19  
 
 20  
 import org.apache.camel.impl.ServiceSupport;
 21  
 import org.apache.camel.util.ObjectHelper;
 22  
 import org.apache.camel.util.ProducerCache;
 23  
 
 24  
 import java.util.HashMap;
 25  
 import java.util.Map;
 26  
 
 27  
 /**
 28  
  * A client helper object (named like Spring's TransactionTemplate & JmsTemplate et al)
 29  
  * for working with Camel and sending {@link Message} instances in an {@link Exchange}
 30  
  * to an {@link Endpoint}.
 31  
  *
 32  
  * @version $Revision: 537937 $
 33  
  */
 34  
 public class CamelTemplate<E extends Exchange> extends ServiceSupport {
 35  
     private CamelContext context;
 36  25
     private ProducerCache<E> producerCache = new ProducerCache<E>();
 37  25
     private boolean useEndpointCache = true;
 38  25
     private Map<String, Endpoint<E>> endpointCache = new HashMap<String, Endpoint<E>>();
 39  
     private Endpoint<E> defaultEndpoint;
 40  
 
 41  
 
 42  25
     public CamelTemplate(CamelContext context) {
 43  25
         this.context = context;
 44  25
     }
 45  
 
 46  
     public CamelTemplate(CamelContext context, Endpoint defaultEndpoint) {
 47  0
         this(context);
 48  0
         this.defaultEndpoint = defaultEndpoint;
 49  0
     }
 50  
 
 51  
     /**
 52  
      * Sends the exchange to the given endpoint
 53  
      *
 54  
      * @param endpointUri the endpoint URI to send the exchange to
 55  
      * @param exchange    the exchange to send
 56  
      */
 57  
     public E send(String endpointUri, E exchange) {
 58  1
         Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
 59  1
         send(endpoint, exchange);
 60  1
         return exchange;
 61  
     }
 62  
 
 63  
     /**
 64  
      * Sends an exchange to an endpoint using a supplied @{link Processor} to populate the exchange
 65  
      *
 66  
      * @param endpointUri the endpoint URI to send the exchange to
 67  
      * @param processor   the transformer used to populate the new exchange
 68  
      */
 69  
     public E send(String endpointUri, Processor processor) {
 70  20
         Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
 71  20
         return send(endpoint, processor);
 72  
     }
 73  
 
 74  
     /**
 75  
      * Sends the exchange to the given endpoint
 76  
      *
 77  
      * @param endpoint the endpoint to send the exchange to
 78  
      * @param exchange the exchange to send
 79  
      */
 80  
     public E send(Endpoint<E> endpoint, E exchange) {
 81  1
         E convertedExchange = endpoint.toExchangeType(exchange);
 82  1
         producerCache.send(endpoint, convertedExchange);
 83  1
         return exchange;
 84  
     }
 85  
 
 86  
     /**
 87  
      * Sends an exchange to an endpoint using a supplied @{link Processor} to populate the exchange
 88  
      *
 89  
      * @param endpoint  the endpoint to send the exchange to
 90  
      * @param processor the transformer used to populate the new exchange
 91  
      */
 92  
     public E send(Endpoint<E> endpoint, Processor processor) {
 93  34
         return producerCache.send(endpoint, processor);
 94  
     }
 95  
 
 96  
     /**
 97  
      * Send the body to an endpoint
 98  
      *
 99  
      * @param endpoint
 100  
      * @param body     = the payload
 101  
      * @return the result
 102  
      */
 103  
     public Object sendBody(Endpoint<E> endpoint, final Object body) {
 104  0
         E result = send(endpoint, new Processor() {
 105  0
             public void process(Exchange exchange) {
 106  0
                 Message in = exchange.getIn();
 107  0
                 in.setBody(body);
 108  0
             }
 109  
         });
 110  0
         return extractResultBody(result);
 111  
     }
 112  
 
 113  
     /**
 114  
      * Send the body to an endpoint
 115  
      *
 116  
      * @param endpointUri
 117  
      * @param body        = the payload
 118  
      * @return the result
 119  
      */
 120  
     public Object sendBody(String endpointUri, final Object body) {
 121  4
         E result = send(endpointUri, new Processor() {
 122  4
             public void process(Exchange exchange) {
 123  4
                 Message in = exchange.getIn();
 124  4
                 in.setBody(body);
 125  4
             }
 126  
         });
 127  4
         return extractResultBody(result);
 128  
     }
 129  
 
 130  
     /**
 131  
      * Sends the body to an endpoint with a specified header and header value
 132  
      *
 133  
      * @param endpointUri the endpoint URI to send to
 134  
      * @param body        the payload send
 135  
      * @param header      the header name
 136  
      * @param headerValue the header value
 137  
      * @return the result
 138  
      */
 139  
     public Object sendBody(String endpointUri, final Object body, final String header, final Object headerValue) {
 140  0
         E result = send(endpointUri, new Processor() {
 141  0
             public void process(Exchange exchange) {
 142  0
                 Message in = exchange.getIn();
 143  0
                 in.setHeader(header, headerValue);
 144  0
                 in.setBody(body);
 145  0
             }
 146  
         });
 147  0
         return extractResultBody(result);
 148  
     }
 149  
     
 150  
     /**
 151  
      * Sends the body to an endpoint with the specified headers and header values
 152  
      *
 153  
      * @param endpointUri the endpoint URI to send to
 154  
      * @param body        the payload send
 155  
      * @return the result
 156  
      */
 157  
     public Object sendBody(String endpointUri, final Object body, final Map<String, Object> headers) {
 158  0
         E result = send(endpointUri, new Processor() {
 159  0
             public void process(Exchange exchange) {
 160  0
                 Message in = exchange.getIn();
 161  0
                 for (Map.Entry<String, Object> header : headers.entrySet()) {
 162  0
                     in.setHeader(header.getKey(), header.getValue());
 163  0
                                 }
 164  0
                 in.setBody(body);
 165  0
             }
 166  
         });
 167  0
         return extractResultBody(result);
 168  
     }
 169  
 
 170  
     // Methods using the default endpoint
 171  
     //-----------------------------------------------------------------------
 172  
 
 173  
     /**
 174  
      * Sends the body to the default endpoint and returns the result content
 175  
      *
 176  
      * @param body the body to send
 177  
      * @return the returned message body
 178  
      */
 179  
     public Object sendBody(Object body) {
 180  0
         return sendBody(getMandatoryDefaultEndpoint(), body);
 181  
     }
 182  
 
 183  
     /**
 184  
      * Sends the exchange to the default endpoint
 185  
      *
 186  
      * @param exchange the exchange to send
 187  
      */
 188  
     public E send(E exchange) {
 189  0
         return send(getMandatoryDefaultEndpoint(), exchange);
 190  
     }
 191  
 
 192  
     /**
 193  
      * Sends an exchange to the default endpoint
 194  
      * using a supplied @{link Processor} to populate the exchange
 195  
      *
 196  
      * @param processor the transformer used to populate the new exchange
 197  
      */
 198  
     public E send(Processor processor) {
 199  0
         return send(getMandatoryDefaultEndpoint(), processor);
 200  
     }
 201  
 
 202  
 
 203  
     // Properties
 204  
     //-----------------------------------------------------------------------
 205  
     public Producer<E> getProducer(Endpoint<E> endpoint) {
 206  0
         return producerCache.getProducer(endpoint);
 207  
     }
 208  
 
 209  
     public CamelContext getContext() {
 210  0
         return context;
 211  
     }
 212  
 
 213  
     public Endpoint<E> getDefaultEndpoint() {
 214  0
         return defaultEndpoint;
 215  
     }
 216  
 
 217  
     public void setDefaultEndpoint(Endpoint<E> defaultEndpoint) {
 218  0
         this.defaultEndpoint = defaultEndpoint;
 219  0
     }
 220  
 
 221  
     /**
 222  
      * Sets the default endpoint to use if none is specified
 223  
      */
 224  
     public void setDefaultEndpointUri(String endpointUri) {
 225  0
         setDefaultEndpoint(getContext().getEndpoint(endpointUri));
 226  0
     }
 227  
 
 228  
     public boolean isUseEndpointCache() {
 229  21
         return useEndpointCache;
 230  
     }
 231  
 
 232  
     public void setUseEndpointCache(boolean useEndpointCache) {
 233  0
         this.useEndpointCache = useEndpointCache;
 234  0
     }
 235  
 
 236  
     // Implementation methods
 237  
     //-----------------------------------------------------------------------
 238  
 
 239  
     protected Endpoint resolveMandatoryEndpoint(String endpointUri) {
 240  21
         Endpoint endpoint = null;
 241  
 
 242  21
         if (isUseEndpointCache()) {
 243  21
             synchronized (endpointCache) {
 244  21
                 endpoint = endpointCache.get(endpointUri);
 245  21
                 if (endpoint == null) {
 246  18
                     endpoint = context.getEndpoint(endpointUri);
 247  18
                     if (endpoint != null) {
 248  18
                         endpointCache.put(endpointUri, endpoint);
 249  
                     }
 250  
                 }
 251  21
             }
 252  21
         }
 253  
         else {
 254  0
             endpoint = context.getEndpoint(endpointUri);
 255  
         }
 256  21
         if (endpoint == null) {
 257  0
             throw new NoSuchEndpointException(endpointUri);
 258  
         }
 259  21
         return endpoint;
 260  
     }
 261  
 
 262  
     protected Endpoint<E> getMandatoryDefaultEndpoint() {
 263  0
         Endpoint<E> answer = getDefaultEndpoint();
 264  0
         ObjectHelper.notNull(answer, "defaultEndpoint");
 265  0
         return answer;
 266  
     }
 267  
 
 268  
     protected void doStart() throws Exception {
 269  0
         producerCache.start();
 270  0
     }
 271  
 
 272  
     protected void doStop() throws Exception {
 273  25
         producerCache.stop();
 274  25
     }
 275  
 
 276  
     protected Object extractResultBody(E result) {
 277  4
         return result != null ? result.getOut().getBody() : null;
 278  
     }
 279  
 }