001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.jms;
018    
019    import org.apache.camel.Exchange;
020    import org.apache.camel.util.ExchangeHelper;
021    import org.apache.commons.logging.Log;
022    import org.apache.commons.logging.LogFactory;
023    
024    import javax.jms.BytesMessage;
025    import javax.jms.Destination;
026    import javax.jms.JMSException;
027    import javax.jms.MapMessage;
028    import javax.jms.Message;
029    import javax.jms.ObjectMessage;
030    import javax.jms.Session;
031    import javax.jms.StreamMessage;
032    import javax.jms.TextMessage;
033    import java.io.Serializable;
034    import java.util.Enumeration;
035    import java.util.HashMap;
036    import java.util.Map;
037    import java.util.Set;
038    
039    /**
040     * A Strategy used to convert between a Camel {@JmsExchange} and {@JmsMessage}
041     * to and from a JMS {@link Message}
042     * 
043     * @version $Revision: 564568 $
044     */
045    public class JmsBinding {
046        private static final transient Log LOG = LogFactory.getLog(JmsBinding.class);
047    
048        /**
049         * Extracts the body from the JMS message
050         * 
051         * @param exchange
052         * @param message
053         */
054        public Object extractBodyFromJms(JmsExchange exchange, Message message) {
055            try {
056                if (message instanceof ObjectMessage) {
057                    ObjectMessage objectMessage = (ObjectMessage)message;
058                    return objectMessage.getObject();
059                } else if (message instanceof TextMessage) {
060                    TextMessage textMessage = (TextMessage)message;
061                    return textMessage.getText();
062                } else if (message instanceof MapMessage) {
063                    return createMapFromMapMessage((MapMessage)message);
064                } else if (message instanceof BytesMessage || message instanceof StreamMessage) {
065                    // TODO we need a decoder to be able to process the message
066                    return message;
067                } else {
068                    return null;
069                }
070            } catch (JMSException e) {
071                throw new RuntimeJmsException("Failed to extract body due to: " + e + ". Message: " + message, e);
072            }
073        }
074    
075        /**
076         * Creates a JMS message from the Camel exchange and message
077         * 
078         * @param session the JMS session used to create the message
079         * @return a newly created JMS Message instance containing the
080         * @throws JMSException if the message could not be created
081         */
082        public Message makeJmsMessage(Exchange exchange, Session session) throws JMSException {
083            Message answer = createJmsMessage(exchange.getIn().getBody(), session);
084            appendJmsProperties(answer, exchange, session);
085            return answer;
086        }
087    
088        /**
089         * Appends the JMS headers from the Camel {@link JmsMessage}
090         */
091        protected void appendJmsProperties(Message jmsMessage, Exchange exchange, Session session) throws JMSException {
092            org.apache.camel.Message in = exchange.getIn();
093            Set<Map.Entry<String, Object>> entries = in.getHeaders().entrySet();
094            for (Map.Entry<String, Object> entry : entries) {
095                String headerName = entry.getKey();
096                Object headerValue = entry.getValue();
097                
098                if (headerName.startsWith("JMS")) {
099                    if (headerName.equals("JMSCorrelationID")) {
100                        jmsMessage.setJMSCorrelationID(ExchangeHelper.convertToType(exchange, String.class, headerValue));
101                    }
102                    else if (headerName.equals("JMSCorrelationID")) {
103                        jmsMessage.setJMSCorrelationID(ExchangeHelper.convertToType(exchange, String.class, headerValue));
104                    }
105                    else if (headerName.equals("JMSReplyTo")) {
106                        jmsMessage.setJMSReplyTo(ExchangeHelper.convertToType(exchange, Destination.class, headerValue));
107                    }
108                    else if (headerName.equals("JMSType")) {
109                        jmsMessage.setJMSType(ExchangeHelper.convertToType(exchange, String.class, headerValue));
110                    }
111                    else if (LOG.isDebugEnabled()) {
112                        // The following properties are set by the MessageProducer
113                        //   JMSDeliveryMode, JMSDestination, JMSExpiration, JMSPriority,
114                        // The following are set on the underlying JMS provider
115                        //   JMSMessageID, JMSTimestamp, JMSRedelivered
116                        LOG.debug("Ignoring JMS header: " + headerName + " with value: " + headerValue);
117                    }
118                }
119                else if (shouldOutputHeader(in, headerName, headerValue)) {
120                    jmsMessage.setObjectProperty(headerName, headerValue);
121                }
122            }
123        }
124    
125        protected Message createJmsMessage(Object body, Session session) throws JMSException {
126            if (body instanceof String) {
127                return session.createTextMessage((String)body);
128            } else if (body instanceof Serializable) {
129                return session.createObjectMessage((Serializable)body);
130            } else {
131                return session.createMessage();
132            }
133        }
134    
135        /**
136         * Extracts a {@link Map} from a {@link MapMessage}
137         */
138        public Map<String, Object> createMapFromMapMessage(MapMessage message) throws JMSException {
139            Map<String, Object> answer = new HashMap<String, Object>();
140            Enumeration names = message.getPropertyNames();
141            while (names.hasMoreElements()) {
142                String name = names.nextElement().toString();
143                Object value = message.getObject(name);
144                answer.put(name, value);
145            }
146            return answer;
147        }
148    
149        /**
150         * Strategy to allow filtering of headers which are put on the JMS message
151         */
152        protected boolean shouldOutputHeader(org.apache.camel.Message camelMessage, String headerName, Object headerValue) {
153            return true;
154        }
155    }