001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.model; 018 019 import java.util.ArrayList; 020 import java.util.List; 021 022 import javax.xml.bind.annotation.XmlAccessType; 023 import javax.xml.bind.annotation.XmlAccessorType; 024 import javax.xml.bind.annotation.XmlAttribute; 025 import javax.xml.bind.annotation.XmlElement; 026 import javax.xml.bind.annotation.XmlElementRef; 027 import javax.xml.bind.annotation.XmlRootElement; 028 import javax.xml.bind.annotation.XmlTransient; 029 030 import org.apache.camel.Expression; 031 import org.apache.camel.Predicate; 032 import org.apache.camel.Processor; 033 import org.apache.camel.builder.ExpressionClause; 034 import org.apache.camel.model.language.ExpressionDefinition; 035 import org.apache.camel.processor.Aggregator; 036 import org.apache.camel.processor.aggregate.AggregationCollection; 037 import org.apache.camel.processor.aggregate.AggregationStrategy; 038 import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy; 039 import org.apache.camel.spi.RouteContext; 040 041 /** 042 * Represents an XML <aggregate/> element 043 * 044 * @version $Revision: 782668 $ 045 */ 046 @XmlRootElement(name = "aggregate") 047 @XmlAccessorType(XmlAccessType.FIELD) 048 public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition> { 049 @XmlElement(name = "correlationExpression", required = false) 050 private ExpressionSubElementDefinition correlationExpression; 051 @XmlTransient 052 private ExpressionDefinition expression; 053 @XmlElementRef 054 private List<ProcessorDefinition> outputs = new ArrayList<ProcessorDefinition>(); 055 @XmlTransient 056 private AggregationStrategy aggregationStrategy; 057 @XmlTransient 058 private AggregationCollection aggregationCollection; 059 @XmlAttribute(required = false) 060 private Integer batchSize; 061 @XmlAttribute(required = false) 062 private Integer outBatchSize; 063 @XmlAttribute(required = false) 064 private Long batchTimeout; 065 @XmlAttribute(required = false) 066 private String strategyRef; 067 @XmlAttribute(required = false) 068 private String collectionRef; 069 @XmlAttribute(required = false) 070 private Boolean groupExchanges; 071 @XmlAttribute(required = false) 072 private Boolean batchSizeFromConsumer; 073 @XmlElement(name = "completionPredicate", required = false) 074 private ExpressionSubElementDefinition completionPredicate; 075 076 public AggregateDefinition() { 077 } 078 079 public AggregateDefinition(Predicate predicate) { 080 if (predicate != null) { 081 setExpression(new ExpressionDefinition(predicate)); 082 } 083 } 084 085 public AggregateDefinition(Expression correlationExpression) { 086 if (correlationExpression != null) { 087 setExpression(new ExpressionDefinition(correlationExpression)); 088 } 089 } 090 091 public AggregateDefinition(ExpressionDefinition correlationExpression) { 092 this.expression = correlationExpression; 093 } 094 095 public AggregateDefinition(Expression correlationExpression, AggregationStrategy aggregationStrategy) { 096 this(correlationExpression); 097 this.aggregationStrategy = aggregationStrategy; 098 } 099 100 @Override 101 public String toString() { 102 String expressionString = (getExpression() != null) ? getExpression().getLabel() : ""; 103 return "Aggregate[" + expressionString + " -> " + getOutputs() + "]"; 104 } 105 106 @Override 107 public String getShortName() { 108 return "aggregate"; 109 } 110 111 @Override 112 public Processor createProcessor(RouteContext routeContext) throws Exception { 113 return createAggregator(routeContext); 114 } 115 116 public ExpressionClause<AggregateDefinition> createAndSetExpression() { 117 ExpressionClause<AggregateDefinition> clause = new ExpressionClause<AggregateDefinition>(this); 118 this.setExpression(clause); 119 return clause; 120 } 121 122 protected Aggregator createAggregator(RouteContext routeContext) throws Exception { 123 final Processor processor = routeContext.createProcessor(this); 124 125 final Aggregator aggregator; 126 if (getAggregationCollection() == null) { 127 setAggregationCollection(createAggregationCollection(routeContext)); 128 } 129 130 if (aggregationCollection != null) { 131 // create the aggregator using the collection 132 // pre configure the collection if its expression and strategy is not set, then 133 // use the ones that is pre configured with this type 134 if (aggregationCollection.getCorrelationExpression() == null) { 135 aggregationCollection.setCorrelationExpression(getExpression()); 136 } 137 if (aggregationCollection.getAggregationStrategy() == null) { 138 AggregationStrategy strategy = createAggregationStrategy(routeContext); 139 aggregationCollection.setAggregationStrategy(strategy); 140 } 141 aggregator = new Aggregator(processor, aggregationCollection); 142 } else { 143 // create the aggregator using a default collection 144 AggregationStrategy strategy = createAggregationStrategy(routeContext); 145 146 if (getExpression() == null) { 147 throw new IllegalArgumentException("You need to specify an expression or " 148 + "aggregation collection for this aggregator: " + this); 149 } 150 151 Expression aggregateExpression = getExpression().createExpression(routeContext); 152 153 Predicate predicate = null; 154 if (getCompletionPredicate() != null) { 155 predicate = getCompletionPredicate().createPredicate(routeContext); 156 } 157 if (predicate != null) { 158 aggregator = new Aggregator(processor, aggregateExpression, strategy, predicate); 159 } else { 160 aggregator = new Aggregator(processor, aggregateExpression, strategy); 161 } 162 } 163 164 if (batchSize != null) { 165 aggregator.setBatchSize(batchSize); 166 } 167 if (batchTimeout != null) { 168 aggregator.setBatchTimeout(batchTimeout); 169 } 170 if (outBatchSize != null) { 171 aggregator.setOutBatchSize(outBatchSize); 172 } 173 if (groupExchanges != null) { 174 aggregator.setGroupExchanges(groupExchanges); 175 } 176 if (batchSizeFromConsumer != null) { 177 aggregator.setBatchConsumer(batchSizeFromConsumer); 178 } 179 180 return aggregator; 181 } 182 183 private AggregationStrategy createAggregationStrategy(RouteContext routeContext) { 184 AggregationStrategy strategy = getAggregationStrategy(); 185 if (strategy == null && strategyRef != null) { 186 strategy = routeContext.lookup(strategyRef, AggregationStrategy.class); 187 } 188 if (strategy == null) { 189 // fallback to use latest 190 strategy = new UseLatestAggregationStrategy(); 191 } 192 return strategy; 193 } 194 195 private AggregationCollection createAggregationCollection(RouteContext routeContext) { 196 AggregationCollection collection = getAggregationCollection(); 197 if (collection == null && collectionRef != null) { 198 collection = routeContext.lookup(collectionRef, AggregationCollection.class); 199 } 200 return collection; 201 } 202 203 public AggregationCollection getAggregationCollection() { 204 return aggregationCollection; 205 } 206 207 public void setAggregationCollection(AggregationCollection aggregationCollection) { 208 this.aggregationCollection = aggregationCollection; 209 } 210 211 public AggregationStrategy getAggregationStrategy() { 212 return aggregationStrategy; 213 } 214 215 public void setAggregationStrategy(AggregationStrategy aggregationStrategy) { 216 this.aggregationStrategy = aggregationStrategy; 217 } 218 219 public Integer getBatchSize() { 220 return batchSize; 221 } 222 223 public void setBatchSize(Integer batchSize) { 224 this.batchSize = batchSize; 225 } 226 227 public Integer getOutBatchSize() { 228 return outBatchSize; 229 } 230 231 public void setOutBatchSize(Integer outBatchSize) { 232 this.outBatchSize = outBatchSize; 233 } 234 235 public Long getBatchTimeout() { 236 return batchTimeout; 237 } 238 239 public void setBatchTimeout(Long batchTimeout) { 240 this.batchTimeout = batchTimeout; 241 } 242 243 public String getStrategyRef() { 244 return strategyRef; 245 } 246 247 public void setStrategyRef(String strategyRef) { 248 this.strategyRef = strategyRef; 249 } 250 251 public String getCollectionRef() { 252 return collectionRef; 253 } 254 255 public void setCollectionRef(String collectionRef) { 256 this.collectionRef = collectionRef; 257 } 258 259 public void setCompletionPredicate(ExpressionSubElementDefinition completionPredicate) { 260 this.completionPredicate = completionPredicate; 261 } 262 263 public ExpressionSubElementDefinition getCompletionPredicate() { 264 return completionPredicate; 265 } 266 267 public Boolean getGroupExchanges() { 268 return groupExchanges; 269 } 270 271 public void setGroupExchanges(Boolean groupExchanges) { 272 this.groupExchanges = groupExchanges; 273 } 274 275 public Boolean getBatchSizeFromConsumer() { 276 return batchSizeFromConsumer; 277 } 278 279 public void setBatchSizeFromConsumer(Boolean batchSizeFromConsumer) { 280 this.batchSizeFromConsumer = batchSizeFromConsumer; 281 } 282 283 // Fluent API 284 //------------------------------------------------------------------------- 285 286 /** 287 * Enables the batch completion mode where we aggregate from a {@link org.apache.camel.BatchConsumer} 288 * and aggregate the total number of exchanges the {@link org.apache.camel.BatchConsumer} has reported 289 * as total by setting the exchange property {@link org.apache.camel.Exchange#BATCH_SIZE}. 290 * 291 * @return builder 292 */ 293 public AggregateDefinition batchSizeFromConsumer() { 294 setBatchSizeFromConsumer(true); 295 return this; 296 } 297 298 /** 299 * Sets the in batch size for number of exchanges received 300 * 301 * @param batchSize the batch size 302 * @return builder 303 */ 304 public AggregateDefinition batchSize(int batchSize) { 305 setBatchSize(batchSize); 306 return this; 307 } 308 309 /** 310 * Sets the out batch size for number of exchanges sent 311 * 312 * @param batchSize the batch size 313 * @return builder 314 */ 315 public AggregateDefinition outBatchSize(int batchSize) { 316 setOutBatchSize(batchSize); 317 return this; 318 } 319 320 /** 321 * Sets the batch timeout 322 * 323 * @param batchTimeout the timeout in millis 324 * @return the builder 325 */ 326 public AggregateDefinition batchTimeout(long batchTimeout) { 327 setBatchTimeout(batchTimeout); 328 return this; 329 } 330 331 /** 332 * Sets the aggregate collection to use 333 * 334 * @param aggregationCollection the aggregate collection to use 335 * @return the builder 336 */ 337 public AggregateDefinition aggregationCollection(AggregationCollection aggregationCollection) { 338 setAggregationCollection(aggregationCollection); 339 return this; 340 } 341 342 /** 343 * Sets the aggregate strategy to use 344 * 345 * @param aggregationStrategy the aggregate strategy to use 346 * @return the builder 347 */ 348 public AggregateDefinition aggregationStrategy(AggregationStrategy aggregationStrategy) { 349 setAggregationStrategy(aggregationStrategy); 350 return this; 351 } 352 353 /** 354 * Sets the aggregate collection to use 355 * 356 * @param collectionRef reference to the aggregate collection to lookup in the registry 357 * @return the builder 358 */ 359 public AggregateDefinition collectionRef(String collectionRef) { 360 setCollectionRef(collectionRef); 361 return this; 362 } 363 364 /** 365 * Sets the aggregate strategy to use 366 * 367 * @param strategyRef reference to the strategy to lookup in the registry 368 * @return the builder 369 */ 370 public AggregateDefinition strategyRef(String strategyRef) { 371 setStrategyRef(strategyRef); 372 return this; 373 } 374 375 /** 376 * Enables grouped exchanges, so the aggregator will group all aggregated exchanges into a single 377 * combined {@link org.apache.camel.impl.GroupedExchange} class holding all the aggregated exchanges. 378 * 379 * @return the builder 380 */ 381 public AggregateDefinition groupExchanges() { 382 setGroupExchanges(true); 383 return this; 384 } 385 386 /** 387 * Sets the predicate used to determine if the aggregation is completed 388 * 389 * @return the clause used to create the predicate 390 */ 391 public ExpressionClause<AggregateDefinition> completionPredicate() { 392 checkNoCompletedPredicate(); 393 ExpressionClause<AggregateDefinition> clause = new ExpressionClause<AggregateDefinition>(this); 394 setCompletionPredicate(new ExpressionSubElementDefinition((Expression)clause)); 395 return clause; 396 } 397 398 /** 399 * Sets the predicate used to determine if the aggregation is completed 400 * 401 * @param predicate the predicate 402 */ 403 public AggregateDefinition completionPredicate(Predicate predicate) { 404 checkNoCompletedPredicate(); 405 setCompletionPredicate(new ExpressionSubElementDefinition(predicate)); 406 return this; 407 } 408 409 protected void checkNoCompletedPredicate() { 410 if (getCompletionPredicate() != null) { 411 throw new IllegalArgumentException("There is already a completionPredicate defined for this aggregator: " + this); 412 } 413 } 414 415 public void setCorrelationExpression(ExpressionSubElementDefinition correlationExpression) { 416 this.correlationExpression = correlationExpression; 417 } 418 419 public ExpressionSubElementDefinition getCorrelationExpression() { 420 return correlationExpression; 421 } 422 423 // Section - Methods from ExpressionNode 424 // Needed to copy methods from ExpressionNode here so that I could specify the 425 // correlation expression as optional in JAXB 426 427 public ExpressionDefinition getExpression() { 428 if (expression == null && correlationExpression != null) { 429 expression = correlationExpression.getExpressionType(); 430 } 431 return expression; 432 } 433 434 public void setExpression(ExpressionDefinition expression) { 435 this.expression = expression; 436 } 437 438 public List<ProcessorDefinition> getOutputs() { 439 return outputs; 440 } 441 442 public void setOutputs(List<ProcessorDefinition> outputs) { 443 this.outputs = outputs; 444 } 445 446 }