001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.builder;
018    
019    import org.apache.camel.Endpoint;
020    import org.apache.camel.Exchange;
021    import org.apache.camel.Expression;
022    import org.apache.camel.Predicate;
023    import org.apache.camel.Processor;
024    import org.apache.camel.Route;
025    import org.apache.camel.impl.EventDrivenConsumerRoute;
026    import org.apache.camel.processor.CompositeProcessor;
027    import org.apache.camel.processor.DelegateProcessor;
028    import org.apache.camel.processor.MulticastProcessor;
029    import org.apache.camel.processor.Pipeline;
030    import org.apache.camel.processor.RecipientList;
031    import org.apache.camel.processor.aggregate.AggregationStrategy;
032    import org.apache.camel.processor.idempotent.IdempotentConsumer;
033    import org.apache.camel.processor.idempotent.MessageIdRepository;
034    import org.apache.camel.spi.Policy;
035    import org.apache.commons.logging.Log;
036    import org.apache.commons.logging.LogFactory;
037    
038    import java.util.ArrayList;
039    import java.util.Collection;
040    import java.util.Collections;
041    import java.util.List;
042    
043    import sun.net.smtp.SmtpClient;
044    
045    /**
046     * @version $Revision: 559613 $
047     */
048    public class FromBuilder extends BuilderSupport implements ProcessorFactory {
049        public static final String DEFAULT_TRACE_CATEGORY = "org.apache.camel.TRACE";
050        private RouteBuilder builder;
051        private Endpoint from;
052        private List<Processor> processors = new ArrayList<Processor>();
053        private List<ProcessorFactory> processFactories = new ArrayList<ProcessorFactory>();
054        private FromBuilder routeBuilder;
055    
056        public FromBuilder(RouteBuilder builder, Endpoint from) {
057            super(builder);
058            this.builder = builder;
059            this.from = from;
060        }
061    
062        public FromBuilder(FromBuilder parent) {
063            super(parent);
064            this.builder = parent.getBuilder();
065            this.from = parent.getFrom();
066        }
067    
068        /**
069         * Sends the exchange to the given endpoint URI
070         */
071        @Fluent
072        public ProcessorFactory to(@FluentArg("uri")String uri) {
073            return to(endpoint(uri));
074        }
075    
076        /**
077         * Sends the exchange to the given endpoint
078         */
079        @Fluent
080        public ProcessorFactory to(@FluentArg("ref")Endpoint endpoint) {
081            ToBuilder answer = new ToBuilder(this, endpoint);
082            addProcessBuilder(answer);
083            return answer;
084        }
085    
086        /**
087         * Sends the exchange to a list of endpoints using the {@link MulticastProcessor} pattern
088         */
089        @Fluent
090        public ProcessorFactory to(String... uris) {
091            return to(endpoints(uris));
092        }
093    
094        /**
095         * Sends the exchange to a list of endpoints using the {@link MulticastProcessor} pattern
096         */
097        @Fluent
098        public ProcessorFactory to(
099                @FluentArg(value = "endpoint", attribute = false, element = true)
100                Endpoint... endpoints) {
101            return to(endpoints(endpoints));
102        }
103    
104        /**
105         * Sends the exchange to a list of endpoint using the {@link MulticastProcessor} pattern
106         */
107        @Fluent
108        public ProcessorFactory to(@FluentArg(value = "endpoint", attribute = false, element = true)
109        Collection<Endpoint> endpoints) {
110            return addProcessBuilder(new MulticastBuilder(this, endpoints));
111        }
112    
113        /**
114         * Creates a {@link Pipeline} of the list of endpoints so that the message will get processed by each endpoint in turn
115         * and for request/response the output of one endpoint will be the input of the next endpoint
116         */
117        @Fluent
118        public ProcessorFactory pipeline(@FluentArg("uris")String... uris) {
119            return pipeline(endpoints(uris));
120        }
121    
122        /**
123         * Creates a {@link Pipeline} of the list of endpoints so that the message will get processed by each endpoint in turn
124         * and for request/response the output of one endpoint will be the input of the next endpoint
125         */
126        @Fluent
127        public ProcessorFactory pipeline(@FluentArg("endpoints")Endpoint... endpoints) {
128            return pipeline(endpoints(endpoints));
129        }
130    
131        /**
132         * Creates a {@link Pipeline} of the list of endpoints so that the message will get processed by each endpoint in turn
133         * and for request/response the output of one endpoint will be the input of the next endpoint
134         */
135        @Fluent
136        public ProcessorFactory pipeline(@FluentArg("endpoints")Collection<Endpoint> endpoints) {
137            return addProcessBuilder(new PipelineBuilder(this, endpoints));
138        }
139    
140        /**
141         * Creates an {@link IdempotentConsumer} to avoid duplicate messages
142         */
143        @Fluent
144        public IdempotentConsumerBuilder idempotentConsumer(
145                @FluentArg("messageIdExpression")Expression messageIdExpression,
146                @FluentArg("MessageIdRepository")MessageIdRepository messageIdRepository) {
147            return (IdempotentConsumerBuilder) addProcessBuilder(new IdempotentConsumerBuilder(this, messageIdExpression, messageIdRepository));
148        }
149    
150        /**
151         * Creates a predicate which is applied and only if it is true then
152         * the exchange is forwarded to the destination
153         *
154         * @return the builder for a predicate
155         */
156        @Fluent
157        public FilterBuilder filter(
158                @FluentArg(value = "predicate", element = true)
159                Predicate predicate) {
160            FilterBuilder answer = new FilterBuilder(this, predicate);
161            addProcessBuilder(answer);
162            return answer;
163        }
164    
165        /**
166         * Creates a choice of one or more predicates with an otherwise clause
167         *
168         * @return the builder for a choice expression
169         */
170        @Fluent(nestedActions = true)
171        public ChoiceBuilder choice() {
172            ChoiceBuilder answer = new ChoiceBuilder(this);
173            addProcessBuilder(answer);
174            return answer;
175        }
176    
177        /**
178         * Creates a dynamic <a href="http://activemq.apache.org/camel/recipient-list.html">Recipient List</a> pattern.
179         *
180         * @param receipients is the builder of the expression used in the {@link RecipientList} to decide the destinations
181         */
182        @Fluent
183        public RecipientListBuilder recipientList(
184                @FluentArg(value = "recipients", element = true)
185                Expression receipients) {
186            RecipientListBuilder answer = new RecipientListBuilder(this, receipients);
187            addProcessBuilder(answer);
188            return answer;
189        }
190    
191        /**
192         * A builder for the <a href="http://activemq.apache.org/camel/splitter.html">Splitter</a> pattern
193         * where an expression is evaluated to iterate through each of the parts of a message and then each part is then send to some endpoint.
194         *
195         * @param receipients the expression on which to split
196         * @return the builder
197         */
198        @Fluent
199        public SplitterBuilder splitter(@FluentArg(value = "recipients", element = true)Expression receipients) {
200            SplitterBuilder answer = new SplitterBuilder(this, receipients);
201            addProcessBuilder(answer);
202            return answer;
203        }
204    
205        /**
206         * A builder for the <a href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a> pattern
207         * where an expression is evaluated to be able to compare the message exchanges to reorder them. e.g. you
208         * may wish to sort by some header
209         *
210         * @param expression the expression on which to compare messages in order
211         * @return the builder
212         */
213        public ResequencerBuilder resequencer(Expression<Exchange> expression) {
214            return resequencer(Collections.<Expression<Exchange>>singletonList(expression));
215        }
216    
217        /**
218         * A builder for the <a href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a> pattern
219         * where a list of expressions are evaluated to be able to compare the message exchanges to reorder them. e.g. you
220         * may wish to sort by some headers
221         *
222         * @param expressions the expressions on which to compare messages in order
223         * @return the builder
224         */
225        @Fluent
226        public ResequencerBuilder resequencer(@FluentArg(value = "expressions")List<Expression<Exchange>> expressions) {
227            ResequencerBuilder answer = new ResequencerBuilder(this, expressions);
228            setRouteBuilder(answer);
229            return answer;
230        }
231    
232        /**
233         * A builder for the <a href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a> pattern
234         * where a list of expressions are evaluated to be able to compare the message exchanges to reorder them. e.g. you
235         * may wish to sort by some headers
236         *
237         * @param expressions the expressions on which to compare messages in order
238         * @return the builder
239         */
240        @Fluent
241        public ResequencerBuilder resequencer(Expression<Exchange>... expressions) {
242            List<Expression<Exchange>> list = new ArrayList<Expression<Exchange>>();
243            for (Expression<Exchange> expression : expressions) {
244                list.add(expression);
245            }
246            return resequencer(list);
247        }
248    
249        /**
250         * A builder for the <a href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a> pattern
251         * where a batch of messages are processed (up to a maximum amount or until some timeout is reached)
252         * and messages for the same correlation key are combined together using some kind of
253         * {@link AggregationStrategy ) (by default the latest message is used) to compress many message exchanges
254         * into a smaller number of exchanges.
255         * <p/>
256         * A good example of this is stock market data; you may be receiving 30,000 messages/second and you may want to
257         * throttle it right down so that multiple messages for the same stock are combined (or just the latest
258         * message is used and older prices are discarded). Another idea is to combine line item messages together
259         * into a single invoice message.
260         *
261         * @param correlationExpression the expression used to calculate the correlation key. For a JMS message this could
262         * be the expression <code>header("JMSDestination")</code> or  <code>header("JMSCorrelationID")</code>
263         */
264        @Fluent
265        public AggregatorBuilder aggregator(Expression correlationExpression) {
266            AggregatorBuilder answer = new AggregatorBuilder(this, correlationExpression);
267            setRouteBuilder(answer);
268            return answer;
269        }
270    
271        /**
272         * A builder for the <a href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a> pattern
273         * where a batch of messages are processed (up to a maximum amount or until some timeout is reached)
274         * and messages for the same correlation key are combined together using some kind of
275         * {@link AggregationStrategy ) (by default the latest message is used) to compress many message exchanges
276         * into a smaller number of exchanges.
277         * <p/>
278         * A good example of this is stock market data; you may be receiving 30,000 messages/second and you may want to
279         * throttle it right down so that multiple messages for the same stock are combined (or just the latest
280         * message is used and older prices are discarded). Another idea is to combine line item messages together
281         * into a single invoice message.
282         *
283         * @param correlationExpression the expression used to calculate the correlation key. For a JMS message this could
284         * be the expression <code>header("JMSDestination")</code> or  <code>header("JMSCorrelationID")</code>
285         */
286        @Fluent
287        public AggregatorBuilder aggregator(Expression correlationExpression, AggregationStrategy strategy) {
288            AggregatorBuilder answer = new AggregatorBuilder(this, correlationExpression);
289            answer.aggregationStrategy(strategy);
290            setRouteBuilder(answer);
291            return answer;
292        }
293    
294        /**
295         * A builder for the <a href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern
296         * where an expression is used to calculate the time which the message will be dispatched on
297         *
298         * @param processAtExpression an expression to calculate the time at which the messages should be processed
299         * @return the builder
300         */
301        @Fluent
302        public DelayerBuilder delayer(Expression<Exchange> processAtExpression) {
303            return delayer(processAtExpression, 0L);
304        }
305    
306        /**
307         * A builder for the <a href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern
308         * where an expression is used to calculate the time which the message will be dispatched on
309         *
310         * @param processAtExpression an expression to calculate the time at which the messages should be processed
311         * @param delay the delay in milliseconds which is added to the processAtExpression to determine the time the
312         * message should be processed
313         * @return the builder
314         */
315        @Fluent
316        public DelayerBuilder delayer(Expression<Exchange> processAtExpression, long delay) {
317            DelayerBuilder answer = new DelayerBuilder(this, processAtExpression, delay);
318            setRouteBuilder(answer);
319            return answer;
320        }
321    
322        /**
323         * A builder for the <a href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern
324         * where a fixed amount of milliseconds are used to delay processing of a message exchange
325         *
326         * @param delay the default delay in milliseconds
327         * @return the builder
328         */
329        @Fluent
330        public DelayerBuilder delayer(long delay) {
331            return delayer(null, delay);
332        }
333    
334    
335        /**
336         * A builder for the <a href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern
337         * where an expression is used to calculate the time which the message will be dispatched on
338         *
339         * @param processAtExpression an expression to calculate the time at which the messages should be processed
340         * @param delay the delay in milliseconds which is added to the processAtExpression to determine the time the
341         * message should be processed
342         * @return the builder
343         */
344        @Fluent
345        public ThrottlerBuilder throttler(long maximumRequestCount) {
346            ThrottlerBuilder answer = new ThrottlerBuilder(this, maximumRequestCount);
347            setRouteBuilder(answer);
348            return answer;
349        }
350    
351    
352        /**
353         * Installs the given error handler builder
354         *
355         * @param errorHandlerBuilder the error handler to be used by default for all child routes
356         * @return the current builder with the error handler configured
357         */
358        @Fluent
359        public FromBuilder errorHandler(@FluentArg("handler")ErrorHandlerBuilder errorHandlerBuilder) {
360            setErrorHandlerBuilder(errorHandlerBuilder);
361            return this;
362        }
363    
364        /**
365         * Configures whether or not the error handler is inherited by every processing node (or just the top most one)
366         *
367         * @param condition the falg as to whether error handlers should be inherited or not
368         * @return the current builder
369         */
370        @Fluent
371        public FromBuilder inheritErrorHandler(@FluentArg("condition")boolean condition) {
372            setInheritErrorHandler(condition);
373            return this;
374        }
375    
376        @Fluent(nestedActions = true)
377        public InterceptorBuilder intercept() {
378            InterceptorBuilder answer = new InterceptorBuilder(this);
379            addProcessBuilder(answer);
380            return answer;
381        }
382    
383        @Fluent
384        public FromBuilder intercept(@FluentArg("interceptor")DelegateProcessor interceptor) {
385            InterceptorBuilder answer = new InterceptorBuilder(this);
386            answer.add(interceptor);
387            addProcessBuilder(answer);
388            return answer.target();
389        }
390    
391        /**
392         * Trace logs the exchange before it goes to the next processing step using the {@link #DEFAULT_TRACE_CATEGORY} logging
393         * category.
394         *
395         * @return
396         */
397        @Fluent
398        public FromBuilder trace() {
399            return trace(DEFAULT_TRACE_CATEGORY);
400        }
401    
402        /**
403         * Trace logs the exchange before it goes to the next processing step using the specified logging
404         * category.
405         *
406         * @param category the logging category trace messages will sent to.
407         * @return
408         */
409        @Fluent
410        public FromBuilder trace(@FluentArg("category")String category) {
411            final Log log = LogFactory.getLog(category);
412            return intercept(new DelegateProcessor() {
413                @Override
414                public void process(Exchange exchange) throws Exception {
415                    log.trace(exchange);
416                    processNext(exchange);
417                }
418            });
419        }
420    
421        @Fluent(nestedActions = true)
422        public PolicyBuilder policies() {
423            PolicyBuilder answer = new PolicyBuilder(this);
424            addProcessBuilder(answer);
425            return answer;
426        }
427    
428        @Fluent
429        public FromBuilder policy(@FluentArg("policy")Policy policy) {
430            PolicyBuilder answer = new PolicyBuilder(this);
431            answer.add(policy);
432            addProcessBuilder(answer);
433            return answer.target();
434        }
435    
436        // Transformers
437        //-------------------------------------------------------------------------
438    
439        /**
440         * Adds the custom processor to this destination which could be a final destination, or could be a transformation in a pipeline
441         */
442        @Fluent
443        public FromBuilder process(@FluentArg("ref")Processor processor) {
444            addProcessorBuilder(processor);
445            return this;
446        }
447    
448        /**
449         * Adds a processor which sets the body on the IN message
450         */
451        @Fluent
452        public FromBuilder setBody(Expression expression) {
453            addProcessorBuilder(ProcessorBuilder.setBody(expression));
454            return this;
455        }
456    
457        /**
458         * Adds a processor which sets the body on the OUT message
459         */
460        @Fluent
461        public FromBuilder setOutBody(Expression expression) {
462            addProcessorBuilder(ProcessorBuilder.setOutBody(expression));
463            return this;
464        }
465    
466        /**
467         * Adds a processor which sets the header on the IN message
468         */
469        @Fluent
470        public FromBuilder setHeader(String name, Expression expression) {
471            addProcessorBuilder(ProcessorBuilder.setHeader(name, expression));
472            return this;
473        }
474    
475        /**
476         * Adds a processor which sets the header on the OUT message
477         */
478        @Fluent
479        public FromBuilder setOutHeader(String name, Expression expression) {
480            addProcessorBuilder(ProcessorBuilder.setOutHeader(name, expression));
481            return this;
482        }
483    
484        /**
485         * Adds a processor which sets the exchange property
486         */
487        @Fluent
488        public FromBuilder setProperty(String name, Expression expression) {
489            addProcessorBuilder(ProcessorBuilder.setProperty(name, expression));
490            return this;
491        }
492    
493        /**
494         * Converts the IN message body to the specified type
495         */
496        @Fluent
497        public FromBuilder convertBodyTo(Class type) {
498            addProcessorBuilder(ProcessorBuilder.setBody(Builder.body().convertTo(type)));
499            return this;
500        }
501    
502        /**
503         * Converts the OUT message body to the specified type
504         */
505        @Fluent
506        public FromBuilder convertOutBodyTo(Class type) {
507            addProcessorBuilder(ProcessorBuilder.setOutBody(Builder.outBody().convertTo(type)));
508            return this;
509        }
510    
511        // Properties
512        //-------------------------------------------------------------------------
513        public RouteBuilder getBuilder() {
514            return builder;
515        }
516    
517        public Endpoint getFrom() {
518            return from;
519        }
520    
521        public List<Processor> getProcessors() {
522            return processors;
523        }
524    
525        public ProcessorFactory addProcessBuilder(ProcessorFactory processFactory) {
526            processFactories.add(processFactory);
527            return processFactory;
528        }
529    
530        protected void addProcessorBuilder(Processor processor) {
531            addProcessBuilder(new ConstantProcessorBuilder(processor));
532        }
533    
534        public void addProcessor(Processor processor) {
535            processors.add(processor);
536        }
537    
538        public Route createRoute() throws Exception {
539            if (routeBuilder != null) {
540                return routeBuilder.createRoute();
541            }
542            Processor processor = createProcessor();
543            if (processor == null) {
544                throw new IllegalArgumentException("No processor created for: " + this);
545            }
546            return new EventDrivenConsumerRoute(getFrom(), processor);
547        }
548    
549        public Processor createProcessor() throws Exception {
550            List<Processor> answer = new ArrayList<Processor>();
551    
552            for (ProcessorFactory processFactory : processFactories) {
553                Processor processor = makeProcessor(processFactory);
554                if (processor == null) {
555                    throw new IllegalArgumentException("No processor created for processBuilder: " + processFactory);
556                }
557                answer.add(processor);
558            }
559            if (answer.size() == 0) {
560                return null;
561            }
562            Processor processor = null;
563            if (answer.size() == 1) {
564                processor = answer.get(0);
565            }
566            else {
567                processor = new CompositeProcessor(answer);
568            }
569            return processor;
570        }
571    
572        /**
573         * Creates the processor and wraps it in any necessary interceptors and error handlers
574         */
575        protected Processor makeProcessor(ProcessorFactory processFactory) throws Exception {
576            Processor processor = processFactory.createProcessor();
577            processor = wrapProcessor(processor);
578            return wrapInErrorHandler(processor);
579        }
580    
581        /**
582         * A strategy method to allow newly created processors to be wrapped in an error handler. This feature
583         * could be disabled for child builders such as {@link IdempotentConsumerBuilder} which will rely on the
584         * {@link FromBuilder} to perform the error handling to avoid doubly-wrapped processors with 2 nested error handlers
585         */
586        protected Processor wrapInErrorHandler(Processor processor) throws Exception {
587            return getErrorHandlerBuilder().createErrorHandler(processor);
588        }
589    
590        /**
591         * A strategy method which allows derived classes to wrap the child processor in some kind of interceptor such as
592         * a filter for the {@link IdempotentConsumerBuilder}.
593         *
594         * @param processor the processor which can be wrapped
595         * @return the original processor or a new wrapped interceptor
596         */
597        protected Processor wrapProcessor(Processor processor) {
598            return processor;
599        }
600    
601        protected FromBuilder getRouteBuilder() {
602            return routeBuilder;
603        }
604    
605        protected void setRouteBuilder(FromBuilder routeBuilder) {
606            this.routeBuilder = routeBuilder;
607        }
608    
609    }