001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import java.util.Collection; 020 import java.util.List; 021 022 import org.apache.camel.Exchange; 023 import org.apache.camel.Processor; 024 import org.apache.camel.Message; 025 import org.apache.commons.logging.Log; 026 import org.apache.commons.logging.LogFactory; 027 028 /** 029 * Creates a Pipeline pattern where the output of the previous step is sent as 030 * input to the next step, reusing the same message exchanges 031 * 032 * @version $Revision: 563931 $ 033 */ 034 public class Pipeline extends MulticastProcessor implements Processor { 035 private static final transient Log LOG = LogFactory.getLog(Pipeline.class); 036 037 public Pipeline(Collection<Processor> processors) { 038 super(processors); 039 } 040 041 public static Processor newInstance(List<Processor> processors) { 042 if (processors.isEmpty()) { 043 return null; 044 } else if (processors.size() == 1) { 045 return processors.get(0); 046 } 047 return new Pipeline(processors); 048 } 049 050 public void process(Exchange exchange) throws Exception { 051 Exchange nextExchange = exchange; 052 boolean first = true; 053 for (Processor producer : getProcessors()) { 054 if (first) { 055 first = false; 056 } else { 057 nextExchange = createNextExchange(producer, nextExchange); 058 } 059 producer.process(nextExchange); 060 } 061 } 062 063 /** 064 * Strategy method to create the next exchange from the 065 * 066 * @param producer the producer used to send to the endpoint 067 * @param previousExchange the previous exchange 068 * @return a new exchange 069 */ 070 protected Exchange createNextExchange(Processor producer, Exchange previousExchange) { 071 Exchange answer = copyExchangeStrategy(previousExchange); 072 073 // now lets set the input of the next exchange to the output of the 074 // previous message if it is not null 075 Object output = previousExchange.getOut().getBody(); 076 Message in = answer.getIn(); 077 if (output != null) { 078 in.setBody(output); 079 } 080 else { 081 Object previousInBody = previousExchange.getIn().getBody(); 082 if (in.getBody() == null && previousInBody != null) { 083 LOG.warn("Bad exchange implementation; the copy() method did not copy across the in body: " + previousExchange 084 + " of type: " + previousExchange.getClass()); 085 in.setBody(previousInBody); 086 } 087 } 088 return answer; 089 } 090 091 /** 092 * Strategy method to copy the exchange before sending to another endpoint. 093 * Derived classes such as the {@link Pipeline} will not clone the exchange 094 * 095 * @param exchange 096 * @return the current exchange if no copying is required such as for a 097 * pipeline otherwise a new copy of the exchange is returned. 098 */ 099 protected Exchange copyExchangeStrategy(Exchange exchange) { 100 return exchange.copy(); 101 } 102 103 @Override 104 public String toString() { 105 return "Pipeline" + getProcessors(); 106 } 107 }