001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.util.Collection;
020    import java.util.Iterator;
021    import java.util.List;
022    
023    import org.apache.camel.Exchange;
024    import org.apache.camel.Processor;
025    import org.apache.camel.util.ExchangeHelper;
026    import org.apache.commons.logging.Log;
027    import org.apache.commons.logging.LogFactory;
028    
029    /**
030     * Creates a Pipeline pattern where the output of the previous step is sent as
031     * input to the next step, reusing the same message exchanges
032     *
033     * @version $Revision: 793373 $
034     */
035    public class Pipeline extends MulticastProcessor implements Processor, Traceable {
036        private static final transient Log LOG = LogFactory.getLog(Pipeline.class);
037    
038        public Pipeline(Collection<Processor> processors) {
039            super(processors);
040        }
041    
042        public static Processor newInstance(List<Processor> processors) {
043            if (processors.isEmpty()) {
044                return null;
045            } else if (processors.size() == 1) {
046                return processors.get(0);
047            }
048            return new Pipeline(processors);
049        }
050    
051        public void process(Exchange exchange) throws Exception {
052            Iterator<Processor> processors = getProcessors().iterator();
053            Exchange nextExchange = exchange;
054            boolean first = true;
055    
056            while (continueRouting(processors, nextExchange)) {
057                if (first) {
058                    first = false;
059                } else {
060                    // prepare for next run
061                    nextExchange = createNextExchange(nextExchange);
062                }
063    
064                // get the next processor
065                Processor processor = processors.next();
066    
067                // process the next exchange
068                try {
069                    if (LOG.isTraceEnabled()) {
070                        // this does the actual processing so log at trace level
071                        LOG.trace("Processing exchangeId: " + nextExchange.getExchangeId() + " >>> " + nextExchange);
072                    }
073                    processor.process(nextExchange);
074                } catch (Exception e) {
075                    nextExchange.setException(e);
076                }
077    
078                // check for error if so we should break out
079                boolean exceptionHandled = hasExceptionBeenHandledByErrorHandler(nextExchange);
080                if (nextExchange.isFailed() || nextExchange.isRollbackOnly() ||  exceptionHandled) {
081                    // The Exchange.ERRORHANDLED_HANDLED property is only set if satisfactory handling was done
082                    // by the error handler. It's still an exception, the exchange still failed.
083                    if (LOG.isDebugEnabled()) {
084                        StringBuilder sb = new StringBuilder();
085                        sb.append("Message exchange has failed so breaking out of pipeline: ").append(nextExchange);
086                        if (nextExchange.isRollbackOnly()) {
087                            sb.append(" Marked as rollback only.");
088                        }
089                        if (nextExchange.getException() != null) {
090                            sb.append(" Exception: ").append(nextExchange.getException());
091                        }
092                        if (nextExchange.hasFault()) {
093                            sb.append(" Fault: ").append(nextExchange.getFault());
094                        }
095                        if (exceptionHandled) {
096                            sb.append(" Handled by the error handler.");
097                        }
098                        LOG.debug(sb.toString());
099                    }
100                    break;
101                }
102            }
103    
104            if (LOG.isTraceEnabled()) {
105                // logging nextExchange as it contains the exchange that might have altered the payload and since
106                // we are logging the completion if will be confusing if we log the original instead
107                // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots
108                LOG.trace("Processing compelete for exchangeId: " + exchange.getExchangeId() + " >>> " + nextExchange);
109            }
110    
111            // copy results back to the original exchange
112            ExchangeHelper.copyResults(exchange, nextExchange);
113        }
114    
115        private static boolean hasExceptionBeenHandledByErrorHandler(Exchange nextExchange) {
116            return Boolean.TRUE.equals(nextExchange.getProperty(Exchange.ERRORHANDLER_HANDLED));
117        }
118    
119        /**
120         * Strategy method to create the next exchange from the previous exchange.
121         * <p/>
122         * Remember to copy the original exchange id otherwise correlation of ids in the log is a problem
123         *
124         * @param previousExchange the previous exchange
125         * @return a new exchange
126         */
127        protected Exchange createNextExchange(Exchange previousExchange) {
128            Exchange answer = previousExchange.newInstance();
129            // we must use the same id as this is a snapshot strategy where Camel copies a snapshot
130            // before processing the next step in the pipeline, so we have a snapshot of the exchange
131            // just before. This snapshot is used if Camel should do redeliveries (re try) using
132            // DeadLetterChannel. That is why it's important the id is the same, as it is the *same*
133            // exchange being routed.
134            answer.setExchangeId(previousExchange.getExchangeId());
135    
136            answer.getProperties().putAll(previousExchange.getProperties());
137    
138            // now lets set the input of the next exchange to the output of the
139            // previous message if it is not null
140            answer.setIn(previousExchange.hasOut() 
141                ? previousExchange.getOut().copy() : previousExchange.getIn().copy());
142            return answer;
143        }
144    
145        protected boolean continueRouting(Iterator<Processor> it, Exchange exchange) {
146            Object stop = exchange.getProperty(Exchange.ROUTE_STOP);
147            if (stop != null) {
148                boolean doStop = exchange.getContext().getTypeConverter().convertTo(Boolean.class, stop);
149                if (doStop) {
150                    if (LOG.isDebugEnabled()) {
151                        LOG.debug("Exchange is marked to stop routing: " + exchange);
152                    }
153                    return false;
154                }
155            }
156    
157            // continue if there are more processors to route
158            return it.hasNext();
159        }
160    
161        @Override
162        public String toString() {
163            return "Pipeline" + getProcessors();
164        }
165    
166        @Override
167        public String getTraceLabel() {
168            return "Pipeline";
169        }
170    }