001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import org.apache.camel.Exchange;
020    import org.apache.camel.ExchangePattern;
021    import org.apache.camel.PollingConsumer;
022    import org.apache.camel.Processor;
023    import org.apache.camel.impl.DefaultExchange;
024    import org.apache.camel.impl.ServiceSupport;
025    import org.apache.camel.processor.aggregate.AggregationStrategy;
026    import org.apache.camel.util.ExchangeHelper;
027    import org.apache.commons.logging.Log;
028    import org.apache.commons.logging.LogFactory;
029    import static org.apache.camel.util.ExchangeHelper.copyResultsPreservePattern;
030    
031    /**
032     * A content enricher that enriches input data by first obtaining additional
033     * data from a <i>resource</i> represented by an endpoint <code>producer</code>
034     * and second by aggregating input data and additional data. Aggregation of
035     * input data and additional data is delegated to an {@link org.apache.camel.processor.aggregate.AggregationStrategy}
036     * object.
037     * <p/>
038     * Uses a {@link org.apache.camel.PollingConsumer} to obatin the additional data as opposed to {@link Enricher}
039     * that uses a {@link org.apache.camel.Producer}.
040     *
041     * @see Enricher
042     */
043    public class PollEnricher extends ServiceSupport implements Processor {
044    
045        private static final transient Log LOG = LogFactory.getLog(PollEnricher.class);
046        private AggregationStrategy aggregationStrategy;
047        private PollingConsumer consumer;
048        private long timeout;
049    
050        /**
051         * Creates a new {@link PollEnricher}. The default aggregation strategy is to
052         * copy the additional data obtained from the enricher's resource over the
053         * input data. When using the copy aggregation strategy the enricher
054         * degenerates to a normal transformer.
055         *
056         * @param consumer consumer to resource endpoint.
057         */
058        public PollEnricher(PollingConsumer consumer) {
059            this(defaultAggregationStrategy(), consumer, 0);
060        }
061    
062        /**
063         * Creates a new {@link PollEnricher}.
064         *
065         * @param aggregationStrategy  aggregation strategy to aggregate input data and additional data.
066         * @param consumer consumer to resource endpoint.
067         */
068        public PollEnricher(AggregationStrategy aggregationStrategy, PollingConsumer consumer, long timeout) {
069            this.aggregationStrategy = aggregationStrategy;
070            this.consumer = consumer;
071            this.timeout = timeout;
072        }
073    
074        /**
075         * Sets the aggregation strategy for this poll enricher.
076         *
077         * @param aggregationStrategy the aggregationStrategy to set
078         */
079        public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
080            this.aggregationStrategy = aggregationStrategy;
081        }
082    
083        /**
084         * Sets the default aggregation strategy for this poll enricher.
085         */
086        public void setDefaultAggregationStrategy() {
087            this.aggregationStrategy = defaultAggregationStrategy();
088        }
089    
090        /**
091         * Sets the timeout to use when polling.
092         * <p/>
093         * Use 0 or negative to not use timeout and block until data is available.
094         *
095         * @param timeout timeout in millis.
096         */
097        public void setTimeout(long timeout) {
098            this.timeout = timeout;
099        }
100    
101        /**
102         * Enriches the input data (<code>exchange</code>) by first obtaining
103         * additional data from an endpoint represented by an endpoint
104         * <code>producer</code> and second by aggregating input data and additional
105         * data. Aggregation of input data and additional data is delegated to an
106         * {@link org.apache.camel.processor.aggregate.AggregationStrategy} object set at construction time. If the
107         * message exchange with the resource endpoint fails then no aggregation
108         * will be done and the failed exchange content is copied over to the
109         * original message exchange.
110         *
111         * @param exchange input data.
112         */
113        public void process(Exchange exchange) throws Exception {
114            Exchange resourceExchange;
115            if (timeout < 0) {
116                if (LOG.isDebugEnabled()) {
117                    LOG.debug("Consumer receive: " + consumer);
118                }
119                resourceExchange = consumer.receive();
120            } else if (timeout == 0) {
121                if (LOG.isDebugEnabled()) {
122                    LOG.debug("Consumer receiveNoWait: " + consumer);
123                }
124                resourceExchange = consumer.receiveNoWait();
125            } else {
126                if (LOG.isDebugEnabled()) {
127                    LOG.debug("Consumer receive with timeout: " + timeout + " ms. " + consumer);
128                }
129                resourceExchange = consumer.receive(timeout);
130            }
131    
132            if (resourceExchange != null && resourceExchange.isFailed()) {
133                // copy resource exchange onto original exchange (preserving pattern)
134                copyResultsPreservePattern(exchange, resourceExchange);
135            } else {
136                prepareResult(exchange);
137    
138                // aggregate original exchange and resource exchange
139                // but do not aggregate if the resource exchange was filtered
140                Boolean filtered = null;
141                if (resourceExchange != null) {
142                    filtered = resourceExchange.getProperty(Exchange.FILTERED, Boolean.class);
143                }
144                if (filtered == null || !filtered) {
145                    // prepare the exchanges for aggregation
146                    ExchangeHelper.prepareAggregation(exchange, resourceExchange);
147                    Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange);
148                    // copy aggregation result onto original exchange (preserving pattern)
149                    copyResultsPreservePattern(exchange, aggregatedExchange);
150                } else {
151                    if (LOG.isTraceEnabled()) {
152                        LOG.trace("Cannot aggregate exchange as its filtered: " + resourceExchange);
153                    }
154                }
155            }
156        }
157    
158        /**
159         * Creates a new {@link org.apache.camel.impl.DefaultExchange} instance from the given
160         * <code>exchange</code>. The resulting exchange's pattern is defined by
161         * <code>pattern</code>.
162         *
163         * @param source  exchange to copy from.
164         * @param pattern exchange pattern to set.
165         * @return created exchange.
166         */
167        protected Exchange createResourceExchange(Exchange source, ExchangePattern pattern) {
168            Exchange target = source.copy();
169            target.setPattern(pattern);
170            return target;
171        }
172    
173        private static void prepareResult(Exchange exchange) {
174            if (exchange.getPattern().isOutCapable()) {
175                exchange.getOut().copyFrom(exchange.getIn());
176            }
177        }
178    
179        private static AggregationStrategy defaultAggregationStrategy() {
180            return new CopyAggregationStrategy();
181        }
182    
183        @Override
184        public String toString() {
185            return "PollEnrich[" + consumer + "]";
186        }
187    
188        protected void doStart() throws Exception {
189            consumer.start();
190        }
191    
192        protected void doStop() throws Exception {
193            consumer.stop();
194        }
195    
196        private static class CopyAggregationStrategy implements AggregationStrategy {
197    
198            public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
199                copyResultsPreservePattern(oldExchange, newExchange);
200                return oldExchange;
201            }
202    
203        }
204    
205    }