001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.util; 018 019 import java.util.HashMap; 020 import java.util.Map; 021 import java.util.concurrent.ExecutionException; 022 import java.util.concurrent.Future; 023 import java.util.concurrent.TimeUnit; 024 import java.util.concurrent.TimeoutException; 025 026 import org.apache.camel.CamelContext; 027 import org.apache.camel.CamelExecutionException; 028 import org.apache.camel.Endpoint; 029 import org.apache.camel.Exchange; 030 import org.apache.camel.ExchangePattern; 031 import org.apache.camel.InvalidPayloadException; 032 import org.apache.camel.Message; 033 import org.apache.camel.NoSuchBeanException; 034 import org.apache.camel.NoSuchEndpointException; 035 import org.apache.camel.NoSuchHeaderException; 036 import org.apache.camel.NoSuchPropertyException; 037 import org.apache.camel.NoTypeConversionAvailableException; 038 import org.apache.camel.TypeConverter; 039 040 /** 041 * Some helper methods for working with {@link Exchange} objects 042 * 043 * @version $Revision: 774683 $ 044 */ 045 public final class ExchangeHelper { 046 047 /** 048 * Utility classes should not have a public constructor. 049 */ 050 private ExchangeHelper() { 051 } 052 053 /** 054 * Extracts the exchange property of the given name and type; if it is not present then the 055 * default value will be used 056 * 057 * @param exchange the message exchange 058 * @param propertyName the name of the property on the exchange 059 * @param type the expected type of the property 060 * @param defaultValue the default value to be used if the property name does not exist or could not be 061 * converted to the given type 062 * @return the property value as the given type or the defaultValue if it could not be found or converted 063 */ 064 public static <T> T getExchangeProperty(Exchange exchange, String propertyName, Class<T> type, T defaultValue) { 065 T answer = exchange.getProperty(propertyName, type); 066 if (answer == null) { 067 return defaultValue; 068 } 069 return answer; 070 } 071 072 073 /** 074 * Attempts to resolve the endpoint for the given value 075 * 076 * @param exchange the message exchange being processed 077 * @param value the value which can be an {@link Endpoint} or an object 078 * which provides a String representation of an endpoint via 079 * {@link #toString()} 080 * 081 * @return the endpoint 082 * @throws NoSuchEndpointException if the endpoint cannot be resolved 083 */ 084 public static Endpoint resolveEndpoint(Exchange exchange, Object value) 085 throws NoSuchEndpointException { 086 Endpoint endpoint; 087 if (value instanceof Endpoint) { 088 endpoint = (Endpoint)value; 089 } else { 090 String uri = value.toString(); 091 endpoint = CamelContextHelper.getMandatoryEndpoint(exchange.getContext(), uri); 092 } 093 return endpoint; 094 } 095 096 public static <T> T getMandatoryProperty(Exchange exchange, String propertyName, Class<T> type) 097 throws NoSuchPropertyException { 098 T result = exchange.getProperty(propertyName, type); 099 if (result != null) { 100 return result; 101 } 102 throw new NoSuchPropertyException(exchange, propertyName, type); 103 } 104 105 public static <T> T getMandatoryHeader(Exchange exchange, String propertyName, Class<T> type) 106 throws NoSuchHeaderException { 107 T answer = exchange.getIn().getHeader(propertyName, type); 108 if (answer == null) { 109 throw new NoSuchHeaderException(exchange, propertyName, type); 110 } 111 return answer; 112 } 113 114 /** 115 * Returns the mandatory inbound message body of the correct type or throws 116 * an exception if it is not present 117 */ 118 public static Object getMandatoryInBody(Exchange exchange) throws InvalidPayloadException { 119 return exchange.getIn().getMandatoryBody(); 120 } 121 122 /** 123 * Returns the mandatory inbound message body of the correct type or throws 124 * an exception if it is not present 125 */ 126 public static <T> T getMandatoryInBody(Exchange exchange, Class<T> type) throws InvalidPayloadException { 127 return exchange.getIn().getMandatoryBody(type); 128 } 129 130 /** 131 * Returns the mandatory outbound message body of the correct type or throws 132 * an exception if it is not present 133 */ 134 public static Object getMandatoryOutBody(Exchange exchange) throws InvalidPayloadException { 135 return exchange.getOut().getMandatoryBody(); 136 } 137 138 /** 139 * Returns the mandatory outbound message body of the correct type or throws 140 * an exception if it is not present 141 */ 142 public static <T> T getMandatoryOutBody(Exchange exchange, Class<T> type) throws InvalidPayloadException { 143 return exchange.getOut().getMandatoryBody(type); 144 } 145 146 /** 147 * Converts the value to the given expected type or throws an exception 148 */ 149 public static <T> T convertToMandatoryType(Exchange exchange, Class<T> type, Object value) throws NoTypeConversionAvailableException { 150 CamelContext camelContext = exchange.getContext(); 151 TypeConverter converter = camelContext.getTypeConverter(); 152 if (converter != null) { 153 return converter.mandatoryConvertTo(type, exchange, value); 154 } 155 throw new NoTypeConversionAvailableException(value, type); 156 } 157 158 /** 159 * Converts the value to the given expected type returning null if it could 160 * not be converted 161 */ 162 public static <T> T convertToType(Exchange exchange, Class<T> type, Object value) { 163 CamelContext camelContext = exchange.getContext(); 164 TypeConverter converter = camelContext.getTypeConverter(); 165 if (converter != null) { 166 return converter.convertTo(type, exchange, value); 167 } 168 return null; 169 } 170 171 /** 172 * Copies the results of a message exchange from the source exchange to the result exchange 173 * which will copy the out and fault message contents and the exception 174 * 175 * @param result the result exchange which will have the output and error state added 176 * @param source the source exchange which is not modified 177 */ 178 public static void copyResults(Exchange result, Exchange source) { 179 180 // -------------------------------------------------------------------- 181 // TODO: merge logic with that of copyResultsPreservePattern() 182 // -------------------------------------------------------------------- 183 184 if (result != source) { 185 result.setException(source.getException()); 186 if (source.hasFault()) { 187 result.getFault().copyFrom(source.getFault()); 188 } 189 190 if (source.hasOut()) { 191 result.getOut().copyFrom(source.getOut()); 192 } else if (result.getPattern() == ExchangePattern.InOptionalOut) { 193 // special case where the result is InOptionalOut and with no OUT response 194 // so we should return null to indicate this fact 195 result.setOut(null); 196 } else { 197 // no results so lets copy the last input 198 // as the final processor on a pipeline might not 199 // have created any OUT; such as a mock:endpoint 200 // so lets assume the last IN is the OUT 201 if (result.getPattern().isOutCapable()) { 202 // only set OUT if its OUT capable 203 result.getOut().copyFrom(source.getIn()); 204 } else { 205 // if not replace IN instead to keep the MEP 206 result.getIn().copyFrom(source.getIn()); 207 } 208 } 209 result.getProperties().clear(); 210 result.getProperties().putAll(source.getProperties()); 211 } 212 } 213 214 /** 215 * Copies the <code>source</code> exchange to <code>target</code> exchange 216 * preserving the {@link ExchangePattern} of <code>target</code>. 217 * 218 * @param source source exchange. 219 * @param result target exchange. 220 */ 221 public static void copyResultsPreservePattern(Exchange result, Exchange source) { 222 223 // -------------------------------------------------------------------- 224 // TODO: merge logic with that of copyResults() 225 // -------------------------------------------------------------------- 226 227 if (source == result) { 228 // no need to copy 229 return; 230 } 231 232 // copy in message 233 result.getIn().copyFrom(source.getIn()); 234 235 // copy out message 236 if (source.hasOut()) { 237 // exchange pattern sensitive 238 getResultMessage(result).copyFrom(source.getOut()); 239 } 240 241 // copy fault message 242 if (source.hasFault()) { 243 result.getFault().copyFrom(source.getFault()); 244 } 245 246 // copy exception 247 result.setException(source.getException()); 248 249 // copy properties 250 result.getProperties().clear(); 251 result.getProperties().putAll(source.getProperties()); 252 } 253 254 /** 255 * Returns the message where to write results in an 256 * exchange-pattern-sensitive way. 257 * 258 * @param exchange 259 * message exchange. 260 * @return result message. 261 */ 262 public static Message getResultMessage(Exchange exchange) { 263 if (exchange.getPattern().isOutCapable()) { 264 return exchange.getOut(); 265 } else { 266 return exchange.getIn(); 267 } 268 } 269 270 /** 271 * Returns true if the given exchange pattern (if defined) can support IN messagea 272 * 273 * @param exchange the exchange to interrogate 274 * @return true if the exchange is defined as an {@link ExchangePattern} which supports 275 * IN messages 276 */ 277 public static boolean isInCapable(Exchange exchange) { 278 ExchangePattern pattern = exchange.getPattern(); 279 return pattern != null && pattern.isInCapable(); 280 } 281 282 /** 283 * Returns true if the given exchange pattern (if defined) can support OUT messagea 284 * 285 * @param exchange the exchange to interrogate 286 * @return true if the exchange is defined as an {@link ExchangePattern} which supports 287 * OUT messages 288 */ 289 public static boolean isOutCapable(Exchange exchange) { 290 ExchangePattern pattern = exchange.getPattern(); 291 return pattern != null && pattern.isOutCapable(); 292 } 293 294 /** 295 * Creates a new instance of the given type from the injector 296 */ 297 public static <T> T newInstance(Exchange exchange, Class<T> type) { 298 return exchange.getContext().getInjector().newInstance(type); 299 } 300 301 /** 302 * Creates a Map of the variables which are made available to a script or template 303 * 304 * @param exchange the exchange to make available 305 * @return a Map populated with the require dvariables 306 */ 307 public static Map createVariableMap(Exchange exchange) { 308 Map answer = new HashMap(); 309 populateVariableMap(exchange, answer); 310 return answer; 311 } 312 313 /** 314 * Populates the Map with the variables which are made available to a script or template 315 * 316 * @param exchange the exchange to make available 317 * @param map the map to populate 318 */ 319 @SuppressWarnings("unchecked") 320 public static void populateVariableMap(Exchange exchange, Map map) { 321 map.put("exchange", exchange); 322 Message in = exchange.getIn(); 323 map.put("in", in); 324 map.put("request", in); 325 map.put("headers", in.getHeaders()); 326 map.put("body", in.getBody()); 327 if (isOutCapable(exchange)) { 328 Message out = exchange.getOut(); 329 map.put("out", out); 330 map.put("response", out); 331 } 332 map.put("camelContext", exchange.getContext()); 333 } 334 335 /** 336 * Returns the MIME content type on the input message or null if one is not defined 337 */ 338 public static String getContentType(Exchange exchange) { 339 return MessageHelper.getContentType(exchange.getIn()); 340 } 341 342 /** 343 * Performs a lookup in the registry of the mandatory bean name and throws an exception if it could not be found 344 */ 345 public static Object lookupMandatoryBean(Exchange exchange, String name) { 346 Object value = lookupBean(exchange, name); 347 if (value == null) { 348 throw new NoSuchBeanException(name); 349 } 350 return value; 351 } 352 353 /** 354 * Performs a lookup in the registry of the mandatory bean name and throws an exception if it could not be found 355 */ 356 public static <T> T lookupMandatoryBean(Exchange exchange, String name, Class<T> type) { 357 T value = lookupBean(exchange, name, type); 358 if (value == null) { 359 throw new NoSuchBeanException(name); 360 } 361 return value; 362 } 363 364 /** 365 * Performs a lookup in the registry of the bean name 366 */ 367 public static Object lookupBean(Exchange exchange, String name) { 368 return exchange.getContext().getRegistry().lookup(name); 369 } 370 371 /** 372 * Performs a lookup in the registry of the bean name and type 373 */ 374 public static <T> T lookupBean(Exchange exchange, String name, Class<T> type) { 375 return exchange.getContext().getRegistry().lookup(name, type); 376 } 377 378 /** 379 * Returns the first exchange in the given collection of exchanges which has the same exchange ID as the one given 380 * or null if none could be found 381 */ 382 public static Exchange getExchangeById(Iterable<Exchange> exchanges, String exchangeId) { 383 for (Exchange exchange : exchanges) { 384 String id = exchange.getExchangeId(); 385 if (id != null && id.equals(exchangeId)) { 386 return exchange; 387 } 388 } 389 return null; 390 } 391 392 public static boolean isFailureHandled(Exchange exchange) { 393 Boolean handled = exchange.getProperty(Exchange.FAILURE_HANDLED, Boolean.class); 394 return handled != null && handled; 395 } 396 397 public static void setFailureHandled(Exchange exchange) { 398 exchange.setProperty(Exchange.FAILURE_HANDLED, Boolean.TRUE); 399 // clear exception since its failure handled 400 exchange.setException(null); 401 } 402 403 /** 404 * Extracts the body from the given exchange. 405 * <p/> 406 * If the exchange pattern is provided it will try to honor it and retrive the body 407 * from either IN or OUT according to the pattern. 408 * 409 * @param exchange the exchange 410 * @param pattern exchange pattern if given, can be <tt>null</tt> 411 * @return the result body, can be <tt>null</tt>. 412 * @throws CamelExecutionException if the processing of the exchange failed 413 */ 414 public static Object extractResultBody(Exchange exchange, ExchangePattern pattern) { 415 Object answer = null; 416 if (exchange != null) { 417 // rethrow if there was an exception during execution 418 if (exchange.getException() != null) { 419 throw ObjectHelper.wrapCamelExecutionException(exchange, exchange.getException()); 420 } 421 422 // result could have a fault message 423 if (hasFaultMessage(exchange)) { 424 return exchange.getFault().getBody(); 425 } 426 427 // okay no fault then return the response according to the pattern 428 // try to honor pattern if provided 429 boolean notOut = pattern != null && !pattern.isOutCapable(); 430 boolean hasOut = exchange.hasOut(); 431 if (hasOut && !notOut) { 432 // we have a response in out and the pattern is out capable 433 answer = exchange.getOut().getBody(); 434 } else if (!hasOut && exchange.getPattern() == ExchangePattern.InOptionalOut) { 435 // special case where the result is InOptionalOut and with no OUT response 436 // so we should return null to indicate this fact 437 answer = null; 438 } else { 439 // use IN as the response 440 answer = exchange.getIn().getBody(); 441 } 442 } 443 return answer; 444 } 445 446 /** 447 * Tests whether the exchange has a fault message set and that its not null. 448 * 449 * @param exchange the exchange 450 * @return <tt>true</tt> if fault message exists 451 */ 452 public static boolean hasFaultMessage(Exchange exchange) { 453 if (exchange.hasFault()) { 454 Object faultBody = exchange.getFault().getBody(); 455 if (faultBody != null) { 456 return true; 457 } 458 } 459 return false; 460 } 461 462 /** 463 * Extracts the body from the given future, that represents a handle to an asynchronous exchange. 464 * <p/> 465 * Will wait until the future task is complete. 466 * 467 * @param context the camel context 468 * @param future the future handle 469 * @param type the expected body response type 470 * @return the result body, can be <tt>null</tt>. 471 * @throws CamelExecutionException if the processing of the exchange failed 472 */ 473 public static <T> T extractFutureBody(CamelContext context, Future future, Class<T> type) { 474 try { 475 return doExtractFutureBody(context, future.get(), type); 476 } catch (InterruptedException e) { 477 throw ObjectHelper.wrapRuntimeCamelException(e); 478 } catch (ExecutionException e) { 479 // execution failed due to an exception so rethrow the cause 480 throw ObjectHelper.wrapCamelExecutionException(null, e.getCause()); 481 } finally { 482 // its harmless to cancel if task is already completed 483 // and in any case we do not want to get hold of the task a 2nd time 484 // and its recommended to cancel according to Brian Goetz in his Java Concurrency in Practice book 485 future.cancel(true); 486 } 487 } 488 489 /** 490 * Extracts the body from the given future, that represents a handle to an asynchronous exchange. 491 * <p/> 492 * Will wait for the future task to complete, but waiting at most the timeout value. 493 * 494 * @param context the camel context 495 * @param future the future handle 496 * @param timeout timeout value 497 * @param unit timeout unit 498 * @param type the expected body response type 499 * @return the result body, can be <tt>null</tt>. 500 * @throws CamelExecutionException if the processing of the exchange failed 501 * @throws java.util.concurrent.TimeoutException is thrown if a timeout triggered 502 */ 503 public static <T> T extractFutureBody(CamelContext context, Future future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException { 504 try { 505 if (timeout > 0) { 506 return doExtractFutureBody(context, future.get(timeout, unit), type); 507 } else { 508 return doExtractFutureBody(context, future.get(), type); 509 } 510 } catch (InterruptedException e) { 511 throw ObjectHelper.wrapRuntimeCamelException(e); 512 } catch (ExecutionException e) { 513 // execution failed due to an exception so rethrow the cause 514 throw ObjectHelper.wrapCamelExecutionException(null, e.getCause()); 515 } finally { 516 // its harmless to cancel if task is already completed 517 // and in any case we do not want to get hold of the task a 2nd time 518 // and its recommended to cancel according to Brian Goetz in his Java Concurrency in Practice book 519 future.cancel(true); 520 } 521 } 522 523 private static <T> T doExtractFutureBody(CamelContext context, Object result, Class<T> type) { 524 if (result == null) { 525 return null; 526 } 527 if (type.isAssignableFrom(result.getClass())) { 528 return type.cast(result); 529 } 530 if (result instanceof Exchange) { 531 Exchange exchange = (Exchange) result; 532 Object answer = ExchangeHelper.extractResultBody(exchange, exchange.getPattern()); 533 return context.getTypeConverter().convertTo(type, answer); 534 } 535 return context.getTypeConverter().convertTo(type, result); 536 } 537 538 }