001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import org.apache.camel.Exchange; 020 import org.apache.camel.ExchangePattern; 021 import org.apache.camel.Processor; 022 import org.apache.camel.Producer; 023 import org.apache.camel.impl.DefaultExchange; 024 import org.apache.camel.impl.ServiceSupport; 025 import org.apache.camel.processor.aggregate.AggregationStrategy; 026 import org.apache.commons.logging.Log; 027 import org.apache.commons.logging.LogFactory; 028 029 import static org.apache.camel.util.ExchangeHelper.copyResultsPreservePattern; 030 031 /** 032 * A content enricher that enriches input data by first obtaining additional 033 * data from a <i>resource</i> represented by an endpoint <code>producer</code> 034 * and second by aggregating input data and additional data. Aggregation of 035 * input data and additional data is delegated to an {@link AggregationStrategy} 036 * object. 037 */ 038 public class Enricher extends ServiceSupport implements Processor { 039 040 private static final transient Log LOG = LogFactory.getLog(Enricher.class); 041 private AggregationStrategy aggregationStrategy; 042 private Producer producer; 043 044 /** 045 * Creates a new {@link Enricher}. The default aggregation strategy is to 046 * copy the additional data obtained from the enricher's resource over the 047 * input data. When using the copy aggregation strategy the enricher 048 * degenerates to a normal transformer. 049 * 050 * @param producer producer to resource endpoint. 051 */ 052 public Enricher(Producer producer) { 053 this(defaultAggregationStrategy(), producer); 054 } 055 056 /** 057 * Creates a new {@link Enricher}. 058 * 059 * @param aggregationStrategy aggregation strategy to aggregate input data and additional data. 060 * @param producer producer to resource endpoint. 061 */ 062 public Enricher(AggregationStrategy aggregationStrategy, Producer producer) { 063 this.aggregationStrategy = aggregationStrategy; 064 this.producer = producer; 065 } 066 067 /** 068 * Sets the aggregation strategy for this enricher. 069 * 070 * @param aggregationStrategy the aggregationStrategy to set 071 */ 072 public void setAggregationStrategy(AggregationStrategy aggregationStrategy) { 073 this.aggregationStrategy = aggregationStrategy; 074 } 075 076 /** 077 * Sets the default aggregation strategy for this enricher. 078 */ 079 public void setDefaultAggregationStrategy() { 080 this.aggregationStrategy = defaultAggregationStrategy(); 081 } 082 083 /** 084 * Enriches the input data (<code>exchange</code>) by first obtaining 085 * additional data from an endpoint represented by an endpoint 086 * <code>producer</code> and second by aggregating input data and additional 087 * data. Aggregation of input data and additional data is delegated to an 088 * {@link AggregationStrategy} object set at construction time. If the 089 * message exchange with the resource endpoint fails then no aggregation 090 * will be done and the failed exchange content is copied over to the 091 * original message exchange. 092 * 093 * @param exchange input data. 094 */ 095 public void process(Exchange exchange) throws Exception { 096 Exchange resourceExchange = createResourceExchange(exchange, ExchangePattern.InOut); 097 producer.process(resourceExchange); 098 099 if (resourceExchange.isFailed()) { 100 // copy resource exchange onto original exchange (preserving pattern) 101 copyResultsPreservePattern(exchange, resourceExchange); 102 } else { 103 prepareResult(exchange); 104 105 // aggregate original exchange and resource exchange 106 // but do not aggregate if the resource exchange was filtered 107 Boolean filtered = resourceExchange.getProperty(Exchange.FILTERED, Boolean.class); 108 if (filtered == null || !filtered) { 109 Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange); 110 // copy aggregation result onto original exchange (preserving pattern) 111 copyResultsPreservePattern(exchange, aggregatedExchange); 112 } else { 113 if (LOG.isTraceEnabled()) { 114 LOG.trace("Cannot aggregate exchange as its filtered: " + resourceExchange); 115 } 116 } 117 } 118 } 119 120 /** 121 * Creates a new {@link DefaultExchange} instance from the given 122 * <code>exchange</code>. The resulting exchange's pattern is defined by 123 * <code>pattern</code>. 124 * 125 * @param source exchange to copy from. 126 * @param pattern exchange pattern to set. 127 * @return created exchange. 128 */ 129 protected Exchange createResourceExchange(Exchange source, ExchangePattern pattern) { 130 DefaultExchange target = new DefaultExchange(source.getContext()); 131 target.copyFrom(source); 132 target.setPattern(pattern); 133 return target; 134 } 135 136 private static void prepareResult(Exchange exchange) { 137 if (exchange.getPattern().isOutCapable()) { 138 exchange.getOut().copyFrom(exchange.getIn()); 139 } 140 } 141 142 private static AggregationStrategy defaultAggregationStrategy() { 143 return new CopyAggregationStrategy(); 144 } 145 146 protected void doStart() throws Exception { 147 producer.start(); 148 } 149 150 protected void doStop() throws Exception { 151 producer.stop(); 152 } 153 154 private static class CopyAggregationStrategy implements AggregationStrategy { 155 156 public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { 157 copyResultsPreservePattern(oldExchange, newExchange); 158 return oldExchange; 159 } 160 161 } 162 163 }