001    /**
002     *
003     * Licensed to the Apache Software Foundation (ASF) under one or more
004     * contributor license agreements.  See the NOTICE file distributed with
005     * this work for additional information regarding copyright ownership.
006     * The ASF licenses this file to You under the Apache License, Version 2.0
007     * (the "License"); you may not use this file except in compliance with
008     * the License.  You may obtain a copy of the License at
009     *
010     * http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.camel.processor;
019    
020    import org.apache.camel.Endpoint;
021    import org.apache.camel.Exchange;
022    import org.apache.camel.Processor;
023    import org.apache.camel.Producer;
024    import org.apache.camel.impl.ServiceSupport;
025    
026    import java.util.ArrayList;
027    import java.util.Collection;
028    
029    /**
030     * Implements the Multicast pattern to send a message exchange to a number of endpoints, each endpoint receiving a copy of
031     * the message exchange.
032     *
033     * @version $Revision: 534145 $
034     */
035    public class MulticastProcessor extends ServiceSupport implements Processor {
036        private Collection<Producer> producers;
037    
038        /**
039         * A helper method to convert a list of endpoints into a list of processors
040         */
041        public static <E extends Exchange> Collection<Producer> toProducers(Collection<Endpoint> endpoints) throws Exception {
042            Collection<Producer> answer = new ArrayList<Producer>();
043            for (Endpoint endpoint : endpoints) {
044                answer.add(endpoint.createProducer());
045            }
046            return answer;
047        }
048    
049        public MulticastProcessor(Collection<Endpoint> endpoints) throws Exception {
050            this.producers = toProducers(endpoints);
051        }
052    
053        @Override
054        public String toString() {
055            return "Multicast" + getEndpoints();
056        }
057    
058        public void process(Exchange exchange) throws Exception {
059            for (Producer producer : producers) {
060                Exchange copy = copyExchangeStrategy(producer, exchange);
061                producer.process(copy);
062            }
063        }
064    
065        protected void doStop() throws Exception {
066            for (Producer producer : producers) {
067                producer.stop();
068            }
069        }
070    
071        protected void doStart() throws Exception {
072            for (Producer producer : producers) {
073                producer.start();
074            }
075        }
076    
077        /**
078         * Returns the producers to multicast to
079         */
080        public Collection<Producer> getProducers() {
081            return producers;
082        }
083    
084        /**
085         * Returns the list of endpoints
086         */
087        public Collection<Endpoint> getEndpoints() {
088            Collection<Endpoint> answer = new ArrayList<Endpoint>();
089            for (Producer producer : producers) {
090                answer.add(producer.getEndpoint());
091            }
092            return answer;
093        }
094    
095        /**
096         * Strategy method to copy the exchange before sending to another endpoint. Derived classes such as the
097         * {@link Pipeline} will not clone the exchange
098         *
099         * @param producer the producer that will send the exchange
100         * @param exchange @return the current exchange if no copying is required such as for a pipeline otherwise a new copy of the exchange is returned.
101         */
102        protected Exchange copyExchangeStrategy(Producer producer, Exchange exchange) {
103            return producer.createExchange(exchange);
104        }
105    }