001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import org.apache.camel.Endpoint; 020 import org.apache.camel.Exchange; 021 import org.apache.camel.ExchangePattern; 022 import org.apache.camel.Message; 023 import org.apache.camel.Processor; 024 import org.apache.camel.Producer; 025 import org.apache.camel.ProducerCallback; 026 import org.apache.camel.impl.ProducerCache; 027 import org.apache.camel.impl.ServiceSupport; 028 import org.apache.camel.model.RoutingSlipDefinition; 029 import org.apache.camel.util.ExchangeHelper; 030 import org.apache.commons.logging.Log; 031 import org.apache.commons.logging.LogFactory; 032 import static org.apache.camel.util.ObjectHelper.notNull; 033 034 /** 035 * Implements a <a href="http://camel.apache.org/routing-slip.html">Routing Slip</a> 036 * pattern where the list of actual endpoints to send a message exchange to are 037 * dependent on the value of a message header. 038 */ 039 public class RoutingSlip extends ServiceSupport implements Processor { 040 private static final transient Log LOG = LogFactory.getLog(RoutingSlip.class); 041 private ProducerCache producerCache; 042 private final String header; 043 private final String uriDelimiter; 044 045 public RoutingSlip(String header) { 046 this(header, RoutingSlipDefinition.DEFAULT_DELIMITER); 047 } 048 049 public RoutingSlip(String header, String uriDelimiter) { 050 notNull(header, "header"); 051 notNull(uriDelimiter, "uriDelimiter"); 052 053 this.header = header; 054 this.uriDelimiter = uriDelimiter; 055 } 056 057 @Override 058 public String toString() { 059 return "RoutingSlip[header=" + header + " uriDelimiter=" + uriDelimiter + "]"; 060 } 061 062 public void process(Exchange exchange) throws Exception { 063 Message message = exchange.getIn(); 064 String[] recipients = recipients(message); 065 Exchange current = exchange; 066 067 for (String nextRecipient : recipients) { 068 Endpoint endpoint = resolveEndpoint(exchange, nextRecipient); 069 070 Exchange copy = current.newInstance(); 071 updateRoutingSlip(current); 072 copyOutToIn(copy, current); 073 074 getProducerCache(exchange).doInProducer(endpoint, copy, null, new ProducerCallback<Object>() { 075 public Object doInProducer(Producer producer, Exchange exchange, ExchangePattern exchangePattern) throws Exception { 076 producer.process(exchange); 077 return exchange; 078 } 079 }); 080 081 current = copy; 082 } 083 ExchangeHelper.copyResults(exchange, current); 084 } 085 086 protected ProducerCache getProducerCache(Exchange exchange) throws Exception { 087 // setup producer cache as we need to use the pluggable service pool defined on camel context 088 if (producerCache == null) { 089 this.producerCache = new ProducerCache(exchange.getContext().getProducerServicePool()); 090 this.producerCache.start(); 091 } 092 return this.producerCache; 093 } 094 095 protected Endpoint resolveEndpoint(Exchange exchange, Object recipient) { 096 return ExchangeHelper.resolveEndpoint(exchange, recipient); 097 } 098 099 protected void doStart() throws Exception { 100 if (producerCache != null) { 101 producerCache.start(); 102 } 103 } 104 105 protected void doStop() throws Exception { 106 if (producerCache != null) { 107 producerCache.stop(); 108 } 109 } 110 111 private void updateRoutingSlip(Exchange current) { 112 Message message = getResultMessage(current); 113 String oldSlip = message.getHeader(header, String.class); 114 if (oldSlip != null) { 115 int delimiterIndex = oldSlip.indexOf(uriDelimiter); 116 String newSlip = delimiterIndex > 0 ? oldSlip.substring(delimiterIndex + 1) : ""; 117 message.setHeader(header, newSlip); 118 } 119 } 120 121 /** 122 * Returns the outbound message if available. Otherwise return the inbound 123 * message. 124 */ 125 private Message getResultMessage(Exchange exchange) { 126 if (exchange.hasOut()) { 127 return exchange.getOut(); 128 } else { 129 // if this endpoint had no out (like a mock endpoint) just take the in 130 return exchange.getIn(); 131 } 132 } 133 134 /** 135 * Return the list of recipients defined in the routing slip in the 136 * specified message. 137 */ 138 private String[] recipients(Message message) { 139 Object headerValue = message.getHeader(header); 140 if (headerValue != null && !headerValue.equals("")) { 141 return headerValue.toString().split(uriDelimiter); 142 } 143 return new String[] {}; 144 } 145 146 /** 147 * Copy the outbound data in 'source' to the inbound data in 'result'. 148 */ 149 private void copyOutToIn(Exchange result, Exchange source) { 150 result.setException(source.getException()); 151 152 if (source.hasFault()) { 153 result.getFault().copyFrom(source.getFault()); 154 } 155 156 result.setIn(getResultMessage(source)); 157 158 result.getProperties().clear(); 159 result.getProperties().putAll(source.getProperties()); 160 } 161 }