001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor.aggregate; 018 019 import java.util.AbstractCollection; 020 import java.util.Iterator; 021 import java.util.LinkedHashMap; 022 import java.util.Map; 023 import java.util.concurrent.atomic.AtomicInteger; 024 025 import org.apache.camel.Exchange; 026 import org.apache.camel.Expression; 027 import org.apache.camel.util.ExchangeHelper; 028 import org.apache.camel.util.ObjectHelper; 029 import org.apache.commons.logging.Log; 030 import org.apache.commons.logging.LogFactory; 031 032 /** 033 * A {@link java.util.Collection} which aggregates exchanges together using a correlation 034 * expression so that there is only a single message exchange sent for a single 035 * correlation key. 036 * 037 * @version $Revision: 787563 $ 038 */ 039 public class DefaultAggregationCollection extends AbstractCollection<Exchange> implements AggregationCollection { 040 041 private static final transient Log LOG = LogFactory.getLog(DefaultAggregationCollection.class); 042 private Expression correlationExpression; 043 private AggregationStrategy aggregationStrategy; 044 private final Map<Object, Exchange> aggregated = new LinkedHashMap<Object, Exchange>(); 045 private final AtomicInteger counter = new AtomicInteger(); 046 047 public DefaultAggregationCollection() { 048 } 049 050 public DefaultAggregationCollection(Expression correlationExpression, AggregationStrategy aggregationStrategy) { 051 this.correlationExpression = correlationExpression; 052 this.aggregationStrategy = aggregationStrategy; 053 } 054 055 protected Map<Object, Exchange> getAggregated() { 056 return aggregated; 057 } 058 059 @Override 060 public boolean add(Exchange exchange) { 061 // do not add exchange if it was filtered 062 Boolean filtered = exchange.getProperty(Exchange.FILTERED, Boolean.class); 063 if (filtered != null && filtered) { 064 if (LOG.isTraceEnabled()) { 065 LOG.trace("Cannot aggregate exchange as its filtered: " + exchange); 066 } 067 return false; 068 } 069 070 Object correlationKey = correlationExpression.evaluate(exchange, Object.class); 071 if (LOG.isTraceEnabled()) { 072 LOG.trace("Evaluated expression: " + correlationExpression + " as correlation key: " + correlationKey); 073 } 074 075 // TODO: correlationKey evalutated to null should be skipped by default 076 077 Exchange oldExchange = aggregated.get(correlationKey); 078 Exchange newExchange = exchange; 079 080 Integer size = 1; 081 if (oldExchange != null) { 082 size = oldExchange.getProperty(Exchange.AGGREGATED_SIZE, Integer.class); 083 ObjectHelper.notNull(size, Exchange.AGGREGATED_SIZE + " on " + oldExchange); 084 size++; 085 } 086 087 // prepare the exchanges for aggregation 088 ExchangeHelper.prepareAggregation(oldExchange, newExchange); 089 newExchange = aggregationStrategy.aggregate(oldExchange, newExchange); 090 newExchange.setProperty(Exchange.AGGREGATED_SIZE, size); 091 092 // update the index counter 093 newExchange.setProperty(Exchange.AGGREGATED_INDEX, counter.getAndIncrement()); 094 095 // the strategy may just update the old exchange and return it 096 if (!newExchange.equals(oldExchange)) { 097 if (LOG.isTraceEnabled()) { 098 LOG.trace("Put exchange:" + newExchange + " with correlation key:" + correlationKey); 099 } 100 aggregated.put(correlationKey, newExchange); 101 } 102 103 onAggregation(correlationKey, newExchange); 104 105 return true; 106 } 107 108 public Iterator<Exchange> iterator() { 109 return aggregated.values().iterator(); 110 } 111 112 public int size() { 113 return aggregated.size(); 114 } 115 116 @Override 117 public void clear() { 118 aggregated.clear(); 119 counter.set(0); 120 } 121 122 public void onAggregation(Object correlationKey, Exchange exchange) { 123 } 124 125 public Expression getCorrelationExpression() { 126 return correlationExpression; 127 } 128 129 public void setCorrelationExpression(Expression correlationExpression) { 130 this.correlationExpression = correlationExpression; 131 } 132 133 public AggregationStrategy getAggregationStrategy() { 134 return aggregationStrategy; 135 } 136 137 public void setAggregationStrategy(AggregationStrategy aggregationStrategy) { 138 this.aggregationStrategy = aggregationStrategy; 139 } 140 }