001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import java.util.Collection; 020 import java.util.Iterator; 021 import java.util.List; 022 023 import org.apache.camel.Exchange; 024 import org.apache.camel.Message; 025 import org.apache.camel.Processor; 026 import org.apache.camel.util.ExchangeHelper; 027 import org.apache.commons.logging.Log; 028 import org.apache.commons.logging.LogFactory; 029 030 /** 031 * Creates a Pipeline pattern where the output of the previous step is sent as 032 * input to the next step, reusing the same message exchanges 033 * 034 * @version $Revision: 772076 $ 035 */ 036 public class Pipeline extends MulticastProcessor implements Processor { 037 private static final transient Log LOG = LogFactory.getLog(Pipeline.class); 038 039 public Pipeline(Collection<Processor> processors) { 040 super(processors); 041 } 042 043 public static Processor newInstance(List<Processor> processors) { 044 if (processors.isEmpty()) { 045 return null; 046 } else if (processors.size() == 1) { 047 return processors.get(0); 048 } 049 return new Pipeline(processors); 050 } 051 052 public void process(Exchange exchange) throws Exception { 053 Iterator<Processor> processors = getProcessors().iterator(); 054 Exchange nextExchange = exchange; 055 boolean first = true; 056 057 while (continueRouting(processors, nextExchange)) { 058 if (first) { 059 first = false; 060 } else { 061 // prepare for next run 062 nextExchange = createNextExchange(nextExchange); 063 } 064 065 // get the next processor 066 Processor processor = processors.next(); 067 068 // process the next exchange 069 try { 070 if (LOG.isTraceEnabled()) { 071 // this does the actual processing so log at trace level 072 LOG.trace("Processing exchangeId: " + nextExchange.getExchangeId() + " >>> " + nextExchange); 073 } 074 processor.process(nextExchange); 075 } catch (Exception e) { 076 nextExchange.setException(e); 077 } 078 079 // check for error if so we should break out 080 boolean exceptionHandled = hasExceptionBeenHandled(nextExchange); 081 if (nextExchange.isFailed() || exceptionHandled) { 082 // The Exchange.EXCEPTION_HANDLED property is only set if satisfactory handling was done 083 // by the error handler. It's still an exception, the exchange still failed. 084 if (LOG.isDebugEnabled()) { 085 LOG.debug("Message exchange has failed so breaking out of pipeline: " + nextExchange 086 + " exception: " + nextExchange.getException() + " fault: " 087 + (nextExchange.hasFault() ? nextExchange.getFault() : null) 088 + (exceptionHandled ? " handled by the error handler" : "")); 089 } 090 break; 091 } 092 } 093 094 if (LOG.isTraceEnabled()) { 095 // logging nextExchange as it contains the exchange that might have altered the payload and since 096 // we are logging the completion if will be confusing if we log the original instead 097 // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots 098 LOG.trace("Processing compelete for exchangeId: " + exchange.getExchangeId() + " >>> " + nextExchange); 099 } 100 101 // copy results back to the original exchange 102 ExchangeHelper.copyResults(exchange, nextExchange); 103 } 104 105 private static boolean hasExceptionBeenHandled(Exchange nextExchange) { 106 return Boolean.TRUE.equals(nextExchange.getProperty(Exchange.EXCEPTION_HANDLED)); 107 } 108 109 /** 110 * Strategy method to create the next exchange from the previous exchange. 111 * <p/> 112 * Remember to copy the original exchange id otherwise correlation of ids in the log is a problem 113 * 114 * @param previousExchange the previous exchange 115 * @return a new exchange 116 */ 117 protected Exchange createNextExchange(Exchange previousExchange) { 118 Exchange answer = previousExchange.newInstance(); 119 // we must use the same id as this is a snapshot strategy where Camel copies a snapshot 120 // before processing the next step in the pipeline, so we have a snapshot of the exchange 121 // just before. This snapshot is used if Camel should do redeliveries (re try) using 122 // DeadLetterChannel. That is why it's important the id is the same, as it is the *same* 123 // exchange being routed. 124 answer.setExchangeId(previousExchange.getExchangeId()); 125 126 answer.getProperties().putAll(previousExchange.getProperties()); 127 128 // now lets set the input of the next exchange to the output of the 129 // previous message if it is not null 130 Message in = answer.getIn(); 131 if (previousExchange.hasOut()) { 132 in.copyFrom(previousExchange.getOut()); 133 } else { 134 in.copyFrom(previousExchange.getIn()); 135 } 136 return answer; 137 } 138 139 protected boolean continueRouting(Iterator<Processor> it, Exchange exchange) { 140 Object stop = exchange.getProperty(Exchange.ROUTE_STOP); 141 if (stop != null) { 142 boolean doStop = exchange.getContext().getTypeConverter().convertTo(Boolean.class, stop); 143 if (doStop) { 144 if (LOG.isDebugEnabled()) { 145 LOG.debug("Exchange is marked to stop routing: " + exchange); 146 } 147 return false; 148 } else { 149 return true; 150 } 151 } else { 152 return it.hasNext(); 153 } 154 } 155 156 @Override 157 public String toString() { 158 return "Pipeline" + getProcessors(); 159 } 160 161 }