Coverage Report - org.apache.camel.processor.Pipeline
 
Classes in this File Line Coverage Branch Coverage Complexity
Pipeline
90% 
100% 
0
 
 1  
 /**
 2  
  * Licensed to the Apache Software Foundation (ASF) under one or more
 3  
  * contributor license agreements.  See the NOTICE file distributed with
 4  
  * this work for additional information regarding copyright ownership.
 5  
  * The ASF licenses this file to You under the Apache License, Version 2.0
 6  
  * (the "License"); you may not use this file except in compliance with
 7  
  * the License.  You may obtain a copy of the License at
 8  
  *
 9  
  *      http://www.apache.org/licenses/LICENSE-2.0
 10  
  *
 11  
  * Unless required by applicable law or agreed to in writing, software
 12  
  * distributed under the License is distributed on an "AS IS" BASIS,
 13  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 14  
  * See the License for the specific language governing permissions and
 15  
  * limitations under the License.
 16  
  */
 17  
 package org.apache.camel.processor;
 18  
 
 19  
 import java.util.Collection;
 20  
 import java.util.List;
 21  
 
 22  
 import org.apache.camel.Exchange;
 23  
 import org.apache.camel.Processor;
 24  
 import org.apache.camel.Message;
 25  
 import org.apache.commons.logging.Log;
 26  
 import org.apache.commons.logging.LogFactory;
 27  
 
 28  
 /**
 29  
  * Creates a Pipeline pattern where the output of the previous step is sent as
 30  
  * input to the next step, reusing the same message exchanges
 31  
  * 
 32  
  * @version $Revision: 563931 $
 33  
  */
 34  
 public class Pipeline extends MulticastProcessor implements Processor {
 35  3
     private static final transient Log LOG = LogFactory.getLog(Pipeline.class);
 36  
 
 37  
     public Pipeline(Collection<Processor> processors) {
 38  45
         super(processors);
 39  45
     }
 40  
     
 41  
     public static Processor newInstance(List<Processor> processors) {
 42  258
         if (processors.isEmpty()) {
 43  0
             return null;
 44  258
         } else if (processors.size() == 1) {
 45  228
             return processors.get(0);
 46  
         }
 47  30
         return new Pipeline(processors);
 48  
     }
 49  
 
 50  
     public void process(Exchange exchange) throws Exception {
 51  39
         Exchange nextExchange = exchange;
 52  39
         boolean first = true;
 53  39
         for (Processor producer : getProcessors()) {
 54  78
             if (first) {
 55  39
                 first = false;
 56  39
             } else {
 57  39
                 nextExchange = createNextExchange(producer, nextExchange);
 58  
             }
 59  78
             producer.process(nextExchange);
 60  75
         }
 61  36
     }
 62  
 
 63  
     /**
 64  
      * Strategy method to create the next exchange from the
 65  
      * 
 66  
      * @param producer the producer used to send to the endpoint
 67  
      * @param previousExchange the previous exchange
 68  
      * @return a new exchange
 69  
      */
 70  
     protected Exchange createNextExchange(Processor producer, Exchange previousExchange) {
 71  39
         Exchange answer = copyExchangeStrategy(previousExchange);
 72  
 
 73  
         // now lets set the input of the next exchange to the output of the
 74  
         // previous message if it is not null
 75  39
         Object output = previousExchange.getOut().getBody();
 76  39
         Message in = answer.getIn();
 77  39
         if (output != null) {
 78  9
             in.setBody(output);
 79  9
         }
 80  
         else {
 81  30
             Object previousInBody = previousExchange.getIn().getBody();
 82  30
             if (in.getBody() == null && previousInBody != null) {
 83  0
                 LOG.warn("Bad exchange implementation; the copy() method did not copy across the in body: " + previousExchange
 84  
                         + " of type: " + previousExchange.getClass());
 85  0
                 in.setBody(previousInBody);
 86  
             }
 87  
         }
 88  39
         return answer;
 89  
     }
 90  
 
 91  
     /**
 92  
      * Strategy method to copy the exchange before sending to another endpoint.
 93  
      * Derived classes such as the {@link Pipeline} will not clone the exchange
 94  
      * 
 95  
      * @param exchange
 96  
      * @return the current exchange if no copying is required such as for a
 97  
      *         pipeline otherwise a new copy of the exchange is returned.
 98  
      */
 99  
     protected Exchange copyExchangeStrategy(Exchange exchange) {
 100  39
         return exchange.copy();
 101  
     }
 102  
 
 103  
     @Override
 104  
     public String toString() {
 105  42
         return "Pipeline" + getProcessors();
 106  
     }
 107  
 }