001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.model;
018    
019    import org.apache.camel.Endpoint;
020    import org.apache.camel.Exchange;
021    import org.apache.camel.Expression;
022    import org.apache.camel.Predicate;
023    import org.apache.camel.Processor;
024    import org.apache.camel.Route;
025    import org.apache.camel.RuntimeCamelException;
026    import org.apache.camel.builder.Builder;
027    import org.apache.camel.builder.DeadLetterChannelBuilder;
028    import org.apache.camel.builder.ErrorHandlerBuilder;
029    import org.apache.camel.builder.NoErrorHandlerBuilder;
030    import org.apache.camel.builder.ProcessorBuilder;
031    import org.apache.camel.converter.ObjectConverter;
032    import org.apache.camel.impl.RouteContext;
033    import org.apache.camel.model.language.ExpressionType;
034    import org.apache.camel.model.language.LanguageExpression;
035    import org.apache.camel.processor.DelegateProcessor;
036    import org.apache.camel.processor.MulticastProcessor;
037    import org.apache.camel.processor.Pipeline;
038    import org.apache.camel.processor.RecipientList;
039    import org.apache.camel.processor.aggregate.AggregationStrategy;
040    import org.apache.camel.processor.idempotent.IdempotentConsumer;
041    import org.apache.camel.processor.idempotent.MessageIdRepository;
042    import org.apache.camel.spi.Policy;
043    import org.apache.commons.logging.Log;
044    import org.apache.commons.logging.LogFactory;
045    
046    import javax.xml.bind.annotation.XmlAttribute;
047    import javax.xml.bind.annotation.XmlTransient;
048    import java.util.ArrayList;
049    import java.util.Collection;
050    import java.util.Collections;
051    import java.util.List;
052    
053    /**
054     * @version $Revision: 1.1 $
055     */
056    public abstract class ProcessorType {
057        public static final String DEFAULT_TRACE_CATEGORY = "org.apache.camel.TRACE";
058        private ErrorHandlerBuilder errorHandlerBuilder;
059        private Boolean inheritErrorHandlerFlag = Boolean.TRUE; // TODO not sure how
060        private DelegateProcessor lastInterceptor;
061        // else to use an
062                                                                // optional
063                                                                // attribute in
064                                                                // JAXB2
065    
066        public abstract List<ProcessorType> getOutputs();
067    
068        public abstract List<InterceptorType> getInterceptors();
069    
070        public Processor createProcessor(RouteContext routeContext) throws Exception {
071            throw new UnsupportedOperationException("Not implemented yet for class: " + getClass().getName());
072        }
073    
074        public Processor createOutputsProcessor(RouteContext routeContext) throws Exception {
075            Collection<ProcessorType> outputs = getOutputs();
076            return createOutputsProcessor(routeContext, outputs);
077        }
078    
079        public void addRoutes(RouteContext routeContext, Collection<Route> routes) throws Exception {
080            Processor processor = makeProcessor(routeContext);
081            routeContext.addEventDrivenProcessor(processor);
082        }
083    
084        /**
085         * Wraps the child processor in whatever necessary interceptors and error
086         * handlers
087         */
088        public Processor wrapProcessor(RouteContext routeContext, Processor processor) throws Exception {
089            processor = wrapProcessorInInterceptors(routeContext, processor);
090            return wrapInErrorHandler(processor);
091        }
092    
093        // Fluent API
094        // -------------------------------------------------------------------------
095    
096        /**
097         * Sends the exchange to the given endpoint URI
098         */
099        public ProcessorType to(String uri) {
100            addOutput(new ToType(uri));
101            return this;
102        }
103    
104        /**
105         * Sends the exchange to the given endpoint
106         */
107        public ProcessorType to(Endpoint endpoint) {
108            addOutput(new ToType(endpoint));
109            return this;
110        }
111    
112        /**
113         * Sends the exchange to a list of endpoints using the
114         * {@link MulticastProcessor} pattern
115         */
116        public ProcessorType to(String... uris) {
117            for (String uri : uris) {
118                addOutput(new ToType(uri));
119            }
120            return this;
121        }
122    
123        /**
124         * Sends the exchange to a list of endpoints using the
125         * {@link MulticastProcessor} pattern
126         */
127        public ProcessorType to(Endpoint... endpoints) {
128            for (Endpoint endpoint : endpoints) {
129                addOutput(new ToType(endpoint));
130            }
131            return this;
132        }
133    
134        /**
135         * Sends the exchange to a list of endpoint using the
136         * {@link MulticastProcessor} pattern
137         */
138        public ProcessorType to(Collection<Endpoint> endpoints) {
139            for (Endpoint endpoint : endpoints) {
140                addOutput(new ToType(endpoint));
141            }
142            return this;
143        }
144    
145        /**
146         * Multicasts messages to all its child outputs; so that each processor and
147         * destination gets a copy of the original message to avoid the processors
148         * interfering with each other.
149         */
150        public MulticastType multicast() {
151            MulticastType answer = new MulticastType();
152            addOutput(answer);
153            return answer;
154        }
155    
156        /**
157         * Creates a {@link Pipeline} of the list of endpoints so that the message
158         * will get processed by each endpoint in turn and for request/response the
159         * output of one endpoint will be the input of the next endpoint
160         */
161        public ProcessorType pipeline(String... uris) {
162            // TODO pipeline v mulicast
163            return to(uris);
164        }
165    
166        /**
167         * Creates a {@link Pipeline} of the list of endpoints so that the message
168         * will get processed by each endpoint in turn and for request/response the
169         * output of one endpoint will be the input of the next endpoint
170         */
171        public ProcessorType pipeline(Endpoint... endpoints) {
172            // TODO pipeline v mulicast
173            return to(endpoints);
174        }
175    
176        /**
177         * Creates a {@link Pipeline} of the list of endpoints so that the message
178         * will get processed by each endpoint in turn and for request/response the
179         * output of one endpoint will be the input of the next endpoint
180         */
181        public ProcessorType pipeline(Collection<Endpoint> endpoints) {
182            // TODO pipeline v mulicast
183            return to(endpoints);
184        }
185    
186        /**
187         * Creates an {@link IdempotentConsumer} to avoid duplicate messages
188         */
189        public IdempotentConsumerType idempotentConsumer(Expression messageIdExpression,
190                                                         MessageIdRepository messageIdRepository) {
191            IdempotentConsumerType answer = new IdempotentConsumerType(messageIdExpression, messageIdRepository);
192            addOutput(answer);
193            return answer;
194        }
195    
196        /**
197         * Creates a predicate which is applied and only if it is true then the
198         * exchange is forwarded to the destination
199         * 
200         * @return the builder for a predicate
201         */
202        public FilterType filter(Predicate predicate) {
203            FilterType filter = new FilterType(predicate);
204            addOutput(filter);
205            return filter;
206        }
207    
208        /**
209         * Creates a choice of one or more predicates with an otherwise clause
210         * 
211         * @return the builder for a choice expression
212         */
213        public ChoiceType choice() {
214            ChoiceType answer = new ChoiceType();
215            addOutput(answer);
216            return answer;
217        }
218    
219        /**
220         * Creates a try/catch block
221         * 
222         * @return the builder for a tryBlock expression
223         */
224        public TryType tryBlock() {
225            TryType answer = new TryType();
226            addOutput(answer);
227            return answer;
228        }
229    
230        /**
231         * Creates a dynamic <a
232         * href="http://activemq.apache.org/camel/recipient-list.html">Recipient
233         * List</a> pattern.
234         * 
235         * @param receipients is the builder of the expression used in the
236         *                {@link RecipientList} to decide the destinations
237         */
238        public ProcessorType recipientList(Expression receipients) {
239            RecipientListType answer = new RecipientListType(receipients);
240            addOutput(answer);
241            return this;
242        }
243    
244        /**
245         * A builder for the <a
246         * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
247         * pattern where an expression is evaluated to iterate through each of the
248         * parts of a message and then each part is then send to some endpoint.
249         * 
250         * @param receipients the expression on which to split
251         * @return the builder
252         */
253        public SplitterType splitter(Expression receipients) {
254            SplitterType answer = new SplitterType(receipients);
255            addOutput(answer);
256            return answer;
257        }
258    
259        /**
260         * A builder for the <a
261         * href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a>
262         * pattern where an expression is evaluated to be able to compare the
263         * message exchanges to reorder them. e.g. you may wish to sort by some
264         * header
265         * 
266         * @param expression the expression on which to compare messages in order
267         * @return the builder
268         */
269        public ResequencerType resequencer(Expression<Exchange> expression) {
270            return resequencer(Collections.<Expression> singletonList(expression));
271        }
272    
273        /**
274         * A builder for the <a
275         * href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a>
276         * pattern where a list of expressions are evaluated to be able to compare
277         * the message exchanges to reorder them. e.g. you may wish to sort by some
278         * headers
279         * 
280         * @param expressions the expressions on which to compare messages in order
281         * @return the builder
282         */
283        public ResequencerType resequencer(List<Expression> expressions) {
284            ResequencerType answer = new ResequencerType(expressions);
285            addOutput(answer);
286            return answer;
287        }
288    
289        /**
290         * A builder for the <a
291         * href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a>
292         * pattern where a list of expressions are evaluated to be able to compare
293         * the message exchanges to reorder them. e.g. you may wish to sort by some
294         * headers
295         * 
296         * @param expressions the expressions on which to compare messages in order
297         * @return the builder
298         */
299        public ResequencerType resequencer(Expression... expressions) {
300            List<Expression> list = new ArrayList<Expression>();
301            for (Expression expression : expressions) {
302                list.add(expression);
303            }
304            return resequencer(list);
305        }
306    
307        /**
308         * A builder for the <a
309         * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a>
310         * pattern where a batch of messages are processed (up to a maximum amount
311         * or until some timeout is reached) and messages for the same correlation
312         * key are combined together using some kind of
313         * {@link AggregationStrategy ) (by default the latest message is used) to compress many message exchanges
314         * into a smaller number of exchanges. <p/> A good example of this is stock
315         * market data; you may be receiving 30,000 messages/second and you may want
316         * to throttle it right down so that multiple messages for the same stock
317         * are combined (or just the latest message is used and older prices are
318         * discarded). Another idea is to combine line item messages together into a
319         * single invoice message.
320         * 
321         * @param correlationExpression the expression used to calculate the
322         *                correlation key. For a JMS message this could be the
323         *                expression <code>header("JMSDestination")</code> or
324         *                <code>header("JMSCorrelationID")</code>
325         */
326        public AggregatorType aggregator(Expression correlationExpression) {
327            AggregatorType answer = new AggregatorType(correlationExpression);
328            addOutput(answer);
329            return answer;
330        }
331    
332        /**
333         * A builder for the <a
334         * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a>
335         * pattern where a batch of messages are processed (up to a maximum amount
336         * or until some timeout is reached) and messages for the same correlation
337         * key are combined together using some kind of
338         * {@link AggregationStrategy ) (by default the latest message is used) to compress many message exchanges
339         * into a smaller number of exchanges. <p/> A good example of this is stock
340         * market data; you may be receiving 30,000 messages/second and you may want
341         * to throttle it right down so that multiple messages for the same stock
342         * are combined (or just the latest message is used and older prices are
343         * discarded). Another idea is to combine line item messages together into a
344         * single invoice message.
345         * 
346         * @param correlationExpression the expression used to calculate the
347         *                correlation key. For a JMS message this could be the
348         *                expression <code>header("JMSDestination")</code> or
349         *                <code>header("JMSCorrelationID")</code>
350         */
351        public AggregatorType aggregator(Expression correlationExpression, AggregationStrategy aggregationStrategy) {
352            AggregatorType answer = new AggregatorType(correlationExpression, aggregationStrategy);
353            addOutput(answer);
354            return answer;
355        }
356    
357        /**
358         * A builder for the <a
359         * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern
360         * where an expression is used to calculate the time which the message will
361         * be dispatched on
362         * 
363         * @param processAtExpression an expression to calculate the time at which
364         *                the messages should be processed
365         * @return the builder
366         */
367        public DelayerType delayer(Expression<Exchange> processAtExpression) {
368            return delayer(processAtExpression, 0L);
369        }
370    
371        /**
372         * A builder for the <a
373         * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern
374         * where an expression is used to calculate the time which the message will
375         * be dispatched on
376         * 
377         * @param processAtExpression an expression to calculate the time at which
378         *                the messages should be processed
379         * @param delay the delay in milliseconds which is added to the
380         *                processAtExpression to determine the time the message
381         *                should be processed
382         * @return the builder
383         */
384        public DelayerType delayer(Expression<Exchange> processAtExpression, long delay) {
385            DelayerType answer = new DelayerType(processAtExpression, delay);
386            addOutput(answer);
387            return answer;
388        }
389    
390        /**
391         * A builder for the <a
392         * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern
393         * where a fixed amount of milliseconds are used to delay processing of a
394         * message exchange
395         * 
396         * @param delay the default delay in milliseconds
397         * @return the builder
398         */
399        public DelayerType delayer(long delay) {
400            return delayer(null, delay);
401        }
402    
403        /**
404         * A builder for the <a
405         * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern
406         * where an expression is used to calculate the time which the message will
407         * be dispatched on
408         * 
409         * @return the builder
410         */
411        public ThrottlerType throttler(long maximumRequestCount) {
412            ThrottlerType answer = new ThrottlerType(maximumRequestCount);
413            addOutput(answer);
414            return answer;
415        }
416    
417        public ProcessorType interceptor(String ref) {
418            getInterceptors().add(new InterceptorRef(ref));
419            return this;
420        }
421    
422        public InterceptType intercept() {
423            InterceptType answer = new InterceptType();
424            addOutput(answer);
425            return answer;
426        }
427    
428        public ProcessorType proceed() {
429            addOutput(new ProceedType());
430            return this;
431        }
432    
433        public ExceptionType exception(Class exceptionType) {
434            ExceptionType answer = new ExceptionType(exceptionType);
435            addOutput(answer);
436            return answer;
437        }
438    
439        /**
440         * Apply an interceptor route if the predicate is true
441         */
442        public OtherwiseType intercept(Predicate predicate) {
443            InterceptType answer = new InterceptType();
444            addOutput(answer);
445            return answer.when(predicate);
446        }
447    
448        public ProcessorType interceptors(String... refs) {
449            for (String ref : refs) {
450                interceptor(ref);
451            }
452            return this;
453        }
454    
455        public FilterType filter(ExpressionType expression) {
456            FilterType filter = new FilterType();
457            filter.setExpression(expression);
458            addOutput(filter);
459            return filter;
460        }
461    
462        public FilterType filter(String language, String expression) {
463            return filter(new LanguageExpression(language, expression));
464        }
465    
466        /**
467         * Trace logs the exchange before it goes to the next processing step using
468         * the {@link #DEFAULT_TRACE_CATEGORY} logging category.
469         * 
470         * @return
471         */
472        public ProcessorType trace() {
473            return trace(DEFAULT_TRACE_CATEGORY);
474        }
475    
476        /**
477         * Trace logs the exchange before it goes to the next processing step using
478         * the specified logging category.
479         * 
480         * @param category the logging category trace messages will sent to.
481         * @return
482         */
483        public ProcessorType trace(String category) {
484            final Log log = LogFactory.getLog(category);
485            return intercept(new DelegateProcessor() {
486                @Override
487                public void process(Exchange exchange) throws Exception {
488                    log.trace(exchange);
489                    processNext(exchange);
490                }
491            });
492        }
493    
494        public PolicyRef policies() {
495            PolicyRef answer = new PolicyRef();
496            addOutput(answer);
497            return answer;
498        }
499    
500        public PolicyRef policy(Policy policy) {
501            PolicyRef answer = new PolicyRef(policy);
502            addOutput(answer);
503            return answer;
504        }
505    
506        public ProcessorType intercept(DelegateProcessor interceptor) {
507            getInterceptors().add(new InterceptorRef(interceptor));
508            lastInterceptor = interceptor;
509            return this;
510        }
511    
512        /**
513         * Installs the given error handler builder
514         * 
515         * @param errorHandlerBuilder the error handler to be used by default for
516         *                all child routes
517         * @return the current builder with the error handler configured
518         */
519        public ProcessorType errorHandler(ErrorHandlerBuilder errorHandlerBuilder) {
520            setErrorHandlerBuilder(errorHandlerBuilder);
521            return this;
522        }
523    
524        /**
525         * Configures whether or not the error handler is inherited by every
526         * processing node (or just the top most one)
527         * 
528         * @param condition the falg as to whether error handlers should be
529         *                inherited or not
530         * @return the current builder
531         */
532        public ProcessorType inheritErrorHandler(boolean condition) {
533            setInheritErrorHandlerFlag(condition);
534            return this;
535        }
536    
537        // Transformers
538        // -------------------------------------------------------------------------
539    
540        /**
541         * Adds the custom processor to this destination which could be a final
542         * destination, or could be a transformation in a pipeline
543         */
544        public ProcessorType process(Processor processor) {
545            ProcessorRef answer = new ProcessorRef(processor);
546            addOutput(answer);
547            return this;
548        }
549    
550        /**
551         * Adds a bean which is invoked which could be a final destination, or could
552         * be a transformation in a pipeline
553         */
554        public ProcessorType beanRef(String ref) {
555            BeanRef answer = new BeanRef(ref);
556            addOutput(answer);
557            return this;
558        }
559    
560        /**
561         * Adds a bean and method which is invoked which could be a final
562         * destination, or could be a transformation in a pipeline
563         */
564        public ProcessorType beanRef(String ref, String method) {
565            BeanRef answer = new BeanRef(ref, method);
566            addOutput(answer);
567            return this;
568        }
569    
570        /**
571         * Adds a processor which sets the body on the IN message
572         */
573        public ProcessorType setBody(Expression expression) {
574            return process(ProcessorBuilder.setBody(expression));
575        }
576    
577        /**
578         * Adds a processor which sets the body on the OUT message
579         */
580        public ProcessorType setOutBody(Expression expression) {
581            return process(ProcessorBuilder.setOutBody(expression));
582        }
583    
584        /**
585         * Adds a processor which sets the header on the IN message
586         */
587        public ProcessorType setHeader(String name, Expression expression) {
588            return process(ProcessorBuilder.setHeader(name, expression));
589        }
590    
591        /**
592         * Adds a processor which sets the header on the OUT message
593         */
594        public ProcessorType setOutHeader(String name, Expression expression) {
595            return process(ProcessorBuilder.setOutHeader(name, expression));
596        }
597    
598        /**
599         * Adds a processor which sets the exchange property
600         */
601        public ProcessorType setProperty(String name, Expression expression) {
602            return process(ProcessorBuilder.setProperty(name, expression));
603        }
604    
605        /**
606         * Converts the IN message body to the specified type
607         */
608        public ProcessorType convertBodyTo(Class type) {
609            return process(ProcessorBuilder.setBody(Builder.body().convertTo(type)));
610        }
611    
612        /**
613         * Converts the OUT message body to the specified type
614         */
615        public ProcessorType convertOutBodyTo(Class type) {
616            return process(ProcessorBuilder.setOutBody(Builder.outBody().convertTo(type)));
617        }
618    
619        // Properties
620        // -------------------------------------------------------------------------
621    
622        @XmlTransient
623        public ErrorHandlerBuilder getErrorHandlerBuilder() {
624            if (errorHandlerBuilder == null) {
625                errorHandlerBuilder = createErrorHandlerBuilder();
626            }
627            return errorHandlerBuilder;
628        }
629    
630        /**
631         * Sets the error handler to use with processors created by this builder
632         */
633        public void setErrorHandlerBuilder(ErrorHandlerBuilder errorHandlerBuilder) {
634            this.errorHandlerBuilder = errorHandlerBuilder;
635        }
636    
637        @XmlTransient
638        public boolean isInheritErrorHandler() {
639            return ObjectConverter.toBoolean(getInheritErrorHandlerFlag());
640        }
641    
642        @XmlAttribute(name = "inheritErrorHandler", required = false)
643        public Boolean getInheritErrorHandlerFlag() {
644            return inheritErrorHandlerFlag;
645        }
646    
647        public void setInheritErrorHandlerFlag(Boolean inheritErrorHandlerFlag) {
648            this.inheritErrorHandlerFlag = inheritErrorHandlerFlag;
649        }
650    
651        // Implementation methods
652        // -------------------------------------------------------------------------
653    
654        /**
655         * Creates the processor and wraps it in any necessary interceptors and
656         * error handlers
657         */
658        protected Processor makeProcessor(RouteContext routeContext) throws Exception {
659            Processor processor = createProcessor(routeContext);
660            return wrapProcessor(routeContext, processor);
661        }
662    
663        /**
664         * A strategy method which allows derived classes to wrap the child
665         * processor in some kind of interceptor
666         * 
667         * @param routeContext
668         * @param target the processor which can be wrapped
669         * @return the original processor or a new wrapped interceptor
670         */
671        protected Processor wrapProcessorInInterceptors(RouteContext routeContext, Processor target) throws Exception {
672            // The target is required.
673            if (target == null) {
674                throw new RuntimeCamelException("target provided.");
675            }
676    
677            // Interceptors are optional
678            DelegateProcessor first = null;
679            DelegateProcessor last = null;
680            List<InterceptorType> interceptors = new ArrayList<InterceptorType>(routeContext.getRoute()
681                .getInterceptors());
682            List<InterceptorType> list = getInterceptors();
683            for (InterceptorType interceptorType : list) {
684                if (!interceptors.contains(interceptorType)) {
685                    interceptors.add(interceptorType);
686                }
687            }
688            for (InterceptorType interceptorRef : interceptors) {
689                DelegateProcessor p = interceptorRef.createInterceptor(routeContext);
690                if (first == null) {
691                    first = p;
692                }
693                if (last != null) {
694                    last.setProcessor(p);
695                }
696                last = p;
697            }
698    
699            if (last != null) {
700                last.setProcessor(target);
701            }
702            return first == null ? target : first;
703        }
704    
705        /**
706         * A strategy method to allow newly created processors to be wrapped in an
707         * error handler.
708         */
709        protected Processor wrapInErrorHandler(Processor processor) throws Exception {
710            return getErrorHandlerBuilder().createErrorHandler(processor);
711        }
712    
713        protected ErrorHandlerBuilder createErrorHandlerBuilder() {
714            if (isInheritErrorHandler()) {
715                return new DeadLetterChannelBuilder();
716            } else {
717                return new NoErrorHandlerBuilder();
718            }
719        }
720    
721        protected void configureChild(ProcessorType output) {
722        }
723    
724        protected void addOutput(ProcessorType processorType) {
725            configureChild(processorType);
726            getOutputs().add(processorType);
727        }
728    
729        /**
730         * Creates a new instance of some kind of composite processor which defaults
731         * to using a {@link Pipeline} but derived classes could change the
732         * behaviour
733         */
734        protected Processor createCompositeProcessor(List<Processor> list) {
735            // return new MulticastProcessor(list);
736            return new Pipeline(list);
737        }
738    
739        protected Processor createOutputsProcessor(RouteContext routeContext, Collection<ProcessorType> outputs)
740            throws Exception {
741            List<Processor> list = new ArrayList<Processor>();
742            for (ProcessorType output : outputs) {
743                Processor processor = output.createProcessor(routeContext);
744                list.add(processor);
745            }
746            Processor processor = null;
747            if (!list.isEmpty()) {
748                if (list.size() == 1) {
749                    processor = list.get(0);
750                } else {
751                    processor = createCompositeProcessor(list);
752                }
753            }
754            return processor;
755        }
756    }