001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.util.ArrayList;
020    import java.util.Collection;
021    import java.util.List;
022    import java.util.concurrent.Callable;
023    import java.util.concurrent.CompletionService;
024    import java.util.concurrent.ExecutionException;
025    import java.util.concurrent.ExecutorCompletionService;
026    import java.util.concurrent.ExecutorService;
027    import java.util.concurrent.Future;
028    import java.util.concurrent.TimeUnit;
029    
030    import org.apache.camel.Exchange;
031    import org.apache.camel.Navigate;
032    import org.apache.camel.Processor;
033    import org.apache.camel.impl.ServiceSupport;
034    import org.apache.camel.processor.aggregate.AggregationStrategy;
035    import org.apache.camel.util.ExchangeHelper;
036    import org.apache.camel.util.ServiceHelper;
037    import org.apache.camel.util.concurrent.AtomicExchange;
038    import org.apache.camel.util.concurrent.ExecutorServiceHelper;
039    import org.apache.camel.util.concurrent.SubmitOrderedCompletionService;
040    import org.apache.commons.logging.Log;
041    import org.apache.commons.logging.LogFactory;
042    
043    import static org.apache.camel.util.ObjectHelper.notNull;
044    
045    /**
046     * Implements the Multicast pattern to send a message exchange to a number of
047     * endpoints, each endpoint receiving a copy of the message exchange.
048     *
049     * @see Pipeline
050     * @version $Revision: 783663 $
051     */
052    public class MulticastProcessor extends ServiceSupport implements Processor, Navigate {
053    
054        private static final transient Log LOG = LogFactory.getLog(MulticastProcessor.class);
055    
056        // TODO: Add option to stop if an exception was thrown during processing to break asap (future task cancel)
057    
058        /**
059         * Class that represent each step in the multicast route to do
060         */
061        static class ProcessorExchangePair {
062            private final Processor processor;
063            private final Exchange exchange;
064    
065            public ProcessorExchangePair(Processor processor, Exchange exchange) {
066                this.processor = processor;
067                this.exchange = exchange;
068            }
069    
070            public Processor getProcessor() {
071                return processor;
072            }
073    
074            public Exchange getExchange() {
075                return exchange;
076            }
077        }
078    
079        private final Collection<Processor> processors;
080        private final AggregationStrategy aggregationStrategy;
081        private final boolean isParallelProcessing;
082        private final boolean streaming;
083        private ExecutorService executorService;
084    
085        public MulticastProcessor(Collection<Processor> processors) {
086            this(processors, null);
087        }
088    
089        public MulticastProcessor(Collection<Processor> processors, AggregationStrategy aggregationStrategy) {
090            this(processors, aggregationStrategy, false, null, false);
091        }
092        
093        public MulticastProcessor(Collection<Processor> processors, AggregationStrategy aggregationStrategy, boolean parallelProcessing, ExecutorService executorService, boolean streaming) {
094            notNull(processors, "processors");
095            // TODO: end() does not work correctly with Splitter
096            this.processors = processors;
097            this.aggregationStrategy = aggregationStrategy;
098            this.isParallelProcessing = parallelProcessing;
099            this.executorService = executorService;
100            this.streaming = streaming;
101    
102            if (isParallelProcessing()) {
103                if (this.executorService == null) {
104                    // setup default executor as parallel processing requires an executor
105                    this.executorService = ExecutorServiceHelper.newScheduledThreadPool(5, "Multicast", true);
106                }
107            }
108        }
109    
110        @Override
111        public String toString() {
112            return "Multicast[" + getProcessors() + "]";
113        }
114    
115        public void process(Exchange exchange) throws Exception {
116            final AtomicExchange result = new AtomicExchange();
117            final Iterable<ProcessorExchangePair> pairs = createProcessorExchangePairs(exchange);
118    
119            if (isParallelProcessing()) {
120                doProcessParallel(result, pairs, isStreaming());
121            } else {
122                doProcessSequntiel(result, pairs);
123            }
124    
125            if (result.get() != null) {
126                ExchangeHelper.copyResults(exchange, result.get());
127            }
128        }
129    
130        protected void doProcessParallel(final AtomicExchange result, Iterable<ProcessorExchangePair> pairs, boolean streaming) throws InterruptedException, ExecutionException {
131            CompletionService<Exchange> completion;
132            if (streaming) {
133                // execute tasks in paralle+streaming and aggregate in the order they are finished (out of order sequence)
134                completion = new ExecutorCompletionService<Exchange>(executorService);
135            } else {
136                // execute tasks in parallel and aggregate in the order the tasks are submitted (in order sequence)
137                completion = new SubmitOrderedCompletionService<Exchange>(executorService);
138            }
139            int total = 0;
140    
141            for (ProcessorExchangePair pair : pairs) {
142                final Processor producer = pair.getProcessor();
143                final Exchange subExchange = pair.getExchange();
144                updateNewExchange(subExchange, total, pairs);
145    
146                completion.submit(new Callable<Exchange>() {
147                    public Exchange call() throws Exception {
148                        try {
149                            producer.process(subExchange);
150                        } catch (Exception e) {
151                            subExchange.setException(e);
152                        }
153                        if (LOG.isTraceEnabled()) {
154                            LOG.trace("Parallel processing complete for exchange: " + subExchange);
155                        }
156                        return subExchange;
157                    }
158                });
159    
160                total++;
161            }
162    
163            for (int i = 0; i < total; i++) {
164                Future<Exchange> future = completion.take();
165                Exchange subExchange = future.get();
166                if (aggregationStrategy != null) {
167                    doAggregate(result, subExchange);
168                }
169            }
170    
171            if (LOG.isDebugEnabled()) {
172                LOG.debug("Done parallel processing " + total + " exchanges");
173            }
174        }
175    
176        protected void doProcessSequntiel(AtomicExchange result, Iterable<ProcessorExchangePair> pairs) throws Exception {
177            int total = 0;
178    
179            for (ProcessorExchangePair pair : pairs) {
180                Processor producer = pair.getProcessor();
181                Exchange subExchange = pair.getExchange();
182                updateNewExchange(subExchange, total, pairs);
183    
184                // process it sequentially
185                producer.process(subExchange);
186                if (LOG.isTraceEnabled()) {
187                    LOG.trace("Sequientel processing complete for number " + total + " exchange: " + subExchange);
188                }
189    
190                if (aggregationStrategy != null) {
191                    doAggregate(result, subExchange);
192                }
193                total++;
194            }
195    
196            if (LOG.isDebugEnabled()) {
197                LOG.debug("Done sequientel processing " + total + " exchanges");
198            }
199        }
200    
201        /**
202         * Aggregate the {@link Exchange} with the current result
203         *
204         * @param result the current result
205         * @param exchange the exchange to be added to the result
206         */
207        protected synchronized void doAggregate(AtomicExchange result, Exchange exchange) {
208            // only aggregate if the exchange is not filtered (eg by the FilterProcessor)
209            Boolean filtered = exchange.getProperty(Exchange.FILTERED, Boolean.class);
210            if (aggregationStrategy != null && (filtered == null || !filtered)) {
211                result.set(aggregationStrategy.aggregate(result.get(), exchange));
212            } else {
213                if (LOG.isTraceEnabled()) {
214                    LOG.trace("Cannot aggregate exchange as its filtered: " + exchange);
215                }
216            }
217        }
218    
219        protected void updateNewExchange(Exchange exchange, int index, Iterable<ProcessorExchangePair> allPairs) {
220            exchange.getIn().setHeader(Exchange.MULTICAST_INDEX, index);
221        }
222    
223        protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) {
224            List<ProcessorExchangePair> result = new ArrayList<ProcessorExchangePair>(processors.size());
225    
226            for (Processor processor : processors) {
227                Exchange copy = exchange.copy();
228                result.add(new ProcessorExchangePair(processor, copy));
229            }
230            return result;
231        }
232    
233        protected void doStop() throws Exception {
234            if (executorService != null) {
235                executorService.shutdown();
236                executorService.awaitTermination(0, TimeUnit.SECONDS);
237            }
238            ServiceHelper.stopServices(processors);
239        }
240    
241        protected void doStart() throws Exception {
242            ServiceHelper.startServices(processors);
243        }
244        
245        /**
246         * Is the multicast processor working in streaming mode?
247         * 
248         * In streaming mode:
249         * <ul>
250         * <li>we use {@link Iterable} to ensure we can send messages as soon as the data becomes available</li>
251         * <li>for parallel processing, we start aggregating responses as they get send back to the processor;
252         * this means the {@link org.apache.camel.processor.aggregate.AggregationStrategy} has to take care of handling out-of-order arrival of exchanges</li>
253         * </ul>
254         */
255        public boolean isStreaming() {
256            return streaming;
257        }
258    
259        /**
260         * Returns the producers to multicast to
261         */
262        public Collection<Processor> getProcessors() {
263            return processors;
264        }
265    
266        public AggregationStrategy getAggregationStrategy() {
267            return aggregationStrategy;
268        }
269    
270        public boolean isParallelProcessing() {
271            return isParallelProcessing;
272        }
273    
274        public ExecutorService getExecutorService() {
275            return executorService;
276        }
277    
278        public void setExecutorService(ExecutorService executorService) {
279            this.executorService = executorService;
280        }
281    
282        public List<Processor> next() {
283            if (!hasNext()) {
284                return null;
285            }
286            return new ArrayList<Processor>(processors);
287        }
288    
289        public boolean hasNext() {
290            return processors != null && !processors.isEmpty();
291        }
292    }