001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import org.apache.camel.Endpoint;
020    import org.apache.camel.Exchange;
021    import org.apache.camel.ExchangePattern;
022    import org.apache.camel.Message;
023    import org.apache.camel.Processor;
024    import org.apache.camel.Producer;
025    import org.apache.camel.ProducerCallback;
026    import org.apache.camel.impl.ProducerCache;
027    import org.apache.camel.impl.ServiceSupport;
028    import org.apache.camel.model.RoutingSlipDefinition;
029    import org.apache.camel.util.ExchangeHelper;
030    
031    import static org.apache.camel.util.ObjectHelper.notNull;
032    
033    /**
034     * Implements a <a href="http://camel.apache.org/routing-slip.html">Routing Slip</a>
035     * pattern where the list of actual endpoints to send a message exchange to are
036     * dependent on the value of a message header.
037     */
038    public class RoutingSlip extends ServiceSupport implements Processor, Traceable {
039        private ProducerCache producerCache;
040        private final String header;
041        private final String uriDelimiter;
042    
043        public RoutingSlip(String header) {
044            this(header, RoutingSlipDefinition.DEFAULT_DELIMITER);
045        }
046    
047        public RoutingSlip(String header, String uriDelimiter) {
048            notNull(header, "header");
049            notNull(uriDelimiter, "uriDelimiter");
050    
051            this.header = header;
052            this.uriDelimiter = uriDelimiter;
053        }
054    
055        @Override
056        public String toString() {
057            return "RoutingSlip[header=" + header + " uriDelimiter=" + uriDelimiter + "]";
058        }
059    
060        public String getTraceLabel() {
061            return "RoutingSlip[" + header + "]";
062        }
063    
064        public void process(Exchange exchange) throws Exception {
065            Message message = exchange.getIn();
066            String[] recipients = recipients(message);
067            Exchange current = exchange;
068    
069            for (String nextRecipient : recipients) {
070                Endpoint endpoint = resolveEndpoint(exchange, nextRecipient);
071    
072                Exchange copy = current.newInstance();
073                updateRoutingSlip(current);
074                copyOutToIn(copy, current);
075    
076                getProducerCache(exchange).doInProducer(endpoint, copy, null, new ProducerCallback<Object>() {
077                    public Object doInProducer(Producer producer, Exchange exchange, ExchangePattern exchangePattern) throws Exception {
078                        producer.process(exchange);
079                        return exchange;
080                    }
081                });
082    
083                current = copy;
084            }
085            ExchangeHelper.copyResults(exchange, current);
086        }
087    
088        protected ProducerCache getProducerCache(Exchange exchange) throws Exception {
089            // setup producer cache as we need to use the pluggable service pool defined on camel context
090            if (producerCache == null) {
091                this.producerCache = new ProducerCache(exchange.getContext().getProducerServicePool());
092                this.producerCache.start();
093            }
094            return this.producerCache;
095        }
096    
097        protected Endpoint resolveEndpoint(Exchange exchange, Object recipient) {
098            return ExchangeHelper.resolveEndpoint(exchange, recipient);
099        }
100    
101        protected void doStart() throws Exception {
102            if (producerCache != null) {
103                producerCache.start();
104            }
105        }
106    
107        protected void doStop() throws Exception {
108            if (producerCache != null) {
109                producerCache.stop();
110            }
111        }
112    
113        private void updateRoutingSlip(Exchange current) {
114            Message message = getResultMessage(current);
115            String oldSlip = message.getHeader(header, String.class);
116            if (oldSlip != null) {
117                int delimiterIndex = oldSlip.indexOf(uriDelimiter);
118                String newSlip = delimiterIndex > 0 ? oldSlip.substring(delimiterIndex + 1) : "";
119                message.setHeader(header, newSlip);
120            }
121        }
122    
123        /**
124         * Returns the outbound message if available. Otherwise return the inbound
125         * message.
126         */
127        private Message getResultMessage(Exchange exchange) {
128            if (exchange.hasOut()) {
129                return exchange.getOut();
130            } else {
131                // if this endpoint had no out (like a mock endpoint) just take the in
132                return exchange.getIn();
133            }
134        }
135    
136        /**
137         * Return the list of recipients defined in the routing slip in the
138         * specified message.
139         */
140        private String[] recipients(Message message) {
141            Object headerValue = message.getHeader(header);
142            if (headerValue != null && !headerValue.equals("")) {
143                return headerValue.toString().split(uriDelimiter);
144            }
145            return new String[] {};
146        }
147    
148        /**
149         * Copy the outbound data in 'source' to the inbound data in 'result'.
150         */
151        private void copyOutToIn(Exchange result, Exchange source) {
152            result.setException(source.getException());
153    
154            if (source.hasFault()) {
155                result.getFault().copyFrom(source.getFault());
156            }
157    
158            result.setIn(getResultMessage(source));
159    
160            result.getProperties().clear();
161            result.getProperties().putAll(source.getProperties());
162        }
163    }