001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.model;
018    
019    import java.util.ArrayList;
020    import java.util.Arrays;
021    import java.util.Collection;
022    import java.util.Collections;
023    import java.util.Comparator;
024    import java.util.HashSet;
025    import java.util.LinkedList;
026    import java.util.List;
027    import java.util.Set;
028    import java.util.concurrent.Executor;
029    
030    import javax.xml.bind.annotation.XmlAccessType;
031    import javax.xml.bind.annotation.XmlAccessorType;
032    import javax.xml.bind.annotation.XmlAttribute;
033    import javax.xml.bind.annotation.XmlTransient;
034    
035    import org.apache.camel.CamelContext;
036    import org.apache.camel.CamelException;
037    import org.apache.camel.Endpoint;
038    import org.apache.camel.ExchangePattern;
039    import org.apache.camel.Expression;
040    import org.apache.camel.Predicate;
041    import org.apache.camel.Processor;
042    import org.apache.camel.Route;
043    import org.apache.camel.builder.DataFormatClause;
044    import org.apache.camel.builder.DeadLetterChannelBuilder;
045    import org.apache.camel.builder.ErrorHandlerBuilder;
046    import org.apache.camel.builder.ErrorHandlerBuilderRef;
047    import org.apache.camel.builder.ExpressionClause;
048    import org.apache.camel.builder.NoErrorHandlerBuilder;
049    import org.apache.camel.builder.ProcessorBuilder;
050    import org.apache.camel.model.dataformat.DataFormatDefinition;
051    import org.apache.camel.model.language.ConstantExpression;
052    import org.apache.camel.model.language.ExpressionDefinition;
053    import org.apache.camel.model.language.LanguageExpression;
054    import org.apache.camel.processor.DelegateProcessor;
055    import org.apache.camel.processor.Pipeline;
056    import org.apache.camel.processor.aggregate.AggregationCollection;
057    import org.apache.camel.processor.aggregate.AggregationStrategy;
058    import org.apache.camel.spi.DataFormat;
059    import org.apache.camel.spi.ErrorHandlerWrappingStrategy;
060    import org.apache.camel.spi.IdempotentRepository;
061    import org.apache.camel.spi.InterceptStrategy;
062    import org.apache.camel.spi.Policy;
063    import org.apache.camel.spi.RouteContext;
064    import org.apache.camel.util.ObjectHelper;
065    import org.apache.commons.logging.Log;
066    import org.apache.commons.logging.LogFactory;
067    import static org.apache.camel.builder.Builder.body;
068    
069    /**
070     * Base class for processor types that most XML types extend.
071     *
072     * @version $Revision: 751648 $
073     */
074    @XmlAccessorType(XmlAccessType.PROPERTY)
075    public abstract class ProcessorDefinition<Type extends ProcessorDefinition> extends OptionalIdentifiedType<Type> implements Block {
076        private static final transient Log LOG = LogFactory.getLog(ProcessorDefinition.class);
077        private ErrorHandlerBuilder errorHandlerBuilder;
078        private Boolean inheritErrorHandlerFlag;
079        private NodeFactory nodeFactory;
080        private LinkedList<Block> blocks = new LinkedList<Block>();
081        private ProcessorDefinition parent;
082        private List<AbstractInterceptorDefinition> interceptors = new ArrayList<AbstractInterceptorDefinition>();
083        private String errorHandlerRef;
084    
085        // else to use an optional attribute in JAXB2
086        public abstract List<ProcessorDefinition> getOutputs();
087    
088    
089        public Processor createProcessor(RouteContext routeContext) throws Exception {
090            throw new UnsupportedOperationException("Not implemented yet for class: " + getClass().getName());
091        }
092    
093        public Processor createOutputsProcessor(RouteContext routeContext) throws Exception {
094            Collection<ProcessorDefinition> outputs = getOutputs();
095            return createOutputsProcessor(routeContext, outputs);
096        }
097    
098        public void addRoutes(RouteContext routeContext, Collection<Route> routes) throws Exception {
099            Processor processor = makeProcessor(routeContext);
100            if (!routeContext.isRouteAdded()) {
101                routeContext.addEventDrivenProcessor(processor);
102            }
103        }
104    
105        /**
106         * Wraps the child processor in whatever necessary interceptors and error
107         * handlers
108         */
109        public Processor wrapProcessor(RouteContext routeContext, Processor processor) throws Exception {
110            processor = wrapProcessorInInterceptors(routeContext, processor);
111            return wrapInErrorHandler(routeContext, processor);
112        }
113    
114        // Fluent API
115        // -------------------------------------------------------------------------
116    
117        /**
118         * Sends the exchange to the given endpoint
119         *
120         * @param uri  the endpoint to send to
121         * @return the builder
122         */
123        @SuppressWarnings("unchecked")
124        public Type to(String uri) {
125            addOutput(new ToDefinition(uri));
126            return (Type) this;
127        }   
128        
129    
130        /**
131         * Sends the exchange to the given endpoint
132         *
133         * @param endpoint  the endpoint to send to
134         * @return the builder
135         */
136        @SuppressWarnings("unchecked")
137        public Type to(Endpoint endpoint) {
138            addOutput(new ToDefinition(endpoint));
139            return (Type) this;
140        }
141        
142        /**
143         * Sends the exchange with certain exchange pattern to the given endpoint
144         *
145         * @param pattern the pattern to use for the message exchange
146         * @param uri  the endpoint to send to
147         * @return the builder
148         */
149        @SuppressWarnings("unchecked")
150        public Type to(ExchangePattern pattern, String uri) {
151            addOutput(new ToDefinition(uri, pattern));
152            return (Type) this;
153        }   
154        
155    
156        /**
157         * Sends the exchange with certain exchange pattern to the given endpoint
158         *
159         * @param pattern the pattern to use for the message exchange
160         * @param endpoint  the endpoint to send to
161         * @return the builder
162         */
163        @SuppressWarnings("unchecked")
164        public Type to(ExchangePattern pattern, Endpoint endpoint) {
165            addOutput(new ToDefinition(endpoint, pattern));
166            return (Type) this;
167        }
168    
169        /**
170         * Sends the exchange to a list of endpoints
171         *
172         * @param uris  list of endpoints to send to
173         * @return the builder
174         */
175        @SuppressWarnings("unchecked")
176        public Type to(String... uris) {
177            for (String uri : uris) {
178                addOutput(new ToDefinition(uri));
179            }
180            return (Type) this;
181        }
182    
183    
184        /**
185         * Sends the exchange to a list of endpoints
186         *
187         * @param endpoints  list of endpoints to send to
188         * @return the builder
189         */
190        @SuppressWarnings("unchecked")
191        public Type to(Endpoint... endpoints) {
192            for (Endpoint endpoint : endpoints) {
193                addOutput(new ToDefinition(endpoint));
194            }
195            return (Type) this;
196        }
197    
198        /**
199         * Sends the exchange to a list of endpoints
200         *
201         * @param endpoints  list of endpoints to send to
202         * @return the builder
203         */
204        @SuppressWarnings("unchecked")
205        public Type to(Iterable<Endpoint> endpoints) {
206            for (Endpoint endpoint : endpoints) {
207                addOutput(new ToDefinition(endpoint));
208            }
209            return (Type) this;
210        }
211        
212        
213        /**
214         * Sends the exchange to a list of endpoints
215         *
216         * @param pattern the pattern to use for the message exchanges
217         * @param uris  list of endpoints to send to
218         * @return the builder
219         */
220        @SuppressWarnings("unchecked")
221        public Type to(ExchangePattern pattern, String... uris) {
222            for (String uri : uris) {
223                addOutput(new ToDefinition(uri, pattern));
224            }
225            return (Type) this;
226        }
227    
228        /**
229         * Sends the exchange to a list of endpoints
230         *
231         * @param pattern the pattern to use for the message exchanges
232         * @param endpoints  list of endpoints to send to
233         * @return the builder
234         */
235        @SuppressWarnings("unchecked")
236        public Type to(ExchangePattern pattern, Endpoint... endpoints) {
237            for (Endpoint endpoint : endpoints) {
238                addOutput(new ToDefinition(endpoint, pattern));
239            }
240            return (Type) this;
241        }
242    
243        /**
244         * Sends the exchange to a list of endpoints
245         *
246         * @param pattern the pattern to use for the message exchanges
247         * @param endpoints  list of endpoints to send to
248         * @return the builder
249         */
250        @SuppressWarnings("unchecked")
251        public Type to(ExchangePattern pattern, Iterable<Endpoint> endpoints) {
252            for (Endpoint endpoint : endpoints) {
253                addOutput(new ToDefinition(endpoint, pattern));
254            }
255            return (Type) this;
256        }
257    
258    
259        /**
260         * <a href="http://camel.apache.org/exchange-pattern.html">ExchangePattern:</a>
261         * set the ExchangePattern {@link ExchangePattern} into the exchange
262         *
263         * @param exchangePattern  instance of {@link ExchangePattern}
264         * @return the builder
265         */
266        @SuppressWarnings("unchecked")
267        public Type setExchangePattern(ExchangePattern exchangePattern) {
268            addOutput(new SetExchangePatternDefinition(exchangePattern));
269            return (Type) this;
270        }
271    
272        /**
273         * <a href="http://camel.apache.org/exchange-pattern.html">ExchangePattern:</a>
274         * set the exchange's ExchangePattern {@link ExchangePattern} to be InOnly
275         *
276         *
277         * @return the builder
278         */
279        public Type inOnly() {
280            return setExchangePattern(ExchangePattern.InOnly);
281        }
282    
283        /**
284         * Sends the message to the given endpoint using an
285         * <a href="http://camel.apache.org/event-message.html">Event Message</a> or
286         * <a href="http://camel.apache.org/exchange-pattern.html">InOnly exchange pattern</a>
287         *
288         * @param uri The endpoint uri which is used for sending the exchange
289         * @return the builder
290         */
291        public Type inOnly(String uri) {
292            return to(ExchangePattern.InOnly, uri);
293        }
294    
295        /**
296         * Sends the message to the given endpoint using an
297         * <a href="http://camel.apache.org/event-message.html">Event Message</a> or 
298         * <a href="http://camel.apache.org/exchange-pattern.html">InOnly exchange pattern</a>
299         *
300         * @param endpoint The endpoint which is used for sending the exchange
301         * @return the builder
302         */
303        public Type inOnly(Endpoint endpoint) {
304            return to(ExchangePattern.InOnly, endpoint);
305        }
306    
307    
308        /**
309         * Sends the message to the given endpoints using an
310         * <a href="http://camel.apache.org/event-message.html">Event Message</a> or
311         * <a href="http://camel.apache.org/exchange-pattern.html">InOnly exchange pattern</a>
312         *
313         * @param uris  list of endpoints to send to
314         * @return the builder
315         */
316        public Type inOnly(String... uris) {
317            return to(ExchangePattern.InOnly, uris);
318        }
319    
320    
321        /**
322         * Sends the message to the given endpoints using an
323         * <a href="http://camel.apache.org/event-message.html">Event Message</a> or
324         * <a href="http://camel.apache.org/exchange-pattern.html">InOnly exchange pattern</a>
325         *
326         * @param endpoints  list of endpoints to send to
327         * @return the builder
328         */
329        public Type inOnly(Endpoint... endpoints) {
330            return to(ExchangePattern.InOnly, endpoints);
331        }
332    
333        /**
334         * Sends the message to the given endpoints using an
335         * <a href="http://camel.apache.org/event-message.html">Event Message</a> or
336         * <a href="http://camel.apache.org/exchange-pattern.html">InOnly exchange pattern</a>
337         *
338         * @param endpoints  list of endpoints to send to
339         * @return the builder
340         */
341        public Type inOnly(Iterable<Endpoint> endpoints) {
342            return to(ExchangePattern.InOnly, endpoints);
343        }
344    
345    
346        /**
347         * <a href="http://camel.apache.org/exchange-pattern.html">ExchangePattern:</a>
348         * set the exchange's ExchangePattern {@link ExchangePattern} to be InOut
349         *
350         *
351         * @return the builder
352         */
353        public Type inOut() {
354            return setExchangePattern(ExchangePattern.InOut);
355        }
356    
357        /**
358         * Sends the message to the given endpoint using an
359         * <a href="http://camel.apache.org/request-reply.html">Request Reply</a> or
360         * <a href="http://camel.apache.org/exchange-pattern.html">InOut exchange pattern</a>
361         *
362         * @param uri The endpoint uri which is used for sending the exchange
363         * @return the builder
364         */
365        public Type inOut(String uri) {
366            return to(ExchangePattern.InOut, uri);
367        }
368    
369    
370        /**
371         * Sends the message to the given endpoint using an
372         * <a href="http://camel.apache.org/request-reply.html">Request Reply</a> or
373         * <a href="http://camel.apache.org/exchange-pattern.html">InOut exchange pattern</a>
374         *
375         * @param endpoint The endpoint which is used for sending the exchange
376         * @return the builder
377         */
378        public Type inOut(Endpoint endpoint) {
379            return to(ExchangePattern.InOut, endpoint);
380        }
381    
382        /**
383         * Sends the message to the given endpoints using an
384         * <a href="http://camel.apache.org/request-reply.html">Request Reply</a> or
385         * <a href="http://camel.apache.org/exchange-pattern.html">InOut exchange pattern</a>
386         *
387         * @param uris  list of endpoints to send to
388         * @return the builder
389         */
390        public Type inOut(String... uris) {
391            return to(ExchangePattern.InOut, uris);
392        }
393    
394    
395        /**
396         * Sends the message to the given endpoints using an
397         * <a href="http://camel.apache.org/request-reply.html">Request Reply</a> or
398         * <a href="http://camel.apache.org/exchange-pattern.html">InOut exchange pattern</a>
399         *
400         * @param endpoints  list of endpoints to send to
401         * @return the builder
402         */
403        public Type inOut(Endpoint... endpoints) {
404            return to(ExchangePattern.InOut, endpoints);
405        }
406    
407        /**
408         * Sends the message to the given endpoints using an
409         * <a href="http://camel.apache.org/request-reply.html">Request Reply</a> or
410         * <a href="http://camel.apache.org/exchange-pattern.html">InOut exchange pattern</a>
411         *
412         * @param endpoints  list of endpoints to send to
413         * @return the builder
414         */
415        public Type inOut(Iterable<Endpoint> endpoints) {
416            return to(ExchangePattern.InOut, endpoints);
417        }
418    
419    
420        /**
421         * <a href="http://camel.apache.org/multicast.html">Multicast EIP:</a>
422         * Multicasts messages to all its child outputs; so that each processor and
423         * destination gets a copy of the original message to avoid the processors
424         * interfering with each other.
425         *
426         * @return the builder
427         */
428        public MulticastDefinition multicast() {
429            MulticastDefinition answer = new MulticastDefinition();
430            addOutput(answer);
431            return answer;
432        }
433    
434        /**
435         * <a href="http://camel.apache.org/multicast.html">Multicast EIP:</a>
436         * Multicasts messages to all its child outputs; so that each processor and
437         * destination gets a copy of the original message to avoid the processors
438         * interfering with each other.
439         *
440         * @param aggregationStrategy the strategy used to aggregate responses for
441         *          every part
442         * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer
443         * @return the builder
444         */
445        public MulticastDefinition multicast(AggregationStrategy aggregationStrategy, boolean parallelProcessing) {
446            MulticastDefinition answer = new MulticastDefinition();
447            addOutput(answer);
448            answer.setAggregationStrategy(aggregationStrategy);
449            answer.setParallelProcessing(parallelProcessing);
450            return answer;
451        }
452    
453        /**
454         * <a href="http://camel.apache.org/multicast.html">Multicast EIP:</a>
455         * Multicasts messages to all its child outputs; so that each processor and
456         * destination gets a copy of the original message to avoid the processors
457         * interfering with each other.
458         *
459         * @param aggregationStrategy the strategy used to aggregate responses for
460         *          every part
461         * @return the builder
462         */
463        public MulticastDefinition multicast(AggregationStrategy aggregationStrategy) {
464            MulticastDefinition answer = new MulticastDefinition();
465            addOutput(answer);
466            answer.setAggregationStrategy(aggregationStrategy);
467            return answer;
468        }
469    
470        /**
471         * <a href="http://camel.apache.org/pipes-nd-filters.html">Pipes and Filters EIP:</a>
472         * Creates a {@link Pipeline} of the list of endpoints so that the message
473         * will get processed by each endpoint in turn and for request/response the
474         * output of one endpoint will be the input of the next endpoint
475         *
476         * @param uris  list of endpoints
477         * @return the builder
478         */
479        public Type pipeline(String... uris) {
480            return to(uris);
481        }
482    
483        /**
484         * <a href="http://camel.apache.org/pipes-nd-filters.html">Pipes and Filters EIP:</a>
485         * Creates a {@link Pipeline} of the list of endpoints so that the message
486         * will get processed by each endpoint in turn and for request/response the
487         * output of one endpoint will be the input of the next endpoint
488         *
489         * @param endpoints  list of endpoints
490         * @return the builder
491         */
492        public Type pipeline(Endpoint... endpoints) {
493            return to(endpoints);
494        }
495    
496        /**
497         * <a href="http://camel.apache.org/pipes-nd-filters.html">Pipes and Filters EIP:</a>
498         * Creates a {@link Pipeline} of the list of endpoints so that the message
499         * will get processed by each endpoint in turn and for request/response the
500         * output of one endpoint will be the input of the next endpoint
501         *
502         * @param endpoints  list of endpoints
503         * @return the builder
504         */
505        public Type pipeline(Collection<Endpoint> endpoints) {
506            return to(endpoints);
507        }
508    
509        /**
510         * Ends the current block
511         *
512         * @return the builder
513         */
514        @SuppressWarnings("unchecked")
515        public ProcessorDefinition<? extends ProcessorDefinition> end() {
516            if (blocks.isEmpty()) {
517                if (parent == null) {
518                    throw new IllegalArgumentException("Root node with no active block");
519                }
520                return parent;
521            }
522            popBlock();
523            return this;
524        }
525    
526        /**
527         * Causes subsequent processors to be called asynchronously
528         *
529         * @param coreSize the number of threads that will be used to process
530         *                 messages in subsequent processors.
531         * @return a ThreadType builder that can be used to further configure the
532         *         the thread pool.
533         */
534        public ThreadDefinition thread(int coreSize) {
535            ThreadDefinition answer = new ThreadDefinition(coreSize);
536            addOutput(answer);
537            return answer;
538        }
539    
540        /**
541         * Causes subsequent processors to be called asynchronously
542         *
543         * @param executor the executor that will be used to process
544         *                 messages in subsequent processors.
545         * @return a ThreadType builder that can be used to further configure the
546         *         the thread pool.
547         */
548        public ProcessorDefinition<Type> thread(Executor executor) {
549            ThreadDefinition answer = new ThreadDefinition(executor);
550            addOutput(answer);
551            return this;
552        }
553        
554        /**
555         * <a href="http://camel.apache.org/idempotent-consumer.html">Idempotent consumer EIP:</a>
556         * Creates an {@link org.apache.camel.processor.idempotent.IdempotentConsumer IdempotentConsumer}
557         * to avoid duplicate messages
558         *      
559         * @return the builder
560         */
561        public IdempotentConsumerDefinition idempotentConsumer() {
562            IdempotentConsumerDefinition answer = new IdempotentConsumerDefinition();
563            addOutput(answer);
564            return answer;
565        }
566    
567        /**
568         * <a href="http://camel.apache.org/idempotent-consumer.html">Idempotent consumer EIP:</a>
569         * Creates an {@link org.apache.camel.processor.idempotent.IdempotentConsumer IdempotentConsumer}
570         * to avoid duplicate messages
571         *
572         * @param messageIdExpression  expression to test of duplicate messages
573         * @param idempotentRepository  the repository to use for duplicate chedck
574         * @return the builder
575         */
576        public IdempotentConsumerDefinition idempotentConsumer(Expression messageIdExpression,
577                IdempotentRepository idempotentRepository) {
578            IdempotentConsumerDefinition answer = new IdempotentConsumerDefinition(messageIdExpression, idempotentRepository);
579            addOutput(answer);
580            return answer;
581        }
582    
583        /**
584         * <a href="http://camel.apache.org/idempotent-consumer.html">Idempotent consumer EIP:</a>
585         * Creates an {@link org.apache.camel.processor.idempotent.IdempotentConsumer IdempotentConsumer}
586         * to avoid duplicate messages
587         *
588         * @param idempotentRepository the repository to use for duplicate chedck
589         * @return the builder used to create the expression
590         */
591        public ExpressionClause<IdempotentConsumerDefinition> idempotentConsumer(IdempotentRepository idempotentRepository) {
592            IdempotentConsumerDefinition answer = new IdempotentConsumerDefinition();
593            answer.setMessageIdRepository(idempotentRepository);
594            addOutput(answer);
595            return ExpressionClause.createAndSetExpression(answer);
596        }
597    
598        /**
599         * <a href="http://camel.apache.org/message-filter.html">Message Filter EIP:</a>
600         * Creates a predicate expression which only if it is <tt>true</tt> then the
601         * exchange is forwarded to the destination
602         *
603         * @return the clause used to create the filter expression
604         */
605        public ExpressionClause<FilterDefinition> filter() {
606            FilterDefinition filter = new FilterDefinition();
607            addOutput(filter);
608            return ExpressionClause.createAndSetExpression(filter);
609        }
610    
611        /**
612         * <a href="http://camel.apache.org/message-filter.html">Message Filter EIP:</a>
613         * Creates a predicate which is applied and only if it is <tt>true</tt> then the
614         * exchange is forwarded to the destination
615         *
616         * @param predicate  predicate to use
617         * @return the builder 
618         */
619        public FilterDefinition filter(Predicate predicate) {
620            FilterDefinition filter = new FilterDefinition(predicate);
621            addOutput(filter);
622            return filter;
623        }
624    
625        /**
626         * <a href="http://camel.apache.org/message-filter.html">Message Filter EIP:</a>
627         * Creates a predicate expression which only if it is <tt>true</tt> then the
628         * exchange is forwarded to the destination
629         *
630         * @param expression  the predicate expression to use
631         * @return the builder
632         */
633        public FilterDefinition filter(ExpressionDefinition expression) {
634            FilterDefinition filter = getNodeFactory().createFilter();
635            filter.setExpression(expression);
636            addOutput(filter);
637            return filter;
638        }
639    
640        /**
641         * <a href="http://camel.apache.org/message-filter.html">Message Filter EIP:</a>
642         * Creates a predicate language expression which only if it is <tt>true</tt> then the
643         * exchange is forwarded to the destination
644         *
645         * @param language     language for expression
646         * @param expression   the expression
647         * @return the builder
648         */
649        public FilterDefinition filter(String language, String expression) {
650            return filter(new LanguageExpression(language, expression));
651        }
652    
653        /**
654         * <a href="http://camel.apache.org/load-balancer.html">Load Balancer EIP:</a>
655         * Creates a loadbalance
656         *
657         * @return  the builder
658         */
659        public LoadBalanceDefinition loadBalance() {
660            LoadBalanceDefinition answer = new LoadBalanceDefinition();
661            addOutput(answer);
662            return answer;
663        }
664    
665    
666        /**
667         * <a href="http://camel.apache.org/content-based-router.html">Content Based Router EIP:</a>
668         * Creates a choice of one or more predicates with an otherwise clause
669         *
670         * @return the builder for a choice expression
671         */
672        public ChoiceDefinition choice() {
673            ChoiceDefinition answer = new ChoiceDefinition();
674            addOutput(answer);
675            return answer;
676        }
677    
678        /**
679         * Creates a try/catch block
680         *
681         * @return the builder for a tryBlock expression
682         */
683        public TryDefinition tryBlock() {
684            TryDefinition answer = new TryDefinition();
685            addOutput(answer);
686            return answer;
687        }
688    
689        /**
690         * <a href="http://camel.apache.org/recipient-list.html">Recipient List EIP:</a>
691         * Creates a dynamic recipient list allowing you to route messages to a number of dynamically specified recipients
692         *
693         * @param recipients expression to decide the destinations
694         * @return the builder
695         */
696        @SuppressWarnings("unchecked")
697        public Type recipientList(Expression recipients) {
698            RecipientListDefinition answer = new RecipientListDefinition(recipients);
699            addOutput(answer);
700            return (Type) this;
701        }
702    
703        /**
704         * <a href="http://camel.apache.org/recipient-list.html">Recipient List EIP:</a>
705         * Creates a dynamic recipient list allowing you to route messages to a number of dynamically specified recipients
706         *
707         * @return the expression clause to configure the expression to decide the destinations
708         */
709        public ExpressionClause<ProcessorDefinition<Type>> recipientList() {
710            RecipientListDefinition answer = new RecipientListDefinition();
711            addOutput(answer);
712            ExpressionClause<ProcessorDefinition<Type>> clause = new ExpressionClause<ProcessorDefinition<Type>>(this);
713            answer.setExpression(clause);
714            return clause;
715        }
716    
717        /**
718         * <a href="http://camel.apache.org/routing-slip.html">Routing Slip EIP:</a>
719         * Creates a routing slip allowing you to route a message consecutively through a series of processing
720         * steps where the sequence of steps is not known at design time and can vary for each message.
721         *
722         * @param header  is the header that the {@link org.apache.camel.processor.RoutingSlip RoutingSlip}
723         *                class will look in for the list of URIs to route the message to.
724         * @param uriDelimiter  is the delimiter that will be used to split up
725         *                      the list of URIs in the routing slip.
726         * @return the buiider
727         */
728        @SuppressWarnings("unchecked")
729        public Type routingSlip(String header, String uriDelimiter) {
730            RoutingSlipDefinition answer = new RoutingSlipDefinition(header, uriDelimiter);
731            addOutput(answer);
732            return (Type) this;
733        }
734    
735        /**
736         * <a href="http://camel.apache.org/routing-slip.html">Routing Slip EIP:</a>
737         * Creates a routing slip allowing you to route a message consecutively through a series of processing
738         * steps where the sequence of steps is not known at design time and can vary for each message.
739         * <p>
740         * The list of URIs will be split based on the default delimiter {@link RoutingSlipDefinition#DEFAULT_DELIMITER}
741         *
742         * @param header  is the header that the {@link org.apache.camel.processor.RoutingSlip RoutingSlip}
743         *                class will look in for the list of URIs to route the message to.
744         * @return the builder
745         */
746        @SuppressWarnings("unchecked")
747        public Type routingSlip(String header) {
748            RoutingSlipDefinition answer = new RoutingSlipDefinition(header);
749            addOutput(answer);
750            return (Type) this;
751        }
752    
753        /**
754         * <a href="http://camel.apache.org/splitter.html">Splitter EIP:</a>
755         * Creates a splitter allowing you split a message into a number of pieces and process them individually.
756         * <p>
757         * This splitter responds with the latest message returned from destination
758         * endpoint.
759         *
760         * @return the expression clause builder for the expression on which to split
761         */
762        public ExpressionClause<SplitDefinition> split() {
763            SplitDefinition answer = new SplitDefinition();
764            addOutput(answer);
765            return ExpressionClause.createAndSetExpression(answer);
766        }
767    
768        /**
769         * <a href="http://camel.apache.org/splitter.html">Splitter EIP:</a>
770         * Creates a splitter allowing you split a message into a number of pieces and process them individually.
771         * <p>
772         * This splitter responds with the latest message returned from destination
773         * endpoint.
774         *
775         * @param expression  the expression on which to split the message
776         * @return the builder
777         */
778        public SplitDefinition split(Expression expression) {
779            SplitDefinition answer = new SplitDefinition(expression);
780            addOutput(answer);
781            return answer;
782        }
783    
784        /**
785         * <a href="http://camel.apache.org/splitter.html">Splitter EIP:</a>
786         * Creates a splitter allowing you split a message into a number of pieces and process them individually.
787         * <p>
788         * The splitter responds with the answer produced by the given {@link AggregationStrategy}.
789         *
790         * @param expression  the expression on which to split
791         * @param aggregationStrategy  the strategy used to aggregate responses for every part
792         * @return the builder
793         */
794        public SplitDefinition split(Expression expression, AggregationStrategy aggregationStrategy) {
795            SplitDefinition answer = new SplitDefinition(expression);
796            addOutput(answer);
797            answer.setAggregationStrategy(aggregationStrategy);
798            return answer;
799        }
800    
801        /**
802         * <a href="http://camel.apache.org/resequencer.html">Resequencer EIP:</a>
803         * Creates a resequencer allowing you to reorganize messages based on some comparator.
804         *
805         * @return the expression clause for the expressions on which to compare messages in order
806         */
807        public ExpressionClause<ResequenceDefinition> resequence() {
808            ResequenceDefinition answer = new ResequenceDefinition();
809            addOutput(answer);
810            ExpressionClause<ResequenceDefinition> clause = new ExpressionClause<ResequenceDefinition>(answer);
811            answer.expression(clause);
812            return clause;
813        }
814    
815        /**
816         * <a href="http://camel.apache.org/resequencer.html">Resequencer EIP:</a>
817         * Creates a resequencer allowing you to reorganize messages based on some comparator.
818         *
819         * @param expression the expression on which to compare messages in order
820         * @return the builder
821         */
822        public ResequenceDefinition resequence(Expression expression) {
823            return resequence(Collections.<Expression>singletonList(expression));
824        }
825    
826        /**
827         * <a href="http://camel.apache.org/resequencer.html">Resequencer EIP:</a>
828         * Creates a resequencer allowing you to reorganize messages based on some comparator.
829         *
830         * @param expressions the list of expressions on which to compare messages in order
831         * @return the builder
832         */
833        public ResequenceDefinition resequence(List<Expression> expressions) {
834            ResequenceDefinition answer = new ResequenceDefinition(expressions);
835            addOutput(answer);
836            return answer;
837        }
838    
839        /**
840         * <a href="http://camel.apache.org/resequencer.html">Resequencer EIP:</a>
841         * Creates a splitter allowing you to reorganise messages based on some comparator.
842         *
843         * @param expressions the list of expressions on which to compare messages in order
844         * @return the builder
845         */
846        public ResequenceDefinition resequencer(Expression... expressions) {
847            List<Expression> list = new ArrayList<Expression>();
848            list.addAll(Arrays.asList(expressions));
849            return resequence(list);
850        }
851    
852        /**
853         * <a href="http://camel.apache.org/aggregator.html">Aggregator EIP:</a>
854         * Creates an aggregator allowing you to combine a number of messages together into a single message.
855         *
856         * @return the expression clause to be used as builder to configure the correlation expression
857         */
858        public ExpressionClause<AggregateDefinition> aggregate() {
859            AggregateDefinition answer = new AggregateDefinition();
860            addOutput(answer);
861            return answer.createAndSetExpression();
862        }
863    
864        /**
865         * <a href="http://camel.apache.org/aggregator.html">Aggregator EIP:</a>
866         * Creates an aggregator allowing you to combine a number of messages together into a single message.
867         *
868         * @param aggregationStrategy the strategy used for the aggregation
869         * @return the expression clause to be used as builder to configure the correlation expression
870         */
871        public ExpressionClause<AggregateDefinition> aggregate(AggregationStrategy aggregationStrategy) {
872            AggregateDefinition answer = new AggregateDefinition();
873            answer.setAggregationStrategy(aggregationStrategy);
874            addOutput(answer);
875            return answer.createAndSetExpression();
876        }
877    
878        /**
879         * <a href="http://camel.apache.org/aggregator.html">Aggregator EIP:</a>
880         * Creates an aggregator allowing you to combine a number of messages together into a single message.
881         *
882         * @param aggregationCollection the collection used to perform the aggregation
883         * @return the builder
884         */
885        public AggregateDefinition aggregate(AggregationCollection aggregationCollection) {
886            AggregateDefinition answer = new AggregateDefinition();
887            answer.setAggregationCollection(aggregationCollection);
888            addOutput(answer);
889            return answer;
890        }
891    
892        /**
893         * <a href="http://camel.apache.org/aggregator.html">Aggregator EIP:</a>
894         * Creates an aggregator allowing you to combine a number of messages together into a single message.
895         *
896         * @param correlationExpression the expression used to calculate the
897         *                              correlation key. For a JMS message this could be the
898         *                              expression <code>header("JMSDestination")</code> or
899         *                              <code>header("JMSCorrelationID")</code>
900         * @return the builder
901         */
902        public AggregateDefinition aggregate(Expression correlationExpression) {
903            AggregateDefinition answer = new AggregateDefinition(correlationExpression);
904            addOutput(answer);
905            return answer;
906        }
907    
908        /**
909         * <a href="http://camel.apache.org/aggregator.html">Aggregator EIP:</a>
910         * Creates an aggregator allowing you to combine a number of messages together into a single message.
911         *
912         * @param correlationExpression the expression used to calculate the
913         *                              correlation key. For a JMS message this could be the
914         *                              expression <code>header("JMSDestination")</code> or
915         *                              <code>header("JMSCorrelationID")</code>
916         * @param aggregationStrategy the strategy used for the aggregation
917         * @return the builder
918         */
919        public AggregateDefinition aggregate(Expression correlationExpression, AggregationStrategy aggregationStrategy) {
920            AggregateDefinition answer = new AggregateDefinition(correlationExpression, aggregationStrategy);
921            addOutput(answer);
922            return answer;
923        }
924    
925        /**
926         * <a href="http://camel.apache.org/delayer.html">Delayer EIP:</a>
927         * Creates a delayer allowing you to delay the delivery of messages to some destination.
928         *
929         * @param processAtExpression  an expression to calculate the time at which the messages should be processed,
930         *                             should be convertable to long as time in millis
931         * @return the builder
932         */
933        public DelayDefinition delay(Expression processAtExpression) {
934            return delay(processAtExpression, 0L);
935        }
936    
937        /**
938         * <a href="http://camel.apache.org/delayer.html">Delayer EIP:</a>
939         * Creates a delayer allowing you to delay the delivery of messages to some destination.
940         *
941         * @param processAtExpression  an expression to calculate the time at which the messages should be processed,
942         *                             should be convertable to long as time in millis
943         * @param delay                the delay in milliseconds which is added to the processAtExpression
944         * @return the builder
945         */
946        public DelayDefinition delay(Expression processAtExpression, long delay) {
947            DelayDefinition answer = new DelayDefinition(processAtExpression, delay);
948            addOutput(answer);
949            return answer;
950        }
951    
952        /**
953         * <a href="http://camel.apache.org/delayer.html">Delayer EIP:</a>
954         * Creates a delayer allowing you to delay the delivery of messages to some destination.
955         *
956         * @return the expression clause to create the expression
957         */
958        public ExpressionClause<DelayDefinition> delay() {
959            DelayDefinition answer = new DelayDefinition();
960            addOutput(answer);
961            return ExpressionClause.createAndSetExpression(answer);
962        }
963    
964        /**
965         * <a href="http://camel.apache.org/delayer.html">Delayer EIP:</a>
966         * Creates a delayer allowing you to delay the delivery of messages to some destination.
967         *
968         * @param delay  the default delay in millis
969         * @return the builder
970         */
971        public DelayDefinition delay(long delay) {
972            return delay(null, delay);
973        }
974    
975        /**
976         * <a href="http://camel.apache.org/throttler.html">Throttler EIP:</a>
977         * Creates a throttler allowing you to ensure that a specific endpoint does not get overloaded,
978         * or that we don't exceed an agreed SLA with some external service.
979         * <p/>
980         * Will default use a time period of 1 second, so setting the maximumRequestCount to eg 10
981         * will default ensure at most 10 messages per second. 
982         *
983         * @param maximumRequestCount  the maximum messages 
984         * @return the builder
985         */
986        public ThrottleDefinition throttle(long maximumRequestCount) {
987            ThrottleDefinition answer = new ThrottleDefinition(maximumRequestCount);
988            addOutput(answer);
989            return answer;
990        }
991    
992        /**
993         * <a href="http://camel.apache.org/loop.html">Loop EIP:</a>
994         * Creates a loop allowing to process the a message a number of times and possibly process them
995         * in a different way. Useful mostly for testing.
996         *
997         * @return the clause used to create the loop expression
998         */
999        public ExpressionClause<LoopDefinition> loop() {
1000            LoopDefinition loop = new LoopDefinition();
1001            addOutput(loop);
1002            return ExpressionClause.createAndSetExpression(loop);
1003        }
1004    
1005        /**
1006         * <a href="http://camel.apache.org/loop.html">Loop EIP:</a>
1007         * Creates a loop allowing to process the a message a number of times and possibly process them
1008         * in a different way. Useful mostly for testing.
1009         *
1010         * @param expression the loop expression
1011         * @return the builder
1012         */
1013        public LoopDefinition loop(Expression expression) {
1014            LoopDefinition loop = getNodeFactory().createLoop();
1015            loop.setExpression(expression);
1016            addOutput(loop);
1017            return loop;
1018        }
1019    
1020        /**
1021         * <a href="http://camel.apache.org/loop.html">Loop EIP:</a>
1022         * Creates a loop allowing to process the a message a number of times and possibly process them
1023         * in a different way. Useful mostly for testing.
1024         *
1025         * @param count  the number of times
1026         * @return the builder
1027         */
1028        public LoopDefinition loop(int count) {
1029            LoopDefinition loop = getNodeFactory().createLoop();
1030            loop.setExpression(new ConstantExpression(Integer.toString(count)));
1031            addOutput(loop);
1032            return loop;
1033        }
1034    
1035        /**
1036         * Creates a fault message based on the given throwable.
1037         *
1038         * @param fault   the fault
1039         * @return the builder
1040         */
1041        @SuppressWarnings("unchecked")
1042        public Type throwFault(Throwable fault) {
1043            ThrowFaultDefinition answer = new ThrowFaultDefinition();
1044            answer.setFault(fault);
1045            addOutput(answer);
1046            return (Type) this;
1047        }
1048    
1049        /**
1050         * Creates a fault message based on the given message.
1051         *
1052         * @param message  the fault message
1053         * @return the builder
1054         */
1055        public Type throwFault(String message) {
1056            return throwFault(new CamelException(message));
1057        }
1058    
1059        /**
1060         * <a href="http://camel.apache.org/wiretap.html">WireTap EIP:</a>
1061         * Sends messages to all its child outputs; so that each processor and
1062         * destination gets a copy of the original message to avoid the processors
1063         * interfering with each other using {@link ExchangePattern#InOnly}.
1064         *
1065         * @return the builder
1066         */
1067        @SuppressWarnings("unchecked")
1068        public Type wireTap(String uri) {
1069            WireTapDefinition answer = new WireTapDefinition();
1070            answer.setUri(uri);
1071            addOutput(answer);
1072            return (Type) this;
1073        }
1074    
1075        /**
1076         * Intercepts outputs added to this node in the future (i.e. intercepts outputs added after this statement)
1077         *
1078         * @param ref  a reference in the registry to lookup the interceptor that must be of type {@link DelegateProcessor}
1079         * @return the builder
1080         */
1081        @SuppressWarnings("unchecked")
1082        public Type interceptor(String ref) {
1083            InterceptorDefinition interceptor = new InterceptorDefinition(ref);
1084            intercept(interceptor);
1085            return (Type) this;
1086        }
1087    
1088        /**
1089         * Intercepts outputs added to this node in the future (i.e. intercepts outputs added after this statement)
1090         *
1091         * @param refs  a list of reference in the registry to lookup the interceptor that must
1092         *              be of type {@link DelegateProcessor}
1093         * @return the builder
1094         */
1095        @SuppressWarnings("unchecked")
1096        public Type interceptors(String... refs) {
1097            for (String ref : refs) {
1098                interceptor(ref);
1099            }
1100            return (Type) this;
1101        }
1102    
1103        /**
1104         * Intercepts outputs added to this node in the future (i.e. intercepts outputs added after this statement)
1105         *
1106         * @param interceptor  the interceptor
1107         * @return the builder
1108         */
1109        @SuppressWarnings("unchecked")
1110        public Type intercept(DelegateProcessor interceptor) {
1111            intercept(new InterceptorDefinition(interceptor));
1112            return (Type) this;
1113        }
1114    
1115        /**
1116         * Intercepts outputs added to this node in the future (i.e. intercepts outputs added after this statement)
1117         *
1118         * @return the intercept builder to configure
1119         */
1120        public InterceptDefinition intercept() {
1121            InterceptDefinition answer = new InterceptDefinition();
1122            addOutput(answer);
1123            return answer;
1124        }
1125    
1126        /**
1127         * Intercepts outputs added to this node in the future (i.e. intercepts outputs added after this statement)
1128         *
1129         * @param  interceptor  the interceptor
1130         */
1131        public void intercept(AbstractInterceptorDefinition interceptor) {
1132            addOutput(interceptor);
1133            pushBlock(interceptor);
1134        }
1135    
1136        /**
1137         * Adds an interceptor around the whole of this nodes processing
1138         *
1139         * @param interceptor  the interceptor
1140         */
1141        public void addInterceptor(AbstractInterceptorDefinition interceptor) {
1142            interceptors.add(interceptor);
1143        }
1144    
1145        /**
1146         * Adds an interceptor around the whole of this nodes processing
1147         *
1148         * @param interceptor  the interceptor
1149         */
1150        public void addInterceptor(DelegateProcessor interceptor) {
1151            addInterceptor(new InterceptorDefinition(interceptor));
1152        }
1153    
1154        /**
1155         * Pushes the given block on the stack as current block
1156         * @param block  the block
1157         */
1158        public void pushBlock(Block block) {
1159            blocks.add(block);
1160        }
1161    
1162        /**
1163         * Pops the block off the stack as current block
1164         * @return the block
1165         */
1166        public Block popBlock() {
1167            return blocks.isEmpty() ? null : blocks.removeLast();
1168        }
1169    
1170        /**
1171         * Procceeds the given intercepted route.
1172         * <p/>
1173         * Proceed is used in conjunction with intercept where calling proceed will route the message through the
1174         * original route path from the point of interception. This can be used to implement the
1175         * <a href="http://www.enterpriseintegrationpatterns.com/Detour.html">detour</a> pattern.
1176         *
1177         * @return the builder
1178         * @see ProcessorDefinition#proceed()
1179         */
1180        @SuppressWarnings("unchecked")
1181        public Type proceed() {
1182            ProceedDefinition proceed = null;
1183            ProcessorDefinition currentProcessor = this;
1184    
1185            if (currentProcessor instanceof InterceptDefinition) {
1186                proceed = ((InterceptDefinition) currentProcessor).getProceed();
1187                LOG.info("proceed() is the implied and hence not needed for an intercept()");
1188            }
1189            if (proceed == null) {
1190                for (ProcessorDefinition node = parent; node != null; node = node.getParent()) {
1191                    if (node instanceof InterceptDefinition) {
1192                        InterceptDefinition intercept = (InterceptDefinition)node;
1193                        proceed = intercept.getProceed();
1194                        break;
1195                    }
1196                }
1197    
1198                if (proceed == null) {
1199                    throw new IllegalArgumentException("Cannot use proceed() without being within an intercept() block");
1200                }
1201    
1202            }
1203    
1204            addOutput(proceed);
1205            return (Type) this;
1206        }
1207    
1208        /**
1209         * Stops the given intercepted route.
1210         * <p/>
1211         * As opposed to {@link #proceed()} calling stop will stop the message route and <b>not</b> continue
1212         * from the interepted origin.
1213         *
1214         * @return the builder
1215         * @see #proceed()
1216         */
1217        @SuppressWarnings("unchecked")
1218        public Type stop() {
1219            ProcessorDefinition currentProcessor = this;
1220    
1221            if (currentProcessor instanceof InterceptDefinition) {
1222                ((InterceptDefinition) currentProcessor).stopIntercept();
1223            } else {
1224                ProcessorDefinition node;
1225                for (node = parent; node != null; node = node.getParent()) {
1226                    if (node instanceof InterceptDefinition) {
1227                        ((InterceptDefinition) node).stopIntercept();
1228                        break;
1229                    }
1230                }
1231                if (node == null) {
1232                    throw new IllegalArgumentException("Cannot use stop() without being within an intercept() block");
1233                }
1234            }
1235    
1236            return (Type) this;
1237        }
1238    
1239        /**
1240         * <a href="http://camel.apache.org/exception-clause.html">Exception clause</a>
1241         * for cathing certain exceptions and handling them.
1242         *
1243         * @param exceptionType  the exception to catch
1244         * @return the exception builder to configure
1245         */
1246        public OnExceptionDefinition onException(Class exceptionType) {
1247            OnExceptionDefinition answer = new OnExceptionDefinition(exceptionType);
1248            addOutput(answer);
1249            return answer;
1250        }
1251    
1252        /**
1253         * Apply an interceptor route if the predicate is true.
1254         *
1255         * @param predicate the predicate to test
1256         * @return  the choice builder to configure
1257         */
1258        public ChoiceDefinition intercept(Predicate predicate) {
1259            InterceptDefinition answer = new InterceptDefinition();
1260            addOutput(answer);
1261            return answer.when(predicate);
1262        }
1263    
1264        /**
1265         * Creates a policy.
1266         * <p/>
1267         * Policy can be used for transactional policies.
1268         *
1269         * @return the policy builder to configure
1270         */
1271        public PolicyDefinition policies() {
1272            PolicyDefinition answer = new PolicyDefinition();
1273            addOutput(answer);
1274            return answer;
1275        }
1276    
1277        /**
1278         * Apply a {@link Policy}.
1279         * <p/>
1280         * Policy can be used for transactional policies.
1281         *
1282         * @param policy  the policy to apply
1283         * @return the policy builder to configure
1284         */
1285        public PolicyDefinition policy(Policy policy) {
1286            PolicyDefinition answer = new PolicyDefinition(policy);
1287            addOutput(answer);
1288            return answer;
1289        }
1290    
1291        /**
1292         * Forces handling of faults as exceptions
1293         *
1294         * @return the current builder with the fault handler configured
1295         */
1296        @SuppressWarnings("unchecked")
1297        public Type handleFault() {
1298            intercept(new HandleFaultDefinition());
1299            return (Type) this;
1300        }
1301    
1302        /**
1303         * Installs the given <a href="http://camel.apache.org/error-handler.html">error handler</a> builder.
1304         *
1305         * @param errorHandlerBuilder the error handler to be used by default for all child routes
1306         * @return the current builder with the error handler configured
1307         */
1308        @SuppressWarnings("unchecked")
1309        public Type errorHandler(ErrorHandlerBuilder errorHandlerBuilder) {
1310            setErrorHandlerBuilder(errorHandlerBuilder);
1311            return (Type) this;
1312        }
1313    
1314        /**
1315         * Configures whether or not the <a href="http://camel.apache.org/error-handler.html">error handler</a>
1316         * is inherited by every processing node (or just the top most one)
1317         *
1318         * @param condition the flag as to whether error handlers should be inherited or not
1319         * @return the current builder
1320         */
1321        @SuppressWarnings("unchecked")
1322        public Type inheritErrorHandler(boolean condition) {
1323            setInheritErrorHandlerFlag(condition);
1324            return (Type) this;
1325        }
1326    
1327        // Transformers
1328        // -------------------------------------------------------------------------
1329    
1330        /**
1331         * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a>
1332         * Adds the custom processor to this destination which could be a final
1333         * destination, or could be a transformation in a pipeline
1334         *
1335         * @param processor  the custom {@link Processor}
1336         * @return the builder
1337         */
1338        @SuppressWarnings("unchecked")
1339        public Type process(Processor processor) {
1340            ProcessDefinition answer = new ProcessDefinition(processor);
1341            addOutput(answer);
1342            return (Type) this;
1343        }
1344    
1345        /**
1346         * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a>
1347         * Adds the custom processor reference to this destination which could be a final
1348         * destination, or could be a transformation in a pipeline
1349         *
1350         * @param ref   reference to a {@link Processor} to lookup in the registry
1351         * @return the builder
1352         */
1353        @SuppressWarnings("unchecked")
1354        public Type processRef(String ref) {
1355            ProcessDefinition answer = new ProcessDefinition();
1356            answer.setRef(ref);
1357            addOutput(answer);
1358            return (Type) this;
1359        }
1360    
1361        /**
1362         * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a>
1363         * Adds a bean which is invoked which could be a final destination, or could be a transformation in a pipeline
1364         *
1365         * @param bean  the bean to invoke
1366         * @return the builder
1367         */
1368        @SuppressWarnings("unchecked")
1369        public Type bean(Object bean) {
1370            BeanDefinition answer = new BeanDefinition();
1371            answer.setBean(bean);
1372            addOutput(answer);
1373            return (Type) this;
1374        }
1375    
1376        /**
1377         * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a>
1378         * Adds a bean which is invoked which could be a final destination, or could be a transformation in a pipeline
1379         *
1380         * @param bean  the bean to invoke
1381         * @param method  the method name to invoke on the bean (can be used to avoid ambiguty)
1382         * @return the builder
1383         */
1384        @SuppressWarnings("unchecked")
1385        public Type bean(Object bean, String method) {
1386            BeanDefinition answer = new BeanDefinition();
1387            answer.setBean(bean);
1388            answer.setMethod(method);
1389            addOutput(answer);
1390            return (Type) this;
1391        }
1392    
1393        /**
1394         * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a>
1395         * Adds a bean which is invoked which could be a final destination, or could be a transformation in a pipeline
1396         *
1397         * @param  beanType  the bean class, Camel will instantiate an object at runtime
1398         * @return the builder
1399         */
1400        @SuppressWarnings("unchecked")
1401        public Type bean(Class beanType) {
1402            BeanDefinition answer = new BeanDefinition();
1403            answer.setBeanType(beanType);
1404            addOutput(answer);
1405            return (Type) this;
1406        }
1407    
1408        /**
1409         * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a>
1410         * Adds a bean which is invoked which could be a final destination, or could be a transformation in a pipeline
1411         *
1412         * @param  beanType  the bean class, Camel will instantiate an object at runtime
1413         * @param method  the method name to invoke on the bean (can be used to avoid ambiguty)
1414         * @return the builder
1415         */
1416        @SuppressWarnings("unchecked")
1417        public Type bean(Class beanType, String method) {
1418            BeanDefinition answer = new BeanDefinition();
1419            answer.setBeanType(beanType);
1420            answer.setMethod(method);
1421            addOutput(answer);
1422            return (Type) this;
1423        }
1424    
1425        /**
1426         * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a>
1427         * Adds a bean which is invoked which could be a final destination, or could be a transformation in a pipeline
1428         *
1429         * @param ref  reference to a bean to lookup in the registry
1430         * @return the builder
1431         */
1432        @SuppressWarnings("unchecked")
1433        public Type beanRef(String ref) {
1434            BeanDefinition answer = new BeanDefinition(ref);
1435            addOutput(answer);
1436            return (Type) this;
1437        }
1438    
1439        /**
1440         * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a>
1441         * Adds a bean which is invoked which could be a final destination, or could be a transformation in a pipeline
1442         *
1443         * @param ref  reference to a bean to lookup in the registry
1444         * @param method  the method name to invoke on the bean (can be used to avoid ambiguty)
1445         * @return the builder
1446         */
1447        @SuppressWarnings("unchecked")
1448        public Type beanRef(String ref, String method) {
1449            BeanDefinition answer = new BeanDefinition(ref, method);
1450            addOutput(answer);
1451            return (Type) this;
1452        }
1453    
1454        /**
1455         * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a>
1456         * Adds a processor which sets the body on the IN message
1457         *
1458         * @return a expression builder clause to set the body
1459         */
1460        public ExpressionClause<ProcessorDefinition<Type>> setBody() {
1461            ExpressionClause<ProcessorDefinition<Type>> clause = new ExpressionClause<ProcessorDefinition<Type>>(this);
1462            SetBodyDefinition answer = new SetBodyDefinition(clause);
1463            addOutput(answer);
1464            return clause;
1465        }
1466    
1467        /**
1468         * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a>
1469         * Adds a processor which sets the body on the IN message
1470         *
1471         * @param expression   the expression used to set the body
1472         * @return the builder
1473         */
1474        @SuppressWarnings("unchecked")
1475        public Type setBody(Expression expression) {
1476            SetBodyDefinition answer = new SetBodyDefinition(expression);
1477            addOutput(answer);
1478            return (Type) this;
1479        }
1480    
1481        /**
1482         * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a>
1483         * Adds a processor which sets the body on the OUT message
1484         *
1485         * @param expression   the expression used to set the body
1486         * @return the builder
1487         */
1488        @SuppressWarnings("unchecked")
1489        public Type transform(Expression expression) {
1490            TransformDefinition answer = new TransformDefinition(expression);
1491            addOutput(answer);
1492            return (Type) this;
1493        }
1494    
1495        /**
1496         * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a>
1497         * Adds a processor which sets the body on the OUT message
1498         *
1499         * @return a expression builder clause to set the body
1500         */
1501        public ExpressionClause<ProcessorDefinition<Type>> transform() {
1502            ExpressionClause<ProcessorDefinition<Type>> clause = new ExpressionClause<ProcessorDefinition<Type>>((Type) this);
1503            TransformDefinition answer = new TransformDefinition(clause);
1504            addOutput(answer);
1505            return clause;
1506        }
1507    
1508        /**
1509         * Adds a processor which sets the body on the FAULT message
1510         *
1511         * @param expression   the expression used to set the body
1512         * @return the builder
1513         */
1514        public Type setFaultBody(Expression expression) {
1515            return process(ProcessorBuilder.setFaultBody(expression));
1516        }
1517    
1518        /**
1519         * Adds a processor which sets the header on the IN message
1520         *
1521         * @param name  the header name
1522         * @return a expression builder clause to set the header
1523         */
1524        public ExpressionClause<ProcessorDefinition<Type>> setHeader(String name) {
1525            ExpressionClause<ProcessorDefinition<Type>> clause = new ExpressionClause<ProcessorDefinition<Type>>(this);
1526            SetHeaderDefinition answer = new SetHeaderDefinition(name, clause);
1527            addOutput(answer);
1528            return clause;
1529        }
1530    
1531        /**
1532         * Adds a processor which sets the header on the IN message
1533         *
1534         * @param name  the header name
1535         * @param expression  the expression used to set the header
1536         * @return the builder
1537         */
1538        @SuppressWarnings("unchecked")
1539        public Type setHeader(String name, Expression expression) {
1540            SetHeaderDefinition answer = new SetHeaderDefinition(name, expression);
1541            addOutput(answer);
1542            return (Type) this;
1543        }
1544    
1545        /**
1546         * Adds a processor which sets the header on the OUT message
1547         *
1548         * @param name  the header name
1549         * @return a expression builder clause to set the header
1550         */
1551        public ExpressionClause<ProcessorDefinition<Type>> setOutHeader(String name) {
1552            ExpressionClause<ProcessorDefinition<Type>> clause = new ExpressionClause<ProcessorDefinition<Type>>(this);
1553            SetOutHeaderDefinition answer = new SetOutHeaderDefinition(name, clause);
1554            addOutput(answer);
1555            return clause;
1556        }
1557    
1558        /**
1559         * Adds a processor which sets the header on the OUT message
1560         *
1561         * @param name  the header name
1562         * @param expression  the expression used to set the header
1563         * @return the builder
1564         */
1565        @SuppressWarnings("unchecked")
1566        public Type setOutHeader(String name, Expression expression) {
1567            SetOutHeaderDefinition answer = new SetOutHeaderDefinition(name, expression);
1568            addOutput(answer);
1569            return (Type) this;
1570        }
1571    
1572        /**
1573         * Adds a processor which sets the header on the FAULT message
1574         *
1575         * @param name  the header name
1576         * @param expression  the expression used to set the header
1577         * @return the builder
1578         */
1579        public Type setFaultHeader(String name, Expression expression) {
1580            return process(ProcessorBuilder.setFaultHeader(name, expression));
1581        }
1582    
1583        /**
1584         * Adds a processor which sets the exchange property
1585         *
1586         * @param name  the property name
1587         * @param expression  the expression used to set the property
1588         * @return the builder
1589         */
1590        @SuppressWarnings("unchecked")
1591        public Type setProperty(String name, Expression expression) {
1592            SetPropertyDefinition answer = new SetPropertyDefinition(name, expression);
1593            addOutput(answer);
1594            return (Type) this;
1595        }
1596    
1597    
1598        /**
1599         * Adds a processor which sets the exchange property
1600         *
1601         * @param name  the property name
1602         * @return a expression builder clause to set the property
1603         */
1604        public ExpressionClause<ProcessorDefinition<Type>> setProperty(String name) {
1605            ExpressionClause<ProcessorDefinition<Type>> clause = new ExpressionClause<ProcessorDefinition<Type>>(this);
1606            SetPropertyDefinition answer = new SetPropertyDefinition(name, clause);
1607            addOutput(answer);
1608            return clause;
1609        }
1610    
1611        /**
1612         * Adds a processor which removes the header on the IN message
1613         *
1614         * @param name  the header name
1615         * @return the builder
1616         */
1617        @SuppressWarnings("unchecked")
1618        public Type removeHeader(String name) {
1619            RemoveHeaderDefinition answer = new RemoveHeaderDefinition(name);
1620            addOutput(answer);
1621            return (Type) this;
1622        }
1623    
1624        /**
1625         * Adds a processor which removes the header on the FAULT message
1626         *
1627         * @param name  the header name
1628         * @return the builder
1629         */
1630        public Type removeFaultHeader(String name) {
1631            return process(ProcessorBuilder.removeFaultHeader(name));
1632        }
1633    
1634        /**
1635         * Adds a processor which removes the exchange property
1636         *
1637         * @param name  the property name
1638         * @return the builder
1639         */
1640        @SuppressWarnings("unchecked")
1641        public Type removeProperty(String name) {
1642            RemovePropertyDefinition answer = new RemovePropertyDefinition(name);
1643            addOutput(answer);
1644            return (Type) this;
1645        }
1646    
1647        /**
1648         * Converts the IN message body to the specified type
1649         *
1650         * @param type the type to convert to
1651         * @return the builder
1652         */
1653        @SuppressWarnings("unchecked")
1654        public Type convertBodyTo(Class type) {
1655            addOutput(new ConvertBodyDefinition(type));
1656            return (Type) this;
1657        }
1658        
1659        /**
1660         * Converts the IN message body to the specified class type
1661         *
1662         * @param typeString the type to convert to as a fully qualified classname
1663         * @return the builder
1664         */
1665        @SuppressWarnings("unchecked")
1666        public Type convertBodyTo(String typeString) {
1667            addOutput(new ConvertBodyDefinition(typeString));
1668            return (Type) this;
1669        }
1670    
1671        /**
1672         * Sorts the IN message body using the given comparator.
1673         * The IN body mut be convertable to {@link List}.
1674         *
1675         * @param comparator  the comparator to use for sorting
1676         * @return the builder
1677         */
1678        @SuppressWarnings("unchecked")
1679        public Type sortBody(Comparator comparator) {
1680            addOutput(new SortDefinition(body(), comparator));
1681            return (Type) this;
1682        }
1683    
1684        /**
1685         * Sorts the IN message body using a default sorting based on toString representation.
1686         * The IN body mut be convertable to {@link List}.
1687         *
1688         * @return the builder
1689         */
1690        public Type sortBody() {
1691            return sortBody(null);
1692        }
1693    
1694        /**
1695         * Sorts the expression using the given comparator
1696         *
1697         * @param expression  the expression, must be convertable to {@link List}
1698         * @param comparator  the comparator to use for sorting
1699         * @return the builder
1700         */
1701        @SuppressWarnings("unchecked")
1702        public Type sort(Expression expression, Comparator comparator) {
1703            addOutput(new SortDefinition(expression, comparator));
1704            return (Type) this;
1705        }
1706    
1707        /**
1708         * Sorts the expression using a default sorting based on toString representation. 
1709         *
1710         * @param expression  the expression, must be convertable to {@link List}
1711         * @return the builder
1712         */
1713        public Type sort(Expression expression) {
1714            return sort(expression, null);
1715        }
1716    
1717        /**
1718         * Enriches an exchange with additional data obtained from a
1719         * <code>resourceUri</code>.
1720         * 
1721         * @param resourceUri
1722         *            URI of resource endpoint for obtaining additional data.
1723         * @param aggregationStrategy
1724         *            aggregation strategy to aggregate input data and additional
1725         *            data.
1726         * @return this processor type
1727         * @see org.apache.camel.processor.Enricher
1728         */
1729        @SuppressWarnings("unchecked")
1730        public Type enrich(String resourceUri, AggregationStrategy aggregationStrategy) {
1731            addOutput(new EnrichDefinition(aggregationStrategy, resourceUri));
1732            return (Type)this;
1733        }
1734        
1735        // DataFormat support
1736        // -------------------------------------------------------------------------
1737    
1738        /**
1739         * <a href="http://camel.apache.org/data-format.html">DataFormat:</a>
1740         * Unmarshals the in body using a {@link DataFormat} expression to define
1741         * the format of the input message and the output will be set on the out message body.
1742         *
1743         * @return the expression to create the {@link DataFormat}
1744         */
1745        public DataFormatClause<ProcessorDefinition<Type>> unmarshal() {
1746            return new DataFormatClause<ProcessorDefinition<Type>>(this, DataFormatClause.Operation.Unmarshal);
1747        }
1748    
1749        /**
1750         * <a href="http://camel.apache.org/data-format.html">DataFormat:</a>
1751         * Unmarshals the in body using the specified {@link DataFormat}
1752         * and sets the output on the out message body.
1753         *
1754         * @param dataFormatType  the dataformat
1755         * @return the builder
1756         */
1757        @SuppressWarnings("unchecked")
1758        public Type unmarshal(DataFormatDefinition dataFormatType) {
1759            addOutput(new UnmarshalDefinition(dataFormatType));
1760            return (Type) this;
1761        }
1762    
1763        /**
1764         * <a href="http://camel.apache.org/data-format.html">DataFormat:</a>
1765         * Unmarshals the in body using the specified {@link DataFormat}
1766         * and sets the output on the out message body.
1767         *
1768         * @param dataFormat  the dataformat
1769         * @return the builder
1770         */
1771        public Type unmarshal(DataFormat dataFormat) {
1772            return unmarshal(new DataFormatDefinition(dataFormat));
1773        }
1774    
1775        /**
1776         * <a href="http://camel.apache.org/data-format.html">DataFormat:</a>
1777         * Unmarshals the in body using the specified {@link DataFormat}
1778         * reference in the {@link org.apache.camel.spi.Registry} and sets
1779         * the output on the out message body.
1780         *
1781         * @param dataTypeRef  reference to a {@link DataFormat} to lookup in the registry
1782         * @return the builder
1783         */
1784        @SuppressWarnings("unchecked")
1785        public Type unmarshal(String dataTypeRef) {
1786            addOutput(new UnmarshalDefinition(dataTypeRef));
1787            return (Type) this;
1788        }
1789    
1790        /**
1791         * <a href="http://camel.apache.org/data-format.html">DataFormat:</a>
1792         * Marshals the in body using a {@link DataFormat} expression to define
1793         * the format of the output which will be added to the out body.
1794         *
1795         * @return the expression to create the {@link DataFormat}
1796         */
1797        public DataFormatClause<ProcessorDefinition<Type>> marshal() {
1798            return new DataFormatClause<ProcessorDefinition<Type>>(this, DataFormatClause.Operation.Marshal);
1799        }
1800    
1801        /**
1802         * <a href="http://camel.apache.org/data-format.html">DataFormat:</a>
1803         * Marshals the in body using the specified {@link DataFormat}
1804         * and sets the output on the out message body.
1805         *
1806         * @param dataFormatType  the dataformat
1807         * @return the builder
1808         */
1809        @SuppressWarnings("unchecked")
1810        public Type marshal(DataFormatDefinition dataFormatType) {
1811            addOutput(new MarshalDefinition(dataFormatType));
1812            return (Type) this;
1813        }
1814    
1815        /**
1816         * <a href="http://camel.apache.org/data-format.html">DataFormat:</a>
1817         * Marshals the in body using the specified {@link DataFormat}
1818         * and sets the output on the out message body.
1819         *
1820         * @param dataFormat  the dataformat
1821         * @return the builder
1822         */
1823        public Type marshal(DataFormat dataFormat) {
1824            return marshal(new DataFormatDefinition(dataFormat));
1825        }
1826    
1827        /**
1828         * <a href="http://camel.apache.org/data-format.html">DataFormat:</a>
1829         * Marshals the in body the specified {@link DataFormat}
1830         * reference in the {@link org.apache.camel.spi.Registry} and sets
1831         * the output on the out message body.
1832         *
1833         * @param dataTypeRef  reference to a {@link DataFormat} to lookup in the registry
1834         * @return the builder
1835         */
1836        @SuppressWarnings("unchecked")
1837        public Type marshal(String dataTypeRef) {
1838            addOutput(new MarshalDefinition(dataTypeRef));
1839            return (Type) this;
1840        }
1841    
1842        // Properties
1843        // -------------------------------------------------------------------------
1844        @XmlTransient
1845        @SuppressWarnings("unchecked")
1846        public ProcessorDefinition<? extends ProcessorDefinition> getParent() {
1847            return parent;
1848        }
1849    
1850        public void setParent(ProcessorDefinition<? extends ProcessorDefinition> parent) {
1851            this.parent = parent;
1852        }
1853    
1854        @XmlTransient
1855        public ErrorHandlerBuilder getErrorHandlerBuilder() {
1856            if (errorHandlerBuilder == null) {
1857                errorHandlerBuilder = createErrorHandlerBuilder();
1858            }
1859            return errorHandlerBuilder;
1860        }
1861    
1862        /**
1863         * Sets the error handler to use with processors created by this builder
1864         */
1865        public void setErrorHandlerBuilder(ErrorHandlerBuilder errorHandlerBuilder) {
1866            this.errorHandlerBuilder = errorHandlerBuilder;
1867        }
1868    
1869        /**
1870         * Sets the error handler if one is not already set
1871         */
1872        protected void setErrorHandlerBuilderIfNull(ErrorHandlerBuilder errorHandlerBuilder) {
1873            if (this.errorHandlerBuilder == null) {
1874                setErrorHandlerBuilder(errorHandlerBuilder);
1875            }
1876        }
1877    
1878        public String getErrorHandlerRef() {
1879            return errorHandlerRef;
1880        }
1881    
1882        /**
1883         * Sets the bean ref name of the error handler builder to use on this route
1884         */
1885        @XmlAttribute(required = false)
1886        public void setErrorHandlerRef(String errorHandlerRef) {
1887            this.errorHandlerRef = errorHandlerRef;
1888            setErrorHandlerBuilder(new ErrorHandlerBuilderRef(errorHandlerRef));
1889        }
1890    
1891        @XmlTransient
1892        public boolean isInheritErrorHandler() {
1893            return isInheritErrorHandler(getInheritErrorHandlerFlag());
1894        }
1895    
1896        /**
1897         * Lets default the inherit value to be true if there is none specified
1898         */
1899        public static boolean isInheritErrorHandler(Boolean value) {
1900            return value == null || value;
1901        }
1902    
1903        @XmlAttribute(name = "inheritErrorHandler", required = false)
1904        public Boolean getInheritErrorHandlerFlag() {
1905            return inheritErrorHandlerFlag;
1906        }
1907    
1908        public void setInheritErrorHandlerFlag(Boolean inheritErrorHandlerFlag) {
1909            this.inheritErrorHandlerFlag = inheritErrorHandlerFlag;
1910        }
1911    
1912        @XmlTransient
1913        public NodeFactory getNodeFactory() {
1914            if (nodeFactory == null) {
1915                nodeFactory = new NodeFactory();
1916            }
1917            return nodeFactory;
1918        }
1919    
1920        public void setNodeFactory(NodeFactory nodeFactory) {
1921            this.nodeFactory = nodeFactory;
1922        }
1923    
1924        /**
1925         * Returns a label to describe this node such as the expression if some kind of expression node
1926         */
1927        public String getLabel() {
1928            return "";
1929        }
1930    
1931        // Implementation methods
1932        // -------------------------------------------------------------------------
1933    
1934        /**
1935         * Creates the processor and wraps it in any necessary interceptors and
1936         * error handlers
1937         */
1938        protected Processor makeProcessor(RouteContext routeContext) throws Exception {
1939            Processor processor = createProcessor(routeContext);
1940            return wrapProcessor(routeContext, processor);
1941        }
1942    
1943        /**
1944         * A strategy method which allows derived classes to wrap the child
1945         * processor in some kind of interceptor
1946         *
1947         * @param routeContext the route context
1948         * @param target       the processor which can be wrapped
1949         * @return the original processor or a new wrapped interceptor
1950         * @throws Exception can be thrown in case of error
1951         */
1952        protected Processor wrapProcessorInInterceptors(RouteContext routeContext, Processor target) throws Exception {
1953            ObjectHelper.notNull(target, "target", this);
1954    
1955            List<InterceptStrategy> strategies = new ArrayList<InterceptStrategy>();
1956            CamelContext camelContext = routeContext.getCamelContext();
1957            strategies.addAll(camelContext.getInterceptStrategies());
1958            strategies.addAll(routeContext.getInterceptStrategies());
1959            for (InterceptStrategy strategy : strategies) {
1960                if (strategy != null) {
1961                    target = strategy.wrapProcessorInInterceptors(this, target);
1962                }
1963            }
1964    
1965            List<AbstractInterceptorDefinition> list = routeContext.getRoute().getInterceptors();
1966            if (interceptors != null) {
1967                list.addAll(interceptors);
1968            }
1969            // lets reverse the list so we apply the inner interceptors first
1970            Collections.reverse(list);
1971            Set<Processor> interceptors = new HashSet<Processor>();
1972            interceptors.add(target);
1973            for (AbstractInterceptorDefinition interceptorType : list) {
1974                DelegateProcessor interceptor = interceptorType.createInterceptor(routeContext);
1975                if (!interceptors.contains(interceptor)) {
1976                    interceptors.add(interceptor);
1977                    if (interceptor.getProcessor() != null) {
1978                        LOG.warn("Interceptor " + interceptor + " currently wraps target "
1979                                + interceptor.getProcessor()
1980                                + " is attempting to change target " + target
1981                                + " new wrapping has been denied.");
1982                    } else {
1983                        interceptor.setProcessor(target);
1984                        target = interceptor;
1985                    }
1986                }
1987            }
1988            return target;
1989        }
1990    
1991        /**
1992         * A strategy method to allow newly created processors to be wrapped in an
1993         * error handler.
1994         */
1995        protected Processor wrapInErrorHandler(RouteContext routeContext, Processor target) throws Exception {
1996            ObjectHelper.notNull(target, "target", this);
1997            ErrorHandlerWrappingStrategy strategy = routeContext.getErrorHandlerWrappingStrategy();
1998            if (strategy != null) {
1999                return strategy.wrapProcessorInErrorHandler(this, target);
2000            }
2001            return getErrorHandlerBuilder().createErrorHandler(routeContext, target);
2002        }
2003    
2004        protected ErrorHandlerBuilder createErrorHandlerBuilder() {
2005            if (errorHandlerRef != null) {
2006                return new ErrorHandlerBuilderRef(errorHandlerRef);
2007            }
2008            if (isInheritErrorHandler()) {
2009                return new DeadLetterChannelBuilder();
2010            } else {
2011                return new NoErrorHandlerBuilder();
2012            }
2013        }
2014    
2015        protected void configureChild(ProcessorDefinition output) {
2016            output.setNodeFactory(getNodeFactory());
2017        }
2018    
2019        @SuppressWarnings("unchecked")
2020        public void addOutput(ProcessorDefinition processorType) {
2021            processorType.setParent(this);
2022            configureChild(processorType);
2023            if (blocks.isEmpty()) {
2024                getOutputs().add(processorType);
2025            } else {
2026                Block block = blocks.getLast();
2027                block.addOutput(processorType);
2028            }
2029        }
2030    
2031        /**
2032         * Creates a new instance of some kind of composite processor which defaults
2033         * to using a {@link Pipeline} but derived classes could change the
2034         * behaviour
2035         */
2036        protected Processor createCompositeProcessor(RouteContext routeContext, List<Processor> list) {
2037            return new Pipeline(list);
2038        }
2039    
2040        protected Processor createOutputsProcessor(RouteContext routeContext, Collection<ProcessorDefinition> outputs)
2041            throws Exception {
2042            List<Processor> list = new ArrayList<Processor>();
2043            for (ProcessorDefinition output : outputs) {
2044                Processor processor = output.createProcessor(routeContext);
2045                // if the ProceedType/StopType create processor is null we keep on going
2046                if ((output instanceof ProceedDefinition || output instanceof StopDefinition) && processor == null) {
2047                    continue;
2048                }
2049                processor = output.wrapProcessorInInterceptors(routeContext, processor);
2050    
2051                ProcessorDefinition currentProcessor = this;
2052                if (!(currentProcessor instanceof OnExceptionDefinition || currentProcessor instanceof TryDefinition)) {
2053                    processor = output.wrapInErrorHandler(routeContext, processor);
2054                }
2055    
2056                list.add(processor);
2057            }
2058            Processor processor = null;
2059            if (!list.isEmpty()) {
2060                if (list.size() == 1) {
2061                    processor = list.get(0);
2062                } else {
2063                    processor = createCompositeProcessor(routeContext, list);
2064                }
2065            }
2066            return processor;
2067        }
2068    
2069        public void clearOutput() {
2070            getOutputs().clear();
2071            blocks.clear();
2072        }
2073    }