001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.util.ArrayList;
020    import java.util.Collection;
021    import java.util.LinkedList;
022    import java.util.List;
023    import java.util.concurrent.ArrayBlockingQueue;
024    import java.util.concurrent.Executor;
025    import java.util.concurrent.RejectedExecutionException;
026    import java.util.concurrent.RejectedExecutionHandler;
027    import java.util.concurrent.ThreadPoolExecutor;
028    import java.util.concurrent.TimeUnit;
029    import java.util.concurrent.atomic.AtomicBoolean;
030    
031    import org.apache.camel.AsyncCallback;
032    import org.apache.camel.Endpoint;
033    import org.apache.camel.Exchange;
034    import org.apache.camel.Processor;
035    import org.apache.camel.impl.ServiceSupport;
036    import org.apache.camel.processor.aggregate.AggregationStrategy;
037    import org.apache.camel.util.ExchangeHelper;
038    import org.apache.camel.util.ServiceHelper;
039    import org.apache.camel.util.concurrent.AtomicExchange;
040    import org.apache.camel.util.concurrent.CountingLatch;
041    
042    import static org.apache.camel.util.ObjectHelper.notNull;
043    
044    /**
045     * Implements the Multicast pattern to send a message exchange to a number of
046     * endpoints, each endpoint receiving a copy of the message exchange.
047     *
048     * @see Pipeline
049     * @version $Revision: 727377 $
050     */
051    public class MulticastProcessor extends ServiceSupport implements Processor {
052        static class ProcessorExchangePair {
053            private final Processor processor;
054            private final Exchange exchange;
055    
056            public ProcessorExchangePair(Processor processor, Exchange exchange) {
057                this.processor = processor;
058                this.exchange = exchange;
059            }
060    
061            public Processor getProcessor() {
062                return processor;
063            }
064    
065            public Exchange getExchange() {
066                return exchange;
067            }
068        }
069    
070        private Collection<Processor> processors;
071        private AggregationStrategy aggregationStrategy;
072        private boolean isParallelProcessing;
073        private Executor executor;
074        private final boolean streaming;
075        private final AtomicBoolean shutdown = new AtomicBoolean(true);
076    
077        public MulticastProcessor(Collection<Processor> processors) {
078            this(processors, null);
079        }
080    
081        public MulticastProcessor(Collection<Processor> processors, AggregationStrategy aggregationStrategy) {
082            this(processors, aggregationStrategy, false, null);
083        }
084        
085        public MulticastProcessor(Collection<Processor> processors, AggregationStrategy aggregationStrategy, boolean parallelProcessing, Executor executor) {
086            this(processors, aggregationStrategy, parallelProcessing, executor, false);
087        }
088    
089        public MulticastProcessor(Collection<Processor> processors, AggregationStrategy aggregationStrategy, boolean parallelProcessing, Executor executor, boolean streaming) {
090            notNull(processors, "processors");
091            this.processors = processors;
092            this.aggregationStrategy = aggregationStrategy;
093            this.isParallelProcessing = parallelProcessing;
094            if (isParallelProcessing) {
095                if (executor != null) {
096                    this.executor = executor;
097                } else { 
098                    // setup default Executor
099                    this.executor = new ThreadPoolExecutor(processors.size(), processors.size(), 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(processors.size()));
100                }
101            }
102            this.streaming = streaming;
103        }
104    
105        /**
106         * A helper method to convert a list of endpoints into a list of processors
107         */
108        public static <E extends Exchange> Collection<Processor> toProducers(Collection<Endpoint> endpoints)
109            throws Exception {
110            Collection<Processor> answer = new ArrayList<Processor>();
111            for (Endpoint endpoint : endpoints) {
112                answer.add(endpoint.createProducer());
113            }
114            return answer;
115        }
116    
117        @Override
118        public String toString() {
119            return "Multicast" + getProcessors();
120        }
121    
122        class ProcessCall implements Runnable {
123            private final Exchange exchange;
124            private final AsyncCallback callback;
125            private final Processor processor;
126    
127            public ProcessCall(Exchange exchange, Processor processor, AsyncCallback callback) {
128                this.exchange = exchange;
129                this.callback = callback;
130                this.processor = processor;
131            }
132    
133            public void run() {
134                if (shutdown.get()) {
135                    exchange.setException(new RejectedExecutionException());
136                    callback.done(false);
137                } else {
138                    try {
139                        processor.process(exchange);
140                    } catch (Exception ex) {
141                        exchange.setException(ex);
142                    }
143                    callback.done(false);
144                }
145            }
146        }
147    
148        public void process(Exchange exchange) throws Exception {
149            final AtomicExchange result = new AtomicExchange();
150    
151            Iterable<ProcessorExchangePair> pairs = createProcessorExchangePairs(exchange);
152            
153            // Parallel Processing the producer
154            if (isParallelProcessing) {
155                List<Exchange> exchanges = new LinkedList<Exchange>();
156                final CountingLatch completedExchanges = new CountingLatch();
157                int i = 0;
158                for (ProcessorExchangePair pair : pairs) {
159                    Processor producer = pair.getProcessor();
160                    final Exchange subExchange = pair.getExchange();
161                    updateNewExchange(subExchange, i, pairs);
162                    exchanges.add(subExchange);
163                    completedExchanges.increment(); 
164                    ProcessCall call = new ProcessCall(subExchange, producer, new AsyncCallback() {
165                        public void done(boolean doneSynchronously) {
166                            if (streaming && aggregationStrategy != null) {
167                                doAggregate(result, subExchange);
168                            }
169                            completedExchanges.decrement();
170                        }
171    
172                    });
173                    executor.execute(call);
174                    i++;
175                }
176                completedExchanges.await();
177                if (!streaming && aggregationStrategy != null) {
178                    for (Exchange resultExchange : exchanges) {
179                        doAggregate(result, resultExchange);
180                    }
181                }
182    
183            } else {
184                // we call the producer one by one sequentially
185                int i = 0;
186                for (ProcessorExchangePair pair : pairs) {
187                    Processor producer = pair.getProcessor();
188                    Exchange subExchange = pair.getExchange();
189                    updateNewExchange(subExchange, i, pairs);
190                    try {
191                        producer.process(subExchange);
192                    } catch (Exception exception) {
193                        subExchange.setException(exception);
194                    }
195                    doAggregate(result, subExchange);
196                    i++;
197                }
198            }
199            if (result.get() != null) {
200                ExchangeHelper.copyResults(exchange, result.get());
201            }
202        }
203    
204        /**
205         * Aggregate the {@link Exchange} with the current result
206         *
207         * @param result the current result
208         * @param exchange the exchange to be added to the result
209         */
210        protected synchronized void doAggregate(AtomicExchange result, Exchange exchange) {
211            if (aggregationStrategy != null) {
212                if (result.get() == null) {
213                    result.set(exchange);
214                } else {
215                    result.set(aggregationStrategy.aggregate(result.get(), exchange));
216                }
217            }
218        }
219    
220        protected void updateNewExchange(Exchange exchange, int i, Iterable<ProcessorExchangePair> allPairs) {
221            // No updates needed
222        }
223    
224        protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) {
225            List<ProcessorExchangePair> result = new ArrayList<ProcessorExchangePair>(processors.size());
226            Processor[] processorsArray = processors.toArray(new Processor[processors.size()]);
227            for (int i = 0; i < processorsArray.length; i++) {
228                result.add(new ProcessorExchangePair(processorsArray[i], exchange.copy()));
229            }
230            return result;
231        }
232    
233        protected void doStop() throws Exception {
234            shutdown.set(true);
235            if (executor != null && executor instanceof ThreadPoolExecutor) {
236                ((ThreadPoolExecutor)executor).shutdown();
237                ((ThreadPoolExecutor)executor).awaitTermination(0, TimeUnit.SECONDS);
238            }
239            ServiceHelper.stopServices(processors);
240        }
241    
242        protected void doStart() throws Exception {
243            shutdown.set(false);
244            if (executor != null && executor instanceof ThreadPoolExecutor) {
245                ((ThreadPoolExecutor)executor).setRejectedExecutionHandler(new RejectedExecutionHandler() {
246                    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
247                        ProcessCall call = (ProcessCall)runnable;
248                        call.exchange.setException(new RejectedExecutionException());
249                        call.callback.done(false);
250                    }
251                });
252            }
253            ServiceHelper.startServices(processors);
254        }
255        
256        /**
257         * Is the multicast processor working in streaming mode?
258         * 
259         * In streaming mode:
260         * <ul>
261         * <li>we use {@link Iterable} to ensure we can send messages as soon as the data becomes available</li>
262         * <li>for parallel processing, we start aggregating responses as they get send back to the processor;
263         * this means the {@link org.apache.camel.processor.aggregate.AggregationStrategy} has to take care of handling out-of-order arrival of exchanges</li>
264         * </ul>
265         */
266        public boolean isStreaming() {
267            return streaming;
268        }
269    
270        /**
271         * Returns the producers to multicast to
272         */
273        public Collection<Processor> getProcessors() {
274            return processors;
275        }
276    
277        public AggregationStrategy getAggregationStrategy() {
278            return aggregationStrategy;
279        }
280    
281        public Executor getExecutor() {
282            return executor;
283        }
284    
285        public boolean isParallelProcessing() {
286            return isParallelProcessing;
287        }
288    }