Coverage Report - org.apache.camel.model.ProcessorType
 
Classes in this File Line Coverage Branch Coverage Complexity
ProcessorType
68% 
83% 
0
 
 1  
 /**
 2  
  * Licensed to the Apache Software Foundation (ASF) under one or more
 3  
  * contributor license agreements.  See the NOTICE file distributed with
 4  
  * this work for additional information regarding copyright ownership.
 5  
  * The ASF licenses this file to You under the Apache License, Version 2.0
 6  
  * (the "License"); you may not use this file except in compliance with
 7  
  * the License.  You may obtain a copy of the License at
 8  
  *
 9  
  *      http://www.apache.org/licenses/LICENSE-2.0
 10  
  *
 11  
  * Unless required by applicable law or agreed to in writing, software
 12  
  * distributed under the License is distributed on an "AS IS" BASIS,
 13  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 14  
  * See the License for the specific language governing permissions and
 15  
  * limitations under the License.
 16  
  */
 17  
 package org.apache.camel.model;
 18  
 
 19  
 import org.apache.camel.Endpoint;
 20  
 import org.apache.camel.Exchange;
 21  
 import org.apache.camel.Expression;
 22  
 import org.apache.camel.Predicate;
 23  
 import org.apache.camel.Processor;
 24  
 import org.apache.camel.Route;
 25  
 import org.apache.camel.RuntimeCamelException;
 26  
 import org.apache.camel.builder.Builder;
 27  
 import org.apache.camel.builder.DeadLetterChannelBuilder;
 28  
 import org.apache.camel.builder.ErrorHandlerBuilder;
 29  
 import org.apache.camel.builder.NoErrorHandlerBuilder;
 30  
 import org.apache.camel.builder.ProcessorBuilder;
 31  
 import org.apache.camel.converter.ObjectConverter;
 32  
 import org.apache.camel.impl.RouteContext;
 33  
 import org.apache.camel.model.language.ExpressionType;
 34  
 import org.apache.camel.model.language.LanguageExpression;
 35  
 import org.apache.camel.processor.DelegateProcessor;
 36  
 import org.apache.camel.processor.MulticastProcessor;
 37  
 import org.apache.camel.processor.Pipeline;
 38  
 import org.apache.camel.processor.RecipientList;
 39  
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 40  
 import org.apache.camel.processor.idempotent.IdempotentConsumer;
 41  
 import org.apache.camel.processor.idempotent.MessageIdRepository;
 42  
 import org.apache.camel.spi.Policy;
 43  
 import org.apache.commons.logging.Log;
 44  
 import org.apache.commons.logging.LogFactory;
 45  
 
 46  
 import javax.xml.bind.annotation.XmlAttribute;
 47  
 import javax.xml.bind.annotation.XmlTransient;
 48  
 import java.util.ArrayList;
 49  
 import java.util.Collection;
 50  
 import java.util.Collections;
 51  
 import java.util.List;
 52  
 
 53  
 /**
 54  
  * @version $Revision: 1.1 $
 55  
  */
 56  1047
 public abstract class ProcessorType {
 57  
     public static final String DEFAULT_TRACE_CATEGORY = "org.apache.camel.TRACE";
 58  
     private ErrorHandlerBuilder errorHandlerBuilder;
 59  1047
     private Boolean inheritErrorHandlerFlag = Boolean.TRUE; // TODO not sure how
 60  
     private DelegateProcessor lastInterceptor;
 61  
     // else to use an
 62  
                                                             // optional
 63  
                                                             // attribute in
 64  
                                                             // JAXB2
 65  
 
 66  
     public abstract List<ProcessorType> getOutputs();
 67  
 
 68  
     public abstract List<InterceptorType> getInterceptors();
 69  
 
 70  
     public Processor createProcessor(RouteContext routeContext) throws Exception {
 71  0
         throw new UnsupportedOperationException("Not implemented yet for class: " + getClass().getName());
 72  
     }
 73  
 
 74  
     public Processor createOutputsProcessor(RouteContext routeContext) throws Exception {
 75  201
         Collection<ProcessorType> outputs = getOutputs();
 76  201
         return createOutputsProcessor(routeContext, outputs);
 77  
     }
 78  
 
 79  
     public void addRoutes(RouteContext routeContext, Collection<Route> routes) throws Exception {
 80  297
         Processor processor = makeProcessor(routeContext);
 81  294
         routeContext.addEventDrivenProcessor(processor);
 82  294
     }
 83  
 
 84  
     /**
 85  
      * Wraps the child processor in whatever necessary interceptors and error
 86  
      * handlers
 87  
      */
 88  
     public Processor wrapProcessor(RouteContext routeContext, Processor processor) throws Exception {
 89  294
         processor = wrapProcessorInInterceptors(routeContext, processor);
 90  294
         return wrapInErrorHandler(processor);
 91  
     }
 92  
 
 93  
     // Fluent API
 94  
     // -------------------------------------------------------------------------
 95  
 
 96  
     /**
 97  
      * Sends the exchange to the given endpoint URI
 98  
      */
 99  
     public ProcessorType to(String uri) {
 100  291
         addOutput(new ToType(uri));
 101  291
         return this;
 102  
     }
 103  
 
 104  
     /**
 105  
      * Sends the exchange to the given endpoint
 106  
      */
 107  
     public ProcessorType to(Endpoint endpoint) {
 108  0
         addOutput(new ToType(endpoint));
 109  0
         return this;
 110  
     }
 111  
 
 112  
     /**
 113  
      * Sends the exchange to a list of endpoints using the
 114  
      * {@link MulticastProcessor} pattern
 115  
      */
 116  
     public ProcessorType to(String... uris) {
 117  36
         for (String uri : uris) {
 118  27
             addOutput(new ToType(uri));
 119  
         }
 120  9
         return this;
 121  
     }
 122  
 
 123  
     /**
 124  
      * Sends the exchange to a list of endpoints using the
 125  
      * {@link MulticastProcessor} pattern
 126  
      */
 127  
     public ProcessorType to(Endpoint... endpoints) {
 128  0
         for (Endpoint endpoint : endpoints) {
 129  0
             addOutput(new ToType(endpoint));
 130  
         }
 131  0
         return this;
 132  
     }
 133  
 
 134  
     /**
 135  
      * Sends the exchange to a list of endpoint using the
 136  
      * {@link MulticastProcessor} pattern
 137  
      */
 138  
     public ProcessorType to(Collection<Endpoint> endpoints) {
 139  0
         for (Endpoint endpoint : endpoints) {
 140  0
             addOutput(new ToType(endpoint));
 141  0
         }
 142  0
         return this;
 143  
     }
 144  
 
 145  
     /**
 146  
      * Multicasts messages to all its child outputs; so that each processor and
 147  
      * destination gets a copy of the original message to avoid the processors
 148  
      * interfering with each other.
 149  
      */
 150  
     public MulticastType multicast() {
 151  3
         MulticastType answer = new MulticastType();
 152  3
         addOutput(answer);
 153  3
         return answer;
 154  
     }
 155  
 
 156  
     /**
 157  
      * Creates a {@link Pipeline} of the list of endpoints so that the message
 158  
      * will get processed by each endpoint in turn and for request/response the
 159  
      * output of one endpoint will be the input of the next endpoint
 160  
      */
 161  
     public ProcessorType pipeline(String... uris) {
 162  
         // TODO pipeline v mulicast
 163  3
         return to(uris);
 164  
     }
 165  
 
 166  
     /**
 167  
      * Creates a {@link Pipeline} of the list of endpoints so that the message
 168  
      * will get processed by each endpoint in turn and for request/response the
 169  
      * output of one endpoint will be the input of the next endpoint
 170  
      */
 171  
     public ProcessorType pipeline(Endpoint... endpoints) {
 172  
         // TODO pipeline v mulicast
 173  0
         return to(endpoints);
 174  
     }
 175  
 
 176  
     /**
 177  
      * Creates a {@link Pipeline} of the list of endpoints so that the message
 178  
      * will get processed by each endpoint in turn and for request/response the
 179  
      * output of one endpoint will be the input of the next endpoint
 180  
      */
 181  
     public ProcessorType pipeline(Collection<Endpoint> endpoints) {
 182  
         // TODO pipeline v mulicast
 183  0
         return to(endpoints);
 184  
     }
 185  
 
 186  
     /**
 187  
      * Creates an {@link IdempotentConsumer} to avoid duplicate messages
 188  
      */
 189  
     public IdempotentConsumerType idempotentConsumer(Expression messageIdExpression,
 190  
                                                      MessageIdRepository messageIdRepository) {
 191  6
         IdempotentConsumerType answer = new IdempotentConsumerType(messageIdExpression, messageIdRepository);
 192  6
         addOutput(answer);
 193  6
         return answer;
 194  
     }
 195  
 
 196  
     /**
 197  
      * Creates a predicate which is applied and only if it is true then the
 198  
      * exchange is forwarded to the destination
 199  
      * 
 200  
      * @return the builder for a predicate
 201  
      */
 202  
     public FilterType filter(Predicate predicate) {
 203  45
         FilterType filter = new FilterType(predicate);
 204  45
         addOutput(filter);
 205  45
         return filter;
 206  
     }
 207  
 
 208  
     /**
 209  
      * Creates a choice of one or more predicates with an otherwise clause
 210  
      * 
 211  
      * @return the builder for a choice expression
 212  
      */
 213  
     public ChoiceType choice() {
 214  33
         ChoiceType answer = new ChoiceType();
 215  33
         addOutput(answer);
 216  33
         return answer;
 217  
     }
 218  
 
 219  
     /**
 220  
      * Creates a try/catch block
 221  
      * 
 222  
      * @return the builder for a tryBlock expression
 223  
      */
 224  
     public TryType tryBlock() {
 225  6
         TryType answer = new TryType();
 226  6
         addOutput(answer);
 227  6
         return answer;
 228  
     }
 229  
 
 230  
     /**
 231  
      * Creates a dynamic <a
 232  
      * href="http://activemq.apache.org/camel/recipient-list.html">Recipient
 233  
      * List</a> pattern.
 234  
      * 
 235  
      * @param receipients is the builder of the expression used in the
 236  
      *                {@link RecipientList} to decide the destinations
 237  
      */
 238  
     public ProcessorType recipientList(Expression receipients) {
 239  6
         RecipientListType answer = new RecipientListType(receipients);
 240  6
         addOutput(answer);
 241  6
         return this;
 242  
     }
 243  
 
 244  
     /**
 245  
      * A builder for the <a
 246  
      * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
 247  
      * pattern where an expression is evaluated to iterate through each of the
 248  
      * parts of a message and then each part is then send to some endpoint.
 249  
      * 
 250  
      * @param receipients the expression on which to split
 251  
      * @return the builder
 252  
      */
 253  
     public SplitterType splitter(Expression receipients) {
 254  6
         SplitterType answer = new SplitterType(receipients);
 255  6
         addOutput(answer);
 256  6
         return answer;
 257  
     }
 258  
 
 259  
     /**
 260  
      * A builder for the <a
 261  
      * href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a>
 262  
      * pattern where an expression is evaluated to be able to compare the
 263  
      * message exchanges to reorder them. e.g. you may wish to sort by some
 264  
      * header
 265  
      * 
 266  
      * @param expression the expression on which to compare messages in order
 267  
      * @return the builder
 268  
      */
 269  
     public ResequencerType resequencer(Expression<Exchange> expression) {
 270  3
         return resequencer(Collections.<Expression> singletonList(expression));
 271  
     }
 272  
 
 273  
     /**
 274  
      * A builder for the <a
 275  
      * href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a>
 276  
      * pattern where a list of expressions are evaluated to be able to compare
 277  
      * the message exchanges to reorder them. e.g. you may wish to sort by some
 278  
      * headers
 279  
      * 
 280  
      * @param expressions the expressions on which to compare messages in order
 281  
      * @return the builder
 282  
      */
 283  
     public ResequencerType resequencer(List<Expression> expressions) {
 284  3
         ResequencerType answer = new ResequencerType(expressions);
 285  3
         addOutput(answer);
 286  3
         return answer;
 287  
     }
 288  
 
 289  
     /**
 290  
      * A builder for the <a
 291  
      * href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a>
 292  
      * pattern where a list of expressions are evaluated to be able to compare
 293  
      * the message exchanges to reorder them. e.g. you may wish to sort by some
 294  
      * headers
 295  
      * 
 296  
      * @param expressions the expressions on which to compare messages in order
 297  
      * @return the builder
 298  
      */
 299  
     public ResequencerType resequencer(Expression... expressions) {
 300  0
         List<Expression> list = new ArrayList<Expression>();
 301  0
         for (Expression expression : expressions) {
 302  0
             list.add(expression);
 303  
         }
 304  0
         return resequencer(list);
 305  
     }
 306  
 
 307  
     /**
 308  
      * A builder for the <a
 309  
      * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a>
 310  
      * pattern where a batch of messages are processed (up to a maximum amount
 311  
      * or until some timeout is reached) and messages for the same correlation
 312  
      * key are combined together using some kind of
 313  
      * {@link AggregationStrategy ) (by default the latest message is used) to compress many message exchanges
 314  
      * into a smaller number of exchanges. <p/> A good example of this is stock
 315  
      * market data; you may be receiving 30,000 messages/second and you may want
 316  
      * to throttle it right down so that multiple messages for the same stock
 317  
      * are combined (or just the latest message is used and older prices are
 318  
      * discarded). Another idea is to combine line item messages together into a
 319  
      * single invoice message.
 320  
      * 
 321  
      * @param correlationExpression the expression used to calculate the
 322  
      *                correlation key. For a JMS message this could be the
 323  
      *                expression <code>header("JMSDestination")</code> or
 324  
      *                <code>header("JMSCorrelationID")</code>
 325  
      */
 326  
     public AggregatorType aggregator(Expression correlationExpression) {
 327  3
         AggregatorType answer = new AggregatorType(correlationExpression);
 328  3
         addOutput(answer);
 329  3
         return answer;
 330  
     }
 331  
 
 332  
     /**
 333  
      * A builder for the <a
 334  
      * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a>
 335  
      * pattern where a batch of messages are processed (up to a maximum amount
 336  
      * or until some timeout is reached) and messages for the same correlation
 337  
      * key are combined together using some kind of
 338  
      * {@link AggregationStrategy ) (by default the latest message is used) to compress many message exchanges
 339  
      * into a smaller number of exchanges. <p/> A good example of this is stock
 340  
      * market data; you may be receiving 30,000 messages/second and you may want
 341  
      * to throttle it right down so that multiple messages for the same stock
 342  
      * are combined (or just the latest message is used and older prices are
 343  
      * discarded). Another idea is to combine line item messages together into a
 344  
      * single invoice message.
 345  
      * 
 346  
      * @param correlationExpression the expression used to calculate the
 347  
      *                correlation key. For a JMS message this could be the
 348  
      *                expression <code>header("JMSDestination")</code> or
 349  
      *                <code>header("JMSCorrelationID")</code>
 350  
      */
 351  
     public AggregatorType aggregator(Expression correlationExpression, AggregationStrategy aggregationStrategy) {
 352  0
         AggregatorType answer = new AggregatorType(correlationExpression, aggregationStrategy);
 353  0
         addOutput(answer);
 354  0
         return answer;
 355  
     }
 356  
 
 357  
     /**
 358  
      * A builder for the <a
 359  
      * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern
 360  
      * where an expression is used to calculate the time which the message will
 361  
      * be dispatched on
 362  
      * 
 363  
      * @param processAtExpression an expression to calculate the time at which
 364  
      *                the messages should be processed
 365  
      * @return the builder
 366  
      */
 367  
     public DelayerType delayer(Expression<Exchange> processAtExpression) {
 368  0
         return delayer(processAtExpression, 0L);
 369  
     }
 370  
 
 371  
     /**
 372  
      * A builder for the <a
 373  
      * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern
 374  
      * where an expression is used to calculate the time which the message will
 375  
      * be dispatched on
 376  
      * 
 377  
      * @param processAtExpression an expression to calculate the time at which
 378  
      *                the messages should be processed
 379  
      * @param delay the delay in milliseconds which is added to the
 380  
      *                processAtExpression to determine the time the message
 381  
      *                should be processed
 382  
      * @return the builder
 383  
      */
 384  
     public DelayerType delayer(Expression<Exchange> processAtExpression, long delay) {
 385  3
         DelayerType answer = new DelayerType(processAtExpression, delay);
 386  3
         addOutput(answer);
 387  3
         return answer;
 388  
     }
 389  
 
 390  
     /**
 391  
      * A builder for the <a
 392  
      * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern
 393  
      * where a fixed amount of milliseconds are used to delay processing of a
 394  
      * message exchange
 395  
      * 
 396  
      * @param delay the default delay in milliseconds
 397  
      * @return the builder
 398  
      */
 399  
     public DelayerType delayer(long delay) {
 400  0
         return delayer(null, delay);
 401  
     }
 402  
 
 403  
     /**
 404  
      * A builder for the <a
 405  
      * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern
 406  
      * where an expression is used to calculate the time which the message will
 407  
      * be dispatched on
 408  
      * 
 409  
      * @return the builder
 410  
      */
 411  
     public ThrottlerType throttler(long maximumRequestCount) {
 412  3
         ThrottlerType answer = new ThrottlerType(maximumRequestCount);
 413  3
         addOutput(answer);
 414  3
         return answer;
 415  
     }
 416  
 
 417  
     public ProcessorType interceptor(String ref) {
 418  6
         getInterceptors().add(new InterceptorRef(ref));
 419  6
         return this;
 420  
     }
 421  
 
 422  
     public InterceptType intercept() {
 423  0
         InterceptType answer = new InterceptType();
 424  0
         addOutput(answer);
 425  0
         return answer;
 426  
     }
 427  
 
 428  
     public ProcessorType proceed() {
 429  24
         addOutput(new ProceedType());
 430  24
         return this;
 431  
     }
 432  
 
 433  
     public ExceptionType exception(Class exceptionType) {
 434  0
         ExceptionType answer = new ExceptionType(exceptionType);
 435  0
         addOutput(answer);
 436  0
         return answer;
 437  
     }
 438  
 
 439  
     /**
 440  
      * Apply an interceptor route if the predicate is true
 441  
      */
 442  
     public OtherwiseType intercept(Predicate predicate) {
 443  0
         InterceptType answer = new InterceptType();
 444  0
         addOutput(answer);
 445  0
         return answer.when(predicate);
 446  
     }
 447  
 
 448  
     public ProcessorType interceptors(String... refs) {
 449  9
         for (String ref : refs) {
 450  6
             interceptor(ref);
 451  
         }
 452  3
         return this;
 453  
     }
 454  
 
 455  
     public FilterType filter(ExpressionType expression) {
 456  6
         FilterType filter = new FilterType();
 457  6
         filter.setExpression(expression);
 458  6
         addOutput(filter);
 459  6
         return filter;
 460  
     }
 461  
 
 462  
     public FilterType filter(String language, String expression) {
 463  3
         return filter(new LanguageExpression(language, expression));
 464  
     }
 465  
 
 466  
     /**
 467  
      * Trace logs the exchange before it goes to the next processing step using
 468  
      * the {@link #DEFAULT_TRACE_CATEGORY} logging category.
 469  
      * 
 470  
      * @return
 471  
      */
 472  
     public ProcessorType trace() {
 473  0
         return trace(DEFAULT_TRACE_CATEGORY);
 474  
     }
 475  
 
 476  
     /**
 477  
      * Trace logs the exchange before it goes to the next processing step using
 478  
      * the specified logging category.
 479  
      * 
 480  
      * @param category the logging category trace messages will sent to.
 481  
      * @return
 482  
      */
 483  
     public ProcessorType trace(String category) {
 484  0
         final Log log = LogFactory.getLog(category);
 485  0
         return intercept(new DelegateProcessor() {
 486  
             @Override
 487  0
             public void process(Exchange exchange) throws Exception {
 488  0
                 log.trace(exchange);
 489  0
                 processNext(exchange);
 490  0
             }
 491  
         });
 492  
     }
 493  
 
 494  
     public PolicyRef policies() {
 495  0
         PolicyRef answer = new PolicyRef();
 496  0
         addOutput(answer);
 497  0
         return answer;
 498  
     }
 499  
 
 500  
     public PolicyRef policy(Policy policy) {
 501  0
         PolicyRef answer = new PolicyRef(policy);
 502  0
         addOutput(answer);
 503  0
         return answer;
 504  
     }
 505  
 
 506  
     public ProcessorType intercept(DelegateProcessor interceptor) {
 507  36
         getInterceptors().add(new InterceptorRef(interceptor));
 508  36
         lastInterceptor = interceptor;
 509  36
         return this;
 510  
     }
 511  
 
 512  
     /**
 513  
      * Installs the given error handler builder
 514  
      * 
 515  
      * @param errorHandlerBuilder the error handler to be used by default for
 516  
      *                all child routes
 517  
      * @return the current builder with the error handler configured
 518  
      */
 519  
     public ProcessorType errorHandler(ErrorHandlerBuilder errorHandlerBuilder) {
 520  9
         setErrorHandlerBuilder(errorHandlerBuilder);
 521  9
         return this;
 522  
     }
 523  
 
 524  
     /**
 525  
      * Configures whether or not the error handler is inherited by every
 526  
      * processing node (or just the top most one)
 527  
      * 
 528  
      * @param condition the falg as to whether error handlers should be
 529  
      *                inherited or not
 530  
      * @return the current builder
 531  
      */
 532  
     public ProcessorType inheritErrorHandler(boolean condition) {
 533  3
         setInheritErrorHandlerFlag(condition);
 534  3
         return this;
 535  
     }
 536  
 
 537  
     // Transformers
 538  
     // -------------------------------------------------------------------------
 539  
 
 540  
     /**
 541  
      * Adds the custom processor to this destination which could be a final
 542  
      * destination, or could be a transformation in a pipeline
 543  
      */
 544  
     public ProcessorType process(Processor processor) {
 545  75
         ProcessorRef answer = new ProcessorRef(processor);
 546  75
         addOutput(answer);
 547  75
         return this;
 548  
     }
 549  
 
 550  
     /**
 551  
      * Adds a bean which is invoked which could be a final destination, or could
 552  
      * be a transformation in a pipeline
 553  
      */
 554  
     public ProcessorType beanRef(String ref) {
 555  0
         BeanRef answer = new BeanRef(ref);
 556  0
         addOutput(answer);
 557  0
         return this;
 558  
     }
 559  
 
 560  
     /**
 561  
      * Adds a bean and method which is invoked which could be a final
 562  
      * destination, or could be a transformation in a pipeline
 563  
      */
 564  
     public ProcessorType beanRef(String ref, String method) {
 565  0
         BeanRef answer = new BeanRef(ref, method);
 566  0
         addOutput(answer);
 567  0
         return this;
 568  
     }
 569  
 
 570  
     /**
 571  
      * Adds a processor which sets the body on the IN message
 572  
      */
 573  
     public ProcessorType setBody(Expression expression) {
 574  3
         return process(ProcessorBuilder.setBody(expression));
 575  
     }
 576  
 
 577  
     /**
 578  
      * Adds a processor which sets the body on the OUT message
 579  
      */
 580  
     public ProcessorType setOutBody(Expression expression) {
 581  0
         return process(ProcessorBuilder.setOutBody(expression));
 582  
     }
 583  
 
 584  
     /**
 585  
      * Adds a processor which sets the header on the IN message
 586  
      */
 587  
     public ProcessorType setHeader(String name, Expression expression) {
 588  0
         return process(ProcessorBuilder.setHeader(name, expression));
 589  
     }
 590  
 
 591  
     /**
 592  
      * Adds a processor which sets the header on the OUT message
 593  
      */
 594  
     public ProcessorType setOutHeader(String name, Expression expression) {
 595  0
         return process(ProcessorBuilder.setOutHeader(name, expression));
 596  
     }
 597  
 
 598  
     /**
 599  
      * Adds a processor which sets the exchange property
 600  
      */
 601  
     public ProcessorType setProperty(String name, Expression expression) {
 602  0
         return process(ProcessorBuilder.setProperty(name, expression));
 603  
     }
 604  
 
 605  
     /**
 606  
      * Converts the IN message body to the specified type
 607  
      */
 608  
     public ProcessorType convertBodyTo(Class type) {
 609  0
         return process(ProcessorBuilder.setBody(Builder.body().convertTo(type)));
 610  
     }
 611  
 
 612  
     /**
 613  
      * Converts the OUT message body to the specified type
 614  
      */
 615  
     public ProcessorType convertOutBodyTo(Class type) {
 616  0
         return process(ProcessorBuilder.setOutBody(Builder.outBody().convertTo(type)));
 617  
     }
 618  
 
 619  
     // Properties
 620  
     // -------------------------------------------------------------------------
 621  
 
 622  
     @XmlTransient
 623  
     public ErrorHandlerBuilder getErrorHandlerBuilder() {
 624  759
         if (errorHandlerBuilder == null) {
 625  321
             errorHandlerBuilder = createErrorHandlerBuilder();
 626  
         }
 627  759
         return errorHandlerBuilder;
 628  
     }
 629  
 
 630  
     /**
 631  
      * Sets the error handler to use with processors created by this builder
 632  
      */
 633  
     public void setErrorHandlerBuilder(ErrorHandlerBuilder errorHandlerBuilder) {
 634  456
         this.errorHandlerBuilder = errorHandlerBuilder;
 635  456
     }
 636  
 
 637  
     @XmlTransient
 638  
     public boolean isInheritErrorHandler() {
 639  771
         return ObjectConverter.toBoolean(getInheritErrorHandlerFlag());
 640  
     }
 641  
 
 642  
     @XmlAttribute(name = "inheritErrorHandler", required = false)
 643  
     public Boolean getInheritErrorHandlerFlag() {
 644  789
         return inheritErrorHandlerFlag;
 645  
     }
 646  
 
 647  
     public void setInheritErrorHandlerFlag(Boolean inheritErrorHandlerFlag) {
 648  276
         this.inheritErrorHandlerFlag = inheritErrorHandlerFlag;
 649  276
     }
 650  
 
 651  
     // Implementation methods
 652  
     // -------------------------------------------------------------------------
 653  
 
 654  
     /**
 655  
      * Creates the processor and wraps it in any necessary interceptors and
 656  
      * error handlers
 657  
      */
 658  
     protected Processor makeProcessor(RouteContext routeContext) throws Exception {
 659  297
         Processor processor = createProcessor(routeContext);
 660  294
         return wrapProcessor(routeContext, processor);
 661  
     }
 662  
 
 663  
     /**
 664  
      * A strategy method which allows derived classes to wrap the child
 665  
      * processor in some kind of interceptor
 666  
      * 
 667  
      * @param routeContext
 668  
      * @param target the processor which can be wrapped
 669  
      * @return the original processor or a new wrapped interceptor
 670  
      */
 671  
     protected Processor wrapProcessorInInterceptors(RouteContext routeContext, Processor target) throws Exception {
 672  
         // The target is required.
 673  294
         if (target == null) {
 674  0
             throw new RuntimeCamelException("target provided.");
 675  
         }
 676  
 
 677  
         // Interceptors are optional
 678  294
         DelegateProcessor first = null;
 679  294
         DelegateProcessor last = null;
 680  294
         List<InterceptorType> interceptors = new ArrayList<InterceptorType>(routeContext.getRoute()
 681  
             .getInterceptors());
 682  294
         List<InterceptorType> list = getInterceptors();
 683  294
         for (InterceptorType interceptorType : list) {
 684  12
             if (!interceptors.contains(interceptorType)) {
 685  0
                 interceptors.add(interceptorType);
 686  
             }
 687  12
         }
 688  294
         for (InterceptorType interceptorRef : interceptors) {
 689  36
             DelegateProcessor p = interceptorRef.createInterceptor(routeContext);
 690  36
             if (first == null) {
 691  30
                 first = p;
 692  
             }
 693  36
             if (last != null) {
 694  6
                 last.setProcessor(p);
 695  
             }
 696  36
             last = p;
 697  36
         }
 698  
 
 699  294
         if (last != null) {
 700  30
             last.setProcessor(target);
 701  
         }
 702  294
         return first == null ? target : first;
 703  
     }
 704  
 
 705  
     /**
 706  
      * A strategy method to allow newly created processors to be wrapped in an
 707  
      * error handler.
 708  
      */
 709  
     protected Processor wrapInErrorHandler(Processor processor) throws Exception {
 710  294
         return getErrorHandlerBuilder().createErrorHandler(processor);
 711  
     }
 712  
 
 713  
     protected ErrorHandlerBuilder createErrorHandlerBuilder() {
 714  321
         if (isInheritErrorHandler()) {
 715  321
             return new DeadLetterChannelBuilder();
 716  
         } else {
 717  0
             return new NoErrorHandlerBuilder();
 718  
         }
 719  
     }
 720  
 
 721  
     protected void configureChild(ProcessorType output) {
 722  78
     }
 723  
 
 724  
     protected void addOutput(ProcessorType processorType) {
 725  528
         configureChild(processorType);
 726  528
         getOutputs().add(processorType);
 727  528
     }
 728  
 
 729  
     /**
 730  
      * Creates a new instance of some kind of composite processor which defaults
 731  
      * to using a {@link Pipeline} but derived classes could change the
 732  
      * behaviour
 733  
      */
 734  
     protected Processor createCompositeProcessor(List<Processor> list) {
 735  
         // return new MulticastProcessor(list);
 736  12
         return new Pipeline(list);
 737  
     }
 738  
 
 739  
     protected Processor createOutputsProcessor(RouteContext routeContext, Collection<ProcessorType> outputs)
 740  
         throws Exception {
 741  207
         List<Processor> list = new ArrayList<Processor>();
 742  207
         for (ProcessorType output : outputs) {
 743  225
             Processor processor = output.createProcessor(routeContext);
 744  225
             list.add(processor);
 745  225
         }
 746  207
         Processor processor = null;
 747  207
         if (!list.isEmpty()) {
 748  207
             if (list.size() == 1) {
 749  192
                 processor = list.get(0);
 750  192
             } else {
 751  15
                 processor = createCompositeProcessor(list);
 752  
             }
 753  
         }
 754  207
         return processor;
 755  
     }
 756  
 }