001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.util;
018    
019    import java.util.HashMap;
020    import java.util.Map;
021    import java.util.concurrent.ExecutionException;
022    import java.util.concurrent.Future;
023    import java.util.concurrent.TimeUnit;
024    import java.util.concurrent.TimeoutException;
025    
026    import org.apache.camel.CamelContext;
027    import org.apache.camel.CamelExecutionException;
028    import org.apache.camel.Endpoint;
029    import org.apache.camel.Exchange;
030    import org.apache.camel.ExchangePattern;
031    import org.apache.camel.InvalidPayloadException;
032    import org.apache.camel.Message;
033    import org.apache.camel.NoSuchBeanException;
034    import org.apache.camel.NoSuchEndpointException;
035    import org.apache.camel.NoSuchHeaderException;
036    import org.apache.camel.NoSuchPropertyException;
037    import org.apache.camel.NoTypeConversionAvailableException;
038    import org.apache.camel.TypeConverter;
039    
040    /**
041     * Some helper methods for working with {@link Exchange} objects
042     *
043     * @version $Revision: 774683 $
044     */
045    public final class ExchangeHelper {
046    
047        /**
048         * Utility classes should not have a public constructor.
049         */
050        private ExchangeHelper() {
051        }
052    
053        /**
054         * Extracts the exchange property of the given name and type; if it is not present then the
055         * default value will be used
056         *
057         * @param exchange the message exchange
058         * @param propertyName the name of the property on the exchange
059         * @param type the expected type of the property
060         * @param defaultValue the default value to be used if the property name does not exist or could not be
061         * converted to the given type
062         * @return the property value as the given type or the defaultValue if it could not be found or converted
063         */
064        public static <T> T getExchangeProperty(Exchange exchange, String propertyName, Class<T> type, T defaultValue) {
065            T answer = exchange.getProperty(propertyName, type);
066            if (answer == null) {
067                return defaultValue;
068            }
069            return answer;
070        }
071    
072    
073        /**
074         * Attempts to resolve the endpoint for the given value
075         *
076         * @param exchange the message exchange being processed
077         * @param value the value which can be an {@link Endpoint} or an object
078         *                which provides a String representation of an endpoint via
079         *                {@link #toString()}
080         *
081         * @return the endpoint
082         * @throws NoSuchEndpointException if the endpoint cannot be resolved
083         */
084        public static Endpoint resolveEndpoint(Exchange exchange, Object value)
085            throws NoSuchEndpointException {
086            Endpoint endpoint;
087            if (value instanceof Endpoint) {
088                endpoint = (Endpoint)value;
089            } else {
090                String uri = value.toString();
091                endpoint = CamelContextHelper.getMandatoryEndpoint(exchange.getContext(), uri);
092            }
093            return endpoint;
094        }
095    
096        public static <T> T getMandatoryProperty(Exchange exchange, String propertyName, Class<T> type)
097            throws NoSuchPropertyException {
098            T result = exchange.getProperty(propertyName, type);
099            if (result != null) {
100                return result;
101            }
102            throw new NoSuchPropertyException(exchange, propertyName, type);
103        }
104    
105        public static <T> T getMandatoryHeader(Exchange exchange, String propertyName, Class<T> type)
106            throws NoSuchHeaderException {
107            T answer = exchange.getIn().getHeader(propertyName, type);
108            if (answer == null) {
109                throw new NoSuchHeaderException(exchange, propertyName, type);
110            }
111            return answer;
112        }
113    
114        /**
115         * Returns the mandatory inbound message body of the correct type or throws
116         * an exception if it is not present
117         */
118        public static Object getMandatoryInBody(Exchange exchange) throws InvalidPayloadException {
119            return exchange.getIn().getMandatoryBody();
120        }
121    
122        /**
123         * Returns the mandatory inbound message body of the correct type or throws
124         * an exception if it is not present
125         */
126        public static <T> T getMandatoryInBody(Exchange exchange, Class<T> type) throws InvalidPayloadException {
127            return exchange.getIn().getMandatoryBody(type);
128        }
129    
130        /**
131         * Returns the mandatory outbound message body of the correct type or throws
132         * an exception if it is not present
133         */
134        public static Object getMandatoryOutBody(Exchange exchange) throws InvalidPayloadException {
135            return exchange.getOut().getMandatoryBody();
136        }
137    
138        /**
139         * Returns the mandatory outbound message body of the correct type or throws
140         * an exception if it is not present
141         */
142        public static <T> T getMandatoryOutBody(Exchange exchange, Class<T> type) throws InvalidPayloadException {
143            return exchange.getOut().getMandatoryBody(type);
144        }
145    
146        /**
147         * Converts the value to the given expected type or throws an exception
148         */
149        public static <T> T convertToMandatoryType(Exchange exchange, Class<T> type, Object value) throws NoTypeConversionAvailableException {
150            CamelContext camelContext = exchange.getContext();
151            TypeConverter converter = camelContext.getTypeConverter();
152            if (converter != null) {
153                return converter.mandatoryConvertTo(type, exchange, value);
154            }
155            throw new NoTypeConversionAvailableException(value, type);
156        }
157    
158        /**
159         * Converts the value to the given expected type returning null if it could
160         * not be converted
161         */
162        public static <T> T convertToType(Exchange exchange, Class<T> type, Object value) {
163            CamelContext camelContext = exchange.getContext();
164            TypeConverter converter = camelContext.getTypeConverter();
165            if (converter != null) {
166                return converter.convertTo(type, exchange, value);
167            }
168            return null;
169        }
170    
171        /**
172         * Copies the results of a message exchange from the source exchange to the result exchange
173         * which will copy the out and fault message contents and the exception
174         *
175         * @param result the result exchange which will have the output and error state added
176         * @param source the source exchange which is not modified
177         */
178        public static void copyResults(Exchange result, Exchange source) {
179    
180            // --------------------------------------------------------------------
181            //  TODO: merge logic with that of copyResultsPreservePattern()
182            // --------------------------------------------------------------------
183            
184            if (result != source) {
185                result.setException(source.getException());
186                if (source.hasFault()) {
187                    result.getFault().copyFrom(source.getFault());
188                }
189    
190                if (source.hasOut()) {
191                    result.getOut().copyFrom(source.getOut());
192                } else if (result.getPattern() == ExchangePattern.InOptionalOut) {
193                    // special case where the result is InOptionalOut and with no OUT response
194                    // so we should return null to indicate this fact
195                    result.setOut(null);
196                } else {
197                    // no results so lets copy the last input
198                    // as the final processor on a pipeline might not
199                    // have created any OUT; such as a mock:endpoint
200                    // so lets assume the last IN is the OUT
201                    if (result.getPattern().isOutCapable()) {
202                        // only set OUT if its OUT capable
203                        result.getOut().copyFrom(source.getIn());
204                    } else {
205                        // if not replace IN instead to keep the MEP
206                        result.getIn().copyFrom(source.getIn());
207                    }
208                }
209                result.getProperties().clear();
210                result.getProperties().putAll(source.getProperties());
211            }
212        }
213    
214        /**
215         * Copies the <code>source</code> exchange to <code>target</code> exchange
216         * preserving the {@link ExchangePattern} of <code>target</code>.  
217         * 
218         * @param source source exchange.
219         * @param result target exchange.
220         */
221        public static void copyResultsPreservePattern(Exchange result, Exchange source) {
222    
223            // --------------------------------------------------------------------
224            //  TODO: merge logic with that of copyResults()
225            // --------------------------------------------------------------------
226            
227            if (source == result) {
228                // no need to copy
229                return;
230            }
231            
232            // copy in message
233            result.getIn().copyFrom(source.getIn());
234        
235            // copy out message
236            if (source.hasOut()) {
237                // exchange pattern sensitive
238                getResultMessage(result).copyFrom(source.getOut());
239            }
240            
241            // copy fault message
242            if (source.hasFault()) {
243                result.getFault().copyFrom(source.getFault());
244            }
245            
246            // copy exception
247            result.setException(source.getException());
248            
249            // copy properties
250            result.getProperties().clear();
251            result.getProperties().putAll(source.getProperties());
252        }
253    
254        /**
255         * Returns the message where to write results in an
256         * exchange-pattern-sensitive way.
257         * 
258         * @param exchange
259         *            message exchange.
260         * @return result message.
261         */
262        public static Message getResultMessage(Exchange exchange) {
263            if (exchange.getPattern().isOutCapable()) {
264                return exchange.getOut();
265            } else {
266                return exchange.getIn();
267            }
268        }
269    
270        /**
271         * Returns true if the given exchange pattern (if defined) can support IN messagea
272         *
273         * @param exchange the exchange to interrogate
274         * @return true if the exchange is defined as an {@link ExchangePattern} which supports
275         * IN messages
276         */
277        public static boolean isInCapable(Exchange exchange) {
278            ExchangePattern pattern = exchange.getPattern();
279            return pattern != null && pattern.isInCapable();
280        }
281    
282        /**
283         * Returns true if the given exchange pattern (if defined) can support OUT messagea
284         *
285         * @param exchange the exchange to interrogate
286         * @return true if the exchange is defined as an {@link ExchangePattern} which supports
287         * OUT messages
288         */
289        public static boolean isOutCapable(Exchange exchange) {
290            ExchangePattern pattern = exchange.getPattern();
291            return pattern != null && pattern.isOutCapable();
292        }
293    
294        /**
295         * Creates a new instance of the given type from the injector
296         */
297        public static <T> T newInstance(Exchange exchange, Class<T> type) {
298            return exchange.getContext().getInjector().newInstance(type);
299        }
300    
301        /**
302         * Creates a Map of the variables which are made available to a script or template
303         *
304         * @param exchange the exchange to make available
305         * @return a Map populated with the require dvariables
306         */
307        public static Map createVariableMap(Exchange exchange) {
308            Map answer = new HashMap();
309            populateVariableMap(exchange, answer);
310            return answer;
311        }
312    
313        /**
314         * Populates the Map with the variables which are made available to a script or template
315         *
316         * @param exchange the exchange to make available
317         * @param map      the map to populate
318         */
319        @SuppressWarnings("unchecked")
320        public static void populateVariableMap(Exchange exchange, Map map) {
321            map.put("exchange", exchange);
322            Message in = exchange.getIn();
323            map.put("in", in);
324            map.put("request", in);
325            map.put("headers", in.getHeaders());
326            map.put("body", in.getBody());
327            if (isOutCapable(exchange)) {
328                Message out = exchange.getOut();
329                map.put("out", out);
330                map.put("response", out);
331            }
332            map.put("camelContext", exchange.getContext());
333        }
334    
335        /**
336         * Returns the MIME content type on the input message or null if one is not defined
337         */
338        public static String getContentType(Exchange exchange) {
339            return MessageHelper.getContentType(exchange.getIn());        
340        }
341    
342        /**
343         * Performs a lookup in the registry of the mandatory bean name and throws an exception if it could not be found
344         */
345        public static Object lookupMandatoryBean(Exchange exchange, String name) {
346            Object value = lookupBean(exchange, name);
347            if (value == null) {
348                throw new NoSuchBeanException(name);
349            }
350            return value;
351        }
352    
353        /**
354         * Performs a lookup in the registry of the mandatory bean name and throws an exception if it could not be found
355         */
356        public static <T> T lookupMandatoryBean(Exchange exchange, String name, Class<T> type) {
357            T value = lookupBean(exchange, name, type);
358            if (value == null) {
359                throw new NoSuchBeanException(name);
360            }
361            return value;
362        }
363    
364        /**
365         * Performs a lookup in the registry of the bean name
366         */
367        public static Object lookupBean(Exchange exchange, String name) {
368            return exchange.getContext().getRegistry().lookup(name);
369        }
370    
371        /**
372         * Performs a lookup in the registry of the bean name and type
373         */
374        public static <T> T lookupBean(Exchange exchange, String name, Class<T> type) {
375            return exchange.getContext().getRegistry().lookup(name, type);
376        }
377    
378        /**
379         * Returns the first exchange in the given collection of exchanges which has the same exchange ID as the one given
380         * or null if none could be found
381         */
382        public static Exchange getExchangeById(Iterable<Exchange> exchanges, String exchangeId) {
383            for (Exchange exchange : exchanges) {
384                String id = exchange.getExchangeId();
385                if (id != null && id.equals(exchangeId)) {
386                    return exchange;
387                }
388            }
389            return null;
390        }
391    
392        public static boolean isFailureHandled(Exchange exchange) {
393            Boolean handled = exchange.getProperty(Exchange.FAILURE_HANDLED, Boolean.class);
394            return handled != null && handled;
395        }
396    
397        public static void setFailureHandled(Exchange exchange) {
398            exchange.setProperty(Exchange.FAILURE_HANDLED, Boolean.TRUE);
399            // clear exception since its failure handled
400            exchange.setException(null);
401        }
402    
403        /**
404         * Extracts the body from the given exchange.
405         * <p/>
406         * If the exchange pattern is provided it will try to honor it and retrive the body
407         * from either IN or OUT according to the pattern.
408         *
409         * @param exchange   the exchange
410         * @param pattern    exchange pattern if given, can be <tt>null</tt>
411         * @return the result body, can be <tt>null</tt>.
412         * @throws CamelExecutionException if the processing of the exchange failed
413         */
414        public static Object extractResultBody(Exchange exchange, ExchangePattern pattern) {
415            Object answer = null;
416            if (exchange != null) {
417                // rethrow if there was an exception during execution
418                if (exchange.getException() != null) {
419                    throw ObjectHelper.wrapCamelExecutionException(exchange, exchange.getException());
420                }
421    
422                // result could have a fault message
423                if (hasFaultMessage(exchange)) {
424                    return exchange.getFault().getBody();
425                }
426    
427                // okay no fault then return the response according to the pattern
428                // try to honor pattern if provided
429                boolean notOut = pattern != null && !pattern.isOutCapable();
430                boolean hasOut = exchange.hasOut();
431                if (hasOut && !notOut) {
432                    // we have a response in out and the pattern is out capable
433                    answer = exchange.getOut().getBody();
434                } else if (!hasOut && exchange.getPattern() == ExchangePattern.InOptionalOut) {
435                    // special case where the result is InOptionalOut and with no OUT response
436                    // so we should return null to indicate this fact
437                    answer = null;
438                } else {
439                    // use IN as the response
440                    answer = exchange.getIn().getBody();
441                }
442            }
443            return answer;
444        }
445    
446        /**
447         * Tests whether the exchange has a fault message set and that its not null.
448         *
449         * @param exchange  the exchange
450         * @return <tt>true</tt> if fault message exists
451         */
452        public static boolean hasFaultMessage(Exchange exchange) {
453            if (exchange.hasFault()) {
454                Object faultBody = exchange.getFault().getBody();
455                if (faultBody != null) {
456                    return true;
457                }
458            }
459            return false;
460        }
461    
462        /**
463         * Extracts the body from the given future, that represents a handle to an asynchronous exchange.
464         * <p/>
465         * Will wait until the future task is complete.
466         *
467         * @param context the camel context
468         * @param future the future handle
469         * @param type the expected body response type
470         * @return the result body, can be <tt>null</tt>.
471         * @throws CamelExecutionException if the processing of the exchange failed
472         */
473        public static <T> T extractFutureBody(CamelContext context, Future future, Class<T> type) {
474            try {
475                return doExtractFutureBody(context, future.get(), type);
476            } catch (InterruptedException e) {
477                throw ObjectHelper.wrapRuntimeCamelException(e);
478            } catch (ExecutionException e) {
479                // execution failed due to an exception so rethrow the cause
480                throw ObjectHelper.wrapCamelExecutionException(null, e.getCause());
481            } finally {
482                // its harmless to cancel if task is already completed
483                // and in any case we do not want to get hold of the task a 2nd time
484                // and its recommended to cancel according to Brian Goetz in his Java Concurrency in Practice book
485                future.cancel(true);
486            }
487        }
488    
489        /**
490         * Extracts the body from the given future, that represents a handle to an asynchronous exchange.
491         * <p/>
492         * Will wait for the future task to complete, but waiting at most the timeout value.
493         *
494         * @param context the camel context
495         * @param future the future handle
496         * @param timeout timeout value
497         * @param unit    timeout unit
498         * @param type the expected body response type
499         * @return the result body, can be <tt>null</tt>.
500         * @throws CamelExecutionException if the processing of the exchange failed
501         * @throws java.util.concurrent.TimeoutException is thrown if a timeout triggered
502         */
503        public static <T> T extractFutureBody(CamelContext context, Future future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException {
504            try {
505                if (timeout > 0) {
506                    return doExtractFutureBody(context, future.get(timeout, unit), type);
507                } else {
508                    return doExtractFutureBody(context, future.get(), type);
509                }
510            } catch (InterruptedException e) {
511                throw ObjectHelper.wrapRuntimeCamelException(e);
512            } catch (ExecutionException e) {
513                // execution failed due to an exception so rethrow the cause
514                throw ObjectHelper.wrapCamelExecutionException(null, e.getCause());
515            } finally {
516                // its harmless to cancel if task is already completed
517                // and in any case we do not want to get hold of the task a 2nd time
518                // and its recommended to cancel according to Brian Goetz in his Java Concurrency in Practice book
519                future.cancel(true);
520            }
521        }
522    
523        private static <T> T doExtractFutureBody(CamelContext context, Object result, Class<T> type) {
524            if (result == null) {
525                return null;
526            }
527            if (type.isAssignableFrom(result.getClass())) {
528                return type.cast(result);
529            }
530            if (result instanceof Exchange) {
531                Exchange exchange = (Exchange) result;
532                Object answer = ExchangeHelper.extractResultBody(exchange, exchange.getPattern());
533                return context.getTypeConverter().convertTo(type, answer);
534            }
535            return context.getTypeConverter().convertTo(type, result);
536        }
537    
538    }