Coverage Report - org.apache.camel.processor.MulticastProcessor
 
Classes in this File Line Coverage Branch Coverage Complexity
MulticastProcessor
100% 
100% 
0
 
 1  
 /**
 2  
  *
 3  
  * Licensed to the Apache Software Foundation (ASF) under one or more
 4  
  * contributor license agreements.  See the NOTICE file distributed with
 5  
  * this work for additional information regarding copyright ownership.
 6  
  * The ASF licenses this file to You under the Apache License, Version 2.0
 7  
  * (the "License"); you may not use this file except in compliance with
 8  
  * the License.  You may obtain a copy of the License at
 9  
  *
 10  
  * http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.camel.processor;
 19  
 
 20  
 import org.apache.camel.Endpoint;
 21  
 import org.apache.camel.Exchange;
 22  
 import org.apache.camel.Processor;
 23  
 import org.apache.camel.Producer;
 24  
 import org.apache.camel.impl.ServiceSupport;
 25  
 
 26  
 import java.util.ArrayList;
 27  
 import java.util.Collection;
 28  
 
 29  
 /**
 30  
  * Implements the Multicast pattern to send a message exchange to a number of endpoints, each endpoint receiving a copy of
 31  
  * the message exchange.
 32  
  *
 33  
  * @version $Revision: 534145 $
 34  
  */
 35  
 public class MulticastProcessor extends ServiceSupport implements Processor {
 36  
     private Collection<Producer> producers;
 37  
 
 38  
     /**
 39  
      * A helper method to convert a list of endpoints into a list of processors
 40  
      */
 41  
     public static <E extends Exchange> Collection<Producer> toProducers(Collection<Endpoint> endpoints) throws Exception {
 42  3
         Collection<Producer> answer = new ArrayList<Producer>();
 43  3
         for (Endpoint endpoint : endpoints) {
 44  9
             answer.add(endpoint.createProducer());
 45  9
         }
 46  3
         return answer;
 47  
     }
 48  
 
 49  3
     public MulticastProcessor(Collection<Endpoint> endpoints) throws Exception {
 50  3
         this.producers = toProducers(endpoints);
 51  3
     }
 52  
 
 53  
     @Override
 54  
     public String toString() {
 55  5
         return "Multicast" + getEndpoints();
 56  
     }
 57  
 
 58  
     public void process(Exchange exchange) throws Exception {
 59  1
         for (Producer producer : producers) {
 60  3
             Exchange copy = copyExchangeStrategy(producer, exchange);
 61  3
             producer.process(copy);
 62  3
         }
 63  1
     }
 64  
 
 65  
     protected void doStop() throws Exception {
 66  2
         for (Producer producer : producers) {
 67  7
             producer.stop();
 68  7
         }
 69  2
     }
 70  
 
 71  
     protected void doStart() throws Exception {
 72  2
         for (Producer producer : producers) {
 73  7
             producer.start();
 74  7
         }
 75  2
     }
 76  
 
 77  
     /**
 78  
      * Returns the producers to multicast to
 79  
      */
 80  
     public Collection<Producer> getProducers() {
 81  1
         return producers;
 82  
     }
 83  
 
 84  
     /**
 85  
      * Returns the list of endpoints
 86  
      */
 87  
     public Collection<Endpoint> getEndpoints() {
 88  7
         Collection<Endpoint> answer = new ArrayList<Endpoint>();
 89  7
         for (Producer producer : producers) {
 90  17
             answer.add(producer.getEndpoint());
 91  17
         }
 92  7
         return answer;
 93  
     }
 94  
 
 95  
     /**
 96  
      * Strategy method to copy the exchange before sending to another endpoint. Derived classes such as the
 97  
      * {@link Pipeline} will not clone the exchange
 98  
      *
 99  
      * @param producer the producer that will send the exchange
 100  
      * @param exchange @return the current exchange if no copying is required such as for a pipeline otherwise a new copy of the exchange is returned.
 101  
      */
 102  
     protected Exchange copyExchangeStrategy(Producer producer, Exchange exchange) {
 103  3
         return producer.createExchange(exchange);
 104  
     }
 105  
 }