001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor.aggregate; 018 019 import java.util.AbstractCollection; 020 import java.util.Iterator; 021 import java.util.LinkedHashMap; 022 import java.util.Map; 023 import java.util.concurrent.atomic.AtomicInteger; 024 025 import org.apache.camel.Exchange; 026 import org.apache.camel.Expression; 027 import org.apache.camel.util.ObjectHelper; 028 import org.apache.commons.logging.Log; 029 import org.apache.commons.logging.LogFactory; 030 031 /** 032 * A {@link java.util.Collection} which aggregates exchanges together using a correlation 033 * expression so that there is only a single message exchange sent for a single 034 * correlation key. 035 * 036 * @version $Revision: 783663 $ 037 */ 038 public class DefaultAggregationCollection extends AbstractCollection<Exchange> implements AggregationCollection { 039 040 private static final transient Log LOG = LogFactory.getLog(DefaultAggregationCollection.class); 041 private Expression correlationExpression; 042 private AggregationStrategy aggregationStrategy; 043 private final Map<Object, Exchange> aggregated = new LinkedHashMap<Object, Exchange>(); 044 private final AtomicInteger counter = new AtomicInteger(); 045 046 public DefaultAggregationCollection() { 047 } 048 049 public DefaultAggregationCollection(Expression correlationExpression, AggregationStrategy aggregationStrategy) { 050 this.correlationExpression = correlationExpression; 051 this.aggregationStrategy = aggregationStrategy; 052 } 053 054 protected Map<Object, Exchange> getAggregated() { 055 return aggregated; 056 } 057 058 @Override 059 public boolean add(Exchange exchange) { 060 // do not add exchange if it was filtered 061 Boolean filtered = exchange.getProperty(Exchange.FILTERED, Boolean.class); 062 if (filtered != null && filtered) { 063 if (LOG.isTraceEnabled()) { 064 LOG.trace("Cannot aggregate exchange as its filtered: " + exchange); 065 } 066 return false; 067 } 068 069 Object correlationKey = correlationExpression.evaluate(exchange, Object.class); 070 if (LOG.isTraceEnabled()) { 071 LOG.trace("Evaluated expression: " + correlationExpression + " as correlation key: " + correlationKey); 072 } 073 074 // TODO: correlationKey evalutated to null should be skipped by default 075 076 Exchange oldExchange = aggregated.get(correlationKey); 077 Exchange newExchange = exchange; 078 079 Integer size = 1; 080 if (oldExchange != null) { 081 size = oldExchange.getProperty(Exchange.AGGREGATED_SIZE, Integer.class); 082 ObjectHelper.notNull(size, Exchange.AGGREGATED_SIZE + " on " + oldExchange); 083 size++; 084 } 085 newExchange = aggregationStrategy.aggregate(oldExchange, newExchange); 086 newExchange.setProperty(Exchange.AGGREGATED_SIZE, size); 087 088 // update the index counter 089 newExchange.setProperty(Exchange.AGGREGATED_INDEX, counter.getAndIncrement()); 090 091 // the strategy may just update the old exchange and return it 092 if (!newExchange.equals(oldExchange)) { 093 if (LOG.isTraceEnabled()) { 094 LOG.trace("Put exchange:" + newExchange + " with correlation key:" + correlationKey); 095 } 096 aggregated.put(correlationKey, newExchange); 097 } 098 099 onAggregation(correlationKey, newExchange); 100 101 return true; 102 } 103 104 public Iterator<Exchange> iterator() { 105 return aggregated.values().iterator(); 106 } 107 108 public int size() { 109 return aggregated.size(); 110 } 111 112 @Override 113 public void clear() { 114 aggregated.clear(); 115 counter.set(0); 116 } 117 118 public void onAggregation(Object correlationKey, Exchange exchange) { 119 } 120 121 public Expression getCorrelationExpression() { 122 return correlationExpression; 123 } 124 125 public void setCorrelationExpression(Expression correlationExpression) { 126 this.correlationExpression = correlationExpression; 127 } 128 129 public AggregationStrategy getAggregationStrategy() { 130 return aggregationStrategy; 131 } 132 133 public void setAggregationStrategy(AggregationStrategy aggregationStrategy) { 134 this.aggregationStrategy = aggregationStrategy; 135 } 136 }