001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.model; 018 019 import java.util.ArrayList; 020 import java.util.Arrays; 021 import java.util.Collection; 022 import java.util.Collections; 023 import java.util.Comparator; 024 import java.util.LinkedList; 025 import java.util.List; 026 027 import javax.xml.bind.annotation.XmlAccessType; 028 import javax.xml.bind.annotation.XmlAccessorType; 029 import javax.xml.bind.annotation.XmlAttribute; 030 import javax.xml.bind.annotation.XmlTransient; 031 032 import org.apache.camel.Channel; 033 import org.apache.camel.Endpoint; 034 import org.apache.camel.ExchangePattern; 035 import org.apache.camel.Expression; 036 import org.apache.camel.Predicate; 037 import org.apache.camel.Processor; 038 import org.apache.camel.Route; 039 import org.apache.camel.builder.DataFormatClause; 040 import org.apache.camel.builder.ErrorHandlerBuilder; 041 import org.apache.camel.builder.ErrorHandlerBuilderRef; 042 import org.apache.camel.builder.ExpressionBuilder; 043 import org.apache.camel.builder.ExpressionClause; 044 import org.apache.camel.builder.ProcessorBuilder; 045 import org.apache.camel.model.language.ConstantExpression; 046 import org.apache.camel.model.language.ExpressionDefinition; 047 import org.apache.camel.model.language.LanguageExpression; 048 import org.apache.camel.processor.DefaultChannel; 049 import org.apache.camel.processor.InterceptEndpointProcessor; 050 import org.apache.camel.processor.Pipeline; 051 import org.apache.camel.processor.aggregate.AggregationCollection; 052 import org.apache.camel.processor.aggregate.AggregationStrategy; 053 import org.apache.camel.processor.loadbalancer.LoadBalancer; 054 import org.apache.camel.spi.DataFormat; 055 import org.apache.camel.spi.IdempotentRepository; 056 import org.apache.camel.spi.InterceptStrategy; 057 import org.apache.camel.spi.Policy; 058 import org.apache.camel.spi.RouteContext; 059 import org.apache.camel.spi.TransactedPolicy; 060 import org.apache.commons.logging.Log; 061 import org.apache.commons.logging.LogFactory; 062 063 import static org.apache.camel.builder.Builder.body; 064 065 /** 066 * Base class for processor types that most XML types extend. 067 * 068 * @version $Revision: 782911 $ 069 */ 070 @XmlAccessorType(XmlAccessType.PROPERTY) 071 public abstract class ProcessorDefinition<Type extends ProcessorDefinition> extends OptionalIdentifiedType<Type> implements Block { 072 private static final transient Log LOG = LogFactory.getLog(ProcessorDefinition.class); 073 private ErrorHandlerBuilder errorHandlerBuilder; 074 private NodeFactory nodeFactory; 075 private final LinkedList<Block> blocks = new LinkedList<Block>(); 076 private ProcessorDefinition parent; 077 private String errorHandlerRef; 078 private final List<InterceptStrategy> interceptStrategies = new ArrayList<InterceptStrategy>(); 079 080 // else to use an optional attribute in JAXB2 081 public abstract List<ProcessorDefinition> getOutputs(); 082 083 084 public Processor createProcessor(RouteContext routeContext) throws Exception { 085 throw new UnsupportedOperationException("Not implemented yet for class: " + getClass().getName()); 086 } 087 088 public Processor createOutputsProcessor(RouteContext routeContext) throws Exception { 089 Collection<ProcessorDefinition> outputs = getOutputs(); 090 return createOutputsProcessor(routeContext, outputs); 091 } 092 093 @SuppressWarnings("unchecked") 094 public void addOutput(ProcessorDefinition processorType) { 095 processorType.setParent(this); 096 configureChild(processorType); 097 if (blocks.isEmpty()) { 098 getOutputs().add(processorType); 099 } else { 100 Block block = blocks.getLast(); 101 block.addOutput(processorType); 102 } 103 } 104 105 public void clearOutput() { 106 getOutputs().clear(); 107 blocks.clear(); 108 } 109 110 public void addRoutes(RouteContext routeContext, Collection<Route> routes) throws Exception { 111 Processor processor = makeProcessor(routeContext); 112 if (processor == null) { 113 // no processor to add 114 return; 115 } 116 117 if (!routeContext.isRouteAdded()) { 118 boolean endpointInterceptor = false; 119 120 // are we routing to an endpoint interceptor, if so we should not add it as an event driven 121 // processor as we use the producer to trigger the interceptor 122 if (processor instanceof Channel) { 123 Channel channel = (Channel) processor; 124 Processor next = channel.getNextProcessor(); 125 if (next instanceof InterceptEndpointProcessor) { 126 endpointInterceptor = true; 127 } 128 } 129 130 // only add regular processors as event driven 131 if (endpointInterceptor) { 132 if (LOG.isDebugEnabled()) { 133 LOG.debug("Endpoint interceptor should not be added as an event driven consumer route: " + processor); 134 } 135 } else { 136 if (LOG.isTraceEnabled()) { 137 LOG.trace("Adding event driven processor: " + processor); 138 } 139 routeContext.addEventDrivenProcessor(processor); 140 } 141 142 } 143 } 144 145 /** 146 * Wraps the child processor in whatever necessary interceptors and error handlers 147 */ 148 public Processor wrapProcessor(RouteContext routeContext, Processor processor) throws Exception { 149 // dont double wrap 150 if (processor instanceof Channel) { 151 return processor; 152 } 153 return wrapChannel(routeContext, processor); 154 } 155 156 protected Processor wrapChannel(RouteContext routeContext, Processor processor) throws Exception { 157 // put a channel inbetween this and each output to control the route flow logic 158 Channel channel = createChannel(routeContext); 159 channel.setNextProcessor(processor); 160 161 // add interceptor strategies to the channel 162 channel.addInterceptStrategies(routeContext.getCamelContext().getInterceptStrategies()); 163 channel.addInterceptStrategies(routeContext.getInterceptStrategies()); 164 channel.addInterceptStrategies(this.getInterceptStrategies()); 165 166 // init the channel 167 channel.initChannel(this, routeContext); 168 169 // set the error handler, must be done after init as we can set the error handler as first in the chain 170 if (this instanceof TryDefinition || this instanceof CatchDefinition || this instanceof FinallyDefinition) { 171 // do not use error handler for try .. catch .. finally blocks as it will handle errors itself 172 return channel; 173 } else { 174 // regular definition so add the error handler 175 Processor errorHandler = getErrorHandlerBuilder().createErrorHandler(routeContext, channel.getOutput()); 176 channel.setErrorHandler(errorHandler); 177 return channel; 178 } 179 } 180 181 /** 182 * Creates a new instance of some kind of composite processor which defaults 183 * to using a {@link Pipeline} but derived classes could change the behaviour 184 */ 185 protected Processor createCompositeProcessor(RouteContext routeContext, List<Processor> list) { 186 return new Pipeline(list); 187 } 188 189 /** 190 * Creates a new instance of the {@link Channel}. 191 */ 192 protected Channel createChannel(RouteContext routeContext) { 193 return new DefaultChannel(); 194 } 195 196 protected Processor createOutputsProcessor(RouteContext routeContext, Collection<ProcessorDefinition> outputs) throws Exception { 197 List<Processor> list = new ArrayList<Processor>(); 198 for (ProcessorDefinition output : outputs) { 199 Processor processor = output.createProcessor(routeContext); 200 if (output instanceof Channel && processor == null) { 201 continue; 202 } 203 204 Processor channel = wrapChannel(routeContext, processor); 205 list.add(channel); 206 } 207 208 // if more than one output wrap than in a composite processor else just keep it as is 209 Processor processor = null; 210 if (!list.isEmpty()) { 211 if (list.size() == 1) { 212 processor = list.get(0); 213 } else { 214 processor = createCompositeProcessor(routeContext, list); 215 } 216 } 217 218 return processor; 219 } 220 221 /** 222 * Creates the processor and wraps it in any necessary interceptors and error handlers 223 */ 224 protected Processor makeProcessor(RouteContext routeContext) throws Exception { 225 Processor processor = createProcessor(routeContext); 226 if (processor == null) { 227 // no processor to make 228 return null; 229 } 230 return wrapProcessor(routeContext, processor); 231 } 232 233 protected ErrorHandlerBuilder createErrorHandlerBuilder() { 234 if (errorHandlerRef != null) { 235 return new ErrorHandlerBuilderRef(errorHandlerRef); 236 } 237 238 // return a reference to the default error handler 239 return new ErrorHandlerBuilderRef(ErrorHandlerBuilderRef.DEFAULT_ERROR_HANDLER_BUILDER); 240 } 241 242 protected void configureChild(ProcessorDefinition output) { 243 output.setNodeFactory(getNodeFactory()); 244 output.setErrorHandlerBuilder(getErrorHandlerBuilder()); 245 } 246 247 // Fluent API 248 // ------------------------------------------------------------------------- 249 250 /** 251 * Sends the exchange to the given endpoint 252 * 253 * @param uri the endpoint to send to 254 * @return the builder 255 */ 256 @SuppressWarnings("unchecked") 257 public Type to(String uri) { 258 addOutput(new ToDefinition(uri)); 259 return (Type) this; 260 } 261 262 /** 263 * Sends the exchange to the given endpoint 264 * 265 * @param uri the String formatted endpoint uri to send to 266 * @param args arguments for the string formatting of the uri 267 * @return the builder 268 */ 269 @SuppressWarnings("unchecked") 270 public Type toF(String uri, Object... args) { 271 addOutput(new ToDefinition(String.format(uri, args))); 272 return (Type) this; 273 } 274 275 276 /** 277 * Sends the exchange to the given endpoint 278 * 279 * @param endpoint the endpoint to send to 280 * @return the builder 281 */ 282 @SuppressWarnings("unchecked") 283 public Type to(Endpoint endpoint) { 284 addOutput(new ToDefinition(endpoint)); 285 return (Type) this; 286 } 287 288 /** 289 * Sends the exchange with certain exchange pattern to the given endpoint 290 * 291 * @param pattern the pattern to use for the message exchange 292 * @param uri the endpoint to send to 293 * @return the builder 294 */ 295 @SuppressWarnings("unchecked") 296 public Type to(ExchangePattern pattern, String uri) { 297 addOutput(new ToDefinition(uri, pattern)); 298 return (Type) this; 299 } 300 301 302 /** 303 * Sends the exchange with certain exchange pattern to the given endpoint 304 * 305 * @param pattern the pattern to use for the message exchange 306 * @param endpoint the endpoint to send to 307 * @return the builder 308 */ 309 @SuppressWarnings("unchecked") 310 public Type to(ExchangePattern pattern, Endpoint endpoint) { 311 addOutput(new ToDefinition(endpoint, pattern)); 312 return (Type) this; 313 } 314 315 /** 316 * Sends the exchange to a list of endpoints 317 * 318 * @param uris list of endpoints to send to 319 * @return the builder 320 */ 321 @SuppressWarnings("unchecked") 322 public Type to(String... uris) { 323 for (String uri : uris) { 324 addOutput(new ToDefinition(uri)); 325 } 326 return (Type) this; 327 } 328 329 330 /** 331 * Sends the exchange to a list of endpoints 332 * 333 * @param endpoints list of endpoints to send to 334 * @return the builder 335 */ 336 @SuppressWarnings("unchecked") 337 public Type to(Endpoint... endpoints) { 338 for (Endpoint endpoint : endpoints) { 339 addOutput(new ToDefinition(endpoint)); 340 } 341 return (Type) this; 342 } 343 344 /** 345 * Sends the exchange to a list of endpoints 346 * 347 * @param endpoints list of endpoints to send to 348 * @return the builder 349 */ 350 @SuppressWarnings("unchecked") 351 public Type to(Iterable<Endpoint> endpoints) { 352 for (Endpoint endpoint : endpoints) { 353 addOutput(new ToDefinition(endpoint)); 354 } 355 return (Type) this; 356 } 357 358 359 /** 360 * Sends the exchange to a list of endpoints 361 * 362 * @param pattern the pattern to use for the message exchanges 363 * @param uris list of endpoints to send to 364 * @return the builder 365 */ 366 @SuppressWarnings("unchecked") 367 public Type to(ExchangePattern pattern, String... uris) { 368 for (String uri : uris) { 369 addOutput(new ToDefinition(uri, pattern)); 370 } 371 return (Type) this; 372 } 373 374 /** 375 * Sends the exchange to a list of endpoints 376 * 377 * @param pattern the pattern to use for the message exchanges 378 * @param endpoints list of endpoints to send to 379 * @return the builder 380 */ 381 @SuppressWarnings("unchecked") 382 public Type to(ExchangePattern pattern, Endpoint... endpoints) { 383 for (Endpoint endpoint : endpoints) { 384 addOutput(new ToDefinition(endpoint, pattern)); 385 } 386 return (Type) this; 387 } 388 389 /** 390 * Sends the exchange to a list of endpoints 391 * 392 * @param pattern the pattern to use for the message exchanges 393 * @param endpoints list of endpoints to send to 394 * @return the builder 395 */ 396 @SuppressWarnings("unchecked") 397 public Type to(ExchangePattern pattern, Iterable<Endpoint> endpoints) { 398 for (Endpoint endpoint : endpoints) { 399 addOutput(new ToDefinition(endpoint, pattern)); 400 } 401 return (Type) this; 402 } 403 404 405 /** 406 * <a href="http://camel.apache.org/exchange-pattern.html">ExchangePattern:</a> 407 * set the ExchangePattern {@link ExchangePattern} into the exchange 408 * 409 * @param exchangePattern instance of {@link ExchangePattern} 410 * @return the builder 411 */ 412 @SuppressWarnings("unchecked") 413 public Type setExchangePattern(ExchangePattern exchangePattern) { 414 addOutput(new SetExchangePatternDefinition(exchangePattern)); 415 return (Type) this; 416 } 417 418 /** 419 * <a href="http://camel.apache.org/exchange-pattern.html">ExchangePattern:</a> 420 * set the exchange's ExchangePattern {@link ExchangePattern} to be InOnly 421 * 422 * 423 * @return the builder 424 */ 425 public Type inOnly() { 426 return setExchangePattern(ExchangePattern.InOnly); 427 } 428 429 /** 430 * Sends the message to the given endpoint using an 431 * <a href="http://camel.apache.org/event-message.html">Event Message</a> or 432 * <a href="http://camel.apache.org/exchange-pattern.html">InOnly exchange pattern</a> 433 * 434 * @param uri The endpoint uri which is used for sending the exchange 435 * @return the builder 436 */ 437 public Type inOnly(String uri) { 438 return to(ExchangePattern.InOnly, uri); 439 } 440 441 /** 442 * Sends the message to the given endpoint using an 443 * <a href="http://camel.apache.org/event-message.html">Event Message</a> or 444 * <a href="http://camel.apache.org/exchange-pattern.html">InOnly exchange pattern</a> 445 * 446 * @param endpoint The endpoint which is used for sending the exchange 447 * @return the builder 448 */ 449 public Type inOnly(Endpoint endpoint) { 450 return to(ExchangePattern.InOnly, endpoint); 451 } 452 453 454 /** 455 * Sends the message to the given endpoints using an 456 * <a href="http://camel.apache.org/event-message.html">Event Message</a> or 457 * <a href="http://camel.apache.org/exchange-pattern.html">InOnly exchange pattern</a> 458 * 459 * @param uris list of endpoints to send to 460 * @return the builder 461 */ 462 public Type inOnly(String... uris) { 463 return to(ExchangePattern.InOnly, uris); 464 } 465 466 467 /** 468 * Sends the message to the given endpoints using an 469 * <a href="http://camel.apache.org/event-message.html">Event Message</a> or 470 * <a href="http://camel.apache.org/exchange-pattern.html">InOnly exchange pattern</a> 471 * 472 * @param endpoints list of endpoints to send to 473 * @return the builder 474 */ 475 public Type inOnly(Endpoint... endpoints) { 476 return to(ExchangePattern.InOnly, endpoints); 477 } 478 479 /** 480 * Sends the message to the given endpoints using an 481 * <a href="http://camel.apache.org/event-message.html">Event Message</a> or 482 * <a href="http://camel.apache.org/exchange-pattern.html">InOnly exchange pattern</a> 483 * 484 * @param endpoints list of endpoints to send to 485 * @return the builder 486 */ 487 public Type inOnly(Iterable<Endpoint> endpoints) { 488 return to(ExchangePattern.InOnly, endpoints); 489 } 490 491 492 /** 493 * <a href="http://camel.apache.org/exchange-pattern.html">ExchangePattern:</a> 494 * set the exchange's ExchangePattern {@link ExchangePattern} to be InOut 495 * 496 * 497 * @return the builder 498 */ 499 public Type inOut() { 500 return setExchangePattern(ExchangePattern.InOut); 501 } 502 503 /** 504 * Sends the message to the given endpoint using an 505 * <a href="http://camel.apache.org/request-reply.html">Request Reply</a> or 506 * <a href="http://camel.apache.org/exchange-pattern.html">InOut exchange pattern</a> 507 * 508 * @param uri The endpoint uri which is used for sending the exchange 509 * @return the builder 510 */ 511 public Type inOut(String uri) { 512 return to(ExchangePattern.InOut, uri); 513 } 514 515 516 /** 517 * Sends the message to the given endpoint using an 518 * <a href="http://camel.apache.org/request-reply.html">Request Reply</a> or 519 * <a href="http://camel.apache.org/exchange-pattern.html">InOut exchange pattern</a> 520 * 521 * @param endpoint The endpoint which is used for sending the exchange 522 * @return the builder 523 */ 524 public Type inOut(Endpoint endpoint) { 525 return to(ExchangePattern.InOut, endpoint); 526 } 527 528 /** 529 * Sends the message to the given endpoints using an 530 * <a href="http://camel.apache.org/request-reply.html">Request Reply</a> or 531 * <a href="http://camel.apache.org/exchange-pattern.html">InOut exchange pattern</a> 532 * 533 * @param uris list of endpoints to send to 534 * @return the builder 535 */ 536 public Type inOut(String... uris) { 537 return to(ExchangePattern.InOut, uris); 538 } 539 540 541 /** 542 * Sends the message to the given endpoints using an 543 * <a href="http://camel.apache.org/request-reply.html">Request Reply</a> or 544 * <a href="http://camel.apache.org/exchange-pattern.html">InOut exchange pattern</a> 545 * 546 * @param endpoints list of endpoints to send to 547 * @return the builder 548 */ 549 public Type inOut(Endpoint... endpoints) { 550 return to(ExchangePattern.InOut, endpoints); 551 } 552 553 /** 554 * Sends the message to the given endpoints using an 555 * <a href="http://camel.apache.org/request-reply.html">Request Reply</a> or 556 * <a href="http://camel.apache.org/exchange-pattern.html">InOut exchange pattern</a> 557 * 558 * @param endpoints list of endpoints to send to 559 * @return the builder 560 */ 561 public Type inOut(Iterable<Endpoint> endpoints) { 562 return to(ExchangePattern.InOut, endpoints); 563 } 564 565 /** 566 * Sets the id of this node 567 * 568 * @param id the id 569 * @return the builder 570 */ 571 @SuppressWarnings("unchecked") 572 public Type id(String id) { 573 if (getOutputs().isEmpty()) { 574 // set id on this 575 setId(id); 576 } else { 577 // set it on last output as this is what the user means to do 578 getOutputs().get(getOutputs().size() - 1).setId(id); 579 } 580 581 return (Type) this; 582 } 583 584 /** 585 * <a href="http://camel.apache.org/multicast.html">Multicast EIP:</a> 586 * Multicasts messages to all its child outputs; so that each processor and 587 * destination gets a copy of the original message to avoid the processors 588 * interfering with each other. 589 * 590 * @return the builder 591 */ 592 public MulticastDefinition multicast() { 593 MulticastDefinition answer = new MulticastDefinition(); 594 addOutput(answer); 595 return answer; 596 } 597 598 /** 599 * <a href="http://camel.apache.org/multicast.html">Multicast EIP:</a> 600 * Multicasts messages to all its child outputs; so that each processor and 601 * destination gets a copy of the original message to avoid the processors 602 * interfering with each other. 603 * 604 * @param aggregationStrategy the strategy used to aggregate responses for 605 * every part 606 * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer 607 * @return the builder 608 */ 609 public MulticastDefinition multicast(AggregationStrategy aggregationStrategy, boolean parallelProcessing) { 610 MulticastDefinition answer = new MulticastDefinition(); 611 addOutput(answer); 612 answer.setAggregationStrategy(aggregationStrategy); 613 answer.setParallelProcessing(parallelProcessing); 614 return answer; 615 } 616 617 /** 618 * <a href="http://camel.apache.org/multicast.html">Multicast EIP:</a> 619 * Multicasts messages to all its child outputs; so that each processor and 620 * destination gets a copy of the original message to avoid the processors 621 * interfering with each other. 622 * 623 * @param aggregationStrategy the strategy used to aggregate responses for 624 * every part 625 * @return the builder 626 */ 627 public MulticastDefinition multicast(AggregationStrategy aggregationStrategy) { 628 MulticastDefinition answer = new MulticastDefinition(); 629 addOutput(answer); 630 answer.setAggregationStrategy(aggregationStrategy); 631 return answer; 632 } 633 634 /** 635 * <a href="http://camel.apache.org/pipes-nd-filters.html">Pipes and Filters EIP:</a> 636 * Creates a {@link Pipeline} of the list of endpoints so that the message 637 * will get processed by each endpoint in turn and for request/response the 638 * output of one endpoint will be the input of the next endpoint 639 * 640 * @param uris list of endpoints 641 * @return the builder 642 */ 643 public Type pipeline(String... uris) { 644 return to(uris); 645 } 646 647 /** 648 * <a href="http://camel.apache.org/pipes-nd-filters.html">Pipes and Filters EIP:</a> 649 * Creates a {@link Pipeline} of the list of endpoints so that the message 650 * will get processed by each endpoint in turn and for request/response the 651 * output of one endpoint will be the input of the next endpoint 652 * 653 * @param endpoints list of endpoints 654 * @return the builder 655 */ 656 public Type pipeline(Endpoint... endpoints) { 657 return to(endpoints); 658 } 659 660 /** 661 * <a href="http://camel.apache.org/pipes-nd-filters.html">Pipes and Filters EIP:</a> 662 * Creates a {@link Pipeline} of the list of endpoints so that the message 663 * will get processed by each endpoint in turn and for request/response the 664 * output of one endpoint will be the input of the next endpoint 665 * 666 * @param endpoints list of endpoints 667 * @return the builder 668 */ 669 public Type pipeline(Collection<Endpoint> endpoints) { 670 return to(endpoints); 671 } 672 673 /** 674 * Leverages a thread pool for multi threading processing exchanges. 675 * <p/> 676 * The caller thread will either wait for the async route 677 * to complete or imeddiately continue. If continue the OUT message will 678 * contain a {@link java.util.concurrent.Future} handle so you can get the real response 679 * later using this handle. 680 * <p/> 681 * Will default <tt>Always</tt> wait for the async route to complete, but this behavior can be overriden by: 682 * <ul> 683 * <li>Configuring the <tt>waitForTaskToComplete</tt> option</li> 684 * <li>Provide an IN header with the key {@link org.apache.camel.Exchange#ASYNC_WAIT} with the 685 * value containing a type {@link org.apache.camel.WaitForTaskToComplete}. The header will take precedence, if provided.</li> 686 * </ul> 687 * 688 * @return the builder 689 */ 690 public ThreadsDefinition threads() { 691 ThreadsDefinition answer = new ThreadsDefinition(); 692 addOutput(answer); 693 return answer; 694 } 695 696 /** 697 * Leverages a thread pool for multi threading processing exchanges. 698 * <p/> 699 * See {@link #threads()} for more details. 700 * 701 * @param poolSize the core pool size 702 * @return the builder 703 */ 704 public ThreadsDefinition threads(int poolSize) { 705 ThreadsDefinition answer = threads(); 706 answer.setPoolSize(poolSize); 707 return answer; 708 } 709 710 /** 711 * Ends the current block 712 * 713 * @return the builder 714 */ 715 @SuppressWarnings("unchecked") 716 public ProcessorDefinition<? extends ProcessorDefinition> end() { 717 if (blocks.isEmpty()) { 718 if (parent == null) { 719 throw new IllegalArgumentException("Root node with no active block"); 720 } 721 return parent; 722 } 723 popBlock(); 724 return this; 725 } 726 727 /** 728 * <a href="http://camel.apache.org/idempotent-consumer.html">Idempotent consumer EIP:</a> 729 * Creates an {@link org.apache.camel.processor.idempotent.IdempotentConsumer IdempotentConsumer} 730 * to avoid duplicate messages 731 * 732 * @return the builder 733 */ 734 public IdempotentConsumerDefinition idempotentConsumer() { 735 IdempotentConsumerDefinition answer = new IdempotentConsumerDefinition(); 736 addOutput(answer); 737 return answer; 738 } 739 740 /** 741 * <a href="http://camel.apache.org/idempotent-consumer.html">Idempotent consumer EIP:</a> 742 * Creates an {@link org.apache.camel.processor.idempotent.IdempotentConsumer IdempotentConsumer} 743 * to avoid duplicate messages 744 * 745 * @param messageIdExpression expression to test of duplicate messages 746 * @param idempotentRepository the repository to use for duplicate chedck 747 * @return the builder 748 */ 749 public IdempotentConsumerDefinition idempotentConsumer(Expression messageIdExpression, 750 IdempotentRepository idempotentRepository) { 751 IdempotentConsumerDefinition answer = new IdempotentConsumerDefinition(messageIdExpression, idempotentRepository); 752 addOutput(answer); 753 return answer; 754 } 755 756 /** 757 * <a href="http://camel.apache.org/idempotent-consumer.html">Idempotent consumer EIP:</a> 758 * Creates an {@link org.apache.camel.processor.idempotent.IdempotentConsumer IdempotentConsumer} 759 * to avoid duplicate messages 760 * 761 * @param idempotentRepository the repository to use for duplicate chedck 762 * @return the builder used to create the expression 763 */ 764 public ExpressionClause<IdempotentConsumerDefinition> idempotentConsumer(IdempotentRepository idempotentRepository) { 765 IdempotentConsumerDefinition answer = new IdempotentConsumerDefinition(); 766 answer.setMessageIdRepository(idempotentRepository); 767 addOutput(answer); 768 return ExpressionClause.createAndSetExpression(answer); 769 } 770 771 /** 772 * <a href="http://camel.apache.org/message-filter.html">Message Filter EIP:</a> 773 * Creates a predicate expression which only if it is <tt>true</tt> then the 774 * exchange is forwarded to the destination 775 * 776 * @return the clause used to create the filter expression 777 */ 778 public ExpressionClause<FilterDefinition> filter() { 779 FilterDefinition filter = new FilterDefinition(); 780 addOutput(filter); 781 return ExpressionClause.createAndSetExpression(filter); 782 } 783 784 /** 785 * <a href="http://camel.apache.org/message-filter.html">Message Filter EIP:</a> 786 * Creates a predicate which is applied and only if it is <tt>true</tt> then the 787 * exchange is forwarded to the destination 788 * 789 * @param predicate predicate to use 790 * @return the builder 791 */ 792 public FilterDefinition filter(Predicate predicate) { 793 FilterDefinition filter = new FilterDefinition(predicate); 794 addOutput(filter); 795 return filter; 796 } 797 798 /** 799 * <a href="http://camel.apache.org/message-filter.html">Message Filter EIP:</a> 800 * Creates a predicate expression which only if it is <tt>true</tt> then the 801 * exchange is forwarded to the destination 802 * 803 * @param expression the predicate expression to use 804 * @return the builder 805 */ 806 public FilterDefinition filter(ExpressionDefinition expression) { 807 FilterDefinition filter = getNodeFactory().createFilter(); 808 filter.setExpression(expression); 809 addOutput(filter); 810 return filter; 811 } 812 813 /** 814 * <a href="http://camel.apache.org/message-filter.html">Message Filter EIP:</a> 815 * Creates a predicate language expression which only if it is <tt>true</tt> then the 816 * exchange is forwarded to the destination 817 * 818 * @param language language for expression 819 * @param expression the expression 820 * @return the builder 821 */ 822 public FilterDefinition filter(String language, String expression) { 823 return filter(new LanguageExpression(language, expression)); 824 } 825 826 /** 827 * <a href="http://camel.apache.org/load-balancer.html">Load Balancer EIP:</a> 828 * Creates a loadbalance 829 * 830 * @return the builder 831 */ 832 public LoadBalanceDefinition loadBalance() { 833 LoadBalanceDefinition answer = new LoadBalanceDefinition(); 834 addOutput(answer); 835 return answer; 836 } 837 838 /** 839 * <a href="http://camel.apache.org/load-balancer.html">Load Balancer EIP:</a> 840 * Creates a loadbalance 841 * 842 * @param loadBalancer a custom load balancer to use 843 * @return the builder 844 */ 845 public LoadBalanceDefinition loadBalance(LoadBalancer loadBalancer) { 846 LoadBalanceDefinition answer = new LoadBalanceDefinition(); 847 addOutput(answer); 848 return answer.loadBalance(loadBalancer); 849 } 850 851 /** 852 * <a href="http://camel.apache.org/content-based-router.html">Content Based Router EIP:</a> 853 * Creates a choice of one or more predicates with an otherwise clause 854 * 855 * @return the builder for a choice expression 856 */ 857 public ChoiceDefinition choice() { 858 ChoiceDefinition answer = new ChoiceDefinition(); 859 addOutput(answer); 860 return answer; 861 } 862 863 /** 864 * Creates a try/catch block 865 * 866 * @return the builder for a tryBlock expression 867 */ 868 public TryDefinition doTry() { 869 TryDefinition answer = new TryDefinition(); 870 addOutput(answer); 871 return answer; 872 } 873 874 /** 875 * <a href="http://camel.apache.org/recipient-list.html">Recipient List EIP:</a> 876 * Creates a dynamic recipient list allowing you to route messages to a number of dynamically specified recipients 877 * 878 * @param recipients expression to decide the destinations 879 * @return the builder 880 */ 881 @SuppressWarnings("unchecked") 882 public Type recipientList(Expression recipients) { 883 RecipientListDefinition answer = new RecipientListDefinition(recipients); 884 addOutput(answer); 885 return (Type) this; 886 } 887 888 /** 889 * <a href="http://camel.apache.org/recipient-list.html">Recipient List EIP:</a> 890 * Creates a dynamic recipient list allowing you to route messages to a number of dynamically specified recipients 891 * 892 * @return the expression clause to configure the expression to decide the destinations 893 */ 894 public ExpressionClause<ProcessorDefinition<Type>> recipientList() { 895 RecipientListDefinition answer = new RecipientListDefinition(); 896 addOutput(answer); 897 ExpressionClause<ProcessorDefinition<Type>> clause = new ExpressionClause<ProcessorDefinition<Type>>(this); 898 answer.setExpression(clause); 899 return clause; 900 } 901 902 /** 903 * <a href="http://camel.apache.org/routing-slip.html">Routing Slip EIP:</a> 904 * Creates a routing slip allowing you to route a message consecutively through a series of processing 905 * steps where the sequence of steps is not known at design time and can vary for each message. 906 * 907 * @param header is the header that the {@link org.apache.camel.processor.RoutingSlip RoutingSlip} 908 * class will look in for the list of URIs to route the message to. 909 * @param uriDelimiter is the delimiter that will be used to split up 910 * the list of URIs in the routing slip. 911 * @return the buiider 912 */ 913 @SuppressWarnings("unchecked") 914 public Type routingSlip(String header, String uriDelimiter) { 915 RoutingSlipDefinition answer = new RoutingSlipDefinition(header, uriDelimiter); 916 addOutput(answer); 917 return (Type) this; 918 } 919 920 /** 921 * <a href="http://camel.apache.org/routing-slip.html">Routing Slip EIP:</a> 922 * Creates a routing slip allowing you to route a message consecutively through a series of processing 923 * steps where the sequence of steps is not known at design time and can vary for each message. 924 * <p> 925 * The list of URIs will be split based on the default delimiter {@link RoutingSlipDefinition#DEFAULT_DELIMITER} 926 * 927 * @param header is the header that the {@link org.apache.camel.processor.RoutingSlip RoutingSlip} 928 * class will look in for the list of URIs to route the message to. 929 * @return the builder 930 */ 931 @SuppressWarnings("unchecked") 932 public Type routingSlip(String header) { 933 RoutingSlipDefinition answer = new RoutingSlipDefinition(header); 934 addOutput(answer); 935 return (Type) this; 936 } 937 938 /** 939 * <a href="http://camel.apache.org/splitter.html">Splitter EIP:</a> 940 * Creates a splitter allowing you split a message into a number of pieces and process them individually. 941 * <p> 942 * This splitter responds with the latest message returned from destination 943 * endpoint. 944 * 945 * @return the expression clause builder for the expression on which to split 946 */ 947 public ExpressionClause<SplitDefinition> split() { 948 SplitDefinition answer = new SplitDefinition(); 949 addOutput(answer); 950 return ExpressionClause.createAndSetExpression(answer); 951 } 952 953 /** 954 * <a href="http://camel.apache.org/splitter.html">Splitter EIP:</a> 955 * Creates a splitter allowing you split a message into a number of pieces and process them individually. 956 * <p> 957 * This splitter responds with the latest message returned from destination 958 * endpoint. 959 * 960 * @param expression the expression on which to split the message 961 * @return the builder 962 */ 963 public SplitDefinition split(Expression expression) { 964 SplitDefinition answer = new SplitDefinition(expression); 965 addOutput(answer); 966 return answer; 967 } 968 969 /** 970 * <a href="http://camel.apache.org/splitter.html">Splitter EIP:</a> 971 * Creates a splitter allowing you split a message into a number of pieces and process them individually. 972 * <p> 973 * The splitter responds with the answer produced by the given {@link AggregationStrategy}. 974 * 975 * @param expression the expression on which to split 976 * @param aggregationStrategy the strategy used to aggregate responses for every part 977 * @return the builder 978 */ 979 public SplitDefinition split(Expression expression, AggregationStrategy aggregationStrategy) { 980 SplitDefinition answer = new SplitDefinition(expression); 981 addOutput(answer); 982 answer.setAggregationStrategy(aggregationStrategy); 983 return answer; 984 } 985 986 /** 987 * <a href="http://camel.apache.org/resequencer.html">Resequencer EIP:</a> 988 * Creates a resequencer allowing you to reorganize messages based on some comparator. 989 * 990 * @return the expression clause for the expressions on which to compare messages in order 991 */ 992 public ExpressionClause<ResequenceDefinition> resequence() { 993 ResequenceDefinition answer = new ResequenceDefinition(); 994 addOutput(answer); 995 ExpressionClause<ResequenceDefinition> clause = new ExpressionClause<ResequenceDefinition>(answer); 996 answer.expression(clause); 997 return clause; 998 } 999 1000 /** 1001 * <a href="http://camel.apache.org/resequencer.html">Resequencer EIP:</a> 1002 * Creates a resequencer allowing you to reorganize messages based on some comparator. 1003 * 1004 * @param expression the expression on which to compare messages in order 1005 * @return the builder 1006 */ 1007 public ResequenceDefinition resequence(Expression expression) { 1008 return resequence(Collections.<Expression>singletonList(expression)); 1009 } 1010 1011 /** 1012 * <a href="http://camel.apache.org/resequencer.html">Resequencer EIP:</a> 1013 * Creates a resequencer allowing you to reorganize messages based on some comparator. 1014 * 1015 * @param expressions the list of expressions on which to compare messages in order 1016 * @return the builder 1017 */ 1018 public ResequenceDefinition resequence(List<Expression> expressions) { 1019 ResequenceDefinition answer = new ResequenceDefinition(expressions); 1020 addOutput(answer); 1021 return answer; 1022 } 1023 1024 /** 1025 * <a href="http://camel.apache.org/resequencer.html">Resequencer EIP:</a> 1026 * Creates a splitter allowing you to reorganise messages based on some comparator. 1027 * 1028 * @param expressions the list of expressions on which to compare messages in order 1029 * @return the builder 1030 */ 1031 public ResequenceDefinition resequencer(Expression... expressions) { 1032 List<Expression> list = new ArrayList<Expression>(); 1033 list.addAll(Arrays.asList(expressions)); 1034 return resequence(list); 1035 } 1036 1037 /** 1038 * <a href="http://camel.apache.org/aggregator.html">Aggregator EIP:</a> 1039 * Creates an aggregator allowing you to combine a number of messages together into a single message. 1040 * 1041 * @return the expression clause to be used as builder to configure the correlation expression 1042 */ 1043 public ExpressionClause<AggregateDefinition> aggregate() { 1044 AggregateDefinition answer = new AggregateDefinition(); 1045 addOutput(answer); 1046 return answer.createAndSetExpression(); 1047 } 1048 1049 /** 1050 * <a href="http://camel.apache.org/aggregator.html">Aggregator EIP:</a> 1051 * Creates an aggregator allowing you to combine a number of messages together into a single message. 1052 * 1053 * @param aggregationStrategy the strategy used for the aggregation 1054 * @return the expression clause to be used as builder to configure the correlation expression 1055 */ 1056 public ExpressionClause<AggregateDefinition> aggregate(AggregationStrategy aggregationStrategy) { 1057 AggregateDefinition answer = new AggregateDefinition(); 1058 answer.setAggregationStrategy(aggregationStrategy); 1059 addOutput(answer); 1060 return answer.createAndSetExpression(); 1061 } 1062 1063 /** 1064 * <a href="http://camel.apache.org/aggregator.html">Aggregator EIP:</a> 1065 * Creates an aggregator allowing you to combine a number of messages together into a single message. 1066 * 1067 * @param aggregationCollection the collection used to perform the aggregation 1068 * @return the builder 1069 */ 1070 public AggregateDefinition aggregate(AggregationCollection aggregationCollection) { 1071 AggregateDefinition answer = new AggregateDefinition(); 1072 answer.setAggregationCollection(aggregationCollection); 1073 addOutput(answer); 1074 return answer; 1075 } 1076 1077 /** 1078 * <a href="http://camel.apache.org/aggregator.html">Aggregator EIP:</a> 1079 * Creates an aggregator allowing you to combine a number of messages together into a single message. 1080 * 1081 * @param correlationExpression the expression used to calculate the 1082 * correlation key. For a JMS message this could be the 1083 * expression <code>header("JMSDestination")</code> or 1084 * <code>header("JMSCorrelationID")</code> 1085 * @return the builder 1086 */ 1087 public AggregateDefinition aggregate(Expression correlationExpression) { 1088 AggregateDefinition answer = new AggregateDefinition(correlationExpression); 1089 addOutput(answer); 1090 return answer; 1091 } 1092 1093 /** 1094 * <a href="http://camel.apache.org/aggregator.html">Aggregator EIP:</a> 1095 * Creates an aggregator allowing you to combine a number of messages together into a single message. 1096 * 1097 * @param correlationExpression the expression used to calculate the 1098 * correlation key. For a JMS message this could be the 1099 * expression <code>header("JMSDestination")</code> or 1100 * <code>header("JMSCorrelationID")</code> 1101 * @param aggregationStrategy the strategy used for the aggregation 1102 * @return the builder 1103 */ 1104 public AggregateDefinition aggregate(Expression correlationExpression, AggregationStrategy aggregationStrategy) { 1105 AggregateDefinition answer = new AggregateDefinition(correlationExpression, aggregationStrategy); 1106 addOutput(answer); 1107 return answer; 1108 } 1109 1110 /** 1111 * <a href="http://camel.apache.org/delayer.html">Delayer EIP:</a> 1112 * Creates a delayer allowing you to delay the delivery of messages to some destination. 1113 * 1114 * @param delay an expression to calculate the delay time in millis 1115 * @return the builder 1116 */ 1117 public DelayDefinition delay(Expression delay) { 1118 DelayDefinition answer = new DelayDefinition(delay); 1119 addOutput(answer); 1120 return answer; 1121 } 1122 1123 /** 1124 * <a href="http://camel.apache.org/delayer.html">Delayer EIP:</a> 1125 * Creates a delayer allowing you to delay the delivery of messages to some destination. 1126 * 1127 * @return the expression clause to create the expression 1128 */ 1129 public ExpressionClause<DelayDefinition> delay() { 1130 DelayDefinition answer = new DelayDefinition(); 1131 addOutput(answer); 1132 return ExpressionClause.createAndSetExpression(answer); 1133 } 1134 1135 /** 1136 * <a href="http://camel.apache.org/delayer.html">Delayer EIP:</a> 1137 * Creates a delayer allowing you to delay the delivery of messages to some destination. 1138 * 1139 * @param delay the delay in millis 1140 * @return the builder 1141 */ 1142 public DelayDefinition delay(long delay) { 1143 return delay(ExpressionBuilder.constantExpression(delay)); 1144 } 1145 1146 /** 1147 * <a href="http://camel.apache.org/throttler.html">Throttler EIP:</a> 1148 * Creates a throttler allowing you to ensure that a specific endpoint does not get overloaded, 1149 * or that we don't exceed an agreed SLA with some external service. 1150 * <p/> 1151 * Will default use a time period of 1 second, so setting the maximumRequestCount to eg 10 1152 * will default ensure at most 10 messages per second. 1153 * 1154 * @param maximumRequestCount the maximum messages 1155 * @return the builder 1156 */ 1157 public ThrottleDefinition throttle(long maximumRequestCount) { 1158 ThrottleDefinition answer = new ThrottleDefinition(maximumRequestCount); 1159 addOutput(answer); 1160 return answer; 1161 } 1162 1163 /** 1164 * <a href="http://camel.apache.org/loop.html">Loop EIP:</a> 1165 * Creates a loop allowing to process the a message a number of times and possibly process them 1166 * in a different way. Useful mostly for testing. 1167 * 1168 * @return the clause used to create the loop expression 1169 */ 1170 public ExpressionClause<LoopDefinition> loop() { 1171 LoopDefinition loop = new LoopDefinition(); 1172 addOutput(loop); 1173 return ExpressionClause.createAndSetExpression(loop); 1174 } 1175 1176 /** 1177 * <a href="http://camel.apache.org/loop.html">Loop EIP:</a> 1178 * Creates a loop allowing to process the a message a number of times and possibly process them 1179 * in a different way. Useful mostly for testing. 1180 * 1181 * @param expression the loop expression 1182 * @return the builder 1183 */ 1184 public LoopDefinition loop(Expression expression) { 1185 LoopDefinition loop = getNodeFactory().createLoop(); 1186 loop.setExpression(expression); 1187 addOutput(loop); 1188 return loop; 1189 } 1190 1191 /** 1192 * <a href="http://camel.apache.org/loop.html">Loop EIP:</a> 1193 * Creates a loop allowing to process the a message a number of times and possibly process them 1194 * in a different way. Useful mostly for testing. 1195 * 1196 * @param count the number of times 1197 * @return the builder 1198 */ 1199 public LoopDefinition loop(int count) { 1200 LoopDefinition loop = getNodeFactory().createLoop(); 1201 loop.setExpression(new ConstantExpression(Integer.toString(count))); 1202 addOutput(loop); 1203 return loop; 1204 } 1205 1206 /** 1207 * Sets the exception on the {@link org.apache.camel.Exchange} 1208 * 1209 * @param exception the exception to throw 1210 * @return the builder 1211 */ 1212 @SuppressWarnings("unchecked") 1213 public Type throwException(Exception exception) { 1214 ThrowExceptionDefinition answer = new ThrowExceptionDefinition(); 1215 answer.setException(exception); 1216 addOutput(answer); 1217 return (Type) this; 1218 } 1219 1220 /** 1221 * Marks the exchange for rollback only. 1222 * <p/> 1223 * This is done by setting a {@link org.apache.camel.RollbackExchangeException} on the Exchange 1224 * and mark it for rollback. 1225 * 1226 * @return the builder 1227 */ 1228 @SuppressWarnings("unchecked") 1229 public Type rollback() { 1230 return rollback(null); 1231 } 1232 1233 /** 1234 * Marks the exchange for rollback only. 1235 * <p/> 1236 * This is done by setting a {@link org.apache.camel.RollbackExchangeException} on the Exchange 1237 * and mark it for rollback. 1238 * 1239 * @param message an optional message used for logging purpose why the rollback was triggered 1240 * @return the builder 1241 */ 1242 @SuppressWarnings("unchecked") 1243 public Type rollback(String message) { 1244 RollbackDefinition answer = new RollbackDefinition(message); 1245 addOutput(answer); 1246 return (Type) this; 1247 } 1248 1249 /** 1250 * <a href="http://camel.apache.org/wiretap.html">WireTap EIP:</a> 1251 * Sends messages to all its child outputs; so that each processor and 1252 * destination gets a copy of the original message to avoid the processors 1253 * interfering with each other using {@link ExchangePattern#InOnly}. 1254 * 1255 * @return the builder 1256 */ 1257 @SuppressWarnings("unchecked") 1258 public Type wireTap(String uri) { 1259 WireTapDefinition answer = new WireTapDefinition(); 1260 answer.setUri(uri); 1261 addOutput(answer); 1262 return (Type) this; 1263 } 1264 1265 /** 1266 * <a href="http://camel.apache.org/wiretap.html">WireTap EIP:</a> 1267 * Sends a new {@link org.apache.camel.Exchange} to the destination 1268 * using {@link ExchangePattern#InOnly}. 1269 * 1270 * @param uri the destination 1271 * @param body expression that creates the body to send 1272 * @return the builder 1273 */ 1274 @SuppressWarnings("unchecked") 1275 public Type wireTap(String uri, Expression body) { 1276 WireTapDefinition answer = new WireTapDefinition(); 1277 answer.setUri(uri); 1278 answer.setNewExchangeExpression(body); 1279 addOutput(answer); 1280 return (Type) this; 1281 } 1282 1283 /** 1284 * <a href="http://camel.apache.org/wiretap.html">WireTap EIP:</a> 1285 * Sends a new {@link org.apache.camel.Exchange} to the destination 1286 * using {@link ExchangePattern#InOnly}. 1287 * 1288 * @param uri the destination 1289 * @param processor processor preparing the new exchange to send 1290 * @return the builder 1291 */ 1292 @SuppressWarnings("unchecked") 1293 public Type wireTap(String uri, Processor processor) { 1294 WireTapDefinition answer = new WireTapDefinition(); 1295 answer.setUri(uri); 1296 answer.setNewExchangeProcessor(processor); 1297 addOutput(answer); 1298 return (Type) this; 1299 } 1300 1301 /** 1302 * Pushes the given block on the stack as current block 1303 * @param block the block 1304 */ 1305 void pushBlock(Block block) { 1306 blocks.add(block); 1307 } 1308 1309 /** 1310 * Pops the block off the stack as current block 1311 * @return the block 1312 */ 1313 Block popBlock() { 1314 return blocks.isEmpty() ? null : blocks.removeLast(); 1315 } 1316 1317 /** 1318 * Stops continue routing the current {@link org.apache.camel.Exchange} and marks it as completed. 1319 * 1320 * @return the builder 1321 */ 1322 @SuppressWarnings("unchecked") 1323 public Type stop() { 1324 StopDefinition stop = new StopDefinition(); 1325 addOutput(stop); 1326 return (Type) this; 1327 } 1328 1329 /** 1330 * <a href="http://camel.apache.org/exception-clause.html">Exception clause</a> 1331 * for cathing certain exceptions and handling them. 1332 * 1333 * @param exceptionType the exception to catch 1334 * @return the exception builder to configure 1335 */ 1336 public OnExceptionDefinition onException(Class exceptionType) { 1337 OnExceptionDefinition answer = new OnExceptionDefinition(exceptionType); 1338 addOutput(answer); 1339 return answer; 1340 } 1341 1342 /** 1343 * Apply a {@link Policy}. 1344 * <p/> 1345 * Policy can be used for transactional policies. 1346 * 1347 * @param policy the policy to apply 1348 * @return the policy builder to configure 1349 */ 1350 public PolicyDefinition policy(Policy policy) { 1351 PolicyDefinition answer = new PolicyDefinition(policy); 1352 addOutput(answer); 1353 return answer; 1354 } 1355 1356 /** 1357 * Apply a {@link Policy}. 1358 * <p/> 1359 * Policy can be used for transactional policies. 1360 * 1361 * @param ref reference to lookup a policy in the registry 1362 * @return the policy builder to configure 1363 */ 1364 public PolicyDefinition policy(String ref) { 1365 PolicyDefinition answer = new PolicyDefinition(); 1366 answer.setRef(ref); 1367 addOutput(answer); 1368 return answer; 1369 } 1370 1371 /** 1372 * Marks this route as transacted and uses the default transacted policy found in the registry. 1373 * 1374 * @return the policy builder to configure 1375 */ 1376 public PolicyDefinition transacted() { 1377 PolicyDefinition answer = new PolicyDefinition(); 1378 answer.setType(TransactedPolicy.class); 1379 addOutput(answer); 1380 return answer; 1381 } 1382 1383 /** 1384 * Marks this route as transacted. 1385 * 1386 * @param ref reference to lookup a transacted policy in the registry 1387 * @return the policy builder to configure 1388 */ 1389 public PolicyDefinition transacted(String ref) { 1390 PolicyDefinition answer = new PolicyDefinition(); 1391 answer.setType(TransactedPolicy.class); 1392 answer.setRef(ref); 1393 addOutput(answer); 1394 return answer; 1395 } 1396 1397 /** 1398 * Installs the given <a href="http://camel.apache.org/error-handler.html">error handler</a> builder. 1399 * 1400 * @param errorHandlerBuilder the error handler to be used by default for all child routes 1401 * @return the current builder with the error handler configured 1402 */ 1403 @SuppressWarnings("unchecked") 1404 public Type errorHandler(ErrorHandlerBuilder errorHandlerBuilder) { 1405 setErrorHandlerBuilder(errorHandlerBuilder); 1406 return (Type) this; 1407 } 1408 1409 // Transformers 1410 // ------------------------------------------------------------------------- 1411 1412 /** 1413 * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a> 1414 * Adds the custom processor to this destination which could be a final 1415 * destination, or could be a transformation in a pipeline 1416 * 1417 * @param processor the custom {@link Processor} 1418 * @return the builder 1419 */ 1420 @SuppressWarnings("unchecked") 1421 public Type process(Processor processor) { 1422 ProcessDefinition answer = new ProcessDefinition(processor); 1423 addOutput(answer); 1424 return (Type) this; 1425 } 1426 1427 /** 1428 * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a> 1429 * Adds the custom processor reference to this destination which could be a final 1430 * destination, or could be a transformation in a pipeline 1431 * 1432 * @param ref reference to a {@link Processor} to lookup in the registry 1433 * @return the builder 1434 */ 1435 @SuppressWarnings("unchecked") 1436 public Type processRef(String ref) { 1437 ProcessDefinition answer = new ProcessDefinition(); 1438 answer.setRef(ref); 1439 addOutput(answer); 1440 return (Type) this; 1441 } 1442 1443 /** 1444 * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a> 1445 * Adds a bean which is invoked which could be a final destination, or could be a transformation in a pipeline 1446 * 1447 * @param bean the bean to invoke 1448 * @return the builder 1449 */ 1450 @SuppressWarnings("unchecked") 1451 public Type bean(Object bean) { 1452 BeanDefinition answer = new BeanDefinition(); 1453 answer.setBean(bean); 1454 addOutput(answer); 1455 return (Type) this; 1456 } 1457 1458 /** 1459 * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a> 1460 * Adds a bean which is invoked which could be a final destination, or could be a transformation in a pipeline 1461 * 1462 * @param bean the bean to invoke 1463 * @param method the method name to invoke on the bean (can be used to avoid ambiguty) 1464 * @return the builder 1465 */ 1466 @SuppressWarnings("unchecked") 1467 public Type bean(Object bean, String method) { 1468 BeanDefinition answer = new BeanDefinition(); 1469 answer.setBean(bean); 1470 answer.setMethod(method); 1471 addOutput(answer); 1472 return (Type) this; 1473 } 1474 1475 /** 1476 * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a> 1477 * Adds a bean which is invoked which could be a final destination, or could be a transformation in a pipeline 1478 * 1479 * @param beanType the bean class, Camel will instantiate an object at runtime 1480 * @return the builder 1481 */ 1482 @SuppressWarnings("unchecked") 1483 public Type bean(Class beanType) { 1484 BeanDefinition answer = new BeanDefinition(); 1485 answer.setBeanType(beanType); 1486 addOutput(answer); 1487 return (Type) this; 1488 } 1489 1490 /** 1491 * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a> 1492 * Adds a bean which is invoked which could be a final destination, or could be a transformation in a pipeline 1493 * 1494 * @param beanType the bean class, Camel will instantiate an object at runtime 1495 * @param method the method name to invoke on the bean (can be used to avoid ambiguty) 1496 * @return the builder 1497 */ 1498 @SuppressWarnings("unchecked") 1499 public Type bean(Class beanType, String method) { 1500 BeanDefinition answer = new BeanDefinition(); 1501 answer.setBeanType(beanType); 1502 answer.setMethod(method); 1503 addOutput(answer); 1504 return (Type) this; 1505 } 1506 1507 /** 1508 * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a> 1509 * Adds a bean which is invoked which could be a final destination, or could be a transformation in a pipeline 1510 * 1511 * @param ref reference to a bean to lookup in the registry 1512 * @return the builder 1513 */ 1514 @SuppressWarnings("unchecked") 1515 public Type beanRef(String ref) { 1516 BeanDefinition answer = new BeanDefinition(ref); 1517 addOutput(answer); 1518 return (Type) this; 1519 } 1520 1521 /** 1522 * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a> 1523 * Adds a bean which is invoked which could be a final destination, or could be a transformation in a pipeline 1524 * 1525 * @param ref reference to a bean to lookup in the registry 1526 * @param method the method name to invoke on the bean (can be used to avoid ambiguty) 1527 * @return the builder 1528 */ 1529 @SuppressWarnings("unchecked") 1530 public Type beanRef(String ref, String method) { 1531 BeanDefinition answer = new BeanDefinition(ref, method); 1532 addOutput(answer); 1533 return (Type) this; 1534 } 1535 1536 /** 1537 * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a> 1538 * Adds a processor which sets the body on the IN message 1539 * 1540 * @return a expression builder clause to set the body 1541 */ 1542 public ExpressionClause<ProcessorDefinition<Type>> setBody() { 1543 ExpressionClause<ProcessorDefinition<Type>> clause = new ExpressionClause<ProcessorDefinition<Type>>(this); 1544 SetBodyDefinition answer = new SetBodyDefinition(clause); 1545 addOutput(answer); 1546 return clause; 1547 } 1548 1549 /** 1550 * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a> 1551 * Adds a processor which sets the body on the IN message 1552 * 1553 * @param expression the expression used to set the body 1554 * @return the builder 1555 */ 1556 @SuppressWarnings("unchecked") 1557 public Type setBody(Expression expression) { 1558 SetBodyDefinition answer = new SetBodyDefinition(expression); 1559 addOutput(answer); 1560 return (Type) this; 1561 } 1562 1563 /** 1564 * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a> 1565 * Adds a processor which sets the body on the OUT message 1566 * 1567 * @param expression the expression used to set the body 1568 * @return the builder 1569 */ 1570 @SuppressWarnings("unchecked") 1571 public Type transform(Expression expression) { 1572 TransformDefinition answer = new TransformDefinition(expression); 1573 addOutput(answer); 1574 return (Type) this; 1575 } 1576 1577 /** 1578 * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a> 1579 * Adds a processor which sets the body on the OUT message 1580 * 1581 * @return a expression builder clause to set the body 1582 */ 1583 public ExpressionClause<ProcessorDefinition<Type>> transform() { 1584 ExpressionClause<ProcessorDefinition<Type>> clause = new ExpressionClause<ProcessorDefinition<Type>>((Type) this); 1585 TransformDefinition answer = new TransformDefinition(clause); 1586 addOutput(answer); 1587 return clause; 1588 } 1589 1590 /** 1591 * Adds a processor which sets the body on the FAULT message 1592 * 1593 * @param expression the expression used to set the body 1594 * @return the builder 1595 */ 1596 public Type setFaultBody(Expression expression) { 1597 return process(ProcessorBuilder.setFaultBody(expression)); 1598 } 1599 1600 /** 1601 * Adds a processor which sets the header on the IN message 1602 * 1603 * @param name the header name 1604 * @return a expression builder clause to set the header 1605 */ 1606 public ExpressionClause<ProcessorDefinition<Type>> setHeader(String name) { 1607 ExpressionClause<ProcessorDefinition<Type>> clause = new ExpressionClause<ProcessorDefinition<Type>>(this); 1608 SetHeaderDefinition answer = new SetHeaderDefinition(name, clause); 1609 addOutput(answer); 1610 return clause; 1611 } 1612 1613 /** 1614 * Adds a processor which sets the header on the IN message 1615 * 1616 * @param name the header name 1617 * @param expression the expression used to set the header 1618 * @return the builder 1619 */ 1620 @SuppressWarnings("unchecked") 1621 public Type setHeader(String name, Expression expression) { 1622 SetHeaderDefinition answer = new SetHeaderDefinition(name, expression); 1623 addOutput(answer); 1624 return (Type) this; 1625 } 1626 1627 /** 1628 * Adds a processor which sets the header on the OUT message 1629 * 1630 * @param name the header name 1631 * @return a expression builder clause to set the header 1632 */ 1633 public ExpressionClause<ProcessorDefinition<Type>> setOutHeader(String name) { 1634 ExpressionClause<ProcessorDefinition<Type>> clause = new ExpressionClause<ProcessorDefinition<Type>>(this); 1635 SetOutHeaderDefinition answer = new SetOutHeaderDefinition(name, clause); 1636 addOutput(answer); 1637 return clause; 1638 } 1639 1640 /** 1641 * Adds a processor which sets the header on the OUT message 1642 * 1643 * @param name the header name 1644 * @param expression the expression used to set the header 1645 * @return the builder 1646 */ 1647 @SuppressWarnings("unchecked") 1648 public Type setOutHeader(String name, Expression expression) { 1649 SetOutHeaderDefinition answer = new SetOutHeaderDefinition(name, expression); 1650 addOutput(answer); 1651 return (Type) this; 1652 } 1653 1654 /** 1655 * Adds a processor which sets the header on the FAULT message 1656 * 1657 * @param name the header name 1658 * @param expression the expression used to set the header 1659 * @return the builder 1660 */ 1661 public Type setFaultHeader(String name, Expression expression) { 1662 return process(ProcessorBuilder.setFaultHeader(name, expression)); 1663 } 1664 1665 /** 1666 * Adds a processor which sets the exchange property 1667 * 1668 * @param name the property name 1669 * @param expression the expression used to set the property 1670 * @return the builder 1671 */ 1672 @SuppressWarnings("unchecked") 1673 public Type setProperty(String name, Expression expression) { 1674 SetPropertyDefinition answer = new SetPropertyDefinition(name, expression); 1675 addOutput(answer); 1676 return (Type) this; 1677 } 1678 1679 1680 /** 1681 * Adds a processor which sets the exchange property 1682 * 1683 * @param name the property name 1684 * @return a expression builder clause to set the property 1685 */ 1686 public ExpressionClause<ProcessorDefinition<Type>> setProperty(String name) { 1687 ExpressionClause<ProcessorDefinition<Type>> clause = new ExpressionClause<ProcessorDefinition<Type>>(this); 1688 SetPropertyDefinition answer = new SetPropertyDefinition(name, clause); 1689 addOutput(answer); 1690 return clause; 1691 } 1692 1693 /** 1694 * Adds a processor which removes the header on the IN message 1695 * 1696 * @param name the header name 1697 * @return the builder 1698 */ 1699 @SuppressWarnings("unchecked") 1700 public Type removeHeader(String name) { 1701 RemoveHeaderDefinition answer = new RemoveHeaderDefinition(name); 1702 addOutput(answer); 1703 return (Type) this; 1704 } 1705 1706 /** 1707 * Adds a processor which removes the header on the FAULT message 1708 * 1709 * @param name the header name 1710 * @return the builder 1711 */ 1712 public Type removeFaultHeader(String name) { 1713 return process(ProcessorBuilder.removeFaultHeader(name)); 1714 } 1715 1716 /** 1717 * Adds a processor which removes the exchange property 1718 * 1719 * @param name the property name 1720 * @return the builder 1721 */ 1722 @SuppressWarnings("unchecked") 1723 public Type removeProperty(String name) { 1724 RemovePropertyDefinition answer = new RemovePropertyDefinition(name); 1725 addOutput(answer); 1726 return (Type) this; 1727 } 1728 1729 /** 1730 * Converts the IN message body to the specified type 1731 * 1732 * @param type the type to convert to 1733 * @return the builder 1734 */ 1735 @SuppressWarnings("unchecked") 1736 public Type convertBodyTo(Class type) { 1737 addOutput(new ConvertBodyDefinition(type)); 1738 return (Type) this; 1739 } 1740 1741 /** 1742 * Converts the IN message body to the specified type 1743 * 1744 * @param type the type to convert to 1745 * @param charset the charset to use by type converters (not all converters support specifc charset) 1746 * @return the builder 1747 */ 1748 @SuppressWarnings("unchecked") 1749 public Type convertBodyTo(Class type, String charset) { 1750 addOutput(new ConvertBodyDefinition(type, charset)); 1751 return (Type) this; 1752 } 1753 1754 /** 1755 * Sorts the IN message body using the given comparator. 1756 * The IN body mut be convertable to {@link List}. 1757 * 1758 * @param comparator the comparator to use for sorting 1759 * @return the builder 1760 */ 1761 @SuppressWarnings("unchecked") 1762 public Type sortBody(Comparator comparator) { 1763 addOutput(new SortDefinition(body(), comparator)); 1764 return (Type) this; 1765 } 1766 1767 /** 1768 * Sorts the IN message body using a default sorting based on toString representation. 1769 * The IN body mut be convertable to {@link List}. 1770 * 1771 * @return the builder 1772 */ 1773 public Type sortBody() { 1774 return sortBody(null); 1775 } 1776 1777 /** 1778 * Sorts the expression using the given comparator 1779 * 1780 * @param expression the expression, must be convertable to {@link List} 1781 * @param comparator the comparator to use for sorting 1782 * @return the builder 1783 */ 1784 @SuppressWarnings("unchecked") 1785 public Type sort(Expression expression, Comparator comparator) { 1786 addOutput(new SortDefinition(expression, comparator)); 1787 return (Type) this; 1788 } 1789 1790 /** 1791 * Sorts the expression using a default sorting based on toString representation. 1792 * 1793 * @param expression the expression, must be convertable to {@link List} 1794 * @return the builder 1795 */ 1796 public Type sort(Expression expression) { 1797 return sort(expression, null); 1798 } 1799 1800 /** 1801 * Enriches an exchange with additional data obtained from a 1802 * <code>resourceUri</code>. 1803 * 1804 * @param resourceUri URI of resource endpoint for obtaining additional data. 1805 * @param aggregationStrategy aggregation strategy to aggregate input data and additional data. 1806 * @return the builder 1807 * @see org.apache.camel.processor.Enricher 1808 */ 1809 @SuppressWarnings("unchecked") 1810 public Type enrich(String resourceUri, AggregationStrategy aggregationStrategy) { 1811 addOutput(new EnrichDefinition(aggregationStrategy, resourceUri)); 1812 return (Type) this; 1813 } 1814 1815 /** 1816 * Enriches an exchange with additional data obtained from a 1817 * <code>resourceUri</code>. 1818 * 1819 * @param resourceUri URI of resource endpoint for obtaining additional data. 1820 * @return the builder 1821 * @see org.apache.camel.processor.Enricher 1822 */ 1823 @SuppressWarnings("unchecked") 1824 public Type enrich(String resourceUri) { 1825 addOutput(new EnrichDefinition(resourceUri)); 1826 return (Type) this; 1827 } 1828 1829 /** 1830 * Adds a onComplection {@link org.apache.camel.spi.Synchronization} hook that invoke this route as 1831 * a callback when the {@link org.apache.camel.Exchange} has finished being processed. 1832 * The hook invoke callbacks for either onComplete or onFailure. 1833 * <p/> 1834 * Will by default always trigger when the {@link org.apache.camel.Exchange} is complete 1835 * (either with success or failed). 1836 * <br/> 1837 * You can limit the callback to either onComplete or onFailure but invoking the nested 1838 * builder method. 1839 * <p/> 1840 * For onFailure the caused exception is stored as a property on the {@link org.apache.camel.Exchange} 1841 * with the key {@link org.apache.camel.Exchange#EXCEPTION_CAUGHT}. 1842 * 1843 * @return the builder 1844 */ 1845 @SuppressWarnings("unchecked") 1846 public OnCompletionDefinition onCompletion() { 1847 OnCompletionDefinition answer = new OnCompletionDefinition(); 1848 // we must remove all existing on completion definition (as they are global) 1849 // and thus we are the only one as route scoped should override any global scoped 1850 answer.removeAllOnCompletionDefinition(this); 1851 popBlock(); 1852 addOutput(answer); 1853 pushBlock(answer); 1854 return answer; 1855 } 1856 1857 // DataFormat support 1858 // ------------------------------------------------------------------------- 1859 1860 /** 1861 * <a href="http://camel.apache.org/data-format.html">DataFormat:</a> 1862 * Unmarshals the in body using a {@link DataFormat} expression to define 1863 * the format of the input message and the output will be set on the out message body. 1864 * 1865 * @return the expression to create the {@link DataFormat} 1866 */ 1867 public DataFormatClause<ProcessorDefinition<Type>> unmarshal() { 1868 return new DataFormatClause<ProcessorDefinition<Type>>(this, DataFormatClause.Operation.Unmarshal); 1869 } 1870 1871 /** 1872 * <a href="http://camel.apache.org/data-format.html">DataFormat:</a> 1873 * Unmarshals the in body using the specified {@link DataFormat} 1874 * and sets the output on the out message body. 1875 * 1876 * @param dataFormatType the dataformat 1877 * @return the builder 1878 */ 1879 @SuppressWarnings("unchecked") 1880 public Type unmarshal(DataFormatDefinition dataFormatType) { 1881 addOutput(new UnmarshalDefinition(dataFormatType)); 1882 return (Type) this; 1883 } 1884 1885 /** 1886 * <a href="http://camel.apache.org/data-format.html">DataFormat:</a> 1887 * Unmarshals the in body using the specified {@link DataFormat} 1888 * and sets the output on the out message body. 1889 * 1890 * @param dataFormat the dataformat 1891 * @return the builder 1892 */ 1893 public Type unmarshal(DataFormat dataFormat) { 1894 return unmarshal(new DataFormatDefinition(dataFormat)); 1895 } 1896 1897 /** 1898 * <a href="http://camel.apache.org/data-format.html">DataFormat:</a> 1899 * Unmarshals the in body using the specified {@link DataFormat} 1900 * reference in the {@link org.apache.camel.spi.Registry} and sets 1901 * the output on the out message body. 1902 * 1903 * @param dataTypeRef reference to a {@link DataFormat} to lookup in the registry 1904 * @return the builder 1905 */ 1906 @SuppressWarnings("unchecked") 1907 public Type unmarshal(String dataTypeRef) { 1908 addOutput(new UnmarshalDefinition(dataTypeRef)); 1909 return (Type) this; 1910 } 1911 1912 /** 1913 * <a href="http://camel.apache.org/data-format.html">DataFormat:</a> 1914 * Marshals the in body using a {@link DataFormat} expression to define 1915 * the format of the output which will be added to the out body. 1916 * 1917 * @return the expression to create the {@link DataFormat} 1918 */ 1919 public DataFormatClause<ProcessorDefinition<Type>> marshal() { 1920 return new DataFormatClause<ProcessorDefinition<Type>>(this, DataFormatClause.Operation.Marshal); 1921 } 1922 1923 /** 1924 * <a href="http://camel.apache.org/data-format.html">DataFormat:</a> 1925 * Marshals the in body using the specified {@link DataFormat} 1926 * and sets the output on the out message body. 1927 * 1928 * @param dataFormatType the dataformat 1929 * @return the builder 1930 */ 1931 @SuppressWarnings("unchecked") 1932 public Type marshal(DataFormatDefinition dataFormatType) { 1933 addOutput(new MarshalDefinition(dataFormatType)); 1934 return (Type) this; 1935 } 1936 1937 /** 1938 * <a href="http://camel.apache.org/data-format.html">DataFormat:</a> 1939 * Marshals the in body using the specified {@link DataFormat} 1940 * and sets the output on the out message body. 1941 * 1942 * @param dataFormat the dataformat 1943 * @return the builder 1944 */ 1945 public Type marshal(DataFormat dataFormat) { 1946 return marshal(new DataFormatDefinition(dataFormat)); 1947 } 1948 1949 /** 1950 * <a href="http://camel.apache.org/data-format.html">DataFormat:</a> 1951 * Marshals the in body the specified {@link DataFormat} 1952 * reference in the {@link org.apache.camel.spi.Registry} and sets 1953 * the output on the out message body. 1954 * 1955 * @param dataTypeRef reference to a {@link DataFormat} to lookup in the registry 1956 * @return the builder 1957 */ 1958 @SuppressWarnings("unchecked") 1959 public Type marshal(String dataTypeRef) { 1960 addOutput(new MarshalDefinition(dataTypeRef)); 1961 return (Type) this; 1962 } 1963 1964 // Properties 1965 // ------------------------------------------------------------------------- 1966 @XmlTransient 1967 @SuppressWarnings("unchecked") 1968 public ProcessorDefinition<? extends ProcessorDefinition> getParent() { 1969 return parent; 1970 } 1971 1972 public void setParent(ProcessorDefinition<? extends ProcessorDefinition> parent) { 1973 this.parent = parent; 1974 } 1975 1976 @XmlTransient 1977 public ErrorHandlerBuilder getErrorHandlerBuilder() { 1978 if (errorHandlerBuilder == null) { 1979 errorHandlerBuilder = createErrorHandlerBuilder(); 1980 } 1981 return errorHandlerBuilder; 1982 } 1983 1984 /** 1985 * Sets the error handler to use with processors created by this builder 1986 */ 1987 public void setErrorHandlerBuilder(ErrorHandlerBuilder errorHandlerBuilder) { 1988 this.errorHandlerBuilder = errorHandlerBuilder; 1989 } 1990 1991 /** 1992 * Sets the error handler if one is not already set 1993 */ 1994 protected void setErrorHandlerBuilderIfNull(ErrorHandlerBuilder errorHandlerBuilder) { 1995 if (this.errorHandlerBuilder == null) { 1996 setErrorHandlerBuilder(errorHandlerBuilder); 1997 } 1998 } 1999 2000 public String getErrorHandlerRef() { 2001 return errorHandlerRef; 2002 } 2003 2004 /** 2005 * Sets the bean ref name of the error handler builder to use on this route 2006 */ 2007 @XmlAttribute(required = false) 2008 public void setErrorHandlerRef(String errorHandlerRef) { 2009 this.errorHandlerRef = errorHandlerRef; 2010 setErrorHandlerBuilder(new ErrorHandlerBuilderRef(errorHandlerRef)); 2011 } 2012 2013 @XmlTransient 2014 public NodeFactory getNodeFactory() { 2015 if (nodeFactory == null) { 2016 nodeFactory = new NodeFactory(); 2017 } 2018 return nodeFactory; 2019 } 2020 2021 public void setNodeFactory(NodeFactory nodeFactory) { 2022 this.nodeFactory = nodeFactory; 2023 } 2024 2025 @XmlTransient 2026 public List<InterceptStrategy> getInterceptStrategies() { 2027 return interceptStrategies; 2028 } 2029 2030 public void addInterceptStrategy(InterceptStrategy strategy) { 2031 this.interceptStrategies.add(strategy); 2032 } 2033 2034 /** 2035 * Returns a label to describe this node such as the expression if some kind of expression node 2036 */ 2037 public String getLabel() { 2038 return ""; 2039 } 2040 2041 2042 }