001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.impl;
018    
019    import java.io.Serializable;
020    import java.util.LinkedHashMap;
021    import java.util.Map;
022    
023    import org.apache.camel.Exchange;
024    import org.apache.commons.logging.Log;
025    import org.apache.commons.logging.LogFactory;
026    
027    /**
028     * Holder object for sending an exchange over a remote wire as a serialized object.
029     * This is usually configured using the <tt>transferExchange=true</tt> option on the endpoint.
030     * <p/>
031     * As opposed to normal usage where only the body part of the exchange is transfered over the wire,
032     * this holder object serializes the following fields over the wire:
033     * <ul>
034     * <li>in body</li>
035     * <li>out body</li>
036     * <li>in headers</li>
037     * <li>out headers</li>
038     * <li>fault body </li>
039     * <li>fault headers</li>
040     * <li>exchange properties</li>
041     * <li>exception</li>
042     * </ul>
043     * Any object that is not serializable will be skipped and Camel will log this at WARN level.
044     *
045     * @version $Revision: 769448 $
046     */
047    public class DefaultExchangeHolder implements Serializable {
048    
049        private static final long serialVersionUID = 1L;
050        private static final transient Log LOG = LogFactory.getLog(DefaultExchangeHolder.class);
051    
052        private Object inBody;
053        private Object outBody;
054        private Object faultBody;
055        private final Map<String, Object> inHeaders = new LinkedHashMap<String, Object>();
056        private final Map<String, Object> outHeaders = new LinkedHashMap<String, Object>();
057        private final Map<String, Object> properties = new LinkedHashMap<String, Object>();
058        private final Map<String, Object> faultHeaders = new LinkedHashMap<String, Object>();
059        private Exception exception;
060    
061        /**
062         * Creates a payload object with the information from the given exchange.
063         * Only marshal the Serializable object
064         *
065         * @param exchange the exchange
066         * @return the holder object with information copied form the exchange
067         */
068        public static DefaultExchangeHolder marshal(Exchange exchange) {
069            DefaultExchangeHolder payload = new DefaultExchangeHolder();
070    
071            payload.inBody = checkSerializableObject("in body", exchange, exchange.getIn().getBody());
072            payload.inHeaders.putAll(checkMapSerializableObjects("in headers", exchange, exchange.getIn().getHeaders()));
073            if (exchange.hasOut()) {
074                payload.outBody = checkSerializableObject("out body", exchange, exchange.getOut().getBody());
075                payload.outHeaders.putAll(checkMapSerializableObjects("out headers", exchange, exchange.getOut().getHeaders()));
076            }
077            if (exchange.hasFault()) {
078                payload.faultBody = checkSerializableObject("fault body", exchange, exchange.getFault().getBody());
079                payload.faultHeaders.putAll(checkMapSerializableObjects("fault headers", exchange, exchange.getFault().getHeaders()));
080            }
081            payload.properties.putAll(checkMapSerializableObjects("exchange properties", exchange, exchange.getProperties()));
082            payload.exception = exchange.getException();
083    
084            return payload;
085        }
086    
087        /**
088         * Transfers the information from the payload to the exchange.
089         *
090         * @param exchange the exchange to set values from the payload
091         * @param payload  the payload with the values
092         */
093        public static void unmarshal(Exchange exchange, DefaultExchangeHolder payload) {
094            exchange.getIn().setBody(payload.inBody);
095            exchange.getIn().setHeaders(payload.inHeaders);
096            if (payload.outBody != null) {
097                exchange.getOut().setBody(payload.outBody);
098                exchange.getOut().setHeaders(payload.outHeaders);
099            }
100            if (payload.faultBody != null) {
101                exchange.getFault().setBody(payload.faultBody);
102                exchange.getFault().setHeaders(payload.faultHeaders);
103            }
104            for (String key : payload.properties.keySet()) {
105                exchange.setProperty(key, payload.properties.get(key));
106            }
107            exchange.setException(payload.exception);
108        }
109    
110        public String toString() {
111            StringBuilder sb = new StringBuilder("DefaultExchangeHolder[");
112            sb.append("inBody=").append(inBody).append(", outBody=").append(outBody);
113            sb.append(", inHeaders=").append(inHeaders).append(", outHeaders=").append(outHeaders);
114            sb.append(", faultBody=").append(faultBody).append(", faultHeaders=").append(faultHeaders);
115            sb.append(", properties=").append(properties).append(", exception=").append(exception);
116            return sb.append(']').toString();
117        }
118    
119        private static Object checkSerializableObject(String type, Exchange exchange, Object object) {
120            if (object == null) {
121                return null;
122            }
123    
124            Serializable converted = exchange.getContext().getTypeConverter().convertTo(Serializable.class, exchange, object);
125            if (converted != null) {
126                return converted;
127            } else {
128                LOG.warn(type + " containig object: " + object + " of type: " + object.getClass().getCanonicalName() + " cannot be serialized, it will be excluded by the holder");
129                return null;
130            }
131        }
132    
133        private static Map<String, Object> checkMapSerializableObjects(String type, Exchange exchange, Map<String, Object> map) {
134            if (map == null) {
135                return null;
136            }
137    
138            Map<String, Object> result = new LinkedHashMap<String, Object>();
139            for (Map.Entry<String, Object> entry : map.entrySet()) {
140                Serializable converted = exchange.getContext().getTypeConverter().convertTo(Serializable.class, exchange, entry.getValue());
141                if (converted != null) {
142                    result.put(entry.getKey(), converted);
143                } else {
144                    LOG.warn(type + " containing object: " + entry.getValue() + " with key: " + entry.getKey()
145                            + " cannot be serialized, it will be excluded by the holder");
146                }
147            }
148    
149            return result;
150        }
151    
152    }