001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import org.apache.camel.Endpoint;
020    import org.apache.camel.Exchange;
021    import org.apache.camel.ExchangePattern;
022    import org.apache.camel.Message;
023    import org.apache.camel.Processor;
024    import org.apache.camel.Producer;
025    import org.apache.camel.ProducerCallback;
026    import org.apache.camel.impl.ProducerCache;
027    import org.apache.camel.impl.ServiceSupport;
028    import org.apache.camel.model.RoutingSlipDefinition;
029    import org.apache.camel.util.ExchangeHelper;
030    import org.apache.commons.logging.Log;
031    import org.apache.commons.logging.LogFactory;
032    import static org.apache.camel.util.ObjectHelper.notNull;
033    
034    /**
035     * Implements a <a href="http://camel.apache.org/routing-slip.html">Routing Slip</a>
036     * pattern where the list of actual endpoints to send a message exchange to are
037     * dependent on the value of a message header.
038     */
039    public class RoutingSlip extends ServiceSupport implements Processor {
040        private static final transient Log LOG = LogFactory.getLog(RoutingSlip.class);
041        private ProducerCache producerCache;
042        private final String header;
043        private final String uriDelimiter;
044    
045        public RoutingSlip(String header) {
046            this(header, RoutingSlipDefinition.DEFAULT_DELIMITER);
047        }
048    
049        public RoutingSlip(String header, String uriDelimiter) {
050            notNull(header, "header");
051            notNull(uriDelimiter, "uriDelimiter");
052    
053            this.header = header;
054            this.uriDelimiter = uriDelimiter;
055        }
056    
057        @Override
058        public String toString() {
059            return "RoutingSlip[header=" + header + " uriDelimiter=" + uriDelimiter + "]";
060        }
061    
062        public void process(Exchange exchange) throws Exception {
063            Message message = exchange.getIn();
064            String[] recipients = recipients(message);
065            Exchange current = exchange;
066    
067            for (String nextRecipient : recipients) {
068                Endpoint endpoint = resolveEndpoint(exchange, nextRecipient);
069    
070                Exchange copy = current.newInstance();
071                updateRoutingSlip(current);
072                copyOutToIn(copy, current);
073    
074                getProducerCache(exchange).doInProducer(endpoint, copy, null, new ProducerCallback<Object>() {
075                    public Object doInProducer(Producer producer, Exchange exchange, ExchangePattern exchangePattern) throws Exception {
076                        producer.process(exchange);
077                        return exchange;
078                    }
079                });
080    
081                current = copy;
082            }
083            ExchangeHelper.copyResults(exchange, current);
084        }
085    
086        protected ProducerCache getProducerCache(Exchange exchange) throws Exception {
087            // setup producer cache as we need to use the pluggable service pool defined on camel context
088            if (producerCache == null) {
089                this.producerCache = new ProducerCache(exchange.getContext().getProducerServicePool());
090                this.producerCache.start();
091            }
092            return this.producerCache;
093        }
094    
095        protected Endpoint resolveEndpoint(Exchange exchange, Object recipient) {
096            return ExchangeHelper.resolveEndpoint(exchange, recipient);
097        }
098    
099        protected void doStart() throws Exception {
100            if (producerCache != null) {
101                producerCache.start();
102            }
103        }
104    
105        protected void doStop() throws Exception {
106            if (producerCache != null) {
107                producerCache.stop();
108            }
109        }
110    
111        private void updateRoutingSlip(Exchange current) {
112            Message message = getResultMessage(current);
113            String oldSlip = message.getHeader(header, String.class);
114            if (oldSlip != null) {
115                int delimiterIndex = oldSlip.indexOf(uriDelimiter);
116                String newSlip = delimiterIndex > 0 ? oldSlip.substring(delimiterIndex + 1) : "";
117                message.setHeader(header, newSlip);
118            }
119        }
120    
121        /**
122         * Returns the outbound message if available. Otherwise return the inbound
123         * message.
124         */
125        private Message getResultMessage(Exchange exchange) {
126            if (exchange.hasOut()) {
127                return exchange.getOut();
128            } else {
129                // if this endpoint had no out (like a mock endpoint) just take the in
130                return exchange.getIn();
131            }
132        }
133    
134        /**
135         * Return the list of recipients defined in the routing slip in the
136         * specified message.
137         */
138        private String[] recipients(Message message) {
139            Object headerValue = message.getHeader(header);
140            if (headerValue != null && !headerValue.equals("")) {
141                return headerValue.toString().split(uriDelimiter);
142            }
143            return new String[] {};
144        }
145    
146        /**
147         * Copy the outbound data in 'source' to the inbound data in 'result'.
148         */
149        private void copyOutToIn(Exchange result, Exchange source) {
150            result.setException(source.getException());
151    
152            if (source.hasFault()) {
153                result.getFault().copyFrom(source.getFault());
154            }
155    
156            result.setIn(getResultMessage(source));
157    
158            result.getProperties().clear();
159            result.getProperties().putAll(source.getProperties());
160        }
161    }