Classes in this File | Line Coverage | Branch Coverage | Complexity | ||||||||
ProcessorType |
|
| 0.0;0 |
1 | /** |
|
2 | * Licensed to the Apache Software Foundation (ASF) under one or more |
|
3 | * contributor license agreements. See the NOTICE file distributed with |
|
4 | * this work for additional information regarding copyright ownership. |
|
5 | * The ASF licenses this file to You under the Apache License, Version 2.0 |
|
6 | * (the "License"); you may not use this file except in compliance with |
|
7 | * the License. You may obtain a copy of the License at |
|
8 | * |
|
9 | * http://www.apache.org/licenses/LICENSE-2.0 |
|
10 | * |
|
11 | * Unless required by applicable law or agreed to in writing, software |
|
12 | * distributed under the License is distributed on an "AS IS" BASIS, |
|
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
14 | * See the License for the specific language governing permissions and |
|
15 | * limitations under the License. |
|
16 | */ |
|
17 | package org.apache.camel.model; |
|
18 | ||
19 | import org.apache.camel.Endpoint; |
|
20 | import org.apache.camel.Exchange; |
|
21 | import org.apache.camel.Expression; |
|
22 | import org.apache.camel.Predicate; |
|
23 | import org.apache.camel.Processor; |
|
24 | import org.apache.camel.Route; |
|
25 | import org.apache.camel.RuntimeCamelException; |
|
26 | import org.apache.camel.builder.Builder; |
|
27 | import org.apache.camel.builder.DeadLetterChannelBuilder; |
|
28 | import org.apache.camel.builder.ErrorHandlerBuilder; |
|
29 | import org.apache.camel.builder.NoErrorHandlerBuilder; |
|
30 | import org.apache.camel.builder.ProcessorBuilder; |
|
31 | import org.apache.camel.converter.ObjectConverter; |
|
32 | import org.apache.camel.impl.RouteContext; |
|
33 | import org.apache.camel.model.language.ExpressionType; |
|
34 | import org.apache.camel.model.language.LanguageExpression; |
|
35 | import org.apache.camel.processor.DelegateProcessor; |
|
36 | import org.apache.camel.processor.MulticastProcessor; |
|
37 | import org.apache.camel.processor.Pipeline; |
|
38 | import org.apache.camel.processor.RecipientList; |
|
39 | import org.apache.camel.processor.aggregate.AggregationStrategy; |
|
40 | import org.apache.camel.processor.idempotent.IdempotentConsumer; |
|
41 | import org.apache.camel.processor.idempotent.MessageIdRepository; |
|
42 | import org.apache.camel.spi.Policy; |
|
43 | import org.apache.commons.logging.Log; |
|
44 | import org.apache.commons.logging.LogFactory; |
|
45 | ||
46 | import javax.xml.bind.annotation.XmlAttribute; |
|
47 | import javax.xml.bind.annotation.XmlTransient; |
|
48 | import java.util.ArrayList; |
|
49 | import java.util.Collection; |
|
50 | import java.util.Collections; |
|
51 | import java.util.List; |
|
52 | ||
53 | /** |
|
54 | * @version $Revision: 1.1 $ |
|
55 | */ |
|
56 | 1047 | public abstract class ProcessorType { |
57 | public static final String DEFAULT_TRACE_CATEGORY = "org.apache.camel.TRACE"; |
|
58 | private ErrorHandlerBuilder errorHandlerBuilder; |
|
59 | 1047 | private Boolean inheritErrorHandlerFlag = Boolean.TRUE; // TODO not sure how |
60 | private DelegateProcessor lastInterceptor; |
|
61 | // else to use an |
|
62 | // optional |
|
63 | // attribute in |
|
64 | // JAXB2 |
|
65 | ||
66 | public abstract List<ProcessorType> getOutputs(); |
|
67 | ||
68 | public abstract List<InterceptorType> getInterceptors(); |
|
69 | ||
70 | public Processor createProcessor(RouteContext routeContext) throws Exception { |
|
71 | 0 | throw new UnsupportedOperationException("Not implemented yet for class: " + getClass().getName()); |
72 | } |
|
73 | ||
74 | public Processor createOutputsProcessor(RouteContext routeContext) throws Exception { |
|
75 | 201 | Collection<ProcessorType> outputs = getOutputs(); |
76 | 201 | return createOutputsProcessor(routeContext, outputs); |
77 | } |
|
78 | ||
79 | public void addRoutes(RouteContext routeContext, Collection<Route> routes) throws Exception { |
|
80 | 297 | Processor processor = makeProcessor(routeContext); |
81 | 294 | routeContext.addEventDrivenProcessor(processor); |
82 | 294 | } |
83 | ||
84 | /** |
|
85 | * Wraps the child processor in whatever necessary interceptors and error |
|
86 | * handlers |
|
87 | */ |
|
88 | public Processor wrapProcessor(RouteContext routeContext, Processor processor) throws Exception { |
|
89 | 294 | processor = wrapProcessorInInterceptors(routeContext, processor); |
90 | 294 | return wrapInErrorHandler(processor); |
91 | } |
|
92 | ||
93 | // Fluent API |
|
94 | // ------------------------------------------------------------------------- |
|
95 | ||
96 | /** |
|
97 | * Sends the exchange to the given endpoint URI |
|
98 | */ |
|
99 | public ProcessorType to(String uri) { |
|
100 | 291 | addOutput(new ToType(uri)); |
101 | 291 | return this; |
102 | } |
|
103 | ||
104 | /** |
|
105 | * Sends the exchange to the given endpoint |
|
106 | */ |
|
107 | public ProcessorType to(Endpoint endpoint) { |
|
108 | 0 | addOutput(new ToType(endpoint)); |
109 | 0 | return this; |
110 | } |
|
111 | ||
112 | /** |
|
113 | * Sends the exchange to a list of endpoints using the |
|
114 | * {@link MulticastProcessor} pattern |
|
115 | */ |
|
116 | public ProcessorType to(String... uris) { |
|
117 | 36 | for (String uri : uris) { |
118 | 27 | addOutput(new ToType(uri)); |
119 | } |
|
120 | 9 | return this; |
121 | } |
|
122 | ||
123 | /** |
|
124 | * Sends the exchange to a list of endpoints using the |
|
125 | * {@link MulticastProcessor} pattern |
|
126 | */ |
|
127 | public ProcessorType to(Endpoint... endpoints) { |
|
128 | 0 | for (Endpoint endpoint : endpoints) { |
129 | 0 | addOutput(new ToType(endpoint)); |
130 | } |
|
131 | 0 | return this; |
132 | } |
|
133 | ||
134 | /** |
|
135 | * Sends the exchange to a list of endpoint using the |
|
136 | * {@link MulticastProcessor} pattern |
|
137 | */ |
|
138 | public ProcessorType to(Collection<Endpoint> endpoints) { |
|
139 | 0 | for (Endpoint endpoint : endpoints) { |
140 | 0 | addOutput(new ToType(endpoint)); |
141 | 0 | } |
142 | 0 | return this; |
143 | } |
|
144 | ||
145 | /** |
|
146 | * Multicasts messages to all its child outputs; so that each processor and |
|
147 | * destination gets a copy of the original message to avoid the processors |
|
148 | * interfering with each other. |
|
149 | */ |
|
150 | public MulticastType multicast() { |
|
151 | 3 | MulticastType answer = new MulticastType(); |
152 | 3 | addOutput(answer); |
153 | 3 | return answer; |
154 | } |
|
155 | ||
156 | /** |
|
157 | * Creates a {@link Pipeline} of the list of endpoints so that the message |
|
158 | * will get processed by each endpoint in turn and for request/response the |
|
159 | * output of one endpoint will be the input of the next endpoint |
|
160 | */ |
|
161 | public ProcessorType pipeline(String... uris) { |
|
162 | // TODO pipeline v mulicast |
|
163 | 3 | return to(uris); |
164 | } |
|
165 | ||
166 | /** |
|
167 | * Creates a {@link Pipeline} of the list of endpoints so that the message |
|
168 | * will get processed by each endpoint in turn and for request/response the |
|
169 | * output of one endpoint will be the input of the next endpoint |
|
170 | */ |
|
171 | public ProcessorType pipeline(Endpoint... endpoints) { |
|
172 | // TODO pipeline v mulicast |
|
173 | 0 | return to(endpoints); |
174 | } |
|
175 | ||
176 | /** |
|
177 | * Creates a {@link Pipeline} of the list of endpoints so that the message |
|
178 | * will get processed by each endpoint in turn and for request/response the |
|
179 | * output of one endpoint will be the input of the next endpoint |
|
180 | */ |
|
181 | public ProcessorType pipeline(Collection<Endpoint> endpoints) { |
|
182 | // TODO pipeline v mulicast |
|
183 | 0 | return to(endpoints); |
184 | } |
|
185 | ||
186 | /** |
|
187 | * Creates an {@link IdempotentConsumer} to avoid duplicate messages |
|
188 | */ |
|
189 | public IdempotentConsumerType idempotentConsumer(Expression messageIdExpression, |
|
190 | MessageIdRepository messageIdRepository) { |
|
191 | 6 | IdempotentConsumerType answer = new IdempotentConsumerType(messageIdExpression, messageIdRepository); |
192 | 6 | addOutput(answer); |
193 | 6 | return answer; |
194 | } |
|
195 | ||
196 | /** |
|
197 | * Creates a predicate which is applied and only if it is true then the |
|
198 | * exchange is forwarded to the destination |
|
199 | * |
|
200 | * @return the builder for a predicate |
|
201 | */ |
|
202 | public FilterType filter(Predicate predicate) { |
|
203 | 45 | FilterType filter = new FilterType(predicate); |
204 | 45 | addOutput(filter); |
205 | 45 | return filter; |
206 | } |
|
207 | ||
208 | /** |
|
209 | * Creates a choice of one or more predicates with an otherwise clause |
|
210 | * |
|
211 | * @return the builder for a choice expression |
|
212 | */ |
|
213 | public ChoiceType choice() { |
|
214 | 33 | ChoiceType answer = new ChoiceType(); |
215 | 33 | addOutput(answer); |
216 | 33 | return answer; |
217 | } |
|
218 | ||
219 | /** |
|
220 | * Creates a try/catch block |
|
221 | * |
|
222 | * @return the builder for a tryBlock expression |
|
223 | */ |
|
224 | public TryType tryBlock() { |
|
225 | 6 | TryType answer = new TryType(); |
226 | 6 | addOutput(answer); |
227 | 6 | return answer; |
228 | } |
|
229 | ||
230 | /** |
|
231 | * Creates a dynamic <a |
|
232 | * href="http://activemq.apache.org/camel/recipient-list.html">Recipient |
|
233 | * List</a> pattern. |
|
234 | * |
|
235 | * @param receipients is the builder of the expression used in the |
|
236 | * {@link RecipientList} to decide the destinations |
|
237 | */ |
|
238 | public ProcessorType recipientList(Expression receipients) { |
|
239 | 6 | RecipientListType answer = new RecipientListType(receipients); |
240 | 6 | addOutput(answer); |
241 | 6 | return this; |
242 | } |
|
243 | ||
244 | /** |
|
245 | * A builder for the <a |
|
246 | * href="http://activemq.apache.org/camel/splitter.html">Splitter</a> |
|
247 | * pattern where an expression is evaluated to iterate through each of the |
|
248 | * parts of a message and then each part is then send to some endpoint. |
|
249 | * |
|
250 | * @param receipients the expression on which to split |
|
251 | * @return the builder |
|
252 | */ |
|
253 | public SplitterType splitter(Expression receipients) { |
|
254 | 6 | SplitterType answer = new SplitterType(receipients); |
255 | 6 | addOutput(answer); |
256 | 6 | return answer; |
257 | } |
|
258 | ||
259 | /** |
|
260 | * A builder for the <a |
|
261 | * href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a> |
|
262 | * pattern where an expression is evaluated to be able to compare the |
|
263 | * message exchanges to reorder them. e.g. you may wish to sort by some |
|
264 | * header |
|
265 | * |
|
266 | * @param expression the expression on which to compare messages in order |
|
267 | * @return the builder |
|
268 | */ |
|
269 | public ResequencerType resequencer(Expression<Exchange> expression) { |
|
270 | 3 | return resequencer(Collections.<Expression> singletonList(expression)); |
271 | } |
|
272 | ||
273 | /** |
|
274 | * A builder for the <a |
|
275 | * href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a> |
|
276 | * pattern where a list of expressions are evaluated to be able to compare |
|
277 | * the message exchanges to reorder them. e.g. you may wish to sort by some |
|
278 | * headers |
|
279 | * |
|
280 | * @param expressions the expressions on which to compare messages in order |
|
281 | * @return the builder |
|
282 | */ |
|
283 | public ResequencerType resequencer(List<Expression> expressions) { |
|
284 | 3 | ResequencerType answer = new ResequencerType(expressions); |
285 | 3 | addOutput(answer); |
286 | 3 | return answer; |
287 | } |
|
288 | ||
289 | /** |
|
290 | * A builder for the <a |
|
291 | * href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a> |
|
292 | * pattern where a list of expressions are evaluated to be able to compare |
|
293 | * the message exchanges to reorder them. e.g. you may wish to sort by some |
|
294 | * headers |
|
295 | * |
|
296 | * @param expressions the expressions on which to compare messages in order |
|
297 | * @return the builder |
|
298 | */ |
|
299 | public ResequencerType resequencer(Expression... expressions) { |
|
300 | 0 | List<Expression> list = new ArrayList<Expression>(); |
301 | 0 | for (Expression expression : expressions) { |
302 | 0 | list.add(expression); |
303 | } |
|
304 | 0 | return resequencer(list); |
305 | } |
|
306 | ||
307 | /** |
|
308 | * A builder for the <a |
|
309 | * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a> |
|
310 | * pattern where a batch of messages are processed (up to a maximum amount |
|
311 | * or until some timeout is reached) and messages for the same correlation |
|
312 | * key are combined together using some kind of |
|
313 | * {@link AggregationStrategy ) (by default the latest message is used) to compress many message exchanges |
|
314 | * into a smaller number of exchanges. <p/> A good example of this is stock |
|
315 | * market data; you may be receiving 30,000 messages/second and you may want |
|
316 | * to throttle it right down so that multiple messages for the same stock |
|
317 | * are combined (or just the latest message is used and older prices are |
|
318 | * discarded). Another idea is to combine line item messages together into a |
|
319 | * single invoice message. |
|
320 | * |
|
321 | * @param correlationExpression the expression used to calculate the |
|
322 | * correlation key. For a JMS message this could be the |
|
323 | * expression <code>header("JMSDestination")</code> or |
|
324 | * <code>header("JMSCorrelationID")</code> |
|
325 | */ |
|
326 | public AggregatorType aggregator(Expression correlationExpression) { |
|
327 | 3 | AggregatorType answer = new AggregatorType(correlationExpression); |
328 | 3 | addOutput(answer); |
329 | 3 | return answer; |
330 | } |
|
331 | ||
332 | /** |
|
333 | * A builder for the <a |
|
334 | * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a> |
|
335 | * pattern where a batch of messages are processed (up to a maximum amount |
|
336 | * or until some timeout is reached) and messages for the same correlation |
|
337 | * key are combined together using some kind of |
|
338 | * {@link AggregationStrategy ) (by default the latest message is used) to compress many message exchanges |
|
339 | * into a smaller number of exchanges. <p/> A good example of this is stock |
|
340 | * market data; you may be receiving 30,000 messages/second and you may want |
|
341 | * to throttle it right down so that multiple messages for the same stock |
|
342 | * are combined (or just the latest message is used and older prices are |
|
343 | * discarded). Another idea is to combine line item messages together into a |
|
344 | * single invoice message. |
|
345 | * |
|
346 | * @param correlationExpression the expression used to calculate the |
|
347 | * correlation key. For a JMS message this could be the |
|
348 | * expression <code>header("JMSDestination")</code> or |
|
349 | * <code>header("JMSCorrelationID")</code> |
|
350 | */ |
|
351 | public AggregatorType aggregator(Expression correlationExpression, AggregationStrategy aggregationStrategy) { |
|
352 | 0 | AggregatorType answer = new AggregatorType(correlationExpression, aggregationStrategy); |
353 | 0 | addOutput(answer); |
354 | 0 | return answer; |
355 | } |
|
356 | ||
357 | /** |
|
358 | * A builder for the <a |
|
359 | * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern |
|
360 | * where an expression is used to calculate the time which the message will |
|
361 | * be dispatched on |
|
362 | * |
|
363 | * @param processAtExpression an expression to calculate the time at which |
|
364 | * the messages should be processed |
|
365 | * @return the builder |
|
366 | */ |
|
367 | public DelayerType delayer(Expression<Exchange> processAtExpression) { |
|
368 | 0 | return delayer(processAtExpression, 0L); |
369 | } |
|
370 | ||
371 | /** |
|
372 | * A builder for the <a |
|
373 | * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern |
|
374 | * where an expression is used to calculate the time which the message will |
|
375 | * be dispatched on |
|
376 | * |
|
377 | * @param processAtExpression an expression to calculate the time at which |
|
378 | * the messages should be processed |
|
379 | * @param delay the delay in milliseconds which is added to the |
|
380 | * processAtExpression to determine the time the message |
|
381 | * should be processed |
|
382 | * @return the builder |
|
383 | */ |
|
384 | public DelayerType delayer(Expression<Exchange> processAtExpression, long delay) { |
|
385 | 3 | DelayerType answer = new DelayerType(processAtExpression, delay); |
386 | 3 | addOutput(answer); |
387 | 3 | return answer; |
388 | } |
|
389 | ||
390 | /** |
|
391 | * A builder for the <a |
|
392 | * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern |
|
393 | * where a fixed amount of milliseconds are used to delay processing of a |
|
394 | * message exchange |
|
395 | * |
|
396 | * @param delay the default delay in milliseconds |
|
397 | * @return the builder |
|
398 | */ |
|
399 | public DelayerType delayer(long delay) { |
|
400 | 0 | return delayer(null, delay); |
401 | } |
|
402 | ||
403 | /** |
|
404 | * A builder for the <a |
|
405 | * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern |
|
406 | * where an expression is used to calculate the time which the message will |
|
407 | * be dispatched on |
|
408 | * |
|
409 | * @return the builder |
|
410 | */ |
|
411 | public ThrottlerType throttler(long maximumRequestCount) { |
|
412 | 3 | ThrottlerType answer = new ThrottlerType(maximumRequestCount); |
413 | 3 | addOutput(answer); |
414 | 3 | return answer; |
415 | } |
|
416 | ||
417 | public ProcessorType interceptor(String ref) { |
|
418 | 6 | getInterceptors().add(new InterceptorRef(ref)); |
419 | 6 | return this; |
420 | } |
|
421 | ||
422 | public InterceptType intercept() { |
|
423 | 0 | InterceptType answer = new InterceptType(); |
424 | 0 | addOutput(answer); |
425 | 0 | return answer; |
426 | } |
|
427 | ||
428 | public ProcessorType proceed() { |
|
429 | 24 | addOutput(new ProceedType()); |
430 | 24 | return this; |
431 | } |
|
432 | ||
433 | public ExceptionType exception(Class exceptionType) { |
|
434 | 0 | ExceptionType answer = new ExceptionType(exceptionType); |
435 | 0 | addOutput(answer); |
436 | 0 | return answer; |
437 | } |
|
438 | ||
439 | /** |
|
440 | * Apply an interceptor route if the predicate is true |
|
441 | */ |
|
442 | public OtherwiseType intercept(Predicate predicate) { |
|
443 | 0 | InterceptType answer = new InterceptType(); |
444 | 0 | addOutput(answer); |
445 | 0 | return answer.when(predicate); |
446 | } |
|
447 | ||
448 | public ProcessorType interceptors(String... refs) { |
|
449 | 9 | for (String ref : refs) { |
450 | 6 | interceptor(ref); |
451 | } |
|
452 | 3 | return this; |
453 | } |
|
454 | ||
455 | public FilterType filter(ExpressionType expression) { |
|
456 | 6 | FilterType filter = new FilterType(); |
457 | 6 | filter.setExpression(expression); |
458 | 6 | addOutput(filter); |
459 | 6 | return filter; |
460 | } |
|
461 | ||
462 | public FilterType filter(String language, String expression) { |
|
463 | 3 | return filter(new LanguageExpression(language, expression)); |
464 | } |
|
465 | ||
466 | /** |
|
467 | * Trace logs the exchange before it goes to the next processing step using |
|
468 | * the {@link #DEFAULT_TRACE_CATEGORY} logging category. |
|
469 | * |
|
470 | * @return |
|
471 | */ |
|
472 | public ProcessorType trace() { |
|
473 | 0 | return trace(DEFAULT_TRACE_CATEGORY); |
474 | } |
|
475 | ||
476 | /** |
|
477 | * Trace logs the exchange before it goes to the next processing step using |
|
478 | * the specified logging category. |
|
479 | * |
|
480 | * @param category the logging category trace messages will sent to. |
|
481 | * @return |
|
482 | */ |
|
483 | public ProcessorType trace(String category) { |
|
484 | 0 | final Log log = LogFactory.getLog(category); |
485 | 0 | return intercept(new DelegateProcessor() { |
486 | @Override |
|
487 | 0 | public void process(Exchange exchange) throws Exception { |
488 | 0 | log.trace(exchange); |
489 | 0 | processNext(exchange); |
490 | 0 | } |
491 | }); |
|
492 | } |
|
493 | ||
494 | public PolicyRef policies() { |
|
495 | 0 | PolicyRef answer = new PolicyRef(); |
496 | 0 | addOutput(answer); |
497 | 0 | return answer; |
498 | } |
|
499 | ||
500 | public PolicyRef policy(Policy policy) { |
|
501 | 0 | PolicyRef answer = new PolicyRef(policy); |
502 | 0 | addOutput(answer); |
503 | 0 | return answer; |
504 | } |
|
505 | ||
506 | public ProcessorType intercept(DelegateProcessor interceptor) { |
|
507 | 36 | getInterceptors().add(new InterceptorRef(interceptor)); |
508 | 36 | lastInterceptor = interceptor; |
509 | 36 | return this; |
510 | } |
|
511 | ||
512 | /** |
|
513 | * Installs the given error handler builder |
|
514 | * |
|
515 | * @param errorHandlerBuilder the error handler to be used by default for |
|
516 | * all child routes |
|
517 | * @return the current builder with the error handler configured |
|
518 | */ |
|
519 | public ProcessorType errorHandler(ErrorHandlerBuilder errorHandlerBuilder) { |
|
520 | 9 | setErrorHandlerBuilder(errorHandlerBuilder); |
521 | 9 | return this; |
522 | } |
|
523 | ||
524 | /** |
|
525 | * Configures whether or not the error handler is inherited by every |
|
526 | * processing node (or just the top most one) |
|
527 | * |
|
528 | * @param condition the falg as to whether error handlers should be |
|
529 | * inherited or not |
|
530 | * @return the current builder |
|
531 | */ |
|
532 | public ProcessorType inheritErrorHandler(boolean condition) { |
|
533 | 3 | setInheritErrorHandlerFlag(condition); |
534 | 3 | return this; |
535 | } |
|
536 | ||
537 | // Transformers |
|
538 | // ------------------------------------------------------------------------- |
|
539 | ||
540 | /** |
|
541 | * Adds the custom processor to this destination which could be a final |
|
542 | * destination, or could be a transformation in a pipeline |
|
543 | */ |
|
544 | public ProcessorType process(Processor processor) { |
|
545 | 75 | ProcessorRef answer = new ProcessorRef(processor); |
546 | 75 | addOutput(answer); |
547 | 75 | return this; |
548 | } |
|
549 | ||
550 | /** |
|
551 | * Adds a bean which is invoked which could be a final destination, or could |
|
552 | * be a transformation in a pipeline |
|
553 | */ |
|
554 | public ProcessorType beanRef(String ref) { |
|
555 | 0 | BeanRef answer = new BeanRef(ref); |
556 | 0 | addOutput(answer); |
557 | 0 | return this; |
558 | } |
|
559 | ||
560 | /** |
|
561 | * Adds a bean and method which is invoked which could be a final |
|
562 | * destination, or could be a transformation in a pipeline |
|
563 | */ |
|
564 | public ProcessorType beanRef(String ref, String method) { |
|
565 | 0 | BeanRef answer = new BeanRef(ref, method); |
566 | 0 | addOutput(answer); |
567 | 0 | return this; |
568 | } |
|
569 | ||
570 | /** |
|
571 | * Adds a processor which sets the body on the IN message |
|
572 | */ |
|
573 | public ProcessorType setBody(Expression expression) { |
|
574 | 3 | return process(ProcessorBuilder.setBody(expression)); |
575 | } |
|
576 | ||
577 | /** |
|
578 | * Adds a processor which sets the body on the OUT message |
|
579 | */ |
|
580 | public ProcessorType setOutBody(Expression expression) { |
|
581 | 0 | return process(ProcessorBuilder.setOutBody(expression)); |
582 | } |
|
583 | ||
584 | /** |
|
585 | * Adds a processor which sets the header on the IN message |
|
586 | */ |
|
587 | public ProcessorType setHeader(String name, Expression expression) { |
|
588 | 0 | return process(ProcessorBuilder.setHeader(name, expression)); |
589 | } |
|
590 | ||
591 | /** |
|
592 | * Adds a processor which sets the header on the OUT message |
|
593 | */ |
|
594 | public ProcessorType setOutHeader(String name, Expression expression) { |
|
595 | 0 | return process(ProcessorBuilder.setOutHeader(name, expression)); |
596 | } |
|
597 | ||
598 | /** |
|
599 | * Adds a processor which sets the exchange property |
|
600 | */ |
|
601 | public ProcessorType setProperty(String name, Expression expression) { |
|
602 | 0 | return process(ProcessorBuilder.setProperty(name, expression)); |
603 | } |
|
604 | ||
605 | /** |
|
606 | * Converts the IN message body to the specified type |
|
607 | */ |
|
608 | public ProcessorType convertBodyTo(Class type) { |
|
609 | 0 | return process(ProcessorBuilder.setBody(Builder.body().convertTo(type))); |
610 | } |
|
611 | ||
612 | /** |
|
613 | * Converts the OUT message body to the specified type |
|
614 | */ |
|
615 | public ProcessorType convertOutBodyTo(Class type) { |
|
616 | 0 | return process(ProcessorBuilder.setOutBody(Builder.outBody().convertTo(type))); |
617 | } |
|
618 | ||
619 | // Properties |
|
620 | // ------------------------------------------------------------------------- |
|
621 | ||
622 | @XmlTransient |
|
623 | public ErrorHandlerBuilder getErrorHandlerBuilder() { |
|
624 | 759 | if (errorHandlerBuilder == null) { |
625 | 321 | errorHandlerBuilder = createErrorHandlerBuilder(); |
626 | } |
|
627 | 759 | return errorHandlerBuilder; |
628 | } |
|
629 | ||
630 | /** |
|
631 | * Sets the error handler to use with processors created by this builder |
|
632 | */ |
|
633 | public void setErrorHandlerBuilder(ErrorHandlerBuilder errorHandlerBuilder) { |
|
634 | 456 | this.errorHandlerBuilder = errorHandlerBuilder; |
635 | 456 | } |
636 | ||
637 | @XmlTransient |
|
638 | public boolean isInheritErrorHandler() { |
|
639 | 771 | return ObjectConverter.toBoolean(getInheritErrorHandlerFlag()); |
640 | } |
|
641 | ||
642 | @XmlAttribute(name = "inheritErrorHandler", required = false) |
|
643 | public Boolean getInheritErrorHandlerFlag() { |
|
644 | 789 | return inheritErrorHandlerFlag; |
645 | } |
|
646 | ||
647 | public void setInheritErrorHandlerFlag(Boolean inheritErrorHandlerFlag) { |
|
648 | 276 | this.inheritErrorHandlerFlag = inheritErrorHandlerFlag; |
649 | 276 | } |
650 | ||
651 | // Implementation methods |
|
652 | // ------------------------------------------------------------------------- |
|
653 | ||
654 | /** |
|
655 | * Creates the processor and wraps it in any necessary interceptors and |
|
656 | * error handlers |
|
657 | */ |
|
658 | protected Processor makeProcessor(RouteContext routeContext) throws Exception { |
|
659 | 297 | Processor processor = createProcessor(routeContext); |
660 | 294 | return wrapProcessor(routeContext, processor); |
661 | } |
|
662 | ||
663 | /** |
|
664 | * A strategy method which allows derived classes to wrap the child |
|
665 | * processor in some kind of interceptor |
|
666 | * |
|
667 | * @param routeContext |
|
668 | * @param target the processor which can be wrapped |
|
669 | * @return the original processor or a new wrapped interceptor |
|
670 | */ |
|
671 | protected Processor wrapProcessorInInterceptors(RouteContext routeContext, Processor target) throws Exception { |
|
672 | // The target is required. |
|
673 | 294 | if (target == null) { |
674 | 0 | throw new RuntimeCamelException("target provided."); |
675 | } |
|
676 | ||
677 | // Interceptors are optional |
|
678 | 294 | DelegateProcessor first = null; |
679 | 294 | DelegateProcessor last = null; |
680 | 294 | List<InterceptorType> interceptors = new ArrayList<InterceptorType>(routeContext.getRoute() |
681 | .getInterceptors()); |
|
682 | 294 | List<InterceptorType> list = getInterceptors(); |
683 | 294 | for (InterceptorType interceptorType : list) { |
684 | 12 | if (!interceptors.contains(interceptorType)) { |
685 | 0 | interceptors.add(interceptorType); |
686 | } |
|
687 | 12 | } |
688 | 294 | for (InterceptorType interceptorRef : interceptors) { |
689 | 36 | DelegateProcessor p = interceptorRef.createInterceptor(routeContext); |
690 | 36 | if (first == null) { |
691 | 30 | first = p; |
692 | } |
|
693 | 36 | if (last != null) { |
694 | 6 | last.setProcessor(p); |
695 | } |
|
696 | 36 | last = p; |
697 | 36 | } |
698 | ||
699 | 294 | if (last != null) { |
700 | 30 | last.setProcessor(target); |
701 | } |
|
702 | 294 | return first == null ? target : first; |
703 | } |
|
704 | ||
705 | /** |
|
706 | * A strategy method to allow newly created processors to be wrapped in an |
|
707 | * error handler. |
|
708 | */ |
|
709 | protected Processor wrapInErrorHandler(Processor processor) throws Exception { |
|
710 | 294 | return getErrorHandlerBuilder().createErrorHandler(processor); |
711 | } |
|
712 | ||
713 | protected ErrorHandlerBuilder createErrorHandlerBuilder() { |
|
714 | 321 | if (isInheritErrorHandler()) { |
715 | 321 | return new DeadLetterChannelBuilder(); |
716 | } else { |
|
717 | 0 | return new NoErrorHandlerBuilder(); |
718 | } |
|
719 | } |
|
720 | ||
721 | protected void configureChild(ProcessorType output) { |
|
722 | 78 | } |
723 | ||
724 | protected void addOutput(ProcessorType processorType) { |
|
725 | 528 | configureChild(processorType); |
726 | 528 | getOutputs().add(processorType); |
727 | 528 | } |
728 | ||
729 | /** |
|
730 | * Creates a new instance of some kind of composite processor which defaults |
|
731 | * to using a {@link Pipeline} but derived classes could change the |
|
732 | * behaviour |
|
733 | */ |
|
734 | protected Processor createCompositeProcessor(List<Processor> list) { |
|
735 | // return new MulticastProcessor(list); |
|
736 | 12 | return new Pipeline(list); |
737 | } |
|
738 | ||
739 | protected Processor createOutputsProcessor(RouteContext routeContext, Collection<ProcessorType> outputs) |
|
740 | throws Exception { |
|
741 | 207 | List<Processor> list = new ArrayList<Processor>(); |
742 | 207 | for (ProcessorType output : outputs) { |
743 | 225 | Processor processor = output.createProcessor(routeContext); |
744 | 225 | list.add(processor); |
745 | 225 | } |
746 | 207 | Processor processor = null; |
747 | 207 | if (!list.isEmpty()) { |
748 | 207 | if (list.size() == 1) { |
749 | 192 | processor = list.get(0); |
750 | 192 | } else { |
751 | 15 | processor = createCompositeProcessor(list); |
752 | } |
|
753 | } |
|
754 | 207 | return processor; |
755 | } |
|
756 | } |