001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.model;
018    
019    import java.util.ArrayList;
020    import java.util.Arrays;
021    import java.util.Collection;
022    import java.util.Collections;
023    import java.util.Comparator;
024    import java.util.LinkedList;
025    import java.util.List;
026    
027    import javax.xml.bind.annotation.XmlAccessType;
028    import javax.xml.bind.annotation.XmlAccessorType;
029    import javax.xml.bind.annotation.XmlAttribute;
030    import javax.xml.bind.annotation.XmlTransient;
031    
032    import org.apache.camel.Channel;
033    import org.apache.camel.Endpoint;
034    import org.apache.camel.ExchangePattern;
035    import org.apache.camel.Expression;
036    import org.apache.camel.Predicate;
037    import org.apache.camel.Processor;
038    import org.apache.camel.Route;
039    import org.apache.camel.builder.DataFormatClause;
040    import org.apache.camel.builder.ErrorHandlerBuilder;
041    import org.apache.camel.builder.ErrorHandlerBuilderRef;
042    import org.apache.camel.builder.ExpressionBuilder;
043    import org.apache.camel.builder.ExpressionClause;
044    import org.apache.camel.builder.ProcessorBuilder;
045    import org.apache.camel.model.language.ConstantExpression;
046    import org.apache.camel.model.language.ExpressionDefinition;
047    import org.apache.camel.model.language.LanguageExpression;
048    import org.apache.camel.processor.DefaultChannel;
049    import org.apache.camel.processor.InterceptEndpointProcessor;
050    import org.apache.camel.processor.Pipeline;
051    import org.apache.camel.processor.aggregate.AggregationCollection;
052    import org.apache.camel.processor.aggregate.AggregationStrategy;
053    import org.apache.camel.processor.loadbalancer.LoadBalancer;
054    import org.apache.camel.spi.DataFormat;
055    import org.apache.camel.spi.IdempotentRepository;
056    import org.apache.camel.spi.InterceptStrategy;
057    import org.apache.camel.spi.Policy;
058    import org.apache.camel.spi.RouteContext;
059    import org.apache.camel.spi.TransactedPolicy;
060    import org.apache.commons.logging.Log;
061    import org.apache.commons.logging.LogFactory;
062    
063    import static org.apache.camel.builder.Builder.body;
064    
065    /**
066     * Base class for processor types that most XML types extend.
067     *
068     * @version $Revision: 782911 $
069     */
070    @XmlAccessorType(XmlAccessType.PROPERTY)
071    public abstract class ProcessorDefinition<Type extends ProcessorDefinition> extends OptionalIdentifiedType<Type> implements Block {
072        private static final transient Log LOG = LogFactory.getLog(ProcessorDefinition.class);
073        private ErrorHandlerBuilder errorHandlerBuilder;
074        private NodeFactory nodeFactory;
075        private final LinkedList<Block> blocks = new LinkedList<Block>();
076        private ProcessorDefinition parent;
077        private String errorHandlerRef;
078        private final List<InterceptStrategy> interceptStrategies = new ArrayList<InterceptStrategy>();
079    
080        // else to use an optional attribute in JAXB2
081        public abstract List<ProcessorDefinition> getOutputs();
082    
083    
084        public Processor createProcessor(RouteContext routeContext) throws Exception {
085            throw new UnsupportedOperationException("Not implemented yet for class: " + getClass().getName());
086        }
087    
088        public Processor createOutputsProcessor(RouteContext routeContext) throws Exception {
089            Collection<ProcessorDefinition> outputs = getOutputs();
090            return createOutputsProcessor(routeContext, outputs);
091        }
092    
093        @SuppressWarnings("unchecked")
094        public void addOutput(ProcessorDefinition processorType) {
095            processorType.setParent(this);
096            configureChild(processorType);
097            if (blocks.isEmpty()) {
098                getOutputs().add(processorType);
099            } else {
100                Block block = blocks.getLast();
101                block.addOutput(processorType);
102            }
103        }
104    
105        public void clearOutput() {
106            getOutputs().clear();
107            blocks.clear();
108        }
109    
110        public void addRoutes(RouteContext routeContext, Collection<Route> routes) throws Exception {
111            Processor processor = makeProcessor(routeContext);
112            if (processor == null) {
113                // no processor to add
114                return;
115            }
116    
117            if (!routeContext.isRouteAdded()) {
118                boolean endpointInterceptor = false;
119    
120                // are we routing to an endpoint interceptor, if so we should not add it as an event driven
121                // processor as we use the producer to trigger the interceptor
122                if (processor instanceof Channel) {
123                    Channel channel = (Channel) processor;
124                    Processor next = channel.getNextProcessor();
125                    if (next instanceof InterceptEndpointProcessor) {
126                        endpointInterceptor = true;
127                    }
128                }
129    
130                // only add regular processors as event driven
131                if (endpointInterceptor) {
132                    if (LOG.isDebugEnabled()) {
133                        LOG.debug("Endpoint interceptor should not be added as an event driven consumer route: " + processor);
134                    }
135                } else {
136                    if (LOG.isTraceEnabled()) {
137                        LOG.trace("Adding event driven processor: " + processor);
138                    }
139                    routeContext.addEventDrivenProcessor(processor);
140                }
141    
142            }
143        }
144    
145        /**
146         * Wraps the child processor in whatever necessary interceptors and error handlers
147         */
148        public Processor wrapProcessor(RouteContext routeContext, Processor processor) throws Exception {
149            // dont double wrap
150            if (processor instanceof Channel) {
151                return processor;
152            }
153            return wrapChannel(routeContext, processor);
154        }
155    
156        protected Processor wrapChannel(RouteContext routeContext, Processor processor) throws Exception {
157            // put a channel inbetween this and each output to control the route flow logic
158            Channel channel = createChannel(routeContext);
159            channel.setNextProcessor(processor);
160    
161            // add interceptor strategies to the channel
162            channel.addInterceptStrategies(routeContext.getCamelContext().getInterceptStrategies());
163            channel.addInterceptStrategies(routeContext.getInterceptStrategies());
164            channel.addInterceptStrategies(this.getInterceptStrategies());
165    
166            // init the channel
167            channel.initChannel(this, routeContext);
168    
169            // set the error handler, must be done after init as we can set the error handler as first in the chain
170            if (this instanceof TryDefinition || this instanceof CatchDefinition || this instanceof FinallyDefinition) {
171                // do not use error handler for try .. catch .. finally blocks as it will handle errors itself
172                return channel;
173            } else {
174                // regular definition so add the error handler
175                Processor errorHandler = getErrorHandlerBuilder().createErrorHandler(routeContext, channel.getOutput());
176                channel.setErrorHandler(errorHandler);
177                return channel;
178            }
179        }
180    
181        /**
182         * Creates a new instance of some kind of composite processor which defaults
183         * to using a {@link Pipeline} but derived classes could change the behaviour
184         */
185        protected Processor createCompositeProcessor(RouteContext routeContext, List<Processor> list) {
186            return new Pipeline(list);
187        }
188    
189        /**
190         * Creates a new instance of the {@link Channel}.
191         */
192        protected Channel createChannel(RouteContext routeContext) {
193            return new DefaultChannel();
194        }
195    
196        protected Processor createOutputsProcessor(RouteContext routeContext, Collection<ProcessorDefinition> outputs) throws Exception {
197            List<Processor> list = new ArrayList<Processor>();
198            for (ProcessorDefinition output : outputs) {
199                Processor processor = output.createProcessor(routeContext);
200                if (output instanceof Channel && processor == null) {
201                    continue;
202                }
203    
204                Processor channel = wrapChannel(routeContext, processor);
205                list.add(channel);
206            }
207    
208            // if more than one output wrap than in a composite processor else just keep it as is
209            Processor processor = null;
210            if (!list.isEmpty()) {
211                if (list.size() == 1) {
212                    processor = list.get(0);
213                } else {
214                    processor = createCompositeProcessor(routeContext, list);
215                }
216            }
217    
218            return processor;
219        }
220    
221        /**
222         * Creates the processor and wraps it in any necessary interceptors and error handlers
223         */
224        protected Processor makeProcessor(RouteContext routeContext) throws Exception {
225            Processor processor = createProcessor(routeContext);
226            if (processor == null) {
227                // no processor to make
228                return null;
229            }
230            return wrapProcessor(routeContext, processor);
231        }
232    
233        protected ErrorHandlerBuilder createErrorHandlerBuilder() {
234            if (errorHandlerRef != null) {
235                return new ErrorHandlerBuilderRef(errorHandlerRef);
236            }
237    
238            // return a reference to the default error handler
239            return new ErrorHandlerBuilderRef(ErrorHandlerBuilderRef.DEFAULT_ERROR_HANDLER_BUILDER);
240        }
241    
242        protected void configureChild(ProcessorDefinition output) {
243            output.setNodeFactory(getNodeFactory());
244            output.setErrorHandlerBuilder(getErrorHandlerBuilder());
245        }
246    
247        // Fluent API
248        // -------------------------------------------------------------------------
249    
250        /**
251         * Sends the exchange to the given endpoint
252         *
253         * @param uri  the endpoint to send to
254         * @return the builder
255         */
256        @SuppressWarnings("unchecked")
257        public Type to(String uri) {
258            addOutput(new ToDefinition(uri));
259            return (Type) this;
260        }   
261        
262        /**
263         * Sends the exchange to the given endpoint
264         *
265         * @param uri  the String formatted endpoint uri to send to
266         * @param args arguments for the string formatting of the uri
267         * @return the builder
268         */
269        @SuppressWarnings("unchecked")
270        public Type toF(String uri, Object... args) {
271            addOutput(new ToDefinition(String.format(uri, args)));
272            return (Type) this;
273        }
274    
275    
276        /**
277         * Sends the exchange to the given endpoint
278         *
279         * @param endpoint  the endpoint to send to
280         * @return the builder
281         */
282        @SuppressWarnings("unchecked")
283        public Type to(Endpoint endpoint) {
284            addOutput(new ToDefinition(endpoint));
285            return (Type) this;
286        }
287        
288        /**
289         * Sends the exchange with certain exchange pattern to the given endpoint
290         *
291         * @param pattern the pattern to use for the message exchange
292         * @param uri  the endpoint to send to
293         * @return the builder
294         */
295        @SuppressWarnings("unchecked")
296        public Type to(ExchangePattern pattern, String uri) {
297            addOutput(new ToDefinition(uri, pattern));
298            return (Type) this;
299        }   
300        
301    
302        /**
303         * Sends the exchange with certain exchange pattern to the given endpoint
304         *
305         * @param pattern the pattern to use for the message exchange
306         * @param endpoint  the endpoint to send to
307         * @return the builder
308         */
309        @SuppressWarnings("unchecked")
310        public Type to(ExchangePattern pattern, Endpoint endpoint) {
311            addOutput(new ToDefinition(endpoint, pattern));
312            return (Type) this;
313        }
314    
315        /**
316         * Sends the exchange to a list of endpoints
317         *
318         * @param uris  list of endpoints to send to
319         * @return the builder
320         */
321        @SuppressWarnings("unchecked")
322        public Type to(String... uris) {
323            for (String uri : uris) {
324                addOutput(new ToDefinition(uri));
325            }
326            return (Type) this;
327        }
328    
329    
330        /**
331         * Sends the exchange to a list of endpoints
332         *
333         * @param endpoints  list of endpoints to send to
334         * @return the builder
335         */
336        @SuppressWarnings("unchecked")
337        public Type to(Endpoint... endpoints) {
338            for (Endpoint endpoint : endpoints) {
339                addOutput(new ToDefinition(endpoint));
340            }
341            return (Type) this;
342        }
343    
344        /**
345         * Sends the exchange to a list of endpoints
346         *
347         * @param endpoints  list of endpoints to send to
348         * @return the builder
349         */
350        @SuppressWarnings("unchecked")
351        public Type to(Iterable<Endpoint> endpoints) {
352            for (Endpoint endpoint : endpoints) {
353                addOutput(new ToDefinition(endpoint));
354            }
355            return (Type) this;
356        }
357        
358        
359        /**
360         * Sends the exchange to a list of endpoints
361         *
362         * @param pattern the pattern to use for the message exchanges
363         * @param uris  list of endpoints to send to
364         * @return the builder
365         */
366        @SuppressWarnings("unchecked")
367        public Type to(ExchangePattern pattern, String... uris) {
368            for (String uri : uris) {
369                addOutput(new ToDefinition(uri, pattern));
370            }
371            return (Type) this;
372        }
373    
374        /**
375         * Sends the exchange to a list of endpoints
376         *
377         * @param pattern the pattern to use for the message exchanges
378         * @param endpoints  list of endpoints to send to
379         * @return the builder
380         */
381        @SuppressWarnings("unchecked")
382        public Type to(ExchangePattern pattern, Endpoint... endpoints) {
383            for (Endpoint endpoint : endpoints) {
384                addOutput(new ToDefinition(endpoint, pattern));
385            }
386            return (Type) this;
387        }
388    
389        /**
390         * Sends the exchange to a list of endpoints
391         *
392         * @param pattern the pattern to use for the message exchanges
393         * @param endpoints  list of endpoints to send to
394         * @return the builder
395         */
396        @SuppressWarnings("unchecked")
397        public Type to(ExchangePattern pattern, Iterable<Endpoint> endpoints) {
398            for (Endpoint endpoint : endpoints) {
399                addOutput(new ToDefinition(endpoint, pattern));
400            }
401            return (Type) this;
402        }
403    
404    
405        /**
406         * <a href="http://camel.apache.org/exchange-pattern.html">ExchangePattern:</a>
407         * set the ExchangePattern {@link ExchangePattern} into the exchange
408         *
409         * @param exchangePattern  instance of {@link ExchangePattern}
410         * @return the builder
411         */
412        @SuppressWarnings("unchecked")
413        public Type setExchangePattern(ExchangePattern exchangePattern) {
414            addOutput(new SetExchangePatternDefinition(exchangePattern));
415            return (Type) this;
416        }
417    
418        /**
419         * <a href="http://camel.apache.org/exchange-pattern.html">ExchangePattern:</a>
420         * set the exchange's ExchangePattern {@link ExchangePattern} to be InOnly
421         *
422         *
423         * @return the builder
424         */
425        public Type inOnly() {
426            return setExchangePattern(ExchangePattern.InOnly);
427        }
428    
429        /**
430         * Sends the message to the given endpoint using an
431         * <a href="http://camel.apache.org/event-message.html">Event Message</a> or
432         * <a href="http://camel.apache.org/exchange-pattern.html">InOnly exchange pattern</a>
433         *
434         * @param uri The endpoint uri which is used for sending the exchange
435         * @return the builder
436         */
437        public Type inOnly(String uri) {
438            return to(ExchangePattern.InOnly, uri);
439        }
440    
441        /**
442         * Sends the message to the given endpoint using an
443         * <a href="http://camel.apache.org/event-message.html">Event Message</a> or 
444         * <a href="http://camel.apache.org/exchange-pattern.html">InOnly exchange pattern</a>
445         *
446         * @param endpoint The endpoint which is used for sending the exchange
447         * @return the builder
448         */
449        public Type inOnly(Endpoint endpoint) {
450            return to(ExchangePattern.InOnly, endpoint);
451        }
452    
453    
454        /**
455         * Sends the message to the given endpoints using an
456         * <a href="http://camel.apache.org/event-message.html">Event Message</a> or
457         * <a href="http://camel.apache.org/exchange-pattern.html">InOnly exchange pattern</a>
458         *
459         * @param uris  list of endpoints to send to
460         * @return the builder
461         */
462        public Type inOnly(String... uris) {
463            return to(ExchangePattern.InOnly, uris);
464        }
465    
466    
467        /**
468         * Sends the message to the given endpoints using an
469         * <a href="http://camel.apache.org/event-message.html">Event Message</a> or
470         * <a href="http://camel.apache.org/exchange-pattern.html">InOnly exchange pattern</a>
471         *
472         * @param endpoints  list of endpoints to send to
473         * @return the builder
474         */
475        public Type inOnly(Endpoint... endpoints) {
476            return to(ExchangePattern.InOnly, endpoints);
477        }
478    
479        /**
480         * Sends the message to the given endpoints using an
481         * <a href="http://camel.apache.org/event-message.html">Event Message</a> or
482         * <a href="http://camel.apache.org/exchange-pattern.html">InOnly exchange pattern</a>
483         *
484         * @param endpoints  list of endpoints to send to
485         * @return the builder
486         */
487        public Type inOnly(Iterable<Endpoint> endpoints) {
488            return to(ExchangePattern.InOnly, endpoints);
489        }
490    
491    
492        /**
493         * <a href="http://camel.apache.org/exchange-pattern.html">ExchangePattern:</a>
494         * set the exchange's ExchangePattern {@link ExchangePattern} to be InOut
495         *
496         *
497         * @return the builder
498         */
499        public Type inOut() {
500            return setExchangePattern(ExchangePattern.InOut);
501        }
502    
503        /**
504         * Sends the message to the given endpoint using an
505         * <a href="http://camel.apache.org/request-reply.html">Request Reply</a> or
506         * <a href="http://camel.apache.org/exchange-pattern.html">InOut exchange pattern</a>
507         *
508         * @param uri The endpoint uri which is used for sending the exchange
509         * @return the builder
510         */
511        public Type inOut(String uri) {
512            return to(ExchangePattern.InOut, uri);
513        }
514    
515    
516        /**
517         * Sends the message to the given endpoint using an
518         * <a href="http://camel.apache.org/request-reply.html">Request Reply</a> or
519         * <a href="http://camel.apache.org/exchange-pattern.html">InOut exchange pattern</a>
520         *
521         * @param endpoint The endpoint which is used for sending the exchange
522         * @return the builder
523         */
524        public Type inOut(Endpoint endpoint) {
525            return to(ExchangePattern.InOut, endpoint);
526        }
527    
528        /**
529         * Sends the message to the given endpoints using an
530         * <a href="http://camel.apache.org/request-reply.html">Request Reply</a> or
531         * <a href="http://camel.apache.org/exchange-pattern.html">InOut exchange pattern</a>
532         *
533         * @param uris  list of endpoints to send to
534         * @return the builder
535         */
536        public Type inOut(String... uris) {
537            return to(ExchangePattern.InOut, uris);
538        }
539    
540    
541        /**
542         * Sends the message to the given endpoints using an
543         * <a href="http://camel.apache.org/request-reply.html">Request Reply</a> or
544         * <a href="http://camel.apache.org/exchange-pattern.html">InOut exchange pattern</a>
545         *
546         * @param endpoints  list of endpoints to send to
547         * @return the builder
548         */
549        public Type inOut(Endpoint... endpoints) {
550            return to(ExchangePattern.InOut, endpoints);
551        }
552    
553        /**
554         * Sends the message to the given endpoints using an
555         * <a href="http://camel.apache.org/request-reply.html">Request Reply</a> or
556         * <a href="http://camel.apache.org/exchange-pattern.html">InOut exchange pattern</a>
557         *
558         * @param endpoints  list of endpoints to send to
559         * @return the builder
560         */
561        public Type inOut(Iterable<Endpoint> endpoints) {
562            return to(ExchangePattern.InOut, endpoints);
563        }
564    
565        /**
566         * Sets the id of this node
567         *
568         * @param id  the id
569         * @return the builder
570         */
571        @SuppressWarnings("unchecked")
572        public Type id(String id) {
573            if (getOutputs().isEmpty()) {
574                // set id on this
575                setId(id);
576            } else {
577                // set it on last output as this is what the user means to do
578                getOutputs().get(getOutputs().size() - 1).setId(id);
579            }
580    
581            return (Type) this;
582        }
583    
584        /**
585         * <a href="http://camel.apache.org/multicast.html">Multicast EIP:</a>
586         * Multicasts messages to all its child outputs; so that each processor and
587         * destination gets a copy of the original message to avoid the processors
588         * interfering with each other.
589         *
590         * @return the builder
591         */
592        public MulticastDefinition multicast() {
593            MulticastDefinition answer = new MulticastDefinition();
594            addOutput(answer);
595            return answer;
596        }
597    
598        /**
599         * <a href="http://camel.apache.org/multicast.html">Multicast EIP:</a>
600         * Multicasts messages to all its child outputs; so that each processor and
601         * destination gets a copy of the original message to avoid the processors
602         * interfering with each other.
603         *
604         * @param aggregationStrategy the strategy used to aggregate responses for
605         *          every part
606         * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer
607         * @return the builder
608         */
609        public MulticastDefinition multicast(AggregationStrategy aggregationStrategy, boolean parallelProcessing) {
610            MulticastDefinition answer = new MulticastDefinition();
611            addOutput(answer);
612            answer.setAggregationStrategy(aggregationStrategy);
613            answer.setParallelProcessing(parallelProcessing);
614            return answer;
615        }
616    
617        /**
618         * <a href="http://camel.apache.org/multicast.html">Multicast EIP:</a>
619         * Multicasts messages to all its child outputs; so that each processor and
620         * destination gets a copy of the original message to avoid the processors
621         * interfering with each other.
622         *
623         * @param aggregationStrategy the strategy used to aggregate responses for
624         *          every part
625         * @return the builder
626         */
627        public MulticastDefinition multicast(AggregationStrategy aggregationStrategy) {
628            MulticastDefinition answer = new MulticastDefinition();
629            addOutput(answer);
630            answer.setAggregationStrategy(aggregationStrategy);
631            return answer;
632        }
633    
634        /**
635         * <a href="http://camel.apache.org/pipes-nd-filters.html">Pipes and Filters EIP:</a>
636         * Creates a {@link Pipeline} of the list of endpoints so that the message
637         * will get processed by each endpoint in turn and for request/response the
638         * output of one endpoint will be the input of the next endpoint
639         *
640         * @param uris  list of endpoints
641         * @return the builder
642         */
643        public Type pipeline(String... uris) {
644            return to(uris);
645        }
646    
647        /**
648         * <a href="http://camel.apache.org/pipes-nd-filters.html">Pipes and Filters EIP:</a>
649         * Creates a {@link Pipeline} of the list of endpoints so that the message
650         * will get processed by each endpoint in turn and for request/response the
651         * output of one endpoint will be the input of the next endpoint
652         *
653         * @param endpoints  list of endpoints
654         * @return the builder
655         */
656        public Type pipeline(Endpoint... endpoints) {
657            return to(endpoints);
658        }
659    
660        /**
661         * <a href="http://camel.apache.org/pipes-nd-filters.html">Pipes and Filters EIP:</a>
662         * Creates a {@link Pipeline} of the list of endpoints so that the message
663         * will get processed by each endpoint in turn and for request/response the
664         * output of one endpoint will be the input of the next endpoint
665         *
666         * @param endpoints  list of endpoints
667         * @return the builder
668         */
669        public Type pipeline(Collection<Endpoint> endpoints) {
670            return to(endpoints);
671        }
672    
673        /**
674         * Leverages a thread pool for multi threading processing exchanges.
675         * <p/>
676         * The caller thread will either wait for the async route
677         * to complete or imeddiately continue. If continue the OUT message will
678         * contain a {@link java.util.concurrent.Future} handle so you can get the real response
679         * later using this handle.
680         * <p/>
681         * Will default <tt>Always</tt> wait for the async route to complete, but this behavior can be overriden by:
682         * <ul>
683         *   <li>Configuring the <tt>waitForTaskToComplete</tt> option</li>
684         *   <li>Provide an IN header with the key {@link org.apache.camel.Exchange#ASYNC_WAIT} with the
685         * value containing a type {@link org.apache.camel.WaitForTaskToComplete}. The header will take precedence, if provided.</li>
686         * </ul>
687         *
688         * @return the builder
689         */
690        public ThreadsDefinition threads() {
691            ThreadsDefinition answer = new ThreadsDefinition();
692            addOutput(answer);
693            return answer;
694        }
695    
696        /**
697         * Leverages a thread pool for multi threading processing exchanges.
698         * <p/>
699         * See {@link #threads()} for more details.
700         *
701         * @param poolSize the core pool size
702         * @return the builder
703         */
704        public ThreadsDefinition threads(int poolSize) {
705            ThreadsDefinition answer = threads();
706            answer.setPoolSize(poolSize);
707            return answer;
708        }
709    
710        /**
711         * Ends the current block
712         *
713         * @return the builder
714         */
715        @SuppressWarnings("unchecked")
716        public ProcessorDefinition<? extends ProcessorDefinition> end() {
717            if (blocks.isEmpty()) {
718                if (parent == null) {
719                    throw new IllegalArgumentException("Root node with no active block");
720                }
721                return parent;
722            }
723            popBlock();
724            return this;
725        }
726    
727        /**
728         * <a href="http://camel.apache.org/idempotent-consumer.html">Idempotent consumer EIP:</a>
729         * Creates an {@link org.apache.camel.processor.idempotent.IdempotentConsumer IdempotentConsumer}
730         * to avoid duplicate messages
731         *      
732         * @return the builder
733         */
734        public IdempotentConsumerDefinition idempotentConsumer() {
735            IdempotentConsumerDefinition answer = new IdempotentConsumerDefinition();
736            addOutput(answer);
737            return answer;
738        }
739    
740        /**
741         * <a href="http://camel.apache.org/idempotent-consumer.html">Idempotent consumer EIP:</a>
742         * Creates an {@link org.apache.camel.processor.idempotent.IdempotentConsumer IdempotentConsumer}
743         * to avoid duplicate messages
744         *
745         * @param messageIdExpression  expression to test of duplicate messages
746         * @param idempotentRepository  the repository to use for duplicate chedck
747         * @return the builder
748         */
749        public IdempotentConsumerDefinition idempotentConsumer(Expression messageIdExpression,
750                IdempotentRepository idempotentRepository) {
751            IdempotentConsumerDefinition answer = new IdempotentConsumerDefinition(messageIdExpression, idempotentRepository);
752            addOutput(answer);
753            return answer;
754        }
755    
756        /**
757         * <a href="http://camel.apache.org/idempotent-consumer.html">Idempotent consumer EIP:</a>
758         * Creates an {@link org.apache.camel.processor.idempotent.IdempotentConsumer IdempotentConsumer}
759         * to avoid duplicate messages
760         *
761         * @param idempotentRepository the repository to use for duplicate chedck
762         * @return the builder used to create the expression
763         */
764        public ExpressionClause<IdempotentConsumerDefinition> idempotentConsumer(IdempotentRepository idempotentRepository) {
765            IdempotentConsumerDefinition answer = new IdempotentConsumerDefinition();
766            answer.setMessageIdRepository(idempotentRepository);
767            addOutput(answer);
768            return ExpressionClause.createAndSetExpression(answer);
769        }
770    
771        /**
772         * <a href="http://camel.apache.org/message-filter.html">Message Filter EIP:</a>
773         * Creates a predicate expression which only if it is <tt>true</tt> then the
774         * exchange is forwarded to the destination
775         *
776         * @return the clause used to create the filter expression
777         */
778        public ExpressionClause<FilterDefinition> filter() {
779            FilterDefinition filter = new FilterDefinition();
780            addOutput(filter);
781            return ExpressionClause.createAndSetExpression(filter);
782        }
783    
784        /**
785         * <a href="http://camel.apache.org/message-filter.html">Message Filter EIP:</a>
786         * Creates a predicate which is applied and only if it is <tt>true</tt> then the
787         * exchange is forwarded to the destination
788         *
789         * @param predicate  predicate to use
790         * @return the builder 
791         */
792        public FilterDefinition filter(Predicate predicate) {
793            FilterDefinition filter = new FilterDefinition(predicate);
794            addOutput(filter);
795            return filter;
796        }
797    
798        /**
799         * <a href="http://camel.apache.org/message-filter.html">Message Filter EIP:</a>
800         * Creates a predicate expression which only if it is <tt>true</tt> then the
801         * exchange is forwarded to the destination
802         *
803         * @param expression  the predicate expression to use
804         * @return the builder
805         */
806        public FilterDefinition filter(ExpressionDefinition expression) {
807            FilterDefinition filter = getNodeFactory().createFilter();
808            filter.setExpression(expression);
809            addOutput(filter);
810            return filter;
811        }
812    
813        /**
814         * <a href="http://camel.apache.org/message-filter.html">Message Filter EIP:</a>
815         * Creates a predicate language expression which only if it is <tt>true</tt> then the
816         * exchange is forwarded to the destination
817         *
818         * @param language     language for expression
819         * @param expression   the expression
820         * @return the builder
821         */
822        public FilterDefinition filter(String language, String expression) {
823            return filter(new LanguageExpression(language, expression));
824        }
825    
826        /**
827         * <a href="http://camel.apache.org/load-balancer.html">Load Balancer EIP:</a>
828         * Creates a loadbalance
829         *
830         * @return  the builder
831         */
832        public LoadBalanceDefinition loadBalance() {
833            LoadBalanceDefinition answer = new LoadBalanceDefinition();
834            addOutput(answer);
835            return answer;
836        }
837    
838        /**
839         * <a href="http://camel.apache.org/load-balancer.html">Load Balancer EIP:</a>
840         * Creates a loadbalance
841         *
842         * @param loadBalancer a custom load balancer to use
843         * @return  the builder
844         */
845        public LoadBalanceDefinition loadBalance(LoadBalancer loadBalancer) {
846            LoadBalanceDefinition answer = new LoadBalanceDefinition();
847            addOutput(answer);
848            return answer.loadBalance(loadBalancer);
849        }
850    
851        /**
852         * <a href="http://camel.apache.org/content-based-router.html">Content Based Router EIP:</a>
853         * Creates a choice of one or more predicates with an otherwise clause
854         *
855         * @return the builder for a choice expression
856         */
857        public ChoiceDefinition choice() {
858            ChoiceDefinition answer = new ChoiceDefinition();
859            addOutput(answer);
860            return answer;
861        }
862    
863        /**
864         * Creates a try/catch block
865         *
866         * @return the builder for a tryBlock expression
867         */
868        public TryDefinition doTry() {
869            TryDefinition answer = new TryDefinition();
870            addOutput(answer);
871            return answer;
872        }
873    
874        /**
875         * <a href="http://camel.apache.org/recipient-list.html">Recipient List EIP:</a>
876         * Creates a dynamic recipient list allowing you to route messages to a number of dynamically specified recipients
877         *
878         * @param recipients expression to decide the destinations
879         * @return the builder
880         */
881        @SuppressWarnings("unchecked")
882        public Type recipientList(Expression recipients) {
883            RecipientListDefinition answer = new RecipientListDefinition(recipients);
884            addOutput(answer);
885            return (Type) this;
886        }
887    
888        /**
889         * <a href="http://camel.apache.org/recipient-list.html">Recipient List EIP:</a>
890         * Creates a dynamic recipient list allowing you to route messages to a number of dynamically specified recipients
891         *
892         * @return the expression clause to configure the expression to decide the destinations
893         */
894        public ExpressionClause<ProcessorDefinition<Type>> recipientList() {
895            RecipientListDefinition answer = new RecipientListDefinition();
896            addOutput(answer);
897            ExpressionClause<ProcessorDefinition<Type>> clause = new ExpressionClause<ProcessorDefinition<Type>>(this);
898            answer.setExpression(clause);
899            return clause;
900        }
901    
902        /**
903         * <a href="http://camel.apache.org/routing-slip.html">Routing Slip EIP:</a>
904         * Creates a routing slip allowing you to route a message consecutively through a series of processing
905         * steps where the sequence of steps is not known at design time and can vary for each message.
906         *
907         * @param header  is the header that the {@link org.apache.camel.processor.RoutingSlip RoutingSlip}
908         *                class will look in for the list of URIs to route the message to.
909         * @param uriDelimiter  is the delimiter that will be used to split up
910         *                      the list of URIs in the routing slip.
911         * @return the buiider
912         */
913        @SuppressWarnings("unchecked")
914        public Type routingSlip(String header, String uriDelimiter) {
915            RoutingSlipDefinition answer = new RoutingSlipDefinition(header, uriDelimiter);
916            addOutput(answer);
917            return (Type) this;
918        }
919    
920        /**
921         * <a href="http://camel.apache.org/routing-slip.html">Routing Slip EIP:</a>
922         * Creates a routing slip allowing you to route a message consecutively through a series of processing
923         * steps where the sequence of steps is not known at design time and can vary for each message.
924         * <p>
925         * The list of URIs will be split based on the default delimiter {@link RoutingSlipDefinition#DEFAULT_DELIMITER}
926         *
927         * @param header  is the header that the {@link org.apache.camel.processor.RoutingSlip RoutingSlip}
928         *                class will look in for the list of URIs to route the message to.
929         * @return the builder
930         */
931        @SuppressWarnings("unchecked")
932        public Type routingSlip(String header) {
933            RoutingSlipDefinition answer = new RoutingSlipDefinition(header);
934            addOutput(answer);
935            return (Type) this;
936        }
937    
938        /**
939         * <a href="http://camel.apache.org/splitter.html">Splitter EIP:</a>
940         * Creates a splitter allowing you split a message into a number of pieces and process them individually.
941         * <p>
942         * This splitter responds with the latest message returned from destination
943         * endpoint.
944         *
945         * @return the expression clause builder for the expression on which to split
946         */
947        public ExpressionClause<SplitDefinition> split() {
948            SplitDefinition answer = new SplitDefinition();
949            addOutput(answer);
950            return ExpressionClause.createAndSetExpression(answer);
951        }
952    
953        /**
954         * <a href="http://camel.apache.org/splitter.html">Splitter EIP:</a>
955         * Creates a splitter allowing you split a message into a number of pieces and process them individually.
956         * <p>
957         * This splitter responds with the latest message returned from destination
958         * endpoint.
959         *
960         * @param expression  the expression on which to split the message
961         * @return the builder
962         */
963        public SplitDefinition split(Expression expression) {
964            SplitDefinition answer = new SplitDefinition(expression);
965            addOutput(answer);
966            return answer;
967        }
968    
969        /**
970         * <a href="http://camel.apache.org/splitter.html">Splitter EIP:</a>
971         * Creates a splitter allowing you split a message into a number of pieces and process them individually.
972         * <p>
973         * The splitter responds with the answer produced by the given {@link AggregationStrategy}.
974         *
975         * @param expression  the expression on which to split
976         * @param aggregationStrategy  the strategy used to aggregate responses for every part
977         * @return the builder
978         */
979        public SplitDefinition split(Expression expression, AggregationStrategy aggregationStrategy) {
980            SplitDefinition answer = new SplitDefinition(expression);
981            addOutput(answer);
982            answer.setAggregationStrategy(aggregationStrategy);
983            return answer;
984        }
985    
986        /**
987         * <a href="http://camel.apache.org/resequencer.html">Resequencer EIP:</a>
988         * Creates a resequencer allowing you to reorganize messages based on some comparator.
989         *
990         * @return the expression clause for the expressions on which to compare messages in order
991         */
992        public ExpressionClause<ResequenceDefinition> resequence() {
993            ResequenceDefinition answer = new ResequenceDefinition();
994            addOutput(answer);
995            ExpressionClause<ResequenceDefinition> clause = new ExpressionClause<ResequenceDefinition>(answer);
996            answer.expression(clause);
997            return clause;
998        }
999    
1000        /**
1001         * <a href="http://camel.apache.org/resequencer.html">Resequencer EIP:</a>
1002         * Creates a resequencer allowing you to reorganize messages based on some comparator.
1003         *
1004         * @param expression the expression on which to compare messages in order
1005         * @return the builder
1006         */
1007        public ResequenceDefinition resequence(Expression expression) {
1008            return resequence(Collections.<Expression>singletonList(expression));
1009        }
1010    
1011        /**
1012         * <a href="http://camel.apache.org/resequencer.html">Resequencer EIP:</a>
1013         * Creates a resequencer allowing you to reorganize messages based on some comparator.
1014         *
1015         * @param expressions the list of expressions on which to compare messages in order
1016         * @return the builder
1017         */
1018        public ResequenceDefinition resequence(List<Expression> expressions) {
1019            ResequenceDefinition answer = new ResequenceDefinition(expressions);
1020            addOutput(answer);
1021            return answer;
1022        }
1023    
1024        /**
1025         * <a href="http://camel.apache.org/resequencer.html">Resequencer EIP:</a>
1026         * Creates a splitter allowing you to reorganise messages based on some comparator.
1027         *
1028         * @param expressions the list of expressions on which to compare messages in order
1029         * @return the builder
1030         */
1031        public ResequenceDefinition resequencer(Expression... expressions) {
1032            List<Expression> list = new ArrayList<Expression>();
1033            list.addAll(Arrays.asList(expressions));
1034            return resequence(list);
1035        }
1036    
1037        /**
1038         * <a href="http://camel.apache.org/aggregator.html">Aggregator EIP:</a>
1039         * Creates an aggregator allowing you to combine a number of messages together into a single message.
1040         *
1041         * @return the expression clause to be used as builder to configure the correlation expression
1042         */
1043        public ExpressionClause<AggregateDefinition> aggregate() {
1044            AggregateDefinition answer = new AggregateDefinition();
1045            addOutput(answer);
1046            return answer.createAndSetExpression();
1047        }
1048    
1049        /**
1050         * <a href="http://camel.apache.org/aggregator.html">Aggregator EIP:</a>
1051         * Creates an aggregator allowing you to combine a number of messages together into a single message.
1052         *
1053         * @param aggregationStrategy the strategy used for the aggregation
1054         * @return the expression clause to be used as builder to configure the correlation expression
1055         */
1056        public ExpressionClause<AggregateDefinition> aggregate(AggregationStrategy aggregationStrategy) {
1057            AggregateDefinition answer = new AggregateDefinition();
1058            answer.setAggregationStrategy(aggregationStrategy);
1059            addOutput(answer);
1060            return answer.createAndSetExpression();
1061        }
1062    
1063        /**
1064         * <a href="http://camel.apache.org/aggregator.html">Aggregator EIP:</a>
1065         * Creates an aggregator allowing you to combine a number of messages together into a single message.
1066         *
1067         * @param aggregationCollection the collection used to perform the aggregation
1068         * @return the builder
1069         */
1070        public AggregateDefinition aggregate(AggregationCollection aggregationCollection) {
1071            AggregateDefinition answer = new AggregateDefinition();
1072            answer.setAggregationCollection(aggregationCollection);
1073            addOutput(answer);
1074            return answer;
1075        }
1076    
1077        /**
1078         * <a href="http://camel.apache.org/aggregator.html">Aggregator EIP:</a>
1079         * Creates an aggregator allowing you to combine a number of messages together into a single message.
1080         *
1081         * @param correlationExpression the expression used to calculate the
1082         *                              correlation key. For a JMS message this could be the
1083         *                              expression <code>header("JMSDestination")</code> or
1084         *                              <code>header("JMSCorrelationID")</code>
1085         * @return the builder
1086         */
1087        public AggregateDefinition aggregate(Expression correlationExpression) {
1088            AggregateDefinition answer = new AggregateDefinition(correlationExpression);
1089            addOutput(answer);
1090            return answer;
1091        }
1092    
1093        /**
1094         * <a href="http://camel.apache.org/aggregator.html">Aggregator EIP:</a>
1095         * Creates an aggregator allowing you to combine a number of messages together into a single message.
1096         *
1097         * @param correlationExpression the expression used to calculate the
1098         *                              correlation key. For a JMS message this could be the
1099         *                              expression <code>header("JMSDestination")</code> or
1100         *                              <code>header("JMSCorrelationID")</code>
1101         * @param aggregationStrategy the strategy used for the aggregation
1102         * @return the builder
1103         */
1104        public AggregateDefinition aggregate(Expression correlationExpression, AggregationStrategy aggregationStrategy) {
1105            AggregateDefinition answer = new AggregateDefinition(correlationExpression, aggregationStrategy);
1106            addOutput(answer);
1107            return answer;
1108        }
1109    
1110        /**
1111         * <a href="http://camel.apache.org/delayer.html">Delayer EIP:</a>
1112         * Creates a delayer allowing you to delay the delivery of messages to some destination.
1113         *
1114         * @param delay  an expression to calculate the delay time in millis
1115         * @return the builder
1116         */
1117        public DelayDefinition delay(Expression delay) {
1118            DelayDefinition answer = new DelayDefinition(delay);
1119            addOutput(answer);
1120            return answer;
1121        }
1122    
1123        /**
1124         * <a href="http://camel.apache.org/delayer.html">Delayer EIP:</a>
1125         * Creates a delayer allowing you to delay the delivery of messages to some destination.
1126         *
1127         * @return the expression clause to create the expression
1128         */
1129        public ExpressionClause<DelayDefinition> delay() {
1130            DelayDefinition answer = new DelayDefinition();
1131            addOutput(answer);
1132            return ExpressionClause.createAndSetExpression(answer);
1133        }
1134    
1135        /**
1136         * <a href="http://camel.apache.org/delayer.html">Delayer EIP:</a>
1137         * Creates a delayer allowing you to delay the delivery of messages to some destination.
1138         *
1139         * @param delay  the delay in millis
1140         * @return the builder
1141         */
1142        public DelayDefinition delay(long delay) {
1143            return delay(ExpressionBuilder.constantExpression(delay));
1144        }
1145    
1146        /**
1147         * <a href="http://camel.apache.org/throttler.html">Throttler EIP:</a>
1148         * Creates a throttler allowing you to ensure that a specific endpoint does not get overloaded,
1149         * or that we don't exceed an agreed SLA with some external service.
1150         * <p/>
1151         * Will default use a time period of 1 second, so setting the maximumRequestCount to eg 10
1152         * will default ensure at most 10 messages per second. 
1153         *
1154         * @param maximumRequestCount  the maximum messages 
1155         * @return the builder
1156         */
1157        public ThrottleDefinition throttle(long maximumRequestCount) {
1158            ThrottleDefinition answer = new ThrottleDefinition(maximumRequestCount);
1159            addOutput(answer);
1160            return answer;
1161        }
1162    
1163        /**
1164         * <a href="http://camel.apache.org/loop.html">Loop EIP:</a>
1165         * Creates a loop allowing to process the a message a number of times and possibly process them
1166         * in a different way. Useful mostly for testing.
1167         *
1168         * @return the clause used to create the loop expression
1169         */
1170        public ExpressionClause<LoopDefinition> loop() {
1171            LoopDefinition loop = new LoopDefinition();
1172            addOutput(loop);
1173            return ExpressionClause.createAndSetExpression(loop);
1174        }
1175    
1176        /**
1177         * <a href="http://camel.apache.org/loop.html">Loop EIP:</a>
1178         * Creates a loop allowing to process the a message a number of times and possibly process them
1179         * in a different way. Useful mostly for testing.
1180         *
1181         * @param expression the loop expression
1182         * @return the builder
1183         */
1184        public LoopDefinition loop(Expression expression) {
1185            LoopDefinition loop = getNodeFactory().createLoop();
1186            loop.setExpression(expression);
1187            addOutput(loop);
1188            return loop;
1189        }
1190    
1191        /**
1192         * <a href="http://camel.apache.org/loop.html">Loop EIP:</a>
1193         * Creates a loop allowing to process the a message a number of times and possibly process them
1194         * in a different way. Useful mostly for testing.
1195         *
1196         * @param count  the number of times
1197         * @return the builder
1198         */
1199        public LoopDefinition loop(int count) {
1200            LoopDefinition loop = getNodeFactory().createLoop();
1201            loop.setExpression(new ConstantExpression(Integer.toString(count)));
1202            addOutput(loop);
1203            return loop;
1204        }
1205    
1206        /**
1207         * Sets the exception on the {@link org.apache.camel.Exchange}
1208         *
1209         * @param exception the exception to throw
1210         * @return the builder
1211         */
1212        @SuppressWarnings("unchecked")
1213        public Type throwException(Exception exception) {
1214            ThrowExceptionDefinition answer = new ThrowExceptionDefinition();
1215            answer.setException(exception);
1216            addOutput(answer);
1217            return (Type) this;
1218        }
1219    
1220        /**
1221         * Marks the exchange for rollback only.
1222         * <p/>
1223         * This is done by setting a {@link org.apache.camel.RollbackExchangeException} on the Exchange
1224         * and mark it for rollback.
1225         *
1226         * @return the builder
1227         */
1228        @SuppressWarnings("unchecked")
1229        public Type rollback() {
1230            return rollback(null);
1231        }
1232    
1233        /**
1234         * Marks the exchange for rollback only.
1235         * <p/>
1236         * This is done by setting a {@link org.apache.camel.RollbackExchangeException} on the Exchange
1237         * and mark it for rollback.
1238         *
1239         * @param message an optional message used for logging purpose why the rollback was triggered
1240         * @return the builder
1241         */
1242        @SuppressWarnings("unchecked")
1243        public Type rollback(String message) {
1244            RollbackDefinition answer = new RollbackDefinition(message);
1245            addOutput(answer);
1246            return (Type) this;
1247        }
1248    
1249        /**
1250         * <a href="http://camel.apache.org/wiretap.html">WireTap EIP:</a>
1251         * Sends messages to all its child outputs; so that each processor and
1252         * destination gets a copy of the original message to avoid the processors
1253         * interfering with each other using {@link ExchangePattern#InOnly}.
1254         *
1255         * @return the builder
1256         */
1257        @SuppressWarnings("unchecked")
1258        public Type wireTap(String uri) {
1259            WireTapDefinition answer = new WireTapDefinition();
1260            answer.setUri(uri);
1261            addOutput(answer);
1262            return (Type) this;
1263        }
1264    
1265        /**
1266         * <a href="http://camel.apache.org/wiretap.html">WireTap EIP:</a>
1267         * Sends a new {@link org.apache.camel.Exchange} to the destination
1268         * using {@link ExchangePattern#InOnly}.
1269         *
1270         * @param uri  the destination
1271         * @param body expression that creates the body to send
1272         * @return the builder
1273         */
1274        @SuppressWarnings("unchecked")
1275        public Type wireTap(String uri, Expression body) {
1276            WireTapDefinition answer = new WireTapDefinition();
1277            answer.setUri(uri);
1278            answer.setNewExchangeExpression(body);
1279            addOutput(answer);
1280            return (Type) this;
1281        }
1282    
1283        /**
1284         * <a href="http://camel.apache.org/wiretap.html">WireTap EIP:</a>
1285         * Sends a new {@link org.apache.camel.Exchange} to the destination
1286         * using {@link ExchangePattern#InOnly}.
1287         *
1288         * @param uri  the destination
1289         * @param processor  processor preparing the new exchange to send
1290         * @return the builder
1291         */
1292        @SuppressWarnings("unchecked")
1293        public Type wireTap(String uri, Processor processor) {
1294            WireTapDefinition answer = new WireTapDefinition();
1295            answer.setUri(uri);
1296            answer.setNewExchangeProcessor(processor);
1297            addOutput(answer);
1298            return (Type) this;
1299        }
1300    
1301        /**
1302         * Pushes the given block on the stack as current block
1303         * @param block  the block
1304         */
1305        void pushBlock(Block block) {
1306            blocks.add(block);
1307        }
1308    
1309        /**
1310         * Pops the block off the stack as current block
1311         * @return the block
1312         */
1313        Block popBlock() {
1314            return blocks.isEmpty() ? null : blocks.removeLast();
1315        }
1316    
1317        /**
1318         * Stops continue routing the current {@link org.apache.camel.Exchange} and marks it as completed.
1319         *
1320         * @return the builder
1321         */
1322        @SuppressWarnings("unchecked")
1323        public Type stop() {
1324            StopDefinition stop = new StopDefinition();
1325            addOutput(stop);
1326            return (Type) this;
1327        }
1328    
1329        /**
1330         * <a href="http://camel.apache.org/exception-clause.html">Exception clause</a>
1331         * for cathing certain exceptions and handling them.
1332         *
1333         * @param exceptionType  the exception to catch
1334         * @return the exception builder to configure
1335         */
1336        public OnExceptionDefinition onException(Class exceptionType) {
1337            OnExceptionDefinition answer = new OnExceptionDefinition(exceptionType);
1338            addOutput(answer);
1339            return answer;
1340        }
1341    
1342        /**
1343         * Apply a {@link Policy}.
1344         * <p/>
1345         * Policy can be used for transactional policies.
1346         *
1347         * @param policy  the policy to apply
1348         * @return the policy builder to configure
1349         */
1350        public PolicyDefinition policy(Policy policy) {
1351            PolicyDefinition answer = new PolicyDefinition(policy);
1352            addOutput(answer);
1353            return answer;
1354        }
1355    
1356        /**
1357         * Apply a {@link Policy}.
1358         * <p/>
1359         * Policy can be used for transactional policies.
1360         *
1361         * @param ref  reference to lookup a policy in the registry
1362         * @return the policy builder to configure
1363         */
1364        public PolicyDefinition policy(String ref) {
1365            PolicyDefinition answer = new PolicyDefinition();
1366            answer.setRef(ref);
1367            addOutput(answer);
1368            return answer;
1369        }
1370    
1371        /**
1372         * Marks this route as transacted and uses the default transacted policy found in the registry.
1373         *
1374         * @return the policy builder to configure
1375         */
1376        public PolicyDefinition transacted() {
1377            PolicyDefinition answer = new PolicyDefinition();
1378            answer.setType(TransactedPolicy.class);
1379            addOutput(answer);
1380            return answer;
1381        }
1382    
1383        /**
1384         * Marks this route as transacted.
1385         *
1386         * @param ref  reference to lookup a transacted policy in the registry
1387         * @return the policy builder to configure
1388         */
1389        public PolicyDefinition transacted(String ref) {
1390            PolicyDefinition answer = new PolicyDefinition();
1391            answer.setType(TransactedPolicy.class);
1392            answer.setRef(ref);
1393            addOutput(answer);
1394            return answer;
1395        }
1396    
1397        /**
1398         * Installs the given <a href="http://camel.apache.org/error-handler.html">error handler</a> builder.
1399         *
1400         * @param errorHandlerBuilder the error handler to be used by default for all child routes
1401         * @return the current builder with the error handler configured
1402         */
1403        @SuppressWarnings("unchecked")
1404        public Type errorHandler(ErrorHandlerBuilder errorHandlerBuilder) {
1405            setErrorHandlerBuilder(errorHandlerBuilder);
1406            return (Type) this;
1407        }
1408    
1409        // Transformers
1410        // -------------------------------------------------------------------------
1411    
1412        /**
1413         * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a>
1414         * Adds the custom processor to this destination which could be a final
1415         * destination, or could be a transformation in a pipeline
1416         *
1417         * @param processor  the custom {@link Processor}
1418         * @return the builder
1419         */
1420        @SuppressWarnings("unchecked")
1421        public Type process(Processor processor) {
1422            ProcessDefinition answer = new ProcessDefinition(processor);
1423            addOutput(answer);
1424            return (Type) this;
1425        }
1426    
1427        /**
1428         * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a>
1429         * Adds the custom processor reference to this destination which could be a final
1430         * destination, or could be a transformation in a pipeline
1431         *
1432         * @param ref   reference to a {@link Processor} to lookup in the registry
1433         * @return the builder
1434         */
1435        @SuppressWarnings("unchecked")
1436        public Type processRef(String ref) {
1437            ProcessDefinition answer = new ProcessDefinition();
1438            answer.setRef(ref);
1439            addOutput(answer);
1440            return (Type) this;
1441        }
1442    
1443        /**
1444         * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a>
1445         * Adds a bean which is invoked which could be a final destination, or could be a transformation in a pipeline
1446         *
1447         * @param bean  the bean to invoke
1448         * @return the builder
1449         */
1450        @SuppressWarnings("unchecked")
1451        public Type bean(Object bean) {
1452            BeanDefinition answer = new BeanDefinition();
1453            answer.setBean(bean);
1454            addOutput(answer);
1455            return (Type) this;
1456        }
1457    
1458        /**
1459         * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a>
1460         * Adds a bean which is invoked which could be a final destination, or could be a transformation in a pipeline
1461         *
1462         * @param bean  the bean to invoke
1463         * @param method  the method name to invoke on the bean (can be used to avoid ambiguty)
1464         * @return the builder
1465         */
1466        @SuppressWarnings("unchecked")
1467        public Type bean(Object bean, String method) {
1468            BeanDefinition answer = new BeanDefinition();
1469            answer.setBean(bean);
1470            answer.setMethod(method);
1471            addOutput(answer);
1472            return (Type) this;
1473        }
1474    
1475        /**
1476         * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a>
1477         * Adds a bean which is invoked which could be a final destination, or could be a transformation in a pipeline
1478         *
1479         * @param  beanType  the bean class, Camel will instantiate an object at runtime
1480         * @return the builder
1481         */
1482        @SuppressWarnings("unchecked")
1483        public Type bean(Class beanType) {
1484            BeanDefinition answer = new BeanDefinition();
1485            answer.setBeanType(beanType);
1486            addOutput(answer);
1487            return (Type) this;
1488        }
1489    
1490        /**
1491         * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a>
1492         * Adds a bean which is invoked which could be a final destination, or could be a transformation in a pipeline
1493         *
1494         * @param  beanType  the bean class, Camel will instantiate an object at runtime
1495         * @param method  the method name to invoke on the bean (can be used to avoid ambiguty)
1496         * @return the builder
1497         */
1498        @SuppressWarnings("unchecked")
1499        public Type bean(Class beanType, String method) {
1500            BeanDefinition answer = new BeanDefinition();
1501            answer.setBeanType(beanType);
1502            answer.setMethod(method);
1503            addOutput(answer);
1504            return (Type) this;
1505        }
1506    
1507        /**
1508         * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a>
1509         * Adds a bean which is invoked which could be a final destination, or could be a transformation in a pipeline
1510         *
1511         * @param ref  reference to a bean to lookup in the registry
1512         * @return the builder
1513         */
1514        @SuppressWarnings("unchecked")
1515        public Type beanRef(String ref) {
1516            BeanDefinition answer = new BeanDefinition(ref);
1517            addOutput(answer);
1518            return (Type) this;
1519        }
1520    
1521        /**
1522         * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a>
1523         * Adds a bean which is invoked which could be a final destination, or could be a transformation in a pipeline
1524         *
1525         * @param ref  reference to a bean to lookup in the registry
1526         * @param method  the method name to invoke on the bean (can be used to avoid ambiguty)
1527         * @return the builder
1528         */
1529        @SuppressWarnings("unchecked")
1530        public Type beanRef(String ref, String method) {
1531            BeanDefinition answer = new BeanDefinition(ref, method);
1532            addOutput(answer);
1533            return (Type) this;
1534        }
1535    
1536        /**
1537         * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a>
1538         * Adds a processor which sets the body on the IN message
1539         *
1540         * @return a expression builder clause to set the body
1541         */
1542        public ExpressionClause<ProcessorDefinition<Type>> setBody() {
1543            ExpressionClause<ProcessorDefinition<Type>> clause = new ExpressionClause<ProcessorDefinition<Type>>(this);
1544            SetBodyDefinition answer = new SetBodyDefinition(clause);
1545            addOutput(answer);
1546            return clause;
1547        }
1548    
1549        /**
1550         * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a>
1551         * Adds a processor which sets the body on the IN message
1552         *
1553         * @param expression   the expression used to set the body
1554         * @return the builder
1555         */
1556        @SuppressWarnings("unchecked")
1557        public Type setBody(Expression expression) {
1558            SetBodyDefinition answer = new SetBodyDefinition(expression);
1559            addOutput(answer);
1560            return (Type) this;
1561        }
1562    
1563        /**
1564         * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a>
1565         * Adds a processor which sets the body on the OUT message
1566         *
1567         * @param expression   the expression used to set the body
1568         * @return the builder
1569         */
1570        @SuppressWarnings("unchecked")
1571        public Type transform(Expression expression) {
1572            TransformDefinition answer = new TransformDefinition(expression);
1573            addOutput(answer);
1574            return (Type) this;
1575        }
1576    
1577        /**
1578         * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a>
1579         * Adds a processor which sets the body on the OUT message
1580         *
1581         * @return a expression builder clause to set the body
1582         */
1583        public ExpressionClause<ProcessorDefinition<Type>> transform() {
1584            ExpressionClause<ProcessorDefinition<Type>> clause = new ExpressionClause<ProcessorDefinition<Type>>((Type) this);
1585            TransformDefinition answer = new TransformDefinition(clause);
1586            addOutput(answer);
1587            return clause;
1588        }
1589    
1590        /**
1591         * Adds a processor which sets the body on the FAULT message
1592         *
1593         * @param expression   the expression used to set the body
1594         * @return the builder
1595         */
1596        public Type setFaultBody(Expression expression) {
1597            return process(ProcessorBuilder.setFaultBody(expression));
1598        }
1599    
1600        /**
1601         * Adds a processor which sets the header on the IN message
1602         *
1603         * @param name  the header name
1604         * @return a expression builder clause to set the header
1605         */
1606        public ExpressionClause<ProcessorDefinition<Type>> setHeader(String name) {
1607            ExpressionClause<ProcessorDefinition<Type>> clause = new ExpressionClause<ProcessorDefinition<Type>>(this);
1608            SetHeaderDefinition answer = new SetHeaderDefinition(name, clause);
1609            addOutput(answer);
1610            return clause;
1611        }
1612    
1613        /**
1614         * Adds a processor which sets the header on the IN message
1615         *
1616         * @param name  the header name
1617         * @param expression  the expression used to set the header
1618         * @return the builder
1619         */
1620        @SuppressWarnings("unchecked")
1621        public Type setHeader(String name, Expression expression) {
1622            SetHeaderDefinition answer = new SetHeaderDefinition(name, expression);
1623            addOutput(answer);
1624            return (Type) this;
1625        }
1626    
1627        /**
1628         * Adds a processor which sets the header on the OUT message
1629         *
1630         * @param name  the header name
1631         * @return a expression builder clause to set the header
1632         */
1633        public ExpressionClause<ProcessorDefinition<Type>> setOutHeader(String name) {
1634            ExpressionClause<ProcessorDefinition<Type>> clause = new ExpressionClause<ProcessorDefinition<Type>>(this);
1635            SetOutHeaderDefinition answer = new SetOutHeaderDefinition(name, clause);
1636            addOutput(answer);
1637            return clause;
1638        }
1639    
1640        /**
1641         * Adds a processor which sets the header on the OUT message
1642         *
1643         * @param name  the header name
1644         * @param expression  the expression used to set the header
1645         * @return the builder
1646         */
1647        @SuppressWarnings("unchecked")
1648        public Type setOutHeader(String name, Expression expression) {
1649            SetOutHeaderDefinition answer = new SetOutHeaderDefinition(name, expression);
1650            addOutput(answer);
1651            return (Type) this;
1652        }
1653    
1654        /**
1655         * Adds a processor which sets the header on the FAULT message
1656         *
1657         * @param name  the header name
1658         * @param expression  the expression used to set the header
1659         * @return the builder
1660         */
1661        public Type setFaultHeader(String name, Expression expression) {
1662            return process(ProcessorBuilder.setFaultHeader(name, expression));
1663        }
1664    
1665        /**
1666         * Adds a processor which sets the exchange property
1667         *
1668         * @param name  the property name
1669         * @param expression  the expression used to set the property
1670         * @return the builder
1671         */
1672        @SuppressWarnings("unchecked")
1673        public Type setProperty(String name, Expression expression) {
1674            SetPropertyDefinition answer = new SetPropertyDefinition(name, expression);
1675            addOutput(answer);
1676            return (Type) this;
1677        }
1678    
1679    
1680        /**
1681         * Adds a processor which sets the exchange property
1682         *
1683         * @param name  the property name
1684         * @return a expression builder clause to set the property
1685         */
1686        public ExpressionClause<ProcessorDefinition<Type>> setProperty(String name) {
1687            ExpressionClause<ProcessorDefinition<Type>> clause = new ExpressionClause<ProcessorDefinition<Type>>(this);
1688            SetPropertyDefinition answer = new SetPropertyDefinition(name, clause);
1689            addOutput(answer);
1690            return clause;
1691        }
1692    
1693        /**
1694         * Adds a processor which removes the header on the IN message
1695         *
1696         * @param name  the header name
1697         * @return the builder
1698         */
1699        @SuppressWarnings("unchecked")
1700        public Type removeHeader(String name) {
1701            RemoveHeaderDefinition answer = new RemoveHeaderDefinition(name);
1702            addOutput(answer);
1703            return (Type) this;
1704        }
1705    
1706        /**
1707         * Adds a processor which removes the header on the FAULT message
1708         *
1709         * @param name  the header name
1710         * @return the builder
1711         */
1712        public Type removeFaultHeader(String name) {
1713            return process(ProcessorBuilder.removeFaultHeader(name));
1714        }
1715    
1716        /**
1717         * Adds a processor which removes the exchange property
1718         *
1719         * @param name  the property name
1720         * @return the builder
1721         */
1722        @SuppressWarnings("unchecked")
1723        public Type removeProperty(String name) {
1724            RemovePropertyDefinition answer = new RemovePropertyDefinition(name);
1725            addOutput(answer);
1726            return (Type) this;
1727        }
1728    
1729        /**
1730         * Converts the IN message body to the specified type
1731         *
1732         * @param type the type to convert to
1733         * @return the builder
1734         */
1735        @SuppressWarnings("unchecked")
1736        public Type convertBodyTo(Class type) {
1737            addOutput(new ConvertBodyDefinition(type));
1738            return (Type) this;
1739        }
1740        
1741        /**
1742         * Converts the IN message body to the specified type
1743         *
1744         * @param type the type to convert to
1745         * @param charset the charset to use by type converters (not all converters support specifc charset)
1746         * @return the builder
1747         */
1748        @SuppressWarnings("unchecked")
1749        public Type convertBodyTo(Class type, String charset) {
1750            addOutput(new ConvertBodyDefinition(type, charset));
1751            return (Type) this;
1752        }
1753    
1754        /**
1755         * Sorts the IN message body using the given comparator.
1756         * The IN body mut be convertable to {@link List}.
1757         *
1758         * @param comparator  the comparator to use for sorting
1759         * @return the builder
1760         */
1761        @SuppressWarnings("unchecked")
1762        public Type sortBody(Comparator comparator) {
1763            addOutput(new SortDefinition(body(), comparator));
1764            return (Type) this;
1765        }
1766    
1767        /**
1768         * Sorts the IN message body using a default sorting based on toString representation.
1769         * The IN body mut be convertable to {@link List}.
1770         *
1771         * @return the builder
1772         */
1773        public Type sortBody() {
1774            return sortBody(null);
1775        }
1776    
1777        /**
1778         * Sorts the expression using the given comparator
1779         *
1780         * @param expression  the expression, must be convertable to {@link List}
1781         * @param comparator  the comparator to use for sorting
1782         * @return the builder
1783         */
1784        @SuppressWarnings("unchecked")
1785        public Type sort(Expression expression, Comparator comparator) {
1786            addOutput(new SortDefinition(expression, comparator));
1787            return (Type) this;
1788        }
1789    
1790        /**
1791         * Sorts the expression using a default sorting based on toString representation. 
1792         *
1793         * @param expression  the expression, must be convertable to {@link List}
1794         * @return the builder
1795         */
1796        public Type sort(Expression expression) {
1797            return sort(expression, null);
1798        }
1799    
1800        /**
1801         * Enriches an exchange with additional data obtained from a
1802         * <code>resourceUri</code>.
1803         * 
1804         * @param resourceUri           URI of resource endpoint for obtaining additional data.
1805         * @param aggregationStrategy   aggregation strategy to aggregate input data and additional data.
1806         * @return the builder
1807         * @see org.apache.camel.processor.Enricher
1808         */
1809        @SuppressWarnings("unchecked")
1810        public Type enrich(String resourceUri, AggregationStrategy aggregationStrategy) {
1811            addOutput(new EnrichDefinition(aggregationStrategy, resourceUri));
1812            return (Type) this;
1813        }
1814    
1815        /**
1816         * Enriches an exchange with additional data obtained from a
1817         * <code>resourceUri</code>.
1818         *
1819         * @param resourceUri           URI of resource endpoint for obtaining additional data.
1820         * @return the builder
1821         * @see org.apache.camel.processor.Enricher
1822         */
1823        @SuppressWarnings("unchecked")
1824        public Type enrich(String resourceUri) {
1825            addOutput(new EnrichDefinition(resourceUri));
1826            return (Type) this;
1827        }
1828    
1829        /**
1830         * Adds a onComplection {@link org.apache.camel.spi.Synchronization} hook that invoke this route as
1831         * a callback when the {@link org.apache.camel.Exchange} has finished being processed.
1832         * The hook invoke callbacks for either onComplete or onFailure.
1833         * <p/>
1834         * Will by default always trigger when the {@link org.apache.camel.Exchange} is complete
1835         * (either with success or failed).
1836         * <br/>
1837         * You can limit the callback to either onComplete or onFailure but invoking the nested
1838         * builder method.
1839         * <p/>
1840         * For onFailure the caused exception is stored as a property on the {@link org.apache.camel.Exchange}
1841         * with the key {@link org.apache.camel.Exchange#EXCEPTION_CAUGHT}.
1842         *
1843         * @return the builder
1844         */
1845        @SuppressWarnings("unchecked")
1846        public OnCompletionDefinition onCompletion() {
1847            OnCompletionDefinition answer = new OnCompletionDefinition();
1848            // we must remove all existing on completion definition (as they are global)
1849            // and thus we are the only one as route scoped should override any global scoped
1850            answer.removeAllOnCompletionDefinition(this);
1851            popBlock();
1852            addOutput(answer);
1853            pushBlock(answer);
1854            return answer;
1855        }
1856    
1857        // DataFormat support
1858        // -------------------------------------------------------------------------
1859    
1860        /**
1861         * <a href="http://camel.apache.org/data-format.html">DataFormat:</a>
1862         * Unmarshals the in body using a {@link DataFormat} expression to define
1863         * the format of the input message and the output will be set on the out message body.
1864         *
1865         * @return the expression to create the {@link DataFormat}
1866         */
1867        public DataFormatClause<ProcessorDefinition<Type>> unmarshal() {
1868            return new DataFormatClause<ProcessorDefinition<Type>>(this, DataFormatClause.Operation.Unmarshal);
1869        }
1870    
1871        /**
1872         * <a href="http://camel.apache.org/data-format.html">DataFormat:</a>
1873         * Unmarshals the in body using the specified {@link DataFormat}
1874         * and sets the output on the out message body.
1875         *
1876         * @param dataFormatType  the dataformat
1877         * @return the builder
1878         */
1879        @SuppressWarnings("unchecked")
1880        public Type unmarshal(DataFormatDefinition dataFormatType) {
1881            addOutput(new UnmarshalDefinition(dataFormatType));
1882            return (Type) this;
1883        }
1884    
1885        /**
1886         * <a href="http://camel.apache.org/data-format.html">DataFormat:</a>
1887         * Unmarshals the in body using the specified {@link DataFormat}
1888         * and sets the output on the out message body.
1889         *
1890         * @param dataFormat  the dataformat
1891         * @return the builder
1892         */
1893        public Type unmarshal(DataFormat dataFormat) {
1894            return unmarshal(new DataFormatDefinition(dataFormat));
1895        }
1896    
1897        /**
1898         * <a href="http://camel.apache.org/data-format.html">DataFormat:</a>
1899         * Unmarshals the in body using the specified {@link DataFormat}
1900         * reference in the {@link org.apache.camel.spi.Registry} and sets
1901         * the output on the out message body.
1902         *
1903         * @param dataTypeRef  reference to a {@link DataFormat} to lookup in the registry
1904         * @return the builder
1905         */
1906        @SuppressWarnings("unchecked")
1907        public Type unmarshal(String dataTypeRef) {
1908            addOutput(new UnmarshalDefinition(dataTypeRef));
1909            return (Type) this;
1910        }
1911    
1912        /**
1913         * <a href="http://camel.apache.org/data-format.html">DataFormat:</a>
1914         * Marshals the in body using a {@link DataFormat} expression to define
1915         * the format of the output which will be added to the out body.
1916         *
1917         * @return the expression to create the {@link DataFormat}
1918         */
1919        public DataFormatClause<ProcessorDefinition<Type>> marshal() {
1920            return new DataFormatClause<ProcessorDefinition<Type>>(this, DataFormatClause.Operation.Marshal);
1921        }
1922    
1923        /**
1924         * <a href="http://camel.apache.org/data-format.html">DataFormat:</a>
1925         * Marshals the in body using the specified {@link DataFormat}
1926         * and sets the output on the out message body.
1927         *
1928         * @param dataFormatType  the dataformat
1929         * @return the builder
1930         */
1931        @SuppressWarnings("unchecked")
1932        public Type marshal(DataFormatDefinition dataFormatType) {
1933            addOutput(new MarshalDefinition(dataFormatType));
1934            return (Type) this;
1935        }
1936    
1937        /**
1938         * <a href="http://camel.apache.org/data-format.html">DataFormat:</a>
1939         * Marshals the in body using the specified {@link DataFormat}
1940         * and sets the output on the out message body.
1941         *
1942         * @param dataFormat  the dataformat
1943         * @return the builder
1944         */
1945        public Type marshal(DataFormat dataFormat) {
1946            return marshal(new DataFormatDefinition(dataFormat));
1947        }
1948    
1949        /**
1950         * <a href="http://camel.apache.org/data-format.html">DataFormat:</a>
1951         * Marshals the in body the specified {@link DataFormat}
1952         * reference in the {@link org.apache.camel.spi.Registry} and sets
1953         * the output on the out message body.
1954         *
1955         * @param dataTypeRef  reference to a {@link DataFormat} to lookup in the registry
1956         * @return the builder
1957         */
1958        @SuppressWarnings("unchecked")
1959        public Type marshal(String dataTypeRef) {
1960            addOutput(new MarshalDefinition(dataTypeRef));
1961            return (Type) this;
1962        }
1963    
1964        // Properties
1965        // -------------------------------------------------------------------------
1966        @XmlTransient
1967        @SuppressWarnings("unchecked")
1968        public ProcessorDefinition<? extends ProcessorDefinition> getParent() {
1969            return parent;
1970        }
1971    
1972        public void setParent(ProcessorDefinition<? extends ProcessorDefinition> parent) {
1973            this.parent = parent;
1974        }
1975    
1976        @XmlTransient
1977        public ErrorHandlerBuilder getErrorHandlerBuilder() {
1978            if (errorHandlerBuilder == null) {
1979                errorHandlerBuilder = createErrorHandlerBuilder();
1980            }
1981            return errorHandlerBuilder;
1982        }
1983    
1984        /**
1985         * Sets the error handler to use with processors created by this builder
1986         */
1987        public void setErrorHandlerBuilder(ErrorHandlerBuilder errorHandlerBuilder) {
1988            this.errorHandlerBuilder = errorHandlerBuilder;
1989        }
1990    
1991        /**
1992         * Sets the error handler if one is not already set
1993         */
1994        protected void setErrorHandlerBuilderIfNull(ErrorHandlerBuilder errorHandlerBuilder) {
1995            if (this.errorHandlerBuilder == null) {
1996                setErrorHandlerBuilder(errorHandlerBuilder);
1997            }
1998        }
1999    
2000        public String getErrorHandlerRef() {
2001            return errorHandlerRef;
2002        }
2003    
2004        /**
2005         * Sets the bean ref name of the error handler builder to use on this route
2006         */
2007        @XmlAttribute(required = false)
2008        public void setErrorHandlerRef(String errorHandlerRef) {
2009            this.errorHandlerRef = errorHandlerRef;
2010            setErrorHandlerBuilder(new ErrorHandlerBuilderRef(errorHandlerRef));
2011        }
2012    
2013        @XmlTransient
2014        public NodeFactory getNodeFactory() {
2015            if (nodeFactory == null) {
2016                nodeFactory = new NodeFactory();
2017            }
2018            return nodeFactory;
2019        }
2020    
2021        public void setNodeFactory(NodeFactory nodeFactory) {
2022            this.nodeFactory = nodeFactory;
2023        }
2024    
2025        @XmlTransient
2026        public List<InterceptStrategy> getInterceptStrategies() {
2027            return interceptStrategies;
2028        }
2029    
2030        public void addInterceptStrategy(InterceptStrategy strategy) {
2031            this.interceptStrategies.add(strategy);
2032        }
2033    
2034        /**
2035         * Returns a label to describe this node such as the expression if some kind of expression node
2036         */
2037        public String getLabel() {
2038            return "";
2039        }
2040    
2041    
2042    }