001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.util.Collection;
020    import java.util.Iterator;
021    import java.util.List;
022    
023    import org.apache.camel.Exchange;
024    import org.apache.camel.Message;
025    import org.apache.camel.Processor;
026    import org.apache.camel.util.ExchangeHelper;
027    import org.apache.commons.logging.Log;
028    import org.apache.commons.logging.LogFactory;
029    
030    /**
031     * Creates a Pipeline pattern where the output of the previous step is sent as
032     * input to the next step, reusing the same message exchanges
033     *
034     * @version $Revision: 772076 $
035     */
036    public class Pipeline extends MulticastProcessor implements Processor {
037        private static final transient Log LOG = LogFactory.getLog(Pipeline.class);
038    
039        public Pipeline(Collection<Processor> processors) {
040            super(processors);
041        }
042    
043        public static Processor newInstance(List<Processor> processors) {
044            if (processors.isEmpty()) {
045                return null;
046            } else if (processors.size() == 1) {
047                return processors.get(0);
048            }
049            return new Pipeline(processors);
050        }
051    
052        public void process(Exchange exchange) throws Exception {
053            Iterator<Processor> processors = getProcessors().iterator();
054            Exchange nextExchange = exchange;
055            boolean first = true;
056    
057            while (continueRouting(processors, nextExchange)) {
058                if (first) {
059                    first = false;
060                } else {
061                    // prepare for next run
062                    nextExchange = createNextExchange(nextExchange);
063                }
064    
065                // get the next processor
066                Processor processor = processors.next();
067    
068                // process the next exchange
069                try {
070                    if (LOG.isTraceEnabled()) {
071                        // this does the actual processing so log at trace level
072                        LOG.trace("Processing exchangeId: " + nextExchange.getExchangeId() + " >>> " + nextExchange);
073                    }
074                    processor.process(nextExchange);
075                } catch (Exception e) {
076                    nextExchange.setException(e);
077                }
078    
079                // check for error if so we should break out
080                boolean exceptionHandled = hasExceptionBeenHandled(nextExchange);
081                if (nextExchange.isFailed() || exceptionHandled) {
082                    // The Exchange.EXCEPTION_HANDLED property is only set if satisfactory handling was done
083                    // by the error handler. It's still an exception, the exchange still failed.
084                    if (LOG.isDebugEnabled()) {
085                        LOG.debug("Message exchange has failed so breaking out of pipeline: " + nextExchange
086                                  + " exception: " + nextExchange.getException() + " fault: "
087                                  + (nextExchange.hasFault() ? nextExchange.getFault() : null)
088                                  + (exceptionHandled ? " handled by the error handler" : ""));
089                    }
090                    break;
091                }
092            }
093    
094            if (LOG.isTraceEnabled()) {
095                // logging nextExchange as it contains the exchange that might have altered the payload and since
096                // we are logging the completion if will be confusing if we log the original instead
097                // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots
098                LOG.trace("Processing compelete for exchangeId: " + exchange.getExchangeId() + " >>> " + nextExchange);
099            }
100    
101            // copy results back to the original exchange
102            ExchangeHelper.copyResults(exchange, nextExchange);
103        }
104    
105        private static boolean hasExceptionBeenHandled(Exchange nextExchange) {
106            return Boolean.TRUE.equals(nextExchange.getProperty(Exchange.EXCEPTION_HANDLED));
107        }
108    
109        /**
110         * Strategy method to create the next exchange from the previous exchange.
111         * <p/>
112         * Remember to copy the original exchange id otherwise correlation of ids in the log is a problem
113         *
114         * @param previousExchange the previous exchange
115         * @return a new exchange
116         */
117        protected Exchange createNextExchange(Exchange previousExchange) {
118            Exchange answer = previousExchange.newInstance();
119            // we must use the same id as this is a snapshot strategy where Camel copies a snapshot
120            // before processing the next step in the pipeline, so we have a snapshot of the exchange
121            // just before. This snapshot is used if Camel should do redeliveries (re try) using
122            // DeadLetterChannel. That is why it's important the id is the same, as it is the *same*
123            // exchange being routed.
124            answer.setExchangeId(previousExchange.getExchangeId());
125    
126            answer.getProperties().putAll(previousExchange.getProperties());
127    
128            // now lets set the input of the next exchange to the output of the
129            // previous message if it is not null
130            Message in = answer.getIn();
131            if (previousExchange.hasOut()) {
132                in.copyFrom(previousExchange.getOut());
133            } else {
134                in.copyFrom(previousExchange.getIn());
135            }
136            return answer;
137        }
138    
139        protected boolean continueRouting(Iterator<Processor> it, Exchange exchange) {
140            Object stop = exchange.getProperty(Exchange.ROUTE_STOP);
141            if (stop != null) {
142                boolean doStop = exchange.getContext().getTypeConverter().convertTo(Boolean.class, stop);
143                if (doStop) {
144                    if (LOG.isDebugEnabled()) {
145                        LOG.debug("Exchange is marked to stop routing: " + exchange);
146                    }
147                    return false;
148                } else {
149                    return true;
150                }
151            } else {
152                return it.hasNext();
153            }
154        }
155    
156        @Override
157        public String toString() {
158            return "Pipeline" + getProcessors();
159        }
160    
161    }