001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.component.jms; 018 019 import org.apache.camel.Exchange; 020 import org.apache.camel.util.ExchangeHelper; 021 import org.apache.commons.logging.Log; 022 import org.apache.commons.logging.LogFactory; 023 024 import javax.jms.BytesMessage; 025 import javax.jms.Destination; 026 import javax.jms.JMSException; 027 import javax.jms.MapMessage; 028 import javax.jms.Message; 029 import javax.jms.ObjectMessage; 030 import javax.jms.Session; 031 import javax.jms.StreamMessage; 032 import javax.jms.TextMessage; 033 import java.io.Serializable; 034 import java.util.Enumeration; 035 import java.util.HashMap; 036 import java.util.Map; 037 import java.util.Set; 038 039 /** 040 * A Strategy used to convert between a Camel {@JmsExchange} and {@JmsMessage} 041 * to and from a JMS {@link Message} 042 * 043 * @version $Revision: 564568 $ 044 */ 045 public class JmsBinding { 046 private static final transient Log LOG = LogFactory.getLog(JmsBinding.class); 047 048 /** 049 * Extracts the body from the JMS message 050 * 051 * @param exchange 052 * @param message 053 */ 054 public Object extractBodyFromJms(JmsExchange exchange, Message message) { 055 try { 056 if (message instanceof ObjectMessage) { 057 ObjectMessage objectMessage = (ObjectMessage)message; 058 return objectMessage.getObject(); 059 } else if (message instanceof TextMessage) { 060 TextMessage textMessage = (TextMessage)message; 061 return textMessage.getText(); 062 } else if (message instanceof MapMessage) { 063 return createMapFromMapMessage((MapMessage)message); 064 } else if (message instanceof BytesMessage || message instanceof StreamMessage) { 065 // TODO we need a decoder to be able to process the message 066 return message; 067 } else { 068 return null; 069 } 070 } catch (JMSException e) { 071 throw new RuntimeJmsException("Failed to extract body due to: " + e + ". Message: " + message, e); 072 } 073 } 074 075 /** 076 * Creates a JMS message from the Camel exchange and message 077 * 078 * @param session the JMS session used to create the message 079 * @return a newly created JMS Message instance containing the 080 * @throws JMSException if the message could not be created 081 */ 082 public Message makeJmsMessage(Exchange exchange, Session session) throws JMSException { 083 Message answer = createJmsMessage(exchange.getIn().getBody(), session); 084 appendJmsProperties(answer, exchange, session); 085 return answer; 086 } 087 088 /** 089 * Appends the JMS headers from the Camel {@link JmsMessage} 090 */ 091 protected void appendJmsProperties(Message jmsMessage, Exchange exchange, Session session) throws JMSException { 092 org.apache.camel.Message in = exchange.getIn(); 093 Set<Map.Entry<String, Object>> entries = in.getHeaders().entrySet(); 094 for (Map.Entry<String, Object> entry : entries) { 095 String headerName = entry.getKey(); 096 Object headerValue = entry.getValue(); 097 098 if (headerName.startsWith("JMS")) { 099 if (headerName.equals("JMSCorrelationID")) { 100 jmsMessage.setJMSCorrelationID(ExchangeHelper.convertToType(exchange, String.class, headerValue)); 101 } 102 else if (headerName.equals("JMSCorrelationID")) { 103 jmsMessage.setJMSCorrelationID(ExchangeHelper.convertToType(exchange, String.class, headerValue)); 104 } 105 else if (headerName.equals("JMSReplyTo")) { 106 jmsMessage.setJMSReplyTo(ExchangeHelper.convertToType(exchange, Destination.class, headerValue)); 107 } 108 else if (headerName.equals("JMSType")) { 109 jmsMessage.setJMSType(ExchangeHelper.convertToType(exchange, String.class, headerValue)); 110 } 111 else if (LOG.isDebugEnabled()) { 112 // The following properties are set by the MessageProducer 113 // JMSDeliveryMode, JMSDestination, JMSExpiration, JMSPriority, 114 // The following are set on the underlying JMS provider 115 // JMSMessageID, JMSTimestamp, JMSRedelivered 116 LOG.debug("Ignoring JMS header: " + headerName + " with value: " + headerValue); 117 } 118 } 119 else if (shouldOutputHeader(in, headerName, headerValue)) { 120 jmsMessage.setObjectProperty(headerName, headerValue); 121 } 122 } 123 } 124 125 protected Message createJmsMessage(Object body, Session session) throws JMSException { 126 if (body instanceof String) { 127 return session.createTextMessage((String)body); 128 } else if (body instanceof Serializable) { 129 return session.createObjectMessage((Serializable)body); 130 } else { 131 return session.createMessage(); 132 } 133 } 134 135 /** 136 * Extracts a {@link Map} from a {@link MapMessage} 137 */ 138 public Map<String, Object> createMapFromMapMessage(MapMessage message) throws JMSException { 139 Map<String, Object> answer = new HashMap<String, Object>(); 140 Enumeration names = message.getPropertyNames(); 141 while (names.hasMoreElements()) { 142 String name = names.nextElement().toString(); 143 Object value = message.getObject(name); 144 answer.put(name, value); 145 } 146 return answer; 147 } 148 149 /** 150 * Strategy to allow filtering of headers which are put on the JMS message 151 */ 152 protected boolean shouldOutputHeader(org.apache.camel.Message camelMessage, String headerName, Object headerValue) { 153 return true; 154 } 155 }