001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.builder; 018 019 import org.apache.camel.Endpoint; 020 import org.apache.camel.Exchange; 021 import org.apache.camel.Expression; 022 import org.apache.camel.Predicate; 023 import org.apache.camel.Processor; 024 import org.apache.camel.Route; 025 import org.apache.camel.impl.EventDrivenConsumerRoute; 026 import org.apache.camel.processor.CompositeProcessor; 027 import org.apache.camel.processor.DelegateProcessor; 028 import org.apache.camel.processor.MulticastProcessor; 029 import org.apache.camel.processor.Pipeline; 030 import org.apache.camel.processor.RecipientList; 031 import org.apache.camel.processor.aggregate.AggregationStrategy; 032 import org.apache.camel.processor.idempotent.IdempotentConsumer; 033 import org.apache.camel.processor.idempotent.MessageIdRepository; 034 import org.apache.camel.spi.Policy; 035 import org.apache.commons.logging.Log; 036 import org.apache.commons.logging.LogFactory; 037 038 import java.util.ArrayList; 039 import java.util.Collection; 040 import java.util.Collections; 041 import java.util.List; 042 043 import sun.net.smtp.SmtpClient; 044 045 /** 046 * @version $Revision: 559613 $ 047 */ 048 public class FromBuilder extends BuilderSupport implements ProcessorFactory { 049 public static final String DEFAULT_TRACE_CATEGORY = "org.apache.camel.TRACE"; 050 private RouteBuilder builder; 051 private Endpoint from; 052 private List<Processor> processors = new ArrayList<Processor>(); 053 private List<ProcessorFactory> processFactories = new ArrayList<ProcessorFactory>(); 054 private FromBuilder routeBuilder; 055 056 public FromBuilder(RouteBuilder builder, Endpoint from) { 057 super(builder); 058 this.builder = builder; 059 this.from = from; 060 } 061 062 public FromBuilder(FromBuilder parent) { 063 super(parent); 064 this.builder = parent.getBuilder(); 065 this.from = parent.getFrom(); 066 } 067 068 /** 069 * Sends the exchange to the given endpoint URI 070 */ 071 @Fluent 072 public ProcessorFactory to(@FluentArg("uri")String uri) { 073 return to(endpoint(uri)); 074 } 075 076 /** 077 * Sends the exchange to the given endpoint 078 */ 079 @Fluent 080 public ProcessorFactory to(@FluentArg("ref")Endpoint endpoint) { 081 ToBuilder answer = new ToBuilder(this, endpoint); 082 addProcessBuilder(answer); 083 return answer; 084 } 085 086 /** 087 * Sends the exchange to a list of endpoints using the {@link MulticastProcessor} pattern 088 */ 089 @Fluent 090 public ProcessorFactory to(String... uris) { 091 return to(endpoints(uris)); 092 } 093 094 /** 095 * Sends the exchange to a list of endpoints using the {@link MulticastProcessor} pattern 096 */ 097 @Fluent 098 public ProcessorFactory to( 099 @FluentArg(value = "endpoint", attribute = false, element = true) 100 Endpoint... endpoints) { 101 return to(endpoints(endpoints)); 102 } 103 104 /** 105 * Sends the exchange to a list of endpoint using the {@link MulticastProcessor} pattern 106 */ 107 @Fluent 108 public ProcessorFactory to(@FluentArg(value = "endpoint", attribute = false, element = true) 109 Collection<Endpoint> endpoints) { 110 return addProcessBuilder(new MulticastBuilder(this, endpoints)); 111 } 112 113 /** 114 * Creates a {@link Pipeline} of the list of endpoints so that the message will get processed by each endpoint in turn 115 * and for request/response the output of one endpoint will be the input of the next endpoint 116 */ 117 @Fluent 118 public ProcessorFactory pipeline(@FluentArg("uris")String... uris) { 119 return pipeline(endpoints(uris)); 120 } 121 122 /** 123 * Creates a {@link Pipeline} of the list of endpoints so that the message will get processed by each endpoint in turn 124 * and for request/response the output of one endpoint will be the input of the next endpoint 125 */ 126 @Fluent 127 public ProcessorFactory pipeline(@FluentArg("endpoints")Endpoint... endpoints) { 128 return pipeline(endpoints(endpoints)); 129 } 130 131 /** 132 * Creates a {@link Pipeline} of the list of endpoints so that the message will get processed by each endpoint in turn 133 * and for request/response the output of one endpoint will be the input of the next endpoint 134 */ 135 @Fluent 136 public ProcessorFactory pipeline(@FluentArg("endpoints")Collection<Endpoint> endpoints) { 137 return addProcessBuilder(new PipelineBuilder(this, endpoints)); 138 } 139 140 /** 141 * Creates an {@link IdempotentConsumer} to avoid duplicate messages 142 */ 143 @Fluent 144 public IdempotentConsumerBuilder idempotentConsumer( 145 @FluentArg("messageIdExpression")Expression messageIdExpression, 146 @FluentArg("MessageIdRepository")MessageIdRepository messageIdRepository) { 147 return (IdempotentConsumerBuilder) addProcessBuilder(new IdempotentConsumerBuilder(this, messageIdExpression, messageIdRepository)); 148 } 149 150 /** 151 * Creates a predicate which is applied and only if it is true then 152 * the exchange is forwarded to the destination 153 * 154 * @return the builder for a predicate 155 */ 156 @Fluent 157 public FilterBuilder filter( 158 @FluentArg(value = "predicate", element = true) 159 Predicate predicate) { 160 FilterBuilder answer = new FilterBuilder(this, predicate); 161 addProcessBuilder(answer); 162 return answer; 163 } 164 165 /** 166 * Creates a choice of one or more predicates with an otherwise clause 167 * 168 * @return the builder for a choice expression 169 */ 170 @Fluent(nestedActions = true) 171 public ChoiceBuilder choice() { 172 ChoiceBuilder answer = new ChoiceBuilder(this); 173 addProcessBuilder(answer); 174 return answer; 175 } 176 177 /** 178 * Creates a dynamic <a href="http://activemq.apache.org/camel/recipient-list.html">Recipient List</a> pattern. 179 * 180 * @param receipients is the builder of the expression used in the {@link RecipientList} to decide the destinations 181 */ 182 @Fluent 183 public RecipientListBuilder recipientList( 184 @FluentArg(value = "recipients", element = true) 185 Expression receipients) { 186 RecipientListBuilder answer = new RecipientListBuilder(this, receipients); 187 addProcessBuilder(answer); 188 return answer; 189 } 190 191 /** 192 * A builder for the <a href="http://activemq.apache.org/camel/splitter.html">Splitter</a> pattern 193 * where an expression is evaluated to iterate through each of the parts of a message and then each part is then send to some endpoint. 194 * 195 * @param receipients the expression on which to split 196 * @return the builder 197 */ 198 @Fluent 199 public SplitterBuilder splitter(@FluentArg(value = "recipients", element = true)Expression receipients) { 200 SplitterBuilder answer = new SplitterBuilder(this, receipients); 201 addProcessBuilder(answer); 202 return answer; 203 } 204 205 /** 206 * A builder for the <a href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a> pattern 207 * where an expression is evaluated to be able to compare the message exchanges to reorder them. e.g. you 208 * may wish to sort by some header 209 * 210 * @param expression the expression on which to compare messages in order 211 * @return the builder 212 */ 213 public ResequencerBuilder resequencer(Expression<Exchange> expression) { 214 return resequencer(Collections.<Expression<Exchange>>singletonList(expression)); 215 } 216 217 /** 218 * A builder for the <a href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a> pattern 219 * where a list of expressions are evaluated to be able to compare the message exchanges to reorder them. e.g. you 220 * may wish to sort by some headers 221 * 222 * @param expressions the expressions on which to compare messages in order 223 * @return the builder 224 */ 225 @Fluent 226 public ResequencerBuilder resequencer(@FluentArg(value = "expressions")List<Expression<Exchange>> expressions) { 227 ResequencerBuilder answer = new ResequencerBuilder(this, expressions); 228 setRouteBuilder(answer); 229 return answer; 230 } 231 232 /** 233 * A builder for the <a href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a> pattern 234 * where a list of expressions are evaluated to be able to compare the message exchanges to reorder them. e.g. you 235 * may wish to sort by some headers 236 * 237 * @param expressions the expressions on which to compare messages in order 238 * @return the builder 239 */ 240 @Fluent 241 public ResequencerBuilder resequencer(Expression<Exchange>... expressions) { 242 List<Expression<Exchange>> list = new ArrayList<Expression<Exchange>>(); 243 for (Expression<Exchange> expression : expressions) { 244 list.add(expression); 245 } 246 return resequencer(list); 247 } 248 249 /** 250 * A builder for the <a href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a> pattern 251 * where a batch of messages are processed (up to a maximum amount or until some timeout is reached) 252 * and messages for the same correlation key are combined together using some kind of 253 * {@link AggregationStrategy ) (by default the latest message is used) to compress many message exchanges 254 * into a smaller number of exchanges. 255 * <p/> 256 * A good example of this is stock market data; you may be receiving 30,000 messages/second and you may want to 257 * throttle it right down so that multiple messages for the same stock are combined (or just the latest 258 * message is used and older prices are discarded). Another idea is to combine line item messages together 259 * into a single invoice message. 260 * 261 * @param correlationExpression the expression used to calculate the correlation key. For a JMS message this could 262 * be the expression <code>header("JMSDestination")</code> or <code>header("JMSCorrelationID")</code> 263 */ 264 @Fluent 265 public AggregatorBuilder aggregator(Expression correlationExpression) { 266 AggregatorBuilder answer = new AggregatorBuilder(this, correlationExpression); 267 setRouteBuilder(answer); 268 return answer; 269 } 270 271 /** 272 * A builder for the <a href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a> pattern 273 * where a batch of messages are processed (up to a maximum amount or until some timeout is reached) 274 * and messages for the same correlation key are combined together using some kind of 275 * {@link AggregationStrategy ) (by default the latest message is used) to compress many message exchanges 276 * into a smaller number of exchanges. 277 * <p/> 278 * A good example of this is stock market data; you may be receiving 30,000 messages/second and you may want to 279 * throttle it right down so that multiple messages for the same stock are combined (or just the latest 280 * message is used and older prices are discarded). Another idea is to combine line item messages together 281 * into a single invoice message. 282 * 283 * @param correlationExpression the expression used to calculate the correlation key. For a JMS message this could 284 * be the expression <code>header("JMSDestination")</code> or <code>header("JMSCorrelationID")</code> 285 */ 286 @Fluent 287 public AggregatorBuilder aggregator(Expression correlationExpression, AggregationStrategy strategy) { 288 AggregatorBuilder answer = new AggregatorBuilder(this, correlationExpression); 289 answer.aggregationStrategy(strategy); 290 setRouteBuilder(answer); 291 return answer; 292 } 293 294 /** 295 * A builder for the <a href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern 296 * where an expression is used to calculate the time which the message will be dispatched on 297 * 298 * @param processAtExpression an expression to calculate the time at which the messages should be processed 299 * @return the builder 300 */ 301 @Fluent 302 public DelayerBuilder delayer(Expression<Exchange> processAtExpression) { 303 return delayer(processAtExpression, 0L); 304 } 305 306 /** 307 * A builder for the <a href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern 308 * where an expression is used to calculate the time which the message will be dispatched on 309 * 310 * @param processAtExpression an expression to calculate the time at which the messages should be processed 311 * @param delay the delay in milliseconds which is added to the processAtExpression to determine the time the 312 * message should be processed 313 * @return the builder 314 */ 315 @Fluent 316 public DelayerBuilder delayer(Expression<Exchange> processAtExpression, long delay) { 317 DelayerBuilder answer = new DelayerBuilder(this, processAtExpression, delay); 318 setRouteBuilder(answer); 319 return answer; 320 } 321 322 /** 323 * A builder for the <a href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern 324 * where a fixed amount of milliseconds are used to delay processing of a message exchange 325 * 326 * @param delay the default delay in milliseconds 327 * @return the builder 328 */ 329 @Fluent 330 public DelayerBuilder delayer(long delay) { 331 return delayer(null, delay); 332 } 333 334 335 /** 336 * A builder for the <a href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern 337 * where an expression is used to calculate the time which the message will be dispatched on 338 * 339 * @param processAtExpression an expression to calculate the time at which the messages should be processed 340 * @param delay the delay in milliseconds which is added to the processAtExpression to determine the time the 341 * message should be processed 342 * @return the builder 343 */ 344 @Fluent 345 public ThrottlerBuilder throttler(long maximumRequestCount) { 346 ThrottlerBuilder answer = new ThrottlerBuilder(this, maximumRequestCount); 347 setRouteBuilder(answer); 348 return answer; 349 } 350 351 352 /** 353 * Installs the given error handler builder 354 * 355 * @param errorHandlerBuilder the error handler to be used by default for all child routes 356 * @return the current builder with the error handler configured 357 */ 358 @Fluent 359 public FromBuilder errorHandler(@FluentArg("handler")ErrorHandlerBuilder errorHandlerBuilder) { 360 setErrorHandlerBuilder(errorHandlerBuilder); 361 return this; 362 } 363 364 /** 365 * Configures whether or not the error handler is inherited by every processing node (or just the top most one) 366 * 367 * @param condition the falg as to whether error handlers should be inherited or not 368 * @return the current builder 369 */ 370 @Fluent 371 public FromBuilder inheritErrorHandler(@FluentArg("condition")boolean condition) { 372 setInheritErrorHandler(condition); 373 return this; 374 } 375 376 @Fluent(nestedActions = true) 377 public InterceptorBuilder intercept() { 378 InterceptorBuilder answer = new InterceptorBuilder(this); 379 addProcessBuilder(answer); 380 return answer; 381 } 382 383 @Fluent 384 public FromBuilder intercept(@FluentArg("interceptor")DelegateProcessor interceptor) { 385 InterceptorBuilder answer = new InterceptorBuilder(this); 386 answer.add(interceptor); 387 addProcessBuilder(answer); 388 return answer.target(); 389 } 390 391 /** 392 * Trace logs the exchange before it goes to the next processing step using the {@link #DEFAULT_TRACE_CATEGORY} logging 393 * category. 394 * 395 * @return 396 */ 397 @Fluent 398 public FromBuilder trace() { 399 return trace(DEFAULT_TRACE_CATEGORY); 400 } 401 402 /** 403 * Trace logs the exchange before it goes to the next processing step using the specified logging 404 * category. 405 * 406 * @param category the logging category trace messages will sent to. 407 * @return 408 */ 409 @Fluent 410 public FromBuilder trace(@FluentArg("category")String category) { 411 final Log log = LogFactory.getLog(category); 412 return intercept(new DelegateProcessor() { 413 @Override 414 public void process(Exchange exchange) throws Exception { 415 log.trace(exchange); 416 processNext(exchange); 417 } 418 }); 419 } 420 421 @Fluent(nestedActions = true) 422 public PolicyBuilder policies() { 423 PolicyBuilder answer = new PolicyBuilder(this); 424 addProcessBuilder(answer); 425 return answer; 426 } 427 428 @Fluent 429 public FromBuilder policy(@FluentArg("policy")Policy policy) { 430 PolicyBuilder answer = new PolicyBuilder(this); 431 answer.add(policy); 432 addProcessBuilder(answer); 433 return answer.target(); 434 } 435 436 // Transformers 437 //------------------------------------------------------------------------- 438 439 /** 440 * Adds the custom processor to this destination which could be a final destination, or could be a transformation in a pipeline 441 */ 442 @Fluent 443 public FromBuilder process(@FluentArg("ref")Processor processor) { 444 addProcessorBuilder(processor); 445 return this; 446 } 447 448 /** 449 * Adds a processor which sets the body on the IN message 450 */ 451 @Fluent 452 public FromBuilder setBody(Expression expression) { 453 addProcessorBuilder(ProcessorBuilder.setBody(expression)); 454 return this; 455 } 456 457 /** 458 * Adds a processor which sets the body on the OUT message 459 */ 460 @Fluent 461 public FromBuilder setOutBody(Expression expression) { 462 addProcessorBuilder(ProcessorBuilder.setOutBody(expression)); 463 return this; 464 } 465 466 /** 467 * Adds a processor which sets the header on the IN message 468 */ 469 @Fluent 470 public FromBuilder setHeader(String name, Expression expression) { 471 addProcessorBuilder(ProcessorBuilder.setHeader(name, expression)); 472 return this; 473 } 474 475 /** 476 * Adds a processor which sets the header on the OUT message 477 */ 478 @Fluent 479 public FromBuilder setOutHeader(String name, Expression expression) { 480 addProcessorBuilder(ProcessorBuilder.setOutHeader(name, expression)); 481 return this; 482 } 483 484 /** 485 * Adds a processor which sets the exchange property 486 */ 487 @Fluent 488 public FromBuilder setProperty(String name, Expression expression) { 489 addProcessorBuilder(ProcessorBuilder.setProperty(name, expression)); 490 return this; 491 } 492 493 /** 494 * Converts the IN message body to the specified type 495 */ 496 @Fluent 497 public FromBuilder convertBodyTo(Class type) { 498 addProcessorBuilder(ProcessorBuilder.setBody(Builder.body().convertTo(type))); 499 return this; 500 } 501 502 /** 503 * Converts the OUT message body to the specified type 504 */ 505 @Fluent 506 public FromBuilder convertOutBodyTo(Class type) { 507 addProcessorBuilder(ProcessorBuilder.setOutBody(Builder.outBody().convertTo(type))); 508 return this; 509 } 510 511 // Properties 512 //------------------------------------------------------------------------- 513 public RouteBuilder getBuilder() { 514 return builder; 515 } 516 517 public Endpoint getFrom() { 518 return from; 519 } 520 521 public List<Processor> getProcessors() { 522 return processors; 523 } 524 525 public ProcessorFactory addProcessBuilder(ProcessorFactory processFactory) { 526 processFactories.add(processFactory); 527 return processFactory; 528 } 529 530 protected void addProcessorBuilder(Processor processor) { 531 addProcessBuilder(new ConstantProcessorBuilder(processor)); 532 } 533 534 public void addProcessor(Processor processor) { 535 processors.add(processor); 536 } 537 538 public Route createRoute() throws Exception { 539 if (routeBuilder != null) { 540 return routeBuilder.createRoute(); 541 } 542 Processor processor = createProcessor(); 543 if (processor == null) { 544 throw new IllegalArgumentException("No processor created for: " + this); 545 } 546 return new EventDrivenConsumerRoute(getFrom(), processor); 547 } 548 549 public Processor createProcessor() throws Exception { 550 List<Processor> answer = new ArrayList<Processor>(); 551 552 for (ProcessorFactory processFactory : processFactories) { 553 Processor processor = makeProcessor(processFactory); 554 if (processor == null) { 555 throw new IllegalArgumentException("No processor created for processBuilder: " + processFactory); 556 } 557 answer.add(processor); 558 } 559 if (answer.size() == 0) { 560 return null; 561 } 562 Processor processor = null; 563 if (answer.size() == 1) { 564 processor = answer.get(0); 565 } 566 else { 567 processor = new CompositeProcessor(answer); 568 } 569 return processor; 570 } 571 572 /** 573 * Creates the processor and wraps it in any necessary interceptors and error handlers 574 */ 575 protected Processor makeProcessor(ProcessorFactory processFactory) throws Exception { 576 Processor processor = processFactory.createProcessor(); 577 processor = wrapProcessor(processor); 578 return wrapInErrorHandler(processor); 579 } 580 581 /** 582 * A strategy method to allow newly created processors to be wrapped in an error handler. This feature 583 * could be disabled for child builders such as {@link IdempotentConsumerBuilder} which will rely on the 584 * {@link FromBuilder} to perform the error handling to avoid doubly-wrapped processors with 2 nested error handlers 585 */ 586 protected Processor wrapInErrorHandler(Processor processor) throws Exception { 587 return getErrorHandlerBuilder().createErrorHandler(processor); 588 } 589 590 /** 591 * A strategy method which allows derived classes to wrap the child processor in some kind of interceptor such as 592 * a filter for the {@link IdempotentConsumerBuilder}. 593 * 594 * @param processor the processor which can be wrapped 595 * @return the original processor or a new wrapped interceptor 596 */ 597 protected Processor wrapProcessor(Processor processor) { 598 return processor; 599 } 600 601 protected FromBuilder getRouteBuilder() { 602 return routeBuilder; 603 } 604 605 protected void setRouteBuilder(FromBuilder routeBuilder) { 606 this.routeBuilder = routeBuilder; 607 } 608 609 }