001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.model;
018    
019    import java.util.ArrayList;
020    import java.util.Arrays;
021    import java.util.Collection;
022    import java.util.Collections;
023    import java.util.HashSet;
024    import java.util.LinkedList;
025    import java.util.List;
026    import java.util.Set;
027    import java.util.concurrent.ThreadPoolExecutor;
028    
029    import javax.xml.bind.annotation.XmlAccessType;
030    import javax.xml.bind.annotation.XmlAccessorType;
031    import javax.xml.bind.annotation.XmlAttribute;
032    import javax.xml.bind.annotation.XmlTransient;
033    
034    import org.apache.camel.CamelContext;
035    import org.apache.camel.CamelException;
036    import org.apache.camel.Endpoint;
037    import org.apache.camel.Exchange;
038    import org.apache.camel.Expression;
039    import org.apache.camel.Predicate;
040    import org.apache.camel.Processor;
041    import org.apache.camel.Route;
042    import org.apache.camel.RuntimeCamelException;
043    import org.apache.camel.builder.DataFormatClause;
044    import org.apache.camel.builder.DeadLetterChannelBuilder;
045    import org.apache.camel.builder.ErrorHandlerBuilder;
046    import org.apache.camel.builder.ErrorHandlerBuilderRef;
047    import org.apache.camel.builder.ExpressionClause;
048    import org.apache.camel.builder.NoErrorHandlerBuilder;
049    import org.apache.camel.builder.ProcessorBuilder;
050    import org.apache.camel.impl.DefaultCamelContext;
051    import org.apache.camel.model.dataformat.DataFormatType;
052    import org.apache.camel.model.language.ExpressionType;
053    import org.apache.camel.model.language.LanguageExpression;
054    import org.apache.camel.processor.ConvertBodyProcessor;
055    import org.apache.camel.processor.DelegateProcessor;
056    import org.apache.camel.processor.Pipeline;
057    import org.apache.camel.processor.aggregate.AggregationCollection;
058    import org.apache.camel.processor.aggregate.AggregationStrategy;
059    import org.apache.camel.processor.idempotent.MessageIdRepository;
060    import org.apache.camel.spi.DataFormat;
061    import org.apache.camel.spi.ErrorHandlerWrappingStrategy;
062    import org.apache.camel.spi.InterceptStrategy;
063    import org.apache.camel.spi.Policy;
064    import org.apache.camel.spi.RouteContext;
065    import org.apache.commons.logging.Log;
066    import org.apache.commons.logging.LogFactory;
067    
068    /**
069     * Base class for processor types that most XML types extend.
070     *
071     * @version $Revision: 673837 $
072     */
073    @XmlAccessorType(XmlAccessType.PROPERTY)
074    public abstract class ProcessorType<Type extends ProcessorType> extends OptionalIdentifiedType<Type> implements Block {
075        public static final String DEFAULT_TRACE_CATEGORY = "org.apache.camel.TRACE";
076        private static final transient Log LOG = LogFactory.getLog(ProcessorType.class);
077        private ErrorHandlerBuilder errorHandlerBuilder;
078        private Boolean inheritErrorHandlerFlag;
079        private NodeFactory nodeFactory;
080        private LinkedList<Block> blocks = new LinkedList<Block>();
081        private ProcessorType<? extends ProcessorType> parent;
082        private List<InterceptorType> interceptors = new ArrayList<InterceptorType>();
083        private String errorHandlerRef;
084    
085        // else to use an optional attribute in JAXB2
086        public abstract List<ProcessorType<?>> getOutputs();
087    
088    
089        public Processor createProcessor(RouteContext routeContext) throws Exception {
090            throw new UnsupportedOperationException("Not implemented yet for class: " + getClass().getName());
091        }
092    
093        public Processor createOutputsProcessor(RouteContext routeContext) throws Exception {
094            Collection<ProcessorType<?>> outputs = getOutputs();
095            return createOutputsProcessor(routeContext, outputs);
096        }
097    
098        public void addRoutes(RouteContext routeContext, Collection<Route> routes) throws Exception {
099            Processor processor = makeProcessor(routeContext);
100            if (!routeContext.isRouteAdded()) {
101                routeContext.addEventDrivenProcessor(processor);
102            }
103        }
104    
105        /**
106         * Wraps the child processor in whatever necessary interceptors and error
107         * handlers
108         */
109        public Processor wrapProcessor(RouteContext routeContext, Processor processor) throws Exception {
110            processor = wrapProcessorInInterceptors(routeContext, processor);
111            return wrapInErrorHandler(routeContext, processor);
112        }
113    
114        // Fluent API
115        // -------------------------------------------------------------------------
116    
117        /**
118         * Sends the exchange to the given endpoint URI
119         */
120        public Type to(String uri) {
121            addOutput(new ToType(uri));
122            return (Type) this;
123        }
124    
125        /**
126         * Sends the exchange to the given endpoint
127         */
128        public Type to(Endpoint endpoint) {
129            addOutput(new ToType(endpoint));
130            return (Type) this;
131        }
132    
133        /**
134         * Sends the exchange to a list of endpoints using the
135         * {@link MulticastProcessor} pattern
136         */
137        public Type to(String... uris) {
138            for (String uri : uris) {
139                addOutput(new ToType(uri));
140            }
141            return (Type) this;
142        }
143    
144        /**
145         * Sends the exchange to a list of endpoints using the
146         * {@link MulticastProcessor} pattern
147         */
148        public Type to(Endpoint... endpoints) {
149            for (Endpoint endpoint : endpoints) {
150                addOutput(new ToType(endpoint));
151            }
152            return (Type) this;
153        }
154    
155        /**
156         * Sends the exchange to a list of endpoint using the
157         * {@link MulticastProcessor} pattern
158         */
159        public Type to(Collection<Endpoint> endpoints) {
160            for (Endpoint endpoint : endpoints) {
161                addOutput(new ToType(endpoint));
162            }
163            return (Type) this;
164        }
165    
166        /**
167         * Multicasts messages to all its child outputs; so that each processor and
168         * destination gets a copy of the original message to avoid the processors
169         * interfering with each other.
170         */
171        public MulticastType multicast() {
172            MulticastType answer = new MulticastType();
173            addOutput(answer);
174            return answer;
175        }
176    
177        /**
178         * Multicasts messages to all its child outputs; so that each processor and
179         * destination gets a copy of the original message to avoid the processors
180         * interfering with each other.
181         * @param aggregationStrategy the strategy used to aggregate responses for
182         *          every part
183         * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer
184         * @return the multicast type
185         */
186        public MulticastType multicast(AggregationStrategy aggregationStrategy, boolean parallelProcessing) {
187            MulticastType answer = new MulticastType();
188            addOutput(answer);
189            answer.setAggregationStrategy(aggregationStrategy);
190            answer.setParallelProcessing(parallelProcessing);
191            return answer;
192        }
193    
194        /**
195         * Multicasts messages to all its child outputs; so that each processor and
196         * destination gets a copy of the original message to avoid the processors
197         * interfering with each other.
198         * @param aggregationStrategy the strategy used to aggregate responses for
199         *          every part
200         * @return the multicast type
201         */
202        public MulticastType multicast(AggregationStrategy aggregationStrategy) {
203            MulticastType answer = new MulticastType();
204            addOutput(answer);
205            answer.setAggregationStrategy(aggregationStrategy);
206            return answer;
207        }
208    
209        /**
210         * Creates a {@link Pipeline} of the list of endpoints so that the message
211         * will get processed by each endpoint in turn and for request/response the
212         * output of one endpoint will be the input of the next endpoint
213         */
214        public Type pipeline(String... uris) {
215            // TODO pipeline v mulicast
216            return to(uris);
217        }
218    
219        /**
220         * Creates a {@link Pipeline} of the list of endpoints so that the message
221         * will get processed by each endpoint in turn and for request/response the
222         * output of one endpoint will be the input of the next endpoint
223         */
224        public Type pipeline(Endpoint... endpoints) {
225            // TODO pipeline v mulicast
226            return to(endpoints);
227        }
228    
229        /**
230         * Creates a {@link Pipeline} of the list of endpoints so that the message
231         * will get processed by each endpoint in turn and for request/response the
232         * output of one endpoint will be the input of the next endpoint
233         */
234        public Type pipeline(Collection<Endpoint> endpoints) {
235            // TODO pipeline v mulicast
236            return to(endpoints);
237        }
238    
239        /**
240         * Ends the current block
241         */
242        public ProcessorType<? extends ProcessorType> end() {
243            if (blocks.isEmpty()) {
244                if (parent == null) {
245                    throw new IllegalArgumentException("Root node with no active block");
246                }
247                return parent;
248            }
249            popBlock();
250            return this;
251        }
252    
253        /**
254         * Causes subsequent processors to be called asynchronously
255         *
256         * @param coreSize the number of threads that will be used to process
257         *                 messages in subsequent processors.
258         * @return a ThreadType builder that can be used to further configure the
259         *         the thread pool.
260         */
261        public ThreadType thread(int coreSize) {
262            ThreadType answer = new ThreadType(coreSize);
263            addOutput(answer);
264            return answer;
265        }
266    
267        /**
268         * Causes subsequent processors to be called asynchronously
269         *
270         * @param executor the executor that will be used to process
271         *                 messages in subsequent processors.
272         * @return a ThreadType builder that can be used to further configure the
273         *         the thread pool.
274         */
275        public ProcessorType<Type> thread(ThreadPoolExecutor executor) {
276            ThreadType answer = new ThreadType(executor);
277            addOutput(answer);
278            return this;
279        }
280    
281        /**
282         * Creates an {@link IdempotentConsumer} to avoid duplicate messages
283         */
284        public IdempotentConsumerType idempotentConsumer(Expression messageIdExpression,
285                MessageIdRepository messageIdRepository) {
286            IdempotentConsumerType answer = new IdempotentConsumerType(messageIdExpression, messageIdRepository);
287            addOutput(answer);
288            return answer;
289        }
290    
291        /**
292         * Creates an {@link IdempotentConsumer} to avoid duplicate messages
293         *
294         * @return the builder used to create the expression
295         */
296        public ExpressionClause<IdempotentConsumerType> idempotentConsumer(MessageIdRepository messageIdRepository) {
297            IdempotentConsumerType answer = new IdempotentConsumerType();
298            answer.setMessageIdRepository(messageIdRepository);
299            addOutput(answer);
300            return ExpressionClause.createAndSetExpression(answer);
301        }
302    
303        /**
304         * Creates a predicate expression which only if it is true then the
305         * exchange is forwarded to the destination
306         *
307         * @return the clause used to create the filter expression
308         */
309        public ExpressionClause<FilterType> filter() {
310            FilterType filter = new FilterType();
311            addOutput(filter);
312            return ExpressionClause.createAndSetExpression(filter);
313        }
314    
315        /**
316         * Creates a predicate which is applied and only if it is true then the
317         * exchange is forwarded to the destination
318         *
319         * @return the builder for a predicate
320         */
321        public FilterType filter(Predicate predicate) {
322            FilterType filter = new FilterType(predicate);
323            addOutput(filter);
324            return filter;
325        }
326    
327        public FilterType filter(ExpressionType expression) {
328            FilterType filter = getNodeFactory().createFilter();
329            filter.setExpression(expression);
330            addOutput(filter);
331            return filter;
332        }
333    
334        public FilterType filter(String language, String expression) {
335            return filter(new LanguageExpression(language, expression));
336        }
337    
338        public LoadBalanceType loadBalance() {
339            LoadBalanceType answer = new LoadBalanceType();
340            addOutput(answer);
341            return answer;
342        }
343    
344    
345        /**
346         * Creates a choice of one or more predicates with an otherwise clause
347         *
348         * @return the builder for a choice expression
349         */
350        public ChoiceType choice() {
351            ChoiceType answer = new ChoiceType();
352            addOutput(answer);
353            return answer;
354        }
355    
356        /**
357         * Creates a try/catch block
358         *
359         * @return the builder for a tryBlock expression
360         */
361        public TryType tryBlock() {
362            TryType answer = new TryType();
363            addOutput(answer);
364            return answer;
365        }
366    
367        /**
368         * Creates a dynamic <a
369         * href="http://activemq.apache.org/camel/recipient-list.html">Recipient
370         * List</a> pattern.
371         *
372         * @param receipients is the builder of the expression used in the
373         *                    {@link RecipientList} to decide the destinations
374         */
375        public Type recipientList(Expression receipients) {
376            RecipientListType answer = new RecipientListType(receipients);
377            addOutput(answer);
378            return (Type) this;
379        }
380    
381        /**
382         * Creates a dynamic <a
383         * href="http://activemq.apache.org/camel/recipient-list.html">Recipient
384         * List</a> pattern.
385         *
386         * @return the expression clause for the expression used in the
387         *                    {@link RecipientList} to decide the destinations
388         */
389        public ExpressionClause<ProcessorType<Type>> recipientList() {
390            RecipientListType answer = new RecipientListType();
391            addOutput(answer);
392            ExpressionClause<ProcessorType<Type>> clause = new ExpressionClause<ProcessorType<Type>>((Type) this);
393            answer.setExpression(clause);
394            return clause;
395        }
396    
397        /**
398         * Creates a <a
399         * href="http://activemq.apache.org/camel/routing-slip.html">Routing
400         * Slip</a> pattern.
401         *
402         * @param header is the header that the {@link org.apache.camel.processor.RoutingSlip RoutingSlip}
403         * class will look in for the list of URIs to route the message to.
404         * @param uriDelimiter is the delimiter that will be used to split up
405         * the list of URIs in the routing slip.
406         */
407        public Type routingSlip(String header, String uriDelimiter) {
408            RoutingSlipType answer = new RoutingSlipType(header, uriDelimiter);
409            addOutput(answer);
410            return (Type) this;
411        }
412    
413        /**
414         * Creates a <a
415         * href="http://activemq.apache.org/camel/routing-slip.html">Routing
416         * Slip</a> pattern.
417         *
418         * @param header is the header that the {@link org.apache.camel.processor.RoutingSlip RoutingSlip}
419         * class will look in for the list of URIs to route the message to. The list of URIs
420         * will be split based on the default delimiter
421         * {@link RoutingSlipType#DEFAULT_DELIMITER}.
422         */
423        public Type routingSlip(String header) {
424            RoutingSlipType answer = new RoutingSlipType(header);
425            addOutput(answer);
426            return (Type) this;
427        }
428    
429        /**
430         * Creates a <a
431         * href="http://activemq.apache.org/camel/routing-slip.html">Routing
432         * Slip</a> pattern with the default header {@link RoutingSlipType#ROUTING_SLIP_HEADER}.
433         * The list of URIs in the header will be split based on the default delimiter
434         * {@link RoutingSlipType#DEFAULT_DELIMITER}.
435         */
436        public Type routingSlip() {
437            RoutingSlipType answer = new RoutingSlipType();
438            addOutput(answer);
439            return (Type) this;
440        }
441    
442        /**
443         * Creates the <a
444         * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
445         * pattern where an expression is evaluated to iterate through each of the
446         * parts of a message and then each part is then send to some endpoint.
447         * This splitter responds with the latest message returned from destination
448         * endpoint.
449         *
450         * @param receipients the expression on which to split
451         * @return the builder
452         */
453        public SplitterType splitter(Expression receipients) {
454            SplitterType answer = new SplitterType(receipients);
455            addOutput(answer);
456            return answer;
457        }
458    
459        /**
460         * Creates the <a
461         * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
462         * pattern where an expression is evaluated to iterate through each of the
463         * parts of a message and then each part is then send to some endpoint.
464         * This splitter responds with the latest message returned from destination
465         * endpoint.
466         *
467         * @return the expression clause for the expression on which to split
468         */
469        public ExpressionClause<SplitterType> splitter() {
470            SplitterType answer = new SplitterType();
471            addOutput(answer);
472            return ExpressionClause.createAndSetExpression(answer);
473        }
474    
475        /**
476         * Creates the <a
477         * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
478         * pattern where an expression is evaluated to iterate through each of the
479         * parts of a message and then each part is then send to some endpoint.
480         * Answer from the splitter is produced using given {@link AggregationStrategy}
481         * @param partsExpression the expression on which to split
482         * @param aggregationStrategy the strategy used to aggregate responses for
483         *          every part
484         * @return the builder
485         */
486        public SplitterType splitter(Expression partsExpression, AggregationStrategy aggregationStrategy) {
487            SplitterType answer = new SplitterType(partsExpression);
488            addOutput(answer);
489            answer.setAggregationStrategy(aggregationStrategy);
490            return answer;
491        }
492    
493        /**
494         * Creates the <a
495         * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
496         * pattern where an expression is evaluated to iterate through each of the
497         * parts of a message and then each part is then send to some endpoint.
498         * Answer from the splitter is produced using given {@link AggregationStrategy}
499         * @param aggregationStrategy the strategy used to aggregate responses for
500         *          every part
501         * @return the expression clause for the expression on which to split
502         */
503        public ExpressionClause<SplitterType> splitter(AggregationStrategy aggregationStrategy) {
504            SplitterType answer = new SplitterType();
505            addOutput(answer);
506            answer.setAggregationStrategy(aggregationStrategy);
507            return ExpressionClause.createAndSetExpression(answer);
508        }
509    
510        /**
511         * Creates the <a
512         * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
513         * pattern where an expression is evaluated to iterate through each of the
514         * parts of a message and then each part is then send to some endpoint.
515         * This splitter responds with the latest message returned from destination
516         * endpoint.
517         *
518         * @param receipients the expression on which to split
519         * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer
520         * @return the builder
521         */
522        public SplitterType splitter(Expression receipients, boolean parallelProcessing) {
523            SplitterType answer = new SplitterType(receipients);
524            addOutput(answer);
525            answer.setParallelProcessing(parallelProcessing);
526            return answer;
527        }
528    
529        /**
530         * Creates the <a
531         * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
532         * pattern where an expression is evaluated to iterate through each of the
533         * parts of a message and then each part is then send to some endpoint.
534         * This splitter responds with the latest message returned from destination
535         * endpoint.
536         *
537         * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer
538         * @return the expression clause for the expression on which to split
539         */
540        public ExpressionClause<SplitterType> splitter(boolean parallelProcessing) {
541            SplitterType answer = new SplitterType();
542            addOutput(answer);
543            answer.setParallelProcessing(parallelProcessing);
544            return ExpressionClause.createAndSetExpression(answer);
545        }
546    
547        /**
548         * Creates the <a
549         * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
550         * pattern where an expression is evaluated to iterate through each of the
551         * parts of a message and then each part is then send to some endpoint.
552         * Answer from the splitter is produced using given {@link AggregationStrategy}
553         * @param partsExpression the expression on which to split
554         * @param aggregationStrategy the strategy used to aggregate responses for
555         *          every part
556         * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer
557         * @return the builder
558         */
559        public SplitterType splitter(Expression partsExpression,
560                AggregationStrategy aggregationStrategy, boolean parallelProcessing) {
561            SplitterType answer = new SplitterType(partsExpression);
562            addOutput(answer);
563            answer.setAggregationStrategy(aggregationStrategy);
564            answer.setParallelProcessing(parallelProcessing);
565            return answer;
566        }
567    
568        /**
569         * Creates the <a
570         * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
571         * pattern where an expression is evaluated to iterate through each of the
572         * parts of a message and then each part is then send to some endpoint.
573         * Answer from the splitter is produced using given {@link AggregationStrategy}
574         * @param aggregationStrategy the strategy used to aggregate responses for
575         *          every part
576         * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer
577         * @return the expression clause for the expression on which to split
578         */
579        public ExpressionClause<SplitterType> splitter(AggregationStrategy aggregationStrategy, boolean parallelProcessing) {
580            SplitterType answer = new SplitterType();
581            addOutput(answer);
582            answer.setAggregationStrategy(aggregationStrategy);
583            answer.setParallelProcessing(parallelProcessing);
584            return ExpressionClause.createAndSetExpression(answer);
585        }
586    
587    
588        /**
589         * Creates the <a
590         * href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a>
591         * pattern where a list of expressions are evaluated to be able to compare
592         * the message exchanges to reorder them. e.g. you may wish to sort by some
593         * headers
594         *
595         * @return the expression clause for the expressions on which to compare messages in order
596         */
597        public ExpressionClause<ResequencerType> resequencer() {
598            ResequencerType answer = new ResequencerType();
599            addOutput(answer);
600            ExpressionClause<ResequencerType> clause = new ExpressionClause<ResequencerType>(answer);
601            answer.expression(clause);
602            return clause;
603        }
604    
605        /**
606         * Creates the <a
607         * href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a>
608         * pattern where an expression is evaluated to be able to compare the
609         * message exchanges to reorder them. e.g. you may wish to sort by some
610         * header
611         *
612         * @param expression the expression on which to compare messages in order
613         * @return the builder
614         */
615        public ResequencerType resequencer(Expression<Exchange> expression) {
616            return resequencer(Collections.<Expression>singletonList(expression));
617        }
618    
619        /**
620         * Creates the <a
621         * href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a>
622         * pattern where a list of expressions are evaluated to be able to compare
623         * the message exchanges to reorder them. e.g. you may wish to sort by some
624         * headers
625         *
626         * @param expressions the expressions on which to compare messages in order
627         * @return the builder
628         */
629        public ResequencerType resequencer(List<Expression> expressions) {
630            ResequencerType answer = new ResequencerType(expressions);
631            addOutput(answer);
632            return answer;
633        }
634    
635        /**
636         * Creates the <a
637         * href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a>
638         * pattern where a list of expressions are evaluated to be able to compare
639         * the message exchanges to reorder them. e.g. you may wish to sort by some
640         * headers
641         *
642         * @param expressions the expressions on which to compare messages in order
643         * @return the builder
644         */
645        public ResequencerType resequencer(Expression... expressions) {
646            List<Expression> list = new ArrayList<Expression>();
647            list.addAll(Arrays.asList(expressions));
648            return resequencer(list);
649        }
650    
651        /**
652         * Creates an <a
653         * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a>
654         * pattern where a batch of messages are processed (up to a maximum amount
655         * or until some timeout is reached) and messages for the same correlation
656         * key are combined together using some kind of {@link AggregationStrategy}
657         * (by default the latest message is used) to compress many message exchanges
658         * into a smaller number of exchanges.
659         * <p/>
660         * A good example of this is stock market data; you may be receiving 30,000
661         * messages/second and you may want to throttle it right down so that multiple
662         * messages for the same stock are combined (or just the latest message is used
663         * and older prices are discarded). Another idea is to combine line item messages
664         * together into a single invoice message.
665         */
666        public ExpressionClause<AggregatorType> aggregator() {
667            AggregatorType answer = new AggregatorType();
668            addOutput(answer);
669            return ExpressionClause.createAndSetExpression(answer);
670        }
671    
672        /**
673         * Creates an <a
674         * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a>
675         * pattern where a batch of messages are processed (up to a maximum amount
676         * or until some timeout is reached) and messages for the same correlation
677         * key are combined together using some kind of {@link AggregationStrategy}
678         * (by default the latest message is used) to compress many message exchanges
679         * into a smaller number of exchanges.
680         * <p/>
681         * A good example of this is stock market data; you may be receiving 30,000
682         * messages/second and you may want to throttle it right down so that multiple
683         * messages for the same stock are combined (or just the latest message is used
684         * and older prices are discarded). Another idea is to combine line item messages
685         * together into a single invoice message.
686         *
687         * @param aggregationStrategy the strategy used for the aggregation
688         */
689        public ExpressionClause<AggregatorType> aggregator(AggregationStrategy aggregationStrategy) {
690            AggregatorType answer = new AggregatorType();
691            answer.setAggregationStrategy(aggregationStrategy);
692            addOutput(answer);
693            return ExpressionClause.createAndSetExpression(answer);
694        }
695    
696        /**
697         * Creates an <a
698         * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a>
699         * pattern using a custom aggregation collection implementation.
700         *
701         * @param aggregationCollection the collection used to perform the aggregation
702         */
703        public ExpressionClause<AggregatorType> aggregator(AggregationCollection aggregationCollection) {
704            AggregatorType answer = new AggregatorType();
705            answer.setAggregationCollection(aggregationCollection);
706            addOutput(answer);
707            return ExpressionClause.createAndSetExpression(answer);
708        }
709    
710        /**
711         * Creates an <a
712         * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a>
713         * pattern where a batch of messages are processed (up to a maximum amount
714         * or until some timeout is reached) and messages for the same correlation
715         * key are combined together using some kind of {@link AggregationStrategy}
716         * (by default the latest message is used) to compress many message exchanges
717         * into a smaller number of exchanges.
718         * <p/>
719         * A good example of this is stock market data; you may be receiving 30,000
720         * messages/second and you may want to throttle it right down so that multiple
721         * messages for the same stock are combined (or just the latest message is used
722         * and older prices are discarded). Another idea is to combine line item messages
723         * together into a single invoice message.
724         *
725         * @param correlationExpression the expression used to calculate the
726         *                              correlation key. For a JMS message this could be the
727         *                              expression <code>header("JMSDestination")</code> or
728         *                              <code>header("JMSCorrelationID")</code>
729         */
730        public AggregatorType aggregator(Expression correlationExpression) {
731            AggregatorType answer = new AggregatorType(correlationExpression);
732            addOutput(answer);
733            return answer;
734        }
735    
736        /**
737         * Creates an <a
738         * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a>
739         * pattern where a batch of messages are processed (up to a maximum amount
740         * or until some timeout is reached) and messages for the same correlation
741         * key are combined together using some kind of {@link AggregationStrategy}
742         * (by default the latest message is used) to compress many message exchanges
743         * into a smaller number of exchanges.
744         * <p/>
745         * A good example of this is stock market data; you may be receiving 30,000
746         * messages/second and you may want to throttle it right down so that multiple
747         * messages for the same stock are combined (or just the latest message is used
748         * and older prices are discarded). Another idea is to combine line item messages
749         * together into a single invoice message.
750         *
751         * @param correlationExpression the expression used to calculate the
752         *                              correlation key. For a JMS message this could be the
753         *                              expression <code>header("JMSDestination")</code> or
754         *                              <code>header("JMSCorrelationID")</code>
755         */
756        public AggregatorType aggregator(Expression correlationExpression, AggregationStrategy aggregationStrategy) {
757            AggregatorType answer = new AggregatorType(correlationExpression, aggregationStrategy);
758            addOutput(answer);
759            return answer;
760        }
761    
762        /**
763         * Creates the <a
764         * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern
765         * where an expression is used to calculate the time which the message will
766         * be dispatched on
767         *
768         * @param processAtExpression an expression to calculate the time at which
769         *                            the messages should be processed
770         * @return the builder
771         */
772        public DelayerType delayer(Expression<Exchange> processAtExpression) {
773            return delayer(processAtExpression, 0L);
774        }
775    
776        /**
777         * Creates the <a
778         * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern
779         * where an expression is used to calculate the time which the message will
780         * be dispatched on
781         *
782         * @param processAtExpression an expression to calculate the time at which
783         *                            the messages should be processed
784         * @param delay               the delay in milliseconds which is added to the
785         *                            processAtExpression to determine the time the message
786         *                            should be processed
787         * @return the builder
788         */
789        public DelayerType delayer(Expression<Exchange> processAtExpression, long delay) {
790            DelayerType answer = new DelayerType(processAtExpression, delay);
791            addOutput(answer);
792            return answer;
793        }
794    
795        /**
796         * Creates the <a
797         * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern
798         * where an expression is used to calculate the time which the message will
799         * be dispatched on
800         * @return the expression clause to create the expression
801         */
802        public ExpressionClause<DelayerType> delayer() {
803            DelayerType answer = new DelayerType();
804            addOutput(answer);
805            return ExpressionClause.createAndSetExpression(answer);
806        }
807    
808        /**
809         * Creates the <a
810         * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern
811         * where a fixed amount of milliseconds are used to delay processing of a
812         * message exchange
813         *
814         * @param delay the default delay in milliseconds
815         * @return the builder
816         */
817        public DelayerType delayer(long delay) {
818            return delayer(null, delay);
819        }
820    
821        /**
822         * Creates the <a
823         * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern
824         * where an expression is used to calculate the time which the message will
825         * be dispatched on
826         *
827         * @return the builder
828         */
829        public ThrottlerType throttler(long maximumRequestCount) {
830            ThrottlerType answer = new ThrottlerType(maximumRequestCount);
831            addOutput(answer);
832            return answer;
833        }
834    
835    
836        public Type throwFault(Throwable fault) {
837            ThrowFaultType answer = new ThrowFaultType();
838            answer.setFault(fault);
839            addOutput(answer);
840            return (Type) this;
841        }
842    
843        public Type throwFault(String message) {
844            return throwFault(new CamelException(message));
845        }
846    
847        /**
848         * Intercepts outputs added to this node in the future (i.e. intercepts outputs added after this statement)
849         */
850        public Type interceptor(String ref) {
851            InterceptorRef interceptor = new InterceptorRef(ref);
852            intercept(interceptor);
853            return (Type) this;
854        }
855    
856        /**
857         * Intercepts outputs added to this node in the future (i.e. intercepts outputs added after this statement)
858         */
859        public Type intercept(DelegateProcessor interceptor) {
860            intercept(new InterceptorRef(interceptor));
861            //lastInterceptor = interceptor;
862            return (Type) this;
863        }
864    
865        /**
866         * Intercepts outputs added to this node in the future (i.e. intercepts outputs added after this statement)
867         */
868        public InterceptType intercept() {
869            InterceptType answer = new InterceptType();
870            addOutput(answer);
871            return answer;
872        }
873    
874        /**
875         * Intercepts outputs added to this node in the future (i.e. intercepts outputs added after this statement)
876         */
877        public void intercept(InterceptorType interceptor) {
878            addOutput(interceptor);
879            pushBlock(interceptor);
880        }
881    
882        /**
883         * Adds an interceptor around the whole of this nodes processing
884         *
885         * @param interceptor
886         */
887        public void addInterceptor(InterceptorType interceptor) {
888            interceptors.add(interceptor);
889        }
890    
891        /**
892         * Adds an interceptor around the whole of this nodes processing
893         *
894         * @param interceptor
895         */
896        public void addInterceptor(DelegateProcessor interceptor) {
897            addInterceptor(new InterceptorRef(interceptor));
898        }
899    
900        protected void pushBlock(Block block) {
901            blocks.add(block);
902        }
903    
904        protected Block popBlock() {
905            return blocks.isEmpty() ? null : blocks.removeLast();
906        }
907    
908        public Type proceed() {
909            ProceedType proceed = null;
910            ProcessorType currentProcessor = this;
911    
912            if (currentProcessor instanceof InterceptType) {
913                proceed = ((InterceptType) currentProcessor).getProceed();
914                LOG.info("proceed() is the implied and hence not needed for an intercept()");
915            }
916            if (proceed == null) {
917                for (ProcessorType node = parent; node != null; node = node.getParent()) {
918                    if (node instanceof InterceptType) {
919                        InterceptType intercept = (InterceptType)node;
920                        proceed = intercept.getProceed();
921                        break;
922                    }
923                }
924    
925                if (proceed == null) {
926                    throw new IllegalArgumentException("Cannot use proceed() without being within an intercept() block");
927                }
928    
929            }
930    
931            addOutput(proceed);
932            return (Type) this;
933        }
934    
935        public Type stop() {
936            ProcessorType currentProcessor = this;
937    
938            if (currentProcessor instanceof InterceptType) {
939                ((InterceptType) currentProcessor).stopIntercept();
940            } else {
941                ProcessorType node;
942                for (node = parent; node != null; node = node.getParent()) {
943                    if (node instanceof InterceptType) {
944                        ((InterceptType) node).stopIntercept();
945                        break;
946                    }
947                }
948                if (node == null) {
949                    throw new IllegalArgumentException("Cannot use stop() without being within an intercept() block");
950                }
951            }
952    
953            return (Type) this;
954        }
955    
956        public ExceptionType exception(Class exceptionType) {
957            ExceptionType answer = new ExceptionType(exceptionType);
958            addOutput(answer);
959            return answer;
960        }
961    
962        /**
963         * Apply an interceptor route if the predicate is true
964         */
965        public ChoiceType intercept(Predicate predicate) {
966            InterceptType answer = new InterceptType();
967            addOutput(answer);
968            return answer.when(predicate);
969        }
970    
971        public Type interceptors(String... refs) {
972            for (String ref : refs) {
973                interceptor(ref);
974            }
975            return (Type) this;
976        }
977    
978        /**
979         * Trace logs the exchange before it goes to the next processing step using
980         * the {@link #DEFAULT_TRACE_CATEGORY} logging category.
981         */
982        public Type trace() {
983            return trace(DEFAULT_TRACE_CATEGORY);
984        }
985    
986        /**
987         * Trace logs the exchange before it goes to the next processing step using
988         * the specified logging category.
989         *
990         * @param category the logging category trace messages will sent to.
991         */
992        public Type trace(String category) {
993            final Log log = LogFactory.getLog(category);
994            return intercept(new DelegateProcessor() {
995                @Override
996                public void process(Exchange exchange) throws Exception {
997                    log.trace(exchange);
998                    processNext(exchange);
999                }
1000            });
1001        }
1002    
1003        public PolicyRef policies() {
1004            PolicyRef answer = new PolicyRef();
1005            addOutput(answer);
1006            return answer;
1007        }
1008    
1009        public PolicyRef policy(Policy policy) {
1010            PolicyRef answer = new PolicyRef(policy);
1011            addOutput(answer);
1012            return answer;
1013        }
1014    
1015        /**
1016         * Forces handling of faults as exceptions
1017         *
1018         * @return the current builder with the fault handler configured
1019         */
1020        public Type handleFault() {
1021            intercept(new HandleFaultType());
1022            return (Type) this;
1023        }
1024    
1025        /**
1026         * Installs the given error handler builder
1027         *
1028         * @param errorHandlerBuilder the error handler to be used by default for
1029         *                            all child routes
1030         * @return the current builder with the error handler configured
1031         */
1032        public Type errorHandler(ErrorHandlerBuilder errorHandlerBuilder) {
1033            setErrorHandlerBuilder(errorHandlerBuilder);
1034            return (Type) this;
1035        }
1036    
1037        /**
1038         * Configures whether or not the error handler is inherited by every
1039         * processing node (or just the top most one)
1040         *
1041         * @param condition the flag as to whether error handlers should be
1042         *                  inherited or not
1043         * @return the current builder
1044         */
1045        public Type inheritErrorHandler(boolean condition) {
1046            setInheritErrorHandlerFlag(condition);
1047            return (Type) this;
1048        }
1049    
1050        // Transformers
1051        // -------------------------------------------------------------------------
1052    
1053        /**
1054         * Adds the custom processor to this destination which could be a final
1055         * destination, or could be a transformation in a pipeline
1056         */
1057        public Type process(Processor processor) {
1058            ProcessorRef answer = new ProcessorRef(processor);
1059            addOutput(answer);
1060            return (Type) this;
1061        }
1062    
1063        /**
1064         * Adds the custom processor reference to this destination which could be a final
1065         * destination, or could be a transformation in a pipeline
1066         */
1067        public Type processRef(String ref) {
1068            ProcessorRef answer = new ProcessorRef();
1069            answer.setRef(ref);
1070            addOutput(answer);
1071            return (Type) this;
1072        }
1073    
1074        /**
1075         * Adds a bean which is invoked which could be a final destination, or could
1076         * be a transformation in a pipeline
1077         */
1078        public Type bean(Object bean) {
1079            BeanRef answer = new BeanRef();
1080            answer.setBean(bean);
1081            addOutput(answer);
1082            return (Type) this;
1083        }
1084    
1085        /**
1086         * Adds a bean and method which is invoked which could be a final
1087         * destination, or could be a transformation in a pipeline
1088         */
1089        public Type bean(Object bean, String method) {
1090            BeanRef answer = new BeanRef();
1091            answer.setBean(bean);
1092            answer.setMethod(method);
1093            addOutput(answer);
1094            return (Type) this;
1095        }
1096    
1097        /**
1098         * Adds a bean by type which is invoked which could be a final destination, or could
1099         * be a transformation in a pipeline
1100         */
1101        public Type bean(Class beanType) {
1102            BeanRef answer = new BeanRef();
1103            answer.setBeanType(beanType);
1104            addOutput(answer);
1105            return (Type) this;
1106        }
1107    
1108        /**
1109         * Adds a bean type and method which is invoked which could be a final
1110         * destination, or could be a transformation in a pipeline
1111         */
1112        public Type bean(Class beanType, String method) {
1113            BeanRef answer = new BeanRef();
1114            answer.setBeanType(beanType);
1115            answer.setMethod(method);
1116            addOutput(answer);
1117            return (Type) this;
1118        }
1119    
1120        /**
1121         * Adds a bean which is invoked which could be a final destination, or could
1122         * be a transformation in a pipeline
1123         */
1124        public Type beanRef(String ref) {
1125            BeanRef answer = new BeanRef(ref);
1126            addOutput(answer);
1127            return (Type) this;
1128        }
1129    
1130        /**
1131         * Adds a bean and method which is invoked which could be a final
1132         * destination, or could be a transformation in a pipeline
1133         */
1134        public Type beanRef(String ref, String method) {
1135            BeanRef answer = new BeanRef(ref, method);
1136            addOutput(answer);
1137            return (Type) this;
1138        }
1139    
1140        /**
1141         * Adds a processor which sets the body on the IN message
1142         */
1143        public ExpressionClause<ProcessorType<Type>> setBody() {
1144            ExpressionClause<ProcessorType<Type>> clause = new ExpressionClause<ProcessorType<Type>>((Type) this);
1145            SetBodyType answer = new SetBodyType(clause);
1146            addOutput(answer);
1147            return clause;
1148        }
1149    
1150        /**
1151         * Adds a processor which sets the body on the IN message
1152         */
1153        public Type setBody(Expression expression) {
1154            SetBodyType answer = new SetBodyType(expression);
1155            addOutput(answer);
1156            return (Type) this;
1157        }
1158    
1159        /**
1160         * Adds a processor which sets the body on the OUT message
1161         *
1162         * @deprecated Please use {@link #transform(Expression)} instead. Will be removed in Camel 2.0.
1163         */
1164        @Deprecated
1165        public Type setOutBody(Expression expression) {
1166            return transform(expression);
1167        }
1168    
1169        /**
1170         * Adds a processor which sets the body on the OUT message
1171         *
1172         * @deprecated Please use {@link #transform()} instead. Will be removed in Camel 2.0.
1173         */
1174        @Deprecated
1175        public ExpressionClause<ProcessorType<Type>> setOutBody() {
1176            return transform();
1177        }
1178    
1179        /**
1180         * Adds a processor which sets the body on the OUT message
1181         */
1182        public Type transform(Expression expression) {
1183            TransformType answer = new TransformType(expression);
1184            addOutput(answer);
1185            return (Type) this;
1186        }
1187    
1188        /**
1189         * Adds a processor which sets the body on the OUT message
1190         */
1191        public ExpressionClause<ProcessorType<Type>> transform() {
1192            ExpressionClause<ProcessorType<Type>> clause = new ExpressionClause<ProcessorType<Type>>((Type) this);
1193            TransformType answer = new TransformType(clause);
1194            addOutput(answer);
1195            return clause;
1196        }
1197    
1198        /**
1199         * Adds a processor which sets the body on the FAULT message
1200         */
1201        public Type setFaultBody(Expression expression) {
1202            return process(ProcessorBuilder.setFaultBody(expression));
1203        }
1204    
1205        /**
1206         * Adds a processor which sets the header on the IN message
1207         */
1208        public ExpressionClause<ProcessorType<Type>> setHeader(String name) {
1209            ExpressionClause<ProcessorType<Type>> clause = new ExpressionClause<ProcessorType<Type>>((Type) this);
1210            SetHeaderType answer = new SetHeaderType(name, clause);
1211            addOutput(answer);
1212            return clause;
1213        }
1214    
1215        /**
1216         * Adds a processor which sets the header on the IN message
1217         */
1218        public Type setHeader(String name, Expression expression) {
1219            SetHeaderType answer = new SetHeaderType(name, expression);
1220            addOutput(answer);
1221            return (Type) this;
1222        }
1223    
1224        /**
1225         * Adds a processor which sets the header on the IN message to the given value
1226         */
1227        public Type setHeader(String name, String value) {
1228            SetHeaderType answer = new SetHeaderType(name, value);
1229            addOutput(answer);
1230            return (Type) this;
1231        }
1232    
1233        /**
1234         * Adds a processor which sets the header on the OUT message
1235         */
1236        public ExpressionClause<ProcessorType<Type>> setOutHeader(String name) {
1237            ExpressionClause<ProcessorType<Type>> clause = new ExpressionClause<ProcessorType<Type>>((Type) this);
1238            process(ProcessorBuilder.setOutHeader(name, clause));
1239            return clause;
1240        }
1241    
1242        /**
1243         * Adds a processor which sets the header on the OUT message
1244         */
1245        public Type setOutHeader(String name, Expression expression) {
1246            return process(ProcessorBuilder.setOutHeader(name, expression));
1247        }
1248    
1249        /**
1250         * Adds a processor which sets the header on the OUT message
1251         */
1252        public Type setOutHeader(String name, String value) {
1253            return (Type) setOutHeader(name).constant(value);
1254        }
1255    
1256        /**
1257         * Adds a processor which sets the header on the FAULT message
1258         */
1259        public Type setFaultHeader(String name, Expression expression) {
1260            return process(ProcessorBuilder.setFaultHeader(name, expression));
1261        }
1262    
1263        /**
1264         * Adds a processor which sets the exchange property
1265         */
1266        public Type setProperty(String name, Expression expression) {
1267            return process(ProcessorBuilder.setProperty(name, expression));
1268        }
1269    
1270    
1271        /**
1272         * Adds a processor which sets the exchange property
1273         */
1274        public ExpressionClause<ProcessorType<Type>> setProperty(String name) {
1275            ExpressionClause<ProcessorType<Type>> clause = new ExpressionClause<ProcessorType<Type>>((Type) this);
1276            process(ProcessorBuilder.setProperty(name, clause));
1277            return clause;
1278        }
1279    
1280        /**
1281         * Adds a processor which removes the header on the IN message
1282         */
1283        public Type removeHeader(String name) {
1284            return process(ProcessorBuilder.removeHeader(name));
1285        }
1286    
1287        /**
1288         * Adds a processor which removes the header on the OUT message
1289         */
1290        public Type removeOutHeader(String name) {
1291            return process(ProcessorBuilder.removeOutHeader(name));
1292        }
1293    
1294        /**
1295         * Adds a processor which removes the header on the FAULT message
1296         */
1297        public Type removeFaultHeader(String name) {
1298            return process(ProcessorBuilder.removeFaultHeader(name));
1299        }
1300    
1301        /**
1302         * Adds a processor which removes the exchange property
1303         */
1304        public Type removeProperty(String name) {
1305            return process(ProcessorBuilder.removeProperty(name));
1306        }
1307    
1308        /**
1309         * Converts the IN message body to the specified type
1310         */
1311        public Type convertBodyTo(Class type) {
1312            addOutput(new ConvertBodyType(type));
1313            return (Type) this;
1314        }
1315    
1316        /**
1317         * Converts the OUT message body to the specified type
1318         *
1319         * @deprecated Please use {@link #convertBodyTo(Class)} instead. Will be removed in Camel 2.0.
1320         */
1321        @Deprecated
1322        public Type convertOutBodyTo(Class type) {
1323            return process(new ConvertBodyProcessor(type));
1324        }
1325    
1326        /**
1327         * Converts the FAULT message body to the specified type
1328         *
1329         * @deprecated Please use {@link #convertBodyTo(Class)} instead. Will be removed in Camel 2.0.
1330         */
1331        @Deprecated
1332        public Type convertFaultBodyTo(Class type) {
1333            return process(new ConvertBodyProcessor(type));
1334        }
1335    
1336        // DataFormat support
1337        // -------------------------------------------------------------------------
1338    
1339        /**
1340         * Unmarshals the in body using a {@link DataFormat} expression to define
1341         * the format of the input message and the output will be set on the out message body.
1342         *
1343         * @return the expression to create the {@link DataFormat}
1344         */
1345        public DataFormatClause<ProcessorType<Type>> unmarshal() {
1346            return new DataFormatClause<ProcessorType<Type>>(this, DataFormatClause.Operation.Unmarshal);
1347        }
1348    
1349        /**
1350         * Unmarshals the in body using the specified {@link DataFormat}
1351         * and sets the output on the out message body.
1352         *
1353         * @return this object
1354         */
1355        public Type unmarshal(DataFormatType dataFormatType) {
1356            addOutput(new UnmarshalType(dataFormatType));
1357            return (Type) this;
1358        }
1359    
1360        /**
1361         * Unmarshals the in body using the specified {@link DataFormat}
1362         * and sets the output on the out message body.
1363         *
1364         * @return this object
1365         */
1366        public Type unmarshal(DataFormat dataFormat) {
1367            return unmarshal(new DataFormatType(dataFormat));
1368        }
1369    
1370        /**
1371         * Unmarshals the in body using the specified {@link DataFormat}
1372         * reference in the {@link Registry} and sets the output on the out message body.
1373         *
1374         * @return this object
1375         */
1376        public Type unmarshal(String dataTypeRef) {
1377            addOutput(new UnmarshalType(dataTypeRef));
1378            return (Type) this;
1379        }
1380    
1381        /**
1382         * Marshals the in body using a {@link DataFormat} expression to define
1383         * the format of the output which will be added to the out body.
1384         *
1385         * @return the expression to create the {@link DataFormat}
1386         */
1387        public DataFormatClause<ProcessorType<Type>> marshal() {
1388            return new DataFormatClause<ProcessorType<Type>>(this, DataFormatClause.Operation.Marshal);
1389        }
1390    
1391        /**
1392         * Marshals the in body using the specified {@link DataFormat}
1393         * and sets the output on the out message body.
1394         *
1395         * @return this object
1396         */
1397        public Type marshal(DataFormatType dataFormatType) {
1398            addOutput(new MarshalType(dataFormatType));
1399            return (Type) this;
1400        }
1401    
1402        /**
1403         * Marshals the in body using the specified {@link DataFormat}
1404         * and sets the output on the out message body.
1405         *
1406         * @return this object
1407         */
1408        public Type marshal(DataFormat dataFormat) {
1409            return marshal(new DataFormatType(dataFormat));
1410        }
1411    
1412        /**
1413         * Marshals the in body the specified {@link DataFormat}
1414         * reference in the {@link Registry} and sets the output on the out message body.
1415         *
1416         * @return this object
1417         */
1418        public Type marshal(String dataTypeRef) {
1419            addOutput(new MarshalType(dataTypeRef));
1420            return (Type) this;
1421        }
1422    
1423        // Properties
1424        // -------------------------------------------------------------------------
1425        @XmlTransient
1426        public ProcessorType<? extends ProcessorType> getParent() {
1427            return parent;
1428        }
1429    
1430        public void setParent(ProcessorType<? extends ProcessorType> parent) {
1431            this.parent = parent;
1432        }
1433    
1434        @XmlTransient
1435        public ErrorHandlerBuilder getErrorHandlerBuilder() {
1436            if (errorHandlerBuilder == null) {
1437                errorHandlerBuilder = createErrorHandlerBuilder();
1438            }
1439            return errorHandlerBuilder;
1440        }
1441    
1442        /**
1443         * Sets the error handler to use with processors created by this builder
1444         */
1445        public void setErrorHandlerBuilder(ErrorHandlerBuilder errorHandlerBuilder) {
1446            this.errorHandlerBuilder = errorHandlerBuilder;
1447        }
1448    
1449        /**
1450         * Sets the error handler if one is not already set
1451         */
1452        protected void setErrorHandlerBuilderIfNull(ErrorHandlerBuilder errorHandlerBuilder) {
1453            if (this.errorHandlerBuilder == null) {
1454                setErrorHandlerBuilder(errorHandlerBuilder);
1455            }
1456        }
1457    
1458        public String getErrorHandlerRef() {
1459            return errorHandlerRef;
1460        }
1461    
1462        /**
1463         * Sets the bean ref name of the error handler builder to use on this route
1464         */
1465        @XmlAttribute(required = false)
1466        public void setErrorHandlerRef(String errorHandlerRef) {
1467            this.errorHandlerRef = errorHandlerRef;
1468            setErrorHandlerBuilder(new ErrorHandlerBuilderRef(errorHandlerRef));
1469        }
1470    
1471        @XmlTransient
1472        public boolean isInheritErrorHandler() {
1473            return isInheritErrorHandler(getInheritErrorHandlerFlag());
1474        }
1475    
1476        /**
1477         * Lets default the inherit value to be true if there is none specified
1478         */
1479        public static boolean isInheritErrorHandler(Boolean value) {
1480            return value == null || value.booleanValue();
1481        }
1482    
1483        @XmlAttribute(name = "inheritErrorHandler", required = false)
1484        public Boolean getInheritErrorHandlerFlag() {
1485            return inheritErrorHandlerFlag;
1486        }
1487    
1488        public void setInheritErrorHandlerFlag(Boolean inheritErrorHandlerFlag) {
1489            this.inheritErrorHandlerFlag = inheritErrorHandlerFlag;
1490        }
1491    
1492        @XmlTransient
1493        public NodeFactory getNodeFactory() {
1494            if (nodeFactory == null) {
1495                nodeFactory = new NodeFactory();
1496            }
1497            return nodeFactory;
1498        }
1499    
1500        public void setNodeFactory(NodeFactory nodeFactory) {
1501            this.nodeFactory = nodeFactory;
1502        }
1503    
1504        /**
1505         * Returns a label to describe this node such as the expression if some kind of expression node
1506         */
1507        public String getLabel() {
1508            return "";
1509        }
1510    
1511        // Implementation methods
1512        // -------------------------------------------------------------------------
1513    
1514        /**
1515         * Creates the processor and wraps it in any necessary interceptors and
1516         * error handlers
1517         */
1518        protected Processor makeProcessor(RouteContext routeContext) throws Exception {
1519            Processor processor = createProcessor(routeContext);
1520            return wrapProcessor(routeContext, processor);
1521        }
1522    
1523        /**
1524         * A strategy method which allows derived classes to wrap the child
1525         * processor in some kind of interceptor
1526         *
1527         * @param routeContext
1528         * @param target       the processor which can be wrapped
1529         * @return the original processor or a new wrapped interceptor
1530         */
1531        protected Processor wrapProcessorInInterceptors(RouteContext routeContext, Processor target) throws Exception {
1532            // The target is required.
1533            if (target == null) {
1534                throw new RuntimeCamelException("target not provided.");
1535            }
1536    
1537            List<InterceptStrategy> strategies = new ArrayList<InterceptStrategy>();
1538            CamelContext camelContext = routeContext.getCamelContext();
1539            if (camelContext instanceof DefaultCamelContext) {
1540                DefaultCamelContext defaultCamelContext = (DefaultCamelContext) camelContext;
1541                strategies.addAll(defaultCamelContext.getInterceptStrategies());
1542            }
1543            strategies.addAll(routeContext.getInterceptStrategies());
1544            for (InterceptStrategy strategy : strategies) {
1545                if (strategy != null) {
1546                    target = strategy.wrapProcessorInInterceptors(this, target);
1547                }
1548            }
1549    
1550            List<InterceptorType> list = routeContext.getRoute().getInterceptors();
1551            if (interceptors != null) {
1552                list.addAll(interceptors);
1553            }
1554            // lets reverse the list so we apply the inner interceptors first
1555            Collections.reverse(list);
1556            Set<Processor> interceptors = new HashSet<Processor>();
1557            interceptors.add(target);
1558            for (InterceptorType interceptorType : list) {
1559                DelegateProcessor interceptor = interceptorType.createInterceptor(routeContext);
1560                if (!interceptors.contains(interceptor)) {
1561                    interceptors.add(interceptor);
1562                    if (interceptor.getProcessor() != null) {
1563                        LOG.warn("Interceptor " + interceptor + " currently wraps target "
1564                                + interceptor.getProcessor()
1565                                + " is attempting to change target " + target
1566                                + " new wrapping has been denied.");
1567                    } else {
1568                        interceptor.setProcessor(target);
1569                        target = interceptor;
1570                    }
1571                }
1572            }
1573            return target;
1574        }
1575    
1576        /**
1577         * A strategy method to allow newly created processors to be wrapped in an
1578         * error handler.
1579         */
1580        protected Processor wrapInErrorHandler(RouteContext routeContext, Processor target) throws Exception {
1581            // The target is required.
1582            if (target == null) {
1583                throw new RuntimeCamelException("target not provided.");
1584            }
1585    
1586            ErrorHandlerWrappingStrategy strategy = routeContext.getErrorHandlerWrappingStrategy();
1587    
1588            if (strategy != null) {
1589                return strategy.wrapProcessorInErrorHandler(routeContext, this, target);
1590            }
1591    
1592            return getErrorHandlerBuilder().createErrorHandler(routeContext, target);
1593        }
1594    
1595        protected ErrorHandlerBuilder createErrorHandlerBuilder() {
1596            if (errorHandlerRef != null) {
1597                return new ErrorHandlerBuilderRef(errorHandlerRef);
1598            }
1599            if (isInheritErrorHandler()) {
1600                return new DeadLetterChannelBuilder();
1601            } else {
1602                return new NoErrorHandlerBuilder();
1603            }
1604        }
1605    
1606        protected void configureChild(ProcessorType output) {
1607            output.setNodeFactory(getNodeFactory());
1608        }
1609    
1610        public void addOutput(ProcessorType processorType) {
1611            processorType.setParent(this);
1612            configureChild(processorType);
1613            if (blocks.isEmpty()) {
1614                getOutputs().add(processorType);
1615            } else {
1616                Block block = blocks.getLast();
1617                block.addOutput(processorType);
1618            }
1619        }
1620    
1621        /**
1622         * Creates a new instance of some kind of composite processor which defaults
1623         * to using a {@link Pipeline} but derived classes could change the
1624         * behaviour
1625         */
1626        protected Processor createCompositeProcessor(List<Processor> list) {
1627            // return new MulticastProcessor(list);
1628            return new Pipeline(list);
1629        }
1630    
1631        protected Processor createOutputsProcessor(RouteContext routeContext, Collection<ProcessorType<?>> outputs)
1632            throws Exception {
1633            List<Processor> list = new ArrayList<Processor>();
1634            for (ProcessorType output : outputs) {
1635                Processor processor = output.createProcessor(routeContext);
1636                processor = output.wrapProcessorInInterceptors(routeContext, processor);
1637    
1638                ProcessorType currentProcessor = this;
1639                if (!(currentProcessor instanceof ExceptionType || currentProcessor instanceof TryType)) {
1640                    processor = output.wrapInErrorHandler(routeContext, processor);
1641                }
1642    
1643                list.add(processor);
1644            }
1645            Processor processor = null;
1646            if (!list.isEmpty()) {
1647                if (list.size() == 1) {
1648                    processor = list.get(0);
1649                } else {
1650                    processor = createCompositeProcessor(list);
1651                }
1652            }
1653            return processor;
1654        }
1655    
1656        public void clearOutput() {
1657            getOutputs().clear();
1658            blocks.clear();
1659        }
1660    }