001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor.aggregate;
018    
019    import java.util.AbstractCollection;
020    import java.util.Iterator;
021    import java.util.LinkedHashMap;
022    import java.util.Map;
023    import java.util.concurrent.atomic.AtomicInteger;
024    
025    import org.apache.camel.Exchange;
026    import org.apache.camel.Expression;
027    import org.apache.camel.util.ObjectHelper;
028    import org.apache.commons.logging.Log;
029    import org.apache.commons.logging.LogFactory;
030    
031    /**
032     * A {@link java.util.Collection} which aggregates exchanges together using a correlation
033     * expression so that there is only a single message exchange sent for a single
034     * correlation key.
035     *
036     * @version $Revision: 783663 $
037     */
038    public class DefaultAggregationCollection extends AbstractCollection<Exchange> implements AggregationCollection {
039    
040        private static final transient Log LOG = LogFactory.getLog(DefaultAggregationCollection.class);
041        private Expression correlationExpression;
042        private AggregationStrategy aggregationStrategy;
043        private final Map<Object, Exchange> aggregated = new LinkedHashMap<Object, Exchange>();
044        private final AtomicInteger counter = new AtomicInteger();
045    
046        public DefaultAggregationCollection() {
047        }
048    
049        public DefaultAggregationCollection(Expression correlationExpression, AggregationStrategy aggregationStrategy) {
050            this.correlationExpression = correlationExpression;
051            this.aggregationStrategy = aggregationStrategy;
052        }
053    
054        protected Map<Object, Exchange> getAggregated() {
055            return aggregated;
056        }
057    
058        @Override
059        public boolean add(Exchange exchange) {
060            // do not add exchange if it was filtered
061            Boolean filtered = exchange.getProperty(Exchange.FILTERED, Boolean.class);
062            if (filtered != null && filtered) {
063                if (LOG.isTraceEnabled()) {
064                    LOG.trace("Cannot aggregate exchange as its filtered: " + exchange);
065                }
066                return false;
067            }
068    
069            Object correlationKey = correlationExpression.evaluate(exchange, Object.class);
070            if (LOG.isTraceEnabled()) {
071                LOG.trace("Evaluated expression: " + correlationExpression + " as correlation key: " + correlationKey);
072            }
073    
074            // TODO: correlationKey evalutated to null should be skipped by default
075    
076            Exchange oldExchange = aggregated.get(correlationKey);
077            Exchange newExchange = exchange;
078    
079            Integer size = 1;
080            if (oldExchange != null) {
081                size = oldExchange.getProperty(Exchange.AGGREGATED_SIZE, Integer.class);
082                ObjectHelper.notNull(size, Exchange.AGGREGATED_SIZE + " on " + oldExchange);
083                size++;
084            }
085            newExchange = aggregationStrategy.aggregate(oldExchange, newExchange);
086            newExchange.setProperty(Exchange.AGGREGATED_SIZE, size);
087    
088            // update the index counter
089            newExchange.setProperty(Exchange.AGGREGATED_INDEX, counter.getAndIncrement());
090    
091            // the strategy may just update the old exchange and return it
092            if (!newExchange.equals(oldExchange)) {
093                if (LOG.isTraceEnabled()) {
094                    LOG.trace("Put exchange:" + newExchange + " with correlation key:"  + correlationKey);
095                }
096                aggregated.put(correlationKey, newExchange);
097            }
098    
099            onAggregation(correlationKey, newExchange);
100    
101            return true;
102        }
103    
104        public Iterator<Exchange> iterator() {
105            return aggregated.values().iterator();
106        }
107    
108        public int size() {
109            return aggregated.size();
110        }
111    
112        @Override
113        public void clear() {
114            aggregated.clear();
115            counter.set(0);
116        }
117    
118        public void onAggregation(Object correlationKey, Exchange exchange) {
119        }
120    
121        public Expression getCorrelationExpression() {
122            return correlationExpression;
123        }
124    
125        public void setCorrelationExpression(Expression correlationExpression) {
126            this.correlationExpression = correlationExpression;
127        }
128    
129        public AggregationStrategy getAggregationStrategy() {
130            return aggregationStrategy;
131        }
132    
133        public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
134            this.aggregationStrategy = aggregationStrategy;
135        }
136    }