001 /** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one or more 004 * contributor license agreements. See the NOTICE file distributed with 005 * this work for additional information regarding copyright ownership. 006 * The ASF licenses this file to You under the Apache License, Version 2.0 007 * (the "License"); you may not use this file except in compliance with 008 * the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.camel.builder; 019 020 import org.apache.camel.Exchange; 021 import org.apache.camel.Expression; 022 import org.apache.camel.Processor; 023 import org.apache.camel.Route; 024 import org.apache.camel.Service; 025 import org.apache.camel.processor.Aggregator; 026 import org.apache.camel.processor.aggregate.AggregationStrategy; 027 import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy; 028 029 import java.util.List; 030 031 /** 032 * A builder for the <a href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a> pattern 033 * where a batch of messages are processed (up to a maximum amount or until some timeout is reached) 034 * and messages for the same correlation key are combined together using some kind of 035 * {@link AggregationStrategy ) (by default the latest message is used) to compress many message exchanges 036 * into a smaller number of exchanges. 037 * <p/> 038 * A good example of this is stock market data; you may be receiving 30,000 messages/second and you may want to 039 * throttle it right down so that multiple messages for the same stock are combined (or just the latest 040 * message is used and older prices are discarded). Another idea is to combine line item messages together 041 * into a single invoice message. 042 * 043 * @version $Revision: 1.1 $ 044 */ 045 public class AggregatorBuilder extends FromBuilder { 046 private final Expression correlationExpression; 047 private long batchTimeout = 1000L; 048 private int batchSize = 50000; 049 private AggregationStrategy aggregationStrategy = new UseLatestAggregationStrategy(); 050 051 public AggregatorBuilder(FromBuilder builder, Expression correlationExpression) { 052 super(builder); 053 this.correlationExpression = correlationExpression; 054 } 055 056 @Override 057 public Route createRoute() throws Exception { 058 final Processor processor = super.createProcessor(); 059 final Aggregator service = new Aggregator(getFrom(), processor, correlationExpression, aggregationStrategy); 060 061 return new Route<Exchange>(getFrom(), service) { 062 063 @Override 064 public String toString() { 065 return "AggregatorRoute[" + getEndpoint() + " -> " + processor + "]"; 066 } 067 }; 068 } 069 070 // Builder methods 071 //------------------------------------------------------------------------- 072 public AggregatorBuilder aggregationStrategy(AggregationStrategy aggregationStrategy) { 073 setAggregationStrategy(aggregationStrategy); 074 return this; 075 } 076 077 public AggregatorBuilder batchSize(int batchSize) { 078 setBatchSize(batchSize); 079 return this; 080 } 081 082 public AggregatorBuilder batchTimeout(int batchTimeout) { 083 setBatchTimeout(batchTimeout); 084 return this; 085 } 086 087 // Properties 088 //------------------------------------------------------------------------- 089 public AggregationStrategy getAggregationStrategy() { 090 return aggregationStrategy; 091 } 092 093 public void setAggregationStrategy(AggregationStrategy aggregationStrategy) { 094 this.aggregationStrategy = aggregationStrategy; 095 } 096 097 public int getBatchSize() { 098 return batchSize; 099 } 100 101 public void setBatchSize(int batchSize) { 102 this.batchSize = batchSize; 103 } 104 105 public long getBatchTimeout() { 106 return batchTimeout; 107 } 108 109 public void setBatchTimeout(long batchTimeout) { 110 this.batchTimeout = batchTimeout; 111 } 112 113 }