001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.util.ArrayList;
020    import java.util.Collection;
021    
022    import org.apache.camel.Endpoint;
023    import org.apache.camel.Exchange;
024    import org.apache.camel.Processor;
025    import org.apache.camel.impl.ServiceSupport;
026    import org.apache.camel.util.ServiceHelper;
027    
028    /**
029     * Implements the Multicast pattern to send a message exchange to a number of
030     * endpoints, each endpoint receiving a copy of the message exchange.
031     * 
032     * @see Pipeline
033     * @version $Revision: 563607 $
034     */
035    public class MulticastProcessor extends ServiceSupport implements Processor {
036        private Collection<Processor> processors;
037    
038        public MulticastProcessor(Collection<Processor> processors) {
039            this.processors = processors;
040        }
041    
042        /**
043         * A helper method to convert a list of endpoints into a list of processors
044         */
045        public static <E extends Exchange> Collection<Processor> toProducers(Collection<Endpoint> endpoints)
046            throws Exception {
047            Collection<Processor> answer = new ArrayList<Processor>();
048            for (Endpoint endpoint : endpoints) {
049                answer.add(endpoint.createProducer());
050            }
051            return answer;
052        }
053    
054        @Override
055        public String toString() {
056            return "Multicast" + getProcessors();
057        }
058    
059        public void process(Exchange exchange) throws Exception {
060            for (Processor producer : processors) {
061                Exchange copy = copyExchangeStrategy(producer, exchange);
062                producer.process(copy);
063            }
064        }
065    
066        protected void doStop() throws Exception {
067            ServiceHelper.stopServices(processors);
068        }
069    
070        protected void doStart() throws Exception {
071            ServiceHelper.startServices(processors);
072        }
073    
074        /**
075         * Returns the producers to multicast to
076         */
077        public Collection<Processor> getProcessors() {
078            return processors;
079        }
080    
081        /**
082         * Strategy method to copy the exchange before sending to another endpoint.
083         * Derived classes such as the {@link Pipeline} will not clone the exchange
084         * 
085         * @param processor the processor that will send the exchange
086         * @param exchange
087         * @return the current exchange if no copying is required such as for a
088         *         pipeline otherwise a new copy of the exchange is returned.
089         */
090        protected Exchange copyExchangeStrategy(Processor processor, Exchange exchange) {
091            return exchange.copy();
092        }
093    }