001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import org.apache.camel.Endpoint; 020 import org.apache.camel.Exchange; 021 import org.apache.camel.Message; 022 import org.apache.camel.Processor; 023 import org.apache.camel.Producer; 024 import org.apache.camel.impl.ProducerCache; 025 import org.apache.camel.impl.ServiceSupport; 026 import org.apache.camel.model.RoutingSlipDefinition; 027 import org.apache.camel.util.ExchangeHelper; 028 import org.apache.commons.logging.Log; 029 import org.apache.commons.logging.LogFactory; 030 import static org.apache.camel.util.ObjectHelper.notNull; 031 032 /** 033 * Implements a <a href="http://camel.apache.org/routing-slip.html">Routing Slip</a> 034 * pattern where the list of actual endpoints to send a message exchange to are 035 * dependent on the value of a message header. 036 */ 037 public class RoutingSlip extends ServiceSupport implements Processor { 038 private static final transient Log LOG = LogFactory.getLog(RoutingSlip.class); 039 private final String header; 040 private final String uriDelimiter; 041 042 private ProducerCache producerCache = new ProducerCache(); 043 044 public RoutingSlip(String header) { 045 this(header, RoutingSlipDefinition.DEFAULT_DELIMITER); 046 } 047 048 public RoutingSlip(String header, String uriDelimiter) { 049 notNull(header, "header"); 050 notNull(uriDelimiter, "uriDelimiter"); 051 052 this.header = header; 053 this.uriDelimiter = uriDelimiter; 054 } 055 056 @Override 057 public String toString() { 058 return "RoutingSlip[header=" + header + " uriDelimiter=" + uriDelimiter + "]"; 059 } 060 061 public void process(Exchange exchange) throws Exception { 062 Message message = exchange.getIn(); 063 String[] recipients = recipients(message); 064 Exchange current = exchange; 065 066 for (String nextRecipient : recipients) { 067 Endpoint endpoint = resolveEndpoint(exchange, nextRecipient); 068 Producer producer = producerCache.getProducer(endpoint); 069 Exchange ex = current.newInstance(); 070 071 updateRoutingSlip(current); 072 copyOutToIn(ex, current); 073 074 producer.process(ex); 075 076 current = ex; 077 } 078 ExchangeHelper.copyResults(exchange, current); 079 } 080 081 protected Endpoint resolveEndpoint(Exchange exchange, Object recipient) { 082 return ExchangeHelper.resolveEndpoint(exchange, recipient); 083 } 084 085 protected void doStop() throws Exception { 086 producerCache.stop(); 087 } 088 089 protected void doStart() throws Exception { 090 } 091 092 private void updateRoutingSlip(Exchange current) { 093 Message message = getResultMessage(current); 094 String oldSlip = message.getHeader(header, String.class); 095 if (oldSlip != null) { 096 int delimiterIndex = oldSlip.indexOf(uriDelimiter); 097 String newSlip = delimiterIndex > 0 ? oldSlip.substring(delimiterIndex + 1) : ""; 098 message.setHeader(header, newSlip); 099 } 100 } 101 102 /** 103 * Returns the outbound message if available. Otherwise return the inbound 104 * message. 105 */ 106 private Message getResultMessage(Exchange exchange) { 107 Message message = exchange.getOut(false); 108 // if this endpoint had no out (like a mock endpoint) 109 // just take the in 110 if (message == null) { 111 message = exchange.getIn(); 112 } 113 return message; 114 } 115 116 /** 117 * Return the list of recipients defined in the routing slip in the 118 * specified message. 119 */ 120 private String[] recipients(Message message) { 121 Object headerValue = message.getHeader(header); 122 if (headerValue != null && !headerValue.equals("")) { 123 return headerValue.toString().split(uriDelimiter); 124 } 125 return new String[] {}; 126 } 127 128 /** 129 * Copy the outbound data in 'source' to the inbound data in 'result'. 130 */ 131 private void copyOutToIn(Exchange result, Exchange source) { 132 result.setException(source.getException()); 133 134 Message fault = source.getFault(false); 135 if (fault != null) { 136 result.getFault(true).copyFrom(fault); 137 } 138 139 result.setIn(getResultMessage(source)); 140 141 result.getProperties().clear(); 142 result.getProperties().putAll(source.getProperties()); 143 } 144 }