001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.model; 018 019 import org.apache.camel.Endpoint; 020 import org.apache.camel.Exchange; 021 import org.apache.camel.Expression; 022 import org.apache.camel.Predicate; 023 import org.apache.camel.Processor; 024 import org.apache.camel.Route; 025 import org.apache.camel.RuntimeCamelException; 026 import org.apache.camel.builder.Builder; 027 import org.apache.camel.builder.DeadLetterChannelBuilder; 028 import org.apache.camel.builder.ErrorHandlerBuilder; 029 import org.apache.camel.builder.NoErrorHandlerBuilder; 030 import org.apache.camel.builder.ProcessorBuilder; 031 import org.apache.camel.converter.ObjectConverter; 032 import org.apache.camel.impl.RouteContext; 033 import org.apache.camel.model.language.ExpressionType; 034 import org.apache.camel.model.language.LanguageExpression; 035 import org.apache.camel.processor.DelegateProcessor; 036 import org.apache.camel.processor.MulticastProcessor; 037 import org.apache.camel.processor.Pipeline; 038 import org.apache.camel.processor.RecipientList; 039 import org.apache.camel.processor.aggregate.AggregationStrategy; 040 import org.apache.camel.processor.idempotent.IdempotentConsumer; 041 import org.apache.camel.processor.idempotent.MessageIdRepository; 042 import org.apache.camel.spi.Policy; 043 import org.apache.commons.logging.Log; 044 import org.apache.commons.logging.LogFactory; 045 046 import javax.xml.bind.annotation.XmlAttribute; 047 import javax.xml.bind.annotation.XmlTransient; 048 import java.util.ArrayList; 049 import java.util.Collection; 050 import java.util.Collections; 051 import java.util.List; 052 053 /** 054 * @version $Revision: 1.1 $ 055 */ 056 public abstract class ProcessorType { 057 public static final String DEFAULT_TRACE_CATEGORY = "org.apache.camel.TRACE"; 058 private ErrorHandlerBuilder errorHandlerBuilder; 059 private Boolean inheritErrorHandlerFlag = Boolean.TRUE; // TODO not sure how 060 private DelegateProcessor lastInterceptor; 061 // else to use an 062 // optional 063 // attribute in 064 // JAXB2 065 066 public abstract List<ProcessorType> getOutputs(); 067 068 public abstract List<InterceptorType> getInterceptors(); 069 070 public Processor createProcessor(RouteContext routeContext) throws Exception { 071 throw new UnsupportedOperationException("Not implemented yet for class: " + getClass().getName()); 072 } 073 074 public Processor createOutputsProcessor(RouteContext routeContext) throws Exception { 075 Collection<ProcessorType> outputs = getOutputs(); 076 return createOutputsProcessor(routeContext, outputs); 077 } 078 079 public void addRoutes(RouteContext routeContext, Collection<Route> routes) throws Exception { 080 Processor processor = makeProcessor(routeContext); 081 routeContext.addEventDrivenProcessor(processor); 082 } 083 084 /** 085 * Wraps the child processor in whatever necessary interceptors and error 086 * handlers 087 */ 088 public Processor wrapProcessor(RouteContext routeContext, Processor processor) throws Exception { 089 processor = wrapProcessorInInterceptors(routeContext, processor); 090 return wrapInErrorHandler(processor); 091 } 092 093 // Fluent API 094 // ------------------------------------------------------------------------- 095 096 /** 097 * Sends the exchange to the given endpoint URI 098 */ 099 public ProcessorType to(String uri) { 100 addOutput(new ToType(uri)); 101 return this; 102 } 103 104 /** 105 * Sends the exchange to the given endpoint 106 */ 107 public ProcessorType to(Endpoint endpoint) { 108 addOutput(new ToType(endpoint)); 109 return this; 110 } 111 112 /** 113 * Sends the exchange to a list of endpoints using the 114 * {@link MulticastProcessor} pattern 115 */ 116 public ProcessorType to(String... uris) { 117 for (String uri : uris) { 118 addOutput(new ToType(uri)); 119 } 120 return this; 121 } 122 123 /** 124 * Sends the exchange to a list of endpoints using the 125 * {@link MulticastProcessor} pattern 126 */ 127 public ProcessorType to(Endpoint... endpoints) { 128 for (Endpoint endpoint : endpoints) { 129 addOutput(new ToType(endpoint)); 130 } 131 return this; 132 } 133 134 /** 135 * Sends the exchange to a list of endpoint using the 136 * {@link MulticastProcessor} pattern 137 */ 138 public ProcessorType to(Collection<Endpoint> endpoints) { 139 for (Endpoint endpoint : endpoints) { 140 addOutput(new ToType(endpoint)); 141 } 142 return this; 143 } 144 145 /** 146 * Multicasts messages to all its child outputs; so that each processor and 147 * destination gets a copy of the original message to avoid the processors 148 * interfering with each other. 149 */ 150 public MulticastType multicast() { 151 MulticastType answer = new MulticastType(); 152 addOutput(answer); 153 return answer; 154 } 155 156 /** 157 * Creates a {@link Pipeline} of the list of endpoints so that the message 158 * will get processed by each endpoint in turn and for request/response the 159 * output of one endpoint will be the input of the next endpoint 160 */ 161 public ProcessorType pipeline(String... uris) { 162 // TODO pipeline v mulicast 163 return to(uris); 164 } 165 166 /** 167 * Creates a {@link Pipeline} of the list of endpoints so that the message 168 * will get processed by each endpoint in turn and for request/response the 169 * output of one endpoint will be the input of the next endpoint 170 */ 171 public ProcessorType pipeline(Endpoint... endpoints) { 172 // TODO pipeline v mulicast 173 return to(endpoints); 174 } 175 176 /** 177 * Creates a {@link Pipeline} of the list of endpoints so that the message 178 * will get processed by each endpoint in turn and for request/response the 179 * output of one endpoint will be the input of the next endpoint 180 */ 181 public ProcessorType pipeline(Collection<Endpoint> endpoints) { 182 // TODO pipeline v mulicast 183 return to(endpoints); 184 } 185 186 /** 187 * Creates an {@link IdempotentConsumer} to avoid duplicate messages 188 */ 189 public IdempotentConsumerType idempotentConsumer(Expression messageIdExpression, 190 MessageIdRepository messageIdRepository) { 191 IdempotentConsumerType answer = new IdempotentConsumerType(messageIdExpression, messageIdRepository); 192 addOutput(answer); 193 return answer; 194 } 195 196 /** 197 * Creates a predicate which is applied and only if it is true then the 198 * exchange is forwarded to the destination 199 * 200 * @return the builder for a predicate 201 */ 202 public FilterType filter(Predicate predicate) { 203 FilterType filter = new FilterType(predicate); 204 addOutput(filter); 205 return filter; 206 } 207 208 /** 209 * Creates a choice of one or more predicates with an otherwise clause 210 * 211 * @return the builder for a choice expression 212 */ 213 public ChoiceType choice() { 214 ChoiceType answer = new ChoiceType(); 215 addOutput(answer); 216 return answer; 217 } 218 219 /** 220 * Creates a try/catch block 221 * 222 * @return the builder for a tryBlock expression 223 */ 224 public TryType tryBlock() { 225 TryType answer = new TryType(); 226 addOutput(answer); 227 return answer; 228 } 229 230 /** 231 * Creates a dynamic <a 232 * href="http://activemq.apache.org/camel/recipient-list.html">Recipient 233 * List</a> pattern. 234 * 235 * @param receipients is the builder of the expression used in the 236 * {@link RecipientList} to decide the destinations 237 */ 238 public ProcessorType recipientList(Expression receipients) { 239 RecipientListType answer = new RecipientListType(receipients); 240 addOutput(answer); 241 return this; 242 } 243 244 /** 245 * A builder for the <a 246 * href="http://activemq.apache.org/camel/splitter.html">Splitter</a> 247 * pattern where an expression is evaluated to iterate through each of the 248 * parts of a message and then each part is then send to some endpoint. 249 * 250 * @param receipients the expression on which to split 251 * @return the builder 252 */ 253 public SplitterType splitter(Expression receipients) { 254 SplitterType answer = new SplitterType(receipients); 255 addOutput(answer); 256 return answer; 257 } 258 259 /** 260 * A builder for the <a 261 * href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a> 262 * pattern where an expression is evaluated to be able to compare the 263 * message exchanges to reorder them. e.g. you may wish to sort by some 264 * header 265 * 266 * @param expression the expression on which to compare messages in order 267 * @return the builder 268 */ 269 public ResequencerType resequencer(Expression<Exchange> expression) { 270 return resequencer(Collections.<Expression> singletonList(expression)); 271 } 272 273 /** 274 * A builder for the <a 275 * href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a> 276 * pattern where a list of expressions are evaluated to be able to compare 277 * the message exchanges to reorder them. e.g. you may wish to sort by some 278 * headers 279 * 280 * @param expressions the expressions on which to compare messages in order 281 * @return the builder 282 */ 283 public ResequencerType resequencer(List<Expression> expressions) { 284 ResequencerType answer = new ResequencerType(expressions); 285 addOutput(answer); 286 return answer; 287 } 288 289 /** 290 * A builder for the <a 291 * href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a> 292 * pattern where a list of expressions are evaluated to be able to compare 293 * the message exchanges to reorder them. e.g. you may wish to sort by some 294 * headers 295 * 296 * @param expressions the expressions on which to compare messages in order 297 * @return the builder 298 */ 299 public ResequencerType resequencer(Expression... expressions) { 300 List<Expression> list = new ArrayList<Expression>(); 301 for (Expression expression : expressions) { 302 list.add(expression); 303 } 304 return resequencer(list); 305 } 306 307 /** 308 * A builder for the <a 309 * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a> 310 * pattern where a batch of messages are processed (up to a maximum amount 311 * or until some timeout is reached) and messages for the same correlation 312 * key are combined together using some kind of 313 * {@link AggregationStrategy ) (by default the latest message is used) to compress many message exchanges 314 * into a smaller number of exchanges. <p/> A good example of this is stock 315 * market data; you may be receiving 30,000 messages/second and you may want 316 * to throttle it right down so that multiple messages for the same stock 317 * are combined (or just the latest message is used and older prices are 318 * discarded). Another idea is to combine line item messages together into a 319 * single invoice message. 320 * 321 * @param correlationExpression the expression used to calculate the 322 * correlation key. For a JMS message this could be the 323 * expression <code>header("JMSDestination")</code> or 324 * <code>header("JMSCorrelationID")</code> 325 */ 326 public AggregatorType aggregator(Expression correlationExpression) { 327 AggregatorType answer = new AggregatorType(correlationExpression); 328 addOutput(answer); 329 return answer; 330 } 331 332 /** 333 * A builder for the <a 334 * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a> 335 * pattern where a batch of messages are processed (up to a maximum amount 336 * or until some timeout is reached) and messages for the same correlation 337 * key are combined together using some kind of 338 * {@link AggregationStrategy ) (by default the latest message is used) to compress many message exchanges 339 * into a smaller number of exchanges. <p/> A good example of this is stock 340 * market data; you may be receiving 30,000 messages/second and you may want 341 * to throttle it right down so that multiple messages for the same stock 342 * are combined (or just the latest message is used and older prices are 343 * discarded). Another idea is to combine line item messages together into a 344 * single invoice message. 345 * 346 * @param correlationExpression the expression used to calculate the 347 * correlation key. For a JMS message this could be the 348 * expression <code>header("JMSDestination")</code> or 349 * <code>header("JMSCorrelationID")</code> 350 */ 351 public AggregatorType aggregator(Expression correlationExpression, AggregationStrategy aggregationStrategy) { 352 AggregatorType answer = new AggregatorType(correlationExpression, aggregationStrategy); 353 addOutput(answer); 354 return answer; 355 } 356 357 /** 358 * A builder for the <a 359 * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern 360 * where an expression is used to calculate the time which the message will 361 * be dispatched on 362 * 363 * @param processAtExpression an expression to calculate the time at which 364 * the messages should be processed 365 * @return the builder 366 */ 367 public DelayerType delayer(Expression<Exchange> processAtExpression) { 368 return delayer(processAtExpression, 0L); 369 } 370 371 /** 372 * A builder for the <a 373 * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern 374 * where an expression is used to calculate the time which the message will 375 * be dispatched on 376 * 377 * @param processAtExpression an expression to calculate the time at which 378 * the messages should be processed 379 * @param delay the delay in milliseconds which is added to the 380 * processAtExpression to determine the time the message 381 * should be processed 382 * @return the builder 383 */ 384 public DelayerType delayer(Expression<Exchange> processAtExpression, long delay) { 385 DelayerType answer = new DelayerType(processAtExpression, delay); 386 addOutput(answer); 387 return answer; 388 } 389 390 /** 391 * A builder for the <a 392 * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern 393 * where a fixed amount of milliseconds are used to delay processing of a 394 * message exchange 395 * 396 * @param delay the default delay in milliseconds 397 * @return the builder 398 */ 399 public DelayerType delayer(long delay) { 400 return delayer(null, delay); 401 } 402 403 /** 404 * A builder for the <a 405 * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern 406 * where an expression is used to calculate the time which the message will 407 * be dispatched on 408 * 409 * @return the builder 410 */ 411 public ThrottlerType throttler(long maximumRequestCount) { 412 ThrottlerType answer = new ThrottlerType(maximumRequestCount); 413 addOutput(answer); 414 return answer; 415 } 416 417 public ProcessorType interceptor(String ref) { 418 getInterceptors().add(new InterceptorRef(ref)); 419 return this; 420 } 421 422 public InterceptType intercept() { 423 InterceptType answer = new InterceptType(); 424 addOutput(answer); 425 return answer; 426 } 427 428 public ProcessorType proceed() { 429 addOutput(new ProceedType()); 430 return this; 431 } 432 433 public ExceptionType exception(Class exceptionType) { 434 ExceptionType answer = new ExceptionType(exceptionType); 435 addOutput(answer); 436 return answer; 437 } 438 439 /** 440 * Apply an interceptor route if the predicate is true 441 */ 442 public OtherwiseType intercept(Predicate predicate) { 443 InterceptType answer = new InterceptType(); 444 addOutput(answer); 445 return answer.when(predicate); 446 } 447 448 public ProcessorType interceptors(String... refs) { 449 for (String ref : refs) { 450 interceptor(ref); 451 } 452 return this; 453 } 454 455 public FilterType filter(ExpressionType expression) { 456 FilterType filter = new FilterType(); 457 filter.setExpression(expression); 458 addOutput(filter); 459 return filter; 460 } 461 462 public FilterType filter(String language, String expression) { 463 return filter(new LanguageExpression(language, expression)); 464 } 465 466 /** 467 * Trace logs the exchange before it goes to the next processing step using 468 * the {@link #DEFAULT_TRACE_CATEGORY} logging category. 469 * 470 * @return 471 */ 472 public ProcessorType trace() { 473 return trace(DEFAULT_TRACE_CATEGORY); 474 } 475 476 /** 477 * Trace logs the exchange before it goes to the next processing step using 478 * the specified logging category. 479 * 480 * @param category the logging category trace messages will sent to. 481 * @return 482 */ 483 public ProcessorType trace(String category) { 484 final Log log = LogFactory.getLog(category); 485 return intercept(new DelegateProcessor() { 486 @Override 487 public void process(Exchange exchange) throws Exception { 488 log.trace(exchange); 489 processNext(exchange); 490 } 491 }); 492 } 493 494 public PolicyRef policies() { 495 PolicyRef answer = new PolicyRef(); 496 addOutput(answer); 497 return answer; 498 } 499 500 public PolicyRef policy(Policy policy) { 501 PolicyRef answer = new PolicyRef(policy); 502 addOutput(answer); 503 return answer; 504 } 505 506 public ProcessorType intercept(DelegateProcessor interceptor) { 507 getInterceptors().add(new InterceptorRef(interceptor)); 508 lastInterceptor = interceptor; 509 return this; 510 } 511 512 /** 513 * Installs the given error handler builder 514 * 515 * @param errorHandlerBuilder the error handler to be used by default for 516 * all child routes 517 * @return the current builder with the error handler configured 518 */ 519 public ProcessorType errorHandler(ErrorHandlerBuilder errorHandlerBuilder) { 520 setErrorHandlerBuilder(errorHandlerBuilder); 521 return this; 522 } 523 524 /** 525 * Configures whether or not the error handler is inherited by every 526 * processing node (or just the top most one) 527 * 528 * @param condition the falg as to whether error handlers should be 529 * inherited or not 530 * @return the current builder 531 */ 532 public ProcessorType inheritErrorHandler(boolean condition) { 533 setInheritErrorHandlerFlag(condition); 534 return this; 535 } 536 537 // Transformers 538 // ------------------------------------------------------------------------- 539 540 /** 541 * Adds the custom processor to this destination which could be a final 542 * destination, or could be a transformation in a pipeline 543 */ 544 public ProcessorType process(Processor processor) { 545 ProcessorRef answer = new ProcessorRef(processor); 546 addOutput(answer); 547 return this; 548 } 549 550 /** 551 * Adds a bean which is invoked which could be a final destination, or could 552 * be a transformation in a pipeline 553 */ 554 public ProcessorType beanRef(String ref) { 555 BeanRef answer = new BeanRef(ref); 556 addOutput(answer); 557 return this; 558 } 559 560 /** 561 * Adds a bean and method which is invoked which could be a final 562 * destination, or could be a transformation in a pipeline 563 */ 564 public ProcessorType beanRef(String ref, String method) { 565 BeanRef answer = new BeanRef(ref, method); 566 addOutput(answer); 567 return this; 568 } 569 570 /** 571 * Adds a processor which sets the body on the IN message 572 */ 573 public ProcessorType setBody(Expression expression) { 574 return process(ProcessorBuilder.setBody(expression)); 575 } 576 577 /** 578 * Adds a processor which sets the body on the OUT message 579 */ 580 public ProcessorType setOutBody(Expression expression) { 581 return process(ProcessorBuilder.setOutBody(expression)); 582 } 583 584 /** 585 * Adds a processor which sets the header on the IN message 586 */ 587 public ProcessorType setHeader(String name, Expression expression) { 588 return process(ProcessorBuilder.setHeader(name, expression)); 589 } 590 591 /** 592 * Adds a processor which sets the header on the OUT message 593 */ 594 public ProcessorType setOutHeader(String name, Expression expression) { 595 return process(ProcessorBuilder.setOutHeader(name, expression)); 596 } 597 598 /** 599 * Adds a processor which sets the exchange property 600 */ 601 public ProcessorType setProperty(String name, Expression expression) { 602 return process(ProcessorBuilder.setProperty(name, expression)); 603 } 604 605 /** 606 * Converts the IN message body to the specified type 607 */ 608 public ProcessorType convertBodyTo(Class type) { 609 return process(ProcessorBuilder.setBody(Builder.body().convertTo(type))); 610 } 611 612 /** 613 * Converts the OUT message body to the specified type 614 */ 615 public ProcessorType convertOutBodyTo(Class type) { 616 return process(ProcessorBuilder.setOutBody(Builder.outBody().convertTo(type))); 617 } 618 619 // Properties 620 // ------------------------------------------------------------------------- 621 622 @XmlTransient 623 public ErrorHandlerBuilder getErrorHandlerBuilder() { 624 if (errorHandlerBuilder == null) { 625 errorHandlerBuilder = createErrorHandlerBuilder(); 626 } 627 return errorHandlerBuilder; 628 } 629 630 /** 631 * Sets the error handler to use with processors created by this builder 632 */ 633 public void setErrorHandlerBuilder(ErrorHandlerBuilder errorHandlerBuilder) { 634 this.errorHandlerBuilder = errorHandlerBuilder; 635 } 636 637 @XmlTransient 638 public boolean isInheritErrorHandler() { 639 return ObjectConverter.toBoolean(getInheritErrorHandlerFlag()); 640 } 641 642 @XmlAttribute(name = "inheritErrorHandler", required = false) 643 public Boolean getInheritErrorHandlerFlag() { 644 return inheritErrorHandlerFlag; 645 } 646 647 public void setInheritErrorHandlerFlag(Boolean inheritErrorHandlerFlag) { 648 this.inheritErrorHandlerFlag = inheritErrorHandlerFlag; 649 } 650 651 // Implementation methods 652 // ------------------------------------------------------------------------- 653 654 /** 655 * Creates the processor and wraps it in any necessary interceptors and 656 * error handlers 657 */ 658 protected Processor makeProcessor(RouteContext routeContext) throws Exception { 659 Processor processor = createProcessor(routeContext); 660 return wrapProcessor(routeContext, processor); 661 } 662 663 /** 664 * A strategy method which allows derived classes to wrap the child 665 * processor in some kind of interceptor 666 * 667 * @param routeContext 668 * @param target the processor which can be wrapped 669 * @return the original processor or a new wrapped interceptor 670 */ 671 protected Processor wrapProcessorInInterceptors(RouteContext routeContext, Processor target) throws Exception { 672 // The target is required. 673 if (target == null) { 674 throw new RuntimeCamelException("target provided."); 675 } 676 677 // Interceptors are optional 678 DelegateProcessor first = null; 679 DelegateProcessor last = null; 680 List<InterceptorType> interceptors = new ArrayList<InterceptorType>(routeContext.getRoute() 681 .getInterceptors()); 682 List<InterceptorType> list = getInterceptors(); 683 for (InterceptorType interceptorType : list) { 684 if (!interceptors.contains(interceptorType)) { 685 interceptors.add(interceptorType); 686 } 687 } 688 for (InterceptorType interceptorRef : interceptors) { 689 DelegateProcessor p = interceptorRef.createInterceptor(routeContext); 690 if (first == null) { 691 first = p; 692 } 693 if (last != null) { 694 last.setProcessor(p); 695 } 696 last = p; 697 } 698 699 if (last != null) { 700 last.setProcessor(target); 701 } 702 return first == null ? target : first; 703 } 704 705 /** 706 * A strategy method to allow newly created processors to be wrapped in an 707 * error handler. 708 */ 709 protected Processor wrapInErrorHandler(Processor processor) throws Exception { 710 return getErrorHandlerBuilder().createErrorHandler(processor); 711 } 712 713 protected ErrorHandlerBuilder createErrorHandlerBuilder() { 714 if (isInheritErrorHandler()) { 715 return new DeadLetterChannelBuilder(); 716 } else { 717 return new NoErrorHandlerBuilder(); 718 } 719 } 720 721 protected void configureChild(ProcessorType output) { 722 } 723 724 protected void addOutput(ProcessorType processorType) { 725 configureChild(processorType); 726 getOutputs().add(processorType); 727 } 728 729 /** 730 * Creates a new instance of some kind of composite processor which defaults 731 * to using a {@link Pipeline} but derived classes could change the 732 * behaviour 733 */ 734 protected Processor createCompositeProcessor(List<Processor> list) { 735 // return new MulticastProcessor(list); 736 return new Pipeline(list); 737 } 738 739 protected Processor createOutputsProcessor(RouteContext routeContext, Collection<ProcessorType> outputs) 740 throws Exception { 741 List<Processor> list = new ArrayList<Processor>(); 742 for (ProcessorType output : outputs) { 743 Processor processor = output.createProcessor(routeContext); 744 list.add(processor); 745 } 746 Processor processor = null; 747 if (!list.isEmpty()) { 748 if (list.size() == 1) { 749 processor = list.get(0); 750 } else { 751 processor = createCompositeProcessor(list); 752 } 753 } 754 return processor; 755 } 756 }