001 /** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one or more 004 * contributor license agreements. See the NOTICE file distributed with 005 * this work for additional information regarding copyright ownership. 006 * The ASF licenses this file to You under the Apache License, Version 2.0 007 * (the "License"); you may not use this file except in compliance with 008 * the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.camel.processor; 019 020 import org.apache.camel.Endpoint; 021 import org.apache.camel.Exchange; 022 import org.apache.camel.Processor; 023 import org.apache.camel.Producer; 024 025 import java.util.Collection; 026 027 /** 028 * Creates a Pipeline pattern where the output of the previous step is sent as input to the next step when working 029 * with request/response message exchanges. 030 * 031 * @version $Revision: 534145 $ 032 */ 033 public class Pipeline extends MulticastProcessor implements Processor { 034 public Pipeline(Collection<Endpoint> endpoints) throws Exception { 035 super(endpoints); 036 } 037 038 public void process(Exchange exchange) throws Exception { 039 Exchange nextExchange = exchange; 040 boolean first = true; 041 for (Producer producer : getProducers()) { 042 if (first) { 043 first = false; 044 } 045 else { 046 nextExchange = createNextExchange(producer, nextExchange); 047 } 048 producer.process(nextExchange); 049 } 050 } 051 052 /** 053 * Strategy method to create the next exchange from the 054 * 055 * @param producer the producer used to send to the endpoint 056 * @param previousExchange the previous exchange 057 * @return a new exchange 058 */ 059 protected Exchange createNextExchange(Producer producer, Exchange previousExchange) { 060 Exchange answer = producer.createExchange(previousExchange); 061 062 // now lets set the input of the next exchange to the output of the previous message if it is not null 063 Object output = previousExchange.getOut().getBody(); 064 if (output != null) { 065 answer.getIn().setBody(output); 066 } 067 return answer; 068 } 069 070 /** 071 * Strategy method to copy the exchange before sending to another endpoint. Derived classes such as the 072 * {@link Pipeline} will not clone the exchange 073 * 074 * @param exchange 075 * @return the current exchange if no copying is required such as for a pipeline otherwise a new copy of the exchange is returned. 076 */ 077 protected Exchange copyExchangeStrategy(Exchange exchange) { 078 return exchange.copy(); 079 } 080 081 @Override 082 public String toString() { 083 return "Pipeline" + getEndpoints(); 084 } 085 }