001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.model; 018 019 import java.util.ArrayList; 020 import java.util.List; 021 022 import javax.xml.bind.annotation.XmlAccessType; 023 import javax.xml.bind.annotation.XmlAccessorType; 024 import javax.xml.bind.annotation.XmlAttribute; 025 import javax.xml.bind.annotation.XmlElement; 026 import javax.xml.bind.annotation.XmlElementRef; 027 import javax.xml.bind.annotation.XmlRootElement; 028 import javax.xml.bind.annotation.XmlTransient; 029 030 import org.apache.camel.Expression; 031 import org.apache.camel.Predicate; 032 import org.apache.camel.Processor; 033 import org.apache.camel.builder.ExpressionClause; 034 import org.apache.camel.model.language.ExpressionDefinition; 035 import org.apache.camel.processor.Aggregator; 036 import org.apache.camel.processor.aggregate.AggregationCollection; 037 import org.apache.camel.processor.aggregate.AggregationStrategy; 038 import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy; 039 import org.apache.camel.spi.RouteContext; 040 041 /** 042 * Represents an XML <aggregate/> element 043 * 044 * @version $Revision: 751373 $ 045 */ 046 @XmlRootElement(name = "aggregate") 047 @XmlAccessorType(XmlAccessType.FIELD) 048 public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition> { 049 @XmlElement(name = "correlationExpression", required = false) 050 private ExpressionSubElementDefinition correlationExpression; 051 @XmlTransient 052 private ExpressionDefinition expression; 053 @XmlElementRef 054 private List<ProcessorDefinition> outputs = new ArrayList<ProcessorDefinition>(); 055 @XmlTransient 056 private AggregationStrategy aggregationStrategy; 057 @XmlTransient 058 private AggregationCollection aggregationCollection; 059 @XmlAttribute(required = false) 060 private Integer batchSize; 061 @XmlAttribute(required = false) 062 private Integer outBatchSize; 063 @XmlAttribute(required = false) 064 private Long batchTimeout; 065 @XmlAttribute(required = false) 066 private String strategyRef; 067 @XmlAttribute(required = false) 068 private String collectionRef; 069 @XmlAttribute(required = false) 070 private Boolean groupExchanges; 071 @XmlElement(name = "completionPredicate", required = false) 072 private ExpressionSubElementDefinition completionPredicate; 073 074 public AggregateDefinition() { 075 } 076 077 public AggregateDefinition(Predicate predicate) { 078 if (predicate != null) { 079 setExpression(new ExpressionDefinition(predicate)); 080 } 081 } 082 083 public AggregateDefinition(Expression correlationExpression) { 084 if (correlationExpression != null) { 085 setExpression(new ExpressionDefinition(correlationExpression)); 086 } 087 } 088 089 public AggregateDefinition(ExpressionDefinition correlationExpression) { 090 this.expression = correlationExpression; 091 } 092 093 public AggregateDefinition(Expression correlationExpression, AggregationStrategy aggregationStrategy) { 094 this(correlationExpression); 095 this.aggregationStrategy = aggregationStrategy; 096 } 097 098 @Override 099 public String toString() { 100 String expressionString = (getExpression() != null) ? getExpression().getLabel() : ""; 101 return "Aggregate[" + expressionString + " -> " + getOutputs() + "]"; 102 } 103 104 @Override 105 public String getShortName() { 106 return "aggregate"; 107 } 108 109 @Override 110 public Processor createProcessor(RouteContext routeContext) throws Exception { 111 return createAggregator(routeContext); 112 } 113 114 public ExpressionClause<AggregateDefinition> createAndSetExpression() { 115 ExpressionClause<AggregateDefinition> clause = new ExpressionClause<AggregateDefinition>(this); 116 this.setExpression(clause); 117 return clause; 118 } 119 120 protected Aggregator createAggregator(RouteContext routeContext) throws Exception { 121 final Processor processor = routeContext.createProcessor(this); 122 123 final Aggregator aggregator; 124 if (getAggregationCollection() == null) { 125 setAggregationCollection(createAggregationCollection(routeContext)); 126 } 127 128 if (aggregationCollection != null) { 129 // create the aggregator using the collection 130 // pre configure the collection if its expression and strategy is not set, then 131 // use the ones that is pre configured with this type 132 if (aggregationCollection.getCorrelationExpression() == null) { 133 aggregationCollection.setCorrelationExpression(getExpression()); 134 } 135 if (aggregationCollection.getAggregationStrategy() == null) { 136 AggregationStrategy strategy = createAggregationStrategy(routeContext); 137 aggregationCollection.setAggregationStrategy(strategy); 138 } 139 aggregator = new Aggregator(processor, aggregationCollection); 140 } else { 141 // create the aggregator using a default collection 142 AggregationStrategy strategy = createAggregationStrategy(routeContext); 143 144 if (getExpression() == null) { 145 throw new IllegalArgumentException("You need to specify an expression or " 146 + "aggregation collection for this aggregator: " + this); 147 } 148 149 Expression aggregateExpression = getExpression().createExpression(routeContext); 150 151 Predicate predicate = null; 152 if (getCompletionPredicate() != null) { 153 predicate = getCompletionPredicate().createPredicate(routeContext); 154 } 155 if (predicate != null) { 156 aggregator = new Aggregator(processor, aggregateExpression, strategy, predicate); 157 } else { 158 aggregator = new Aggregator(processor, aggregateExpression, strategy); 159 } 160 } 161 162 if (batchSize != null) { 163 aggregator.setBatchSize(batchSize); 164 } 165 166 if (batchTimeout != null) { 167 aggregator.setBatchTimeout(batchTimeout); 168 } 169 170 if (outBatchSize != null) { 171 aggregator.setOutBatchSize(outBatchSize); 172 } 173 174 if (groupExchanges != null) { 175 aggregator.setGroupExchanges(groupExchanges); 176 } 177 178 return aggregator; 179 } 180 181 private AggregationStrategy createAggregationStrategy(RouteContext routeContext) { 182 AggregationStrategy strategy = getAggregationStrategy(); 183 if (strategy == null && strategyRef != null) { 184 strategy = routeContext.lookup(strategyRef, AggregationStrategy.class); 185 } 186 if (strategy == null) { 187 // fallback to use latest 188 strategy = new UseLatestAggregationStrategy(); 189 } 190 return strategy; 191 } 192 193 private AggregationCollection createAggregationCollection(RouteContext routeContext) { 194 AggregationCollection collection = getAggregationCollection(); 195 if (collection == null && collectionRef != null) { 196 collection = routeContext.lookup(collectionRef, AggregationCollection.class); 197 } 198 return collection; 199 } 200 201 public AggregationCollection getAggregationCollection() { 202 return aggregationCollection; 203 } 204 205 public void setAggregationCollection(AggregationCollection aggregationCollection) { 206 this.aggregationCollection = aggregationCollection; 207 } 208 209 public AggregationStrategy getAggregationStrategy() { 210 return aggregationStrategy; 211 } 212 213 public void setAggregationStrategy(AggregationStrategy aggregationStrategy) { 214 this.aggregationStrategy = aggregationStrategy; 215 } 216 217 public Integer getBatchSize() { 218 return batchSize; 219 } 220 221 public void setBatchSize(Integer batchSize) { 222 this.batchSize = batchSize; 223 } 224 225 public Integer getOutBatchSize() { 226 return outBatchSize; 227 } 228 229 public void setOutBatchSize(Integer outBatchSize) { 230 this.outBatchSize = outBatchSize; 231 } 232 233 public Long getBatchTimeout() { 234 return batchTimeout; 235 } 236 237 public void setBatchTimeout(Long batchTimeout) { 238 this.batchTimeout = batchTimeout; 239 } 240 241 public String getStrategyRef() { 242 return strategyRef; 243 } 244 245 public void setStrategyRef(String strategyRef) { 246 this.strategyRef = strategyRef; 247 } 248 249 public String getCollectionRef() { 250 return collectionRef; 251 } 252 253 public void setCollectionRef(String collectionRef) { 254 this.collectionRef = collectionRef; 255 } 256 257 public void setCompletionPredicate(ExpressionSubElementDefinition completionPredicate) { 258 this.completionPredicate = completionPredicate; 259 } 260 261 public ExpressionSubElementDefinition getCompletionPredicate() { 262 return completionPredicate; 263 } 264 265 public Boolean getGroupExchanges() { 266 return groupExchanges; 267 } 268 269 public void setGroupExchanges(Boolean groupExchanges) { 270 this.groupExchanges = groupExchanges; 271 } 272 273 // Fluent API 274 //------------------------------------------------------------------------- 275 276 /** 277 * Sets the in batch size for number of exchanges received 278 * 279 * @param batchSize the batch size 280 * @return builder 281 */ 282 public AggregateDefinition batchSize(int batchSize) { 283 setBatchSize(batchSize); 284 return this; 285 } 286 287 /** 288 * Sets the out batch size for number of exchanges sent 289 * 290 * @param batchSize the batch size 291 * @return builder 292 */ 293 public AggregateDefinition outBatchSize(int batchSize) { 294 setOutBatchSize(batchSize); 295 return this; 296 } 297 298 /** 299 * Sets the batch timeout 300 * 301 * @param batchTimeout the timeout in millis 302 * @return the builder 303 */ 304 public AggregateDefinition batchTimeout(long batchTimeout) { 305 setBatchTimeout(batchTimeout); 306 return this; 307 } 308 309 /** 310 * Sets the aggregate collection to use 311 * 312 * @param aggregationCollection the aggregate collection to use 313 * @return the builder 314 */ 315 public AggregateDefinition aggregationCollection(AggregationCollection aggregationCollection) { 316 setAggregationCollection(aggregationCollection); 317 return this; 318 } 319 320 /** 321 * Sets the aggregate strategy to use 322 * 323 * @param aggregationStrategy the aggregate strategy to use 324 * @return the builder 325 */ 326 public AggregateDefinition aggregationStrategy(AggregationStrategy aggregationStrategy) { 327 setAggregationStrategy(aggregationStrategy); 328 return this; 329 } 330 331 /** 332 * Sets the aggregate collection to use 333 * 334 * @param collectionRef reference to the aggregate collection to lookup in the registry 335 * @return the builder 336 */ 337 public AggregateDefinition collectionRef(String collectionRef) { 338 setCollectionRef(collectionRef); 339 return this; 340 } 341 342 /** 343 * Sets the aggregate strategy to use 344 * 345 * @param strategyRef reference to the strategy to lookup in the registry 346 * @return the builder 347 */ 348 public AggregateDefinition strategyRef(String strategyRef) { 349 setStrategyRef(strategyRef); 350 return this; 351 } 352 353 /** 354 * Enables grouped exchanges, so the aggregator will group all aggregated exchanges into a single 355 * combined {@link org.apache.camel.impl.GroupedExchange} class holding all the aggregated exchanges. 356 * 357 * @return the builder 358 */ 359 public AggregateDefinition groupExchanges() { 360 setGroupExchanges(true); 361 return this; 362 } 363 364 /** 365 * Sets the predicate used to determine if the aggregation is completed 366 * 367 * @return the clause used to create the predicate 368 */ 369 public ExpressionClause<AggregateDefinition> completionPredicate() { 370 checkNoCompletedPredicate(); 371 ExpressionClause<AggregateDefinition> clause = new ExpressionClause<AggregateDefinition>(this); 372 setCompletionPredicate(new ExpressionSubElementDefinition((Expression)clause)); 373 return clause; 374 } 375 376 /** 377 * Sets the predicate used to determine if the aggregation is completed 378 * 379 * @param predicate the predicate 380 */ 381 public AggregateDefinition completionPredicate(Predicate predicate) { 382 checkNoCompletedPredicate(); 383 setCompletionPredicate(new ExpressionSubElementDefinition(predicate)); 384 return this; 385 } 386 387 protected void checkNoCompletedPredicate() { 388 if (getCompletionPredicate() != null) { 389 throw new IllegalArgumentException("There is already a completionPredicate defined for this aggregator: " + this); 390 } 391 } 392 393 public void setCorrelationExpression(ExpressionSubElementDefinition correlationExpression) { 394 this.correlationExpression = correlationExpression; 395 } 396 397 public ExpressionSubElementDefinition getCorrelationExpression() { 398 return correlationExpression; 399 } 400 401 // Section - Methods from ExpressionNode 402 // Needed to copy methods from ExpressionNode here so that I could specify the 403 // correlation expression as optional in JAXB 404 405 public ExpressionDefinition getExpression() { 406 if (expression == null && correlationExpression != null) { 407 expression = correlationExpression.getExpressionType(); 408 } 409 return expression; 410 } 411 412 public void setExpression(ExpressionDefinition expression) { 413 this.expression = expression; 414 } 415 416 public List<ProcessorDefinition> getOutputs() { 417 return outputs; 418 } 419 420 public void setOutputs(List<ProcessorDefinition> outputs) { 421 this.outputs = outputs; 422 } 423 424 @Override 425 protected void configureChild(ProcessorDefinition output) { 426 super.configureChild(output); 427 if (isInheritErrorHandler()) { 428 output.setErrorHandlerBuilder(getErrorHandlerBuilder()); 429 } 430 } 431 }