001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.util.Collection;
020    import java.util.List;
021    
022    import org.apache.camel.Exchange;
023    import org.apache.camel.Processor;
024    import org.apache.camel.Message;
025    import org.apache.commons.logging.Log;
026    import org.apache.commons.logging.LogFactory;
027    
028    /**
029     * Creates a Pipeline pattern where the output of the previous step is sent as
030     * input to the next step, reusing the same message exchanges
031     * 
032     * @version $Revision: 563931 $
033     */
034    public class Pipeline extends MulticastProcessor implements Processor {
035        private static final transient Log LOG = LogFactory.getLog(Pipeline.class);
036    
037        public Pipeline(Collection<Processor> processors) {
038            super(processors);
039        }
040        
041        public static Processor newInstance(List<Processor> processors) {
042            if (processors.isEmpty()) {
043                return null;
044            } else if (processors.size() == 1) {
045                return processors.get(0);
046            }
047            return new Pipeline(processors);
048        }
049    
050        public void process(Exchange exchange) throws Exception {
051            Exchange nextExchange = exchange;
052            boolean first = true;
053            for (Processor producer : getProcessors()) {
054                if (first) {
055                    first = false;
056                } else {
057                    nextExchange = createNextExchange(producer, nextExchange);
058                }
059                producer.process(nextExchange);
060            }
061        }
062    
063        /**
064         * Strategy method to create the next exchange from the
065         * 
066         * @param producer the producer used to send to the endpoint
067         * @param previousExchange the previous exchange
068         * @return a new exchange
069         */
070        protected Exchange createNextExchange(Processor producer, Exchange previousExchange) {
071            Exchange answer = copyExchangeStrategy(previousExchange);
072    
073            // now lets set the input of the next exchange to the output of the
074            // previous message if it is not null
075            Object output = previousExchange.getOut().getBody();
076            Message in = answer.getIn();
077            if (output != null) {
078                in.setBody(output);
079            }
080            else {
081                Object previousInBody = previousExchange.getIn().getBody();
082                if (in.getBody() == null && previousInBody != null) {
083                    LOG.warn("Bad exchange implementation; the copy() method did not copy across the in body: " + previousExchange
084                            + " of type: " + previousExchange.getClass());
085                    in.setBody(previousInBody);
086                }
087            }
088            return answer;
089        }
090    
091        /**
092         * Strategy method to copy the exchange before sending to another endpoint.
093         * Derived classes such as the {@link Pipeline} will not clone the exchange
094         * 
095         * @param exchange
096         * @return the current exchange if no copying is required such as for a
097         *         pipeline otherwise a new copy of the exchange is returned.
098         */
099        protected Exchange copyExchangeStrategy(Exchange exchange) {
100            return exchange.copy();
101        }
102    
103        @Override
104        public String toString() {
105            return "Pipeline" + getProcessors();
106        }
107    }