001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.model;
018    
019    import java.util.List;
020    import java.util.concurrent.ExecutorService;
021    
022    import javax.xml.bind.annotation.XmlAccessType;
023    import javax.xml.bind.annotation.XmlAccessorType;
024    import javax.xml.bind.annotation.XmlAttribute;
025    import javax.xml.bind.annotation.XmlRootElement;
026    import javax.xml.bind.annotation.XmlTransient;
027    
028    import org.apache.camel.Processor;
029    import org.apache.camel.processor.MulticastProcessor;
030    import org.apache.camel.processor.aggregate.AggregationStrategy;
031    import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
032    import org.apache.camel.spi.RouteContext;
033    
034    /**
035     * Represents an XML <multicast/> element
036     *
037     * @version $Revision: 772172 $
038     */
039    @XmlRootElement(name = "multicast")
040    @XmlAccessorType(XmlAccessType.FIELD)
041    public class MulticastDefinition extends OutputDefinition<ProcessorDefinition> {
042        @XmlAttribute(required = false)
043        private Boolean parallelProcessing;
044        @XmlAttribute(required = false)
045        private String strategyRef;
046        @XmlTransient
047        private ExecutorService executorService;
048        @XmlAttribute(required = false)
049        private String executorServiceRef;
050        @XmlAttribute(required = false)
051        private Boolean streaming;
052        @XmlTransient
053        private AggregationStrategy aggregationStrategy;
054    
055    
056        @Override
057        public String toString() {
058            return "Multicast[" + getOutputs() + "]";
059        }
060    
061        @Override
062        public String getShortName() {
063            return "multicast";
064        }
065    
066        @Override
067        public Processor createProcessor(RouteContext routeContext) throws Exception {
068            return createOutputsProcessor(routeContext);
069        }
070    
071        // Fluent API
072        // -------------------------------------------------------------------------
073    
074        /**
075         * Set the multicasting aggregationStrategy
076         *
077         * @return the builder
078         */
079        public MulticastDefinition aggregationStrategy(AggregationStrategy aggregationStrategy) {
080            setAggregationStrategy(aggregationStrategy);
081            return this;
082        }
083        
084        /**
085         * Uses the {@link java.util.concurrent.ExecutorService} to do the multicasting work
086         *     
087         * @return the builder
088         */
089        public MulticastDefinition parallelProcessing() {
090            setParallelProcessing(true);
091            return this;
092        }
093        
094        /**
095         * Aggregates the responses as the are done (e.g. out of order sequence)
096         *
097         * @return the builder
098         */
099        public MulticastDefinition streaming() {
100            setStreaming(true);
101            return this;
102        }
103           
104        /**
105         * Setting the executor service for executing the multicasting action.
106         *
107         * @return the builder
108         */
109        public MulticastDefinition executorService(ExecutorService executorService) {
110            setExecutorService(executorService);
111            return this;
112        }    
113            
114        protected Processor createCompositeProcessor(RouteContext routeContext, List<Processor> list) {
115            if (strategyRef != null) {
116                aggregationStrategy = routeContext.lookup(strategyRef, AggregationStrategy.class);
117            }
118            if (aggregationStrategy == null) {
119                // default to use latest aggregation strategy
120                aggregationStrategy = new UseLatestAggregationStrategy();
121            }
122            if (executorServiceRef != null) {
123                executorService = routeContext.lookup(executorServiceRef, ExecutorService.class);
124            }
125            return new MulticastProcessor(list, aggregationStrategy, isParallelProcessing(), executorService, isStreaming());
126        }
127    
128        public AggregationStrategy getAggregationStrategy() {
129            return aggregationStrategy;
130        }
131    
132        public MulticastDefinition setAggregationStrategy(AggregationStrategy aggregationStrategy) {
133            this.aggregationStrategy = aggregationStrategy;
134            return this;
135        }
136    
137        public boolean isParallelProcessing() {
138            return parallelProcessing != null ? parallelProcessing : false;
139        }
140    
141        public void setParallelProcessing(boolean parallelProcessing) {
142            this.parallelProcessing = parallelProcessing;        
143        }
144    
145        public boolean isStreaming() {
146            return streaming != null ? streaming : false;
147        }
148    
149        public void setStreaming(boolean streaming) {
150            this.streaming = streaming;
151        }
152    
153        public ExecutorService getExecutorService() {
154            return executorService;
155        }
156    
157        public void setExecutorService(ExecutorService executorService) {
158            this.executorService = executorService;
159        }
160    
161    }