001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.impl;
018    
019    import java.util.ArrayList;
020    import java.util.Collections;
021    import java.util.List;
022    
023    import org.apache.camel.Exchange;
024    import org.apache.camel.Service;
025    import org.apache.camel.model.ProcessorDefinition;
026    import org.apache.camel.spi.Synchronization;
027    import org.apache.camel.spi.TraceableUnitOfWork;
028    import org.apache.camel.util.UuidGenerator;
029    import org.apache.commons.logging.Log;
030    import org.apache.commons.logging.LogFactory;
031    
032    /**
033     * The default implementation of {@link org.apache.camel.spi.UnitOfWork}
034     *
035     * @version $Revision: 780261 $
036     */
037    public class DefaultUnitOfWork implements TraceableUnitOfWork, Service {
038        private static final transient Log LOG = LogFactory.getLog(DefaultUnitOfWork.class);
039        private static final UuidGenerator DEFAULT_ID_GENERATOR = new UuidGenerator();
040    
041        private String id;
042        private List<Synchronization> synchronizations;
043        private List<ProcessorDefinition> routeList;
044        private Object originalInBody;
045    
046        public DefaultUnitOfWork(Exchange exchange) {
047            this.originalInBody = exchange.getIn().getBody();
048        }
049    
050        public void start() throws Exception {
051            id = null;
052        }
053    
054        public void stop() throws Exception {
055            // need to clean up when we are stopping to not leak memory
056            if (synchronizations != null) {
057                synchronizations.clear();
058                synchronizations = null;
059            }
060            if (routeList != null) {
061                routeList.clear();
062                routeList = null;
063            }
064    
065            originalInBody = null;
066        }
067    
068        public synchronized void addSynchronization(Synchronization synchronization) {
069            if (synchronizations == null) {
070                synchronizations = new ArrayList<Synchronization>();
071            }
072            synchronizations.add(synchronization);
073        }
074    
075        public synchronized void removeSynchronization(Synchronization synchronization) {
076            if (synchronizations != null) {
077                synchronizations.remove(synchronization);
078            }
079        }
080    
081        public void handoverSynchronization(Exchange target) {
082            if (synchronizations == null || synchronizations.isEmpty()) {
083                return;
084            }
085    
086            for (Synchronization synchronization : synchronizations) {
087                target.addOnCompletion(synchronization);
088            }
089    
090            // clear this list as its handed over to the other exchange
091            this.synchronizations.clear();
092        }
093    
094        public void done(Exchange exchange) {
095            if (synchronizations != null && !synchronizations.isEmpty()) {
096                boolean failed = exchange.isFailed();
097                for (Synchronization synchronization : synchronizations) {
098                    try {
099                        if (failed) {
100                            synchronization.onFailure(exchange);
101                        } else {
102                            synchronization.onComplete(exchange);
103                        }
104                    } catch (Exception e) {
105                        // must catch exceptions to ensure all synchronizations have a chance to run
106                        LOG.error("Exception occured during onCompletion. This exception will be ignored: ", e);
107                    }
108                }
109            }
110        }
111    
112        public String getId() {
113            if (id == null) {
114                id = DEFAULT_ID_GENERATOR.generateId();
115            }
116            return id;
117        }
118    
119        public synchronized void addInterceptedNode(ProcessorDefinition node) {
120            if (routeList == null) {
121                routeList = new ArrayList<ProcessorDefinition>();
122            }
123            routeList.add(node);
124        }
125    
126        public synchronized ProcessorDefinition getLastInterceptedNode() {
127            if (routeList == null || routeList.isEmpty()) {
128                return null;
129            }
130            return routeList.get(routeList.size() - 1);
131        }
132    
133        public List<ProcessorDefinition> getInterceptedNodes() {
134            return Collections.unmodifiableList(routeList);
135        }
136    
137        public Object getOriginalInBody() {
138            return originalInBody;
139        }
140    }