001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.impl;
018    
019    import java.util.ArrayList;
020    import java.util.Iterator;
021    import java.util.List;
022    import java.util.Map;
023    import java.util.concurrent.ConcurrentHashMap;
024    
025    import org.apache.camel.CamelContext;
026    import org.apache.camel.Endpoint;
027    import org.apache.camel.Exchange;
028    import org.apache.camel.ExchangePattern;
029    import org.apache.camel.Message;
030    import org.apache.camel.spi.Synchronization;
031    import org.apache.camel.spi.UnitOfWork;
032    import org.apache.camel.util.ExchangeHelper;
033    import org.apache.camel.util.ObjectHelper;
034    import org.apache.camel.util.UuidGenerator;
035    
036    /**
037     * A default implementation of {@link Exchange}
038     *
039     * @version $Revision: 777808 $
040     */
041    public class DefaultExchange implements Exchange {
042    
043        private static final UuidGenerator DEFAULT_ID_GENERATOR = new UuidGenerator();
044        protected final CamelContext context;
045        private Map<String, Object> properties;
046        private Message in;
047        private Message out;
048        private Message fault;
049        private Exception exception;
050        private String exchangeId;
051        private UnitOfWork unitOfWork;
052        private ExchangePattern pattern;
053        private Endpoint fromEndpoint;
054        private List<Synchronization> onCompletions;
055    
056        public DefaultExchange(CamelContext context) {
057            this(context, ExchangePattern.InOnly);
058        }
059    
060        public DefaultExchange(CamelContext context, ExchangePattern pattern) {
061            this.context = context;
062            this.pattern = pattern;
063        }
064    
065        public DefaultExchange(Exchange parent) {
066            this(parent.getContext(), parent.getPattern());
067            this.unitOfWork = parent.getUnitOfWork();
068            this.fromEndpoint = parent.getFromEndpoint();
069        }
070    
071        public DefaultExchange(Endpoint fromEndpoint) {
072            this(fromEndpoint, ExchangePattern.InOnly);
073        }
074        
075        public DefaultExchange(Endpoint fromEndpoint, ExchangePattern pattern) {
076            this.context = fromEndpoint.getCamelContext();
077            this.fromEndpoint = fromEndpoint;
078            this.pattern = pattern;
079        }
080    
081        @Override
082        public String toString() {
083            return "Exchange[" + in + "]";
084        }
085    
086        public Exchange copy() {
087            Exchange exchange = newInstance();
088            exchange.copyFrom(this);
089            return exchange;
090        }
091    
092        public Exchange newCopy(boolean handoverOnCompletion) {
093            Exchange copy = copy();
094            // do not share the unit of work
095            copy.setUnitOfWork(null);
096            // handover on completeion to the copy if we got any
097            if (handoverOnCompletion && unitOfWork != null) {
098                unitOfWork.handoverSynchronization(copy);
099            }
100            return copy;
101        }
102    
103        public void copyFrom(Exchange exchange) {
104            if (exchange == this) {
105                return;
106            }
107            setProperties(safeCopy(exchange.getProperties()));
108    
109            // this can cause strangeness if we copy, say, a FileMessage onto an FtpExchange with overloaded getExchange() methods etc.
110            safeCopy(getIn(), exchange.getIn());
111            if (exchange.hasOut()) {
112                safeCopy(getOut(), exchange.getOut());
113            }
114            if (exchange.hasFault()) {
115                safeCopy(getFault(), exchange.getFault());
116            }
117            setException(exchange.getException());
118    
119            unitOfWork = exchange.getUnitOfWork();
120            pattern = exchange.getPattern();
121            setFromEndpoint(exchange.getFromEndpoint());
122        }
123    
124        private static void safeCopy(Message message, Message that) {
125            if (message != null) {
126                message.copyFrom(that);
127            }
128        }
129    
130        private static Map<String, Object> safeCopy(Map<String, Object> properties) {
131            if (properties == null) {
132                return null;
133            }
134            return new ConcurrentHashMap<String, Object>(properties);
135        }
136    
137        public Exchange newInstance() {
138            return new DefaultExchange(this);
139        }
140    
141        public CamelContext getContext() {
142            return context;
143        }
144    
145        public Object getProperty(String name) {
146            if (properties != null) {
147                return properties.get(name);
148            }
149            return null;
150        }
151    
152        public <T> T getProperty(String name, Class<T> type) {
153            Object value = getProperty(name);
154            return ExchangeHelper.convertToType(this, type, value);
155        }
156    
157        public void setProperty(String name, Object value) {
158            if (value != null) {
159                // avoid the NullPointException
160                getProperties().put(name, value);
161            } else {
162                // if the value is null, we just remove the key from the map
163                if (name != null) {
164                    getProperties().remove(name);
165                }
166            }
167        }
168    
169        public Object removeProperty(String name) {
170            return getProperties().remove(name);
171        }
172    
173        public Map<String, Object> getProperties() {
174            if (properties == null) {
175                properties = new ConcurrentHashMap<String, Object>();
176            }
177            return properties;
178        }
179    
180        public void setProperties(Map<String, Object> properties) {
181            this.properties = properties;
182        }
183    
184        public Message getIn() {
185            if (in == null) {
186                in = createInMessage();
187                configureMessage(in);
188            }
189            return in;
190        }
191    
192        public void setIn(Message in) {
193            this.in = in;
194            configureMessage(in);
195        }
196    
197        public Message getOut() {
198            if (out == null) {
199                out = createOutMessage();
200                configureMessage(out);
201            }
202            return out;
203        }
204    
205        public boolean hasOut() {
206            return out != null;
207        }
208    
209        public Message getOut(boolean lazyCreate) {
210            if (out == null && lazyCreate) {
211                out = createOutMessage();
212                configureMessage(out);
213            }
214            return out;
215        }
216    
217        public void setOut(Message out) {
218            this.out = out;
219            configureMessage(out);
220        }
221    
222        public Exception getException() {
223            return exception;
224        }
225    
226        public <T> T getException(Class<T> type) {
227            if (exception == null) {
228                return null;
229            }
230    
231            Iterator<Throwable> it = ObjectHelper.createExceptionIterator(exception);
232            while (it.hasNext()) {
233                Throwable e = it.next();
234                if (type.isInstance(e)) {
235                    return type.cast(e);
236                }
237            }
238            // not found
239            return null;
240        }
241    
242        public void setException(Exception exception) {
243            this.exception = exception;
244        }
245    
246        public ExchangePattern getPattern() {
247            return pattern;
248        }
249    
250        public void setPattern(ExchangePattern pattern) {
251            this.pattern = pattern;
252        }
253    
254        public Endpoint getFromEndpoint() {
255            return fromEndpoint;
256        }
257    
258        public void setFromEndpoint(Endpoint fromEndpoint) {
259            this.fromEndpoint = fromEndpoint;
260        }
261    
262        public Message getFault() {
263            if (fault == null) {
264                fault = createFaultMessage();
265                configureMessage(fault);
266            }
267            return fault;
268        }
269    
270        public boolean hasFault() {
271            return fault != null;
272        }
273    
274        public Message getFault(boolean lazyCreate) {
275            if (fault == null && lazyCreate) {
276                fault = createFaultMessage();
277                configureMessage(fault);
278            }
279            return fault;
280        }
281    
282        public void setFault(Message fault) {
283            this.fault = fault;
284            configureMessage(fault);
285        }
286    
287        public void removeFault() {
288            this.fault = null;
289        }
290    
291        public String getExchangeId() {
292            if (exchangeId == null) {
293                exchangeId = createExchangeId();
294            }
295            return exchangeId;
296        }
297    
298        public void setExchangeId(String id) {
299            this.exchangeId = id;
300        }
301    
302        public boolean isFailed() {
303            if (hasFault()) {
304                Object faultBody = getFault().getBody();
305                if (faultBody != null) {
306                    return true;
307                }
308            }
309            return getException() != null;
310        }
311    
312        public boolean isTransacted() {
313            Boolean transacted = getProperty(TRANSACTED, Boolean.class);
314            return transacted != null && transacted;
315        }
316    
317        public boolean isRollbackOnly() {
318            Boolean rollback = getProperty(ROLLBACK_ONLY, Boolean.class);
319            return rollback != null && rollback;
320        }
321    
322        public UnitOfWork getUnitOfWork() {
323            return unitOfWork;
324        }
325    
326        public void setUnitOfWork(UnitOfWork unitOfWork) {
327            this.unitOfWork = unitOfWork;
328            if (this.onCompletions != null) {
329                // now an unit of work has been assigned so add the on completions
330                // we might have registered already
331                for (Synchronization onCompletion : this.onCompletions) {
332                    this.unitOfWork.addSynchronization(onCompletion);
333                }
334                // cleanup the temporary on completion list as they now have been registered
335                // on the unit of work
336                this.onCompletions.clear();
337                this.onCompletions = null;
338            }
339        }
340    
341        public void addOnCompletion(Synchronization onCompletion) {
342            if (this.unitOfWork == null) {
343                // unit of work not yet registered so we store the on completion temporary
344                // until the unit of work is assigned to this exchange by the UnitOfWorkProcessor
345                if (this.onCompletions == null) {
346                    this.onCompletions = new ArrayList<Synchronization>();
347                }
348                this.onCompletions.add(onCompletion);
349            } else {
350                this.getUnitOfWork().addSynchronization(onCompletion);
351            }
352        }
353    
354        /**
355         * Factory method used to lazily create the IN message
356         */
357        protected Message createInMessage() {
358            return new DefaultMessage();
359        }
360    
361        /**
362         * Factory method to lazily create the OUT message
363         */
364        protected Message createOutMessage() {
365            return new DefaultMessage();
366        }
367    
368        /**
369         * Factory method to lazily create the FAULT message
370         */
371        protected Message createFaultMessage() {
372            return new DefaultMessage();
373        }
374    
375        /**
376         * Configures the message after it has been set on the exchange
377         */
378        protected void configureMessage(Message message) {
379            if (message instanceof MessageSupport) {
380                MessageSupport messageSupport = (MessageSupport)message;
381                messageSupport.setExchange(this);
382            }
383        }
384    
385        protected String createExchangeId() {
386            String answer = null;
387            if (in != null) {
388                answer = in.createExchangeId();
389            }
390            if (answer == null) {
391                answer = DefaultExchange.DEFAULT_ID_GENERATOR.generateId();
392            }
393            return answer;
394        }
395    
396    }