001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    
020    import java.util.ArrayList;
021    import java.util.Collection;
022    import java.util.Collections;
023    import java.util.Iterator;
024    import java.util.List;
025    import java.util.concurrent.Executor;
026    
027    import org.apache.camel.Exchange;
028    import org.apache.camel.Expression;
029    import org.apache.camel.Message;
030    import org.apache.camel.Processor;
031    import org.apache.camel.processor.aggregate.AggregationStrategy;
032    import org.apache.camel.util.CollectionHelper;
033    import org.apache.camel.util.ObjectHelper;
034    
035    import static org.apache.camel.util.ObjectHelper.notNull;
036    
037    /**
038     * Implements a dynamic <a
039     * href="http://camel.apache.org/splitter.html">Splitter</a> pattern
040     * where an expression is evaluated to iterate through each of the parts of a
041     * message and then each part is then send to some endpoint.
042     *
043     * @version $Revision: 747062 $
044     */
045    public class Splitter extends MulticastProcessor implements Processor {
046        private final Expression expression;
047    
048        public Splitter(Expression expression, Processor destination, AggregationStrategy aggregationStrategy) {
049            this(expression, destination, aggregationStrategy, false, null, false);
050        }
051    
052        public Splitter(Expression expression, Processor destination, AggregationStrategy aggregationStrategy,
053                boolean parallelProcessing, Executor executor, boolean streaming) {
054            super(Collections.singleton(destination), aggregationStrategy, parallelProcessing, executor, streaming);
055    
056            this.expression = expression;
057            notNull(expression, "expression");
058            notNull(destination, "destination");
059        }
060    
061        @Override
062        public String toString() {
063            return "Splitter[on: " + expression + " to: " + getProcessors().iterator().next() + " aggregate: " + getAggregationStrategy() + "]";
064        }
065    
066        @Override
067        protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) {
068            Object value = expression.evaluate(exchange);
069    
070            if (isStreaming()) {
071                return createProcessorExchangePairsIterable(exchange, value);
072            } else {
073                return createProcessorExchangePairsList(exchange, value);
074            }
075        }
076    
077        @SuppressWarnings("unchecked")
078        private Iterable<ProcessorExchangePair> createProcessorExchangePairsIterable(final Exchange exchange, Object value) {
079            final Iterator iterator = ObjectHelper.createIterator(value);
080            return new Iterable() {
081    
082                public Iterator iterator() {
083                    return new Iterator() {
084    
085                        public boolean hasNext() {
086                            return iterator.hasNext();
087                        }
088    
089                        public Object next() {
090                            Object part = iterator.next();
091                            Exchange newExchange = exchange.copy();
092                            Message in = newExchange.getIn();
093                            in.setBody(part);
094                            return new ProcessorExchangePair(getProcessors().iterator().next(), newExchange);
095                        }
096    
097                        public void remove() {
098                            throw new UnsupportedOperationException("Remove is not supported by this iterator");
099                        }
100                    };
101                }
102    
103            };
104        }
105    
106        private Iterable<ProcessorExchangePair> createProcessorExchangePairsList(Exchange exchange, Object value) {
107            List<ProcessorExchangePair> result;
108            Integer collectionSize = CollectionHelper.size(value);
109            if (collectionSize != null) {
110                result = new ArrayList<ProcessorExchangePair>(collectionSize);
111            } else {
112                result = new ArrayList<ProcessorExchangePair>();
113            }
114            Iterator iter = ObjectHelper.createIterator(value);
115            while (iter.hasNext()) {
116                Object part = iter.next();
117                Exchange newExchange = exchange.copy();
118                Message in = newExchange.getIn();
119                in.setBody(part);
120                result.add(new ProcessorExchangePair(getProcessors().iterator().next(), newExchange));
121            }
122            return result;
123        }
124    
125        @Override
126        protected void updateNewExchange(Exchange exchange, int index, Iterable<ProcessorExchangePair> allPairs) {
127            exchange.getIn().setHeader(Exchange.SPLIT_INDEX, index);
128            if (allPairs instanceof Collection) {
129                exchange.getIn().setHeader(Exchange.SPLIT_SIZE, ((Collection) allPairs).size());
130            }
131        }
132    
133        public Expression getExpression() {
134            return expression;
135        }
136    }