001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.jms;
018    
019    import org.apache.activemq.ActiveMQMessageProducerSupport;
020    import org.apache.activemq.ActiveMQSession;
021    import org.apache.activemq.command.ActiveMQMessage;
022    import org.apache.activemq.util.JMSExceptionSupport;
023    import org.apache.camel.Endpoint;
024    import org.apache.camel.Producer;
025    import org.apache.camel.component.jms.JmsExchange;
026    import org.apache.camel.util.ObjectHelper;
027    
028    import javax.jms.Destination;
029    import javax.jms.IllegalStateException;
030    import javax.jms.JMSException;
031    import javax.jms.Message;
032    
033    /**
034     * A JMS {@link javax.jms.MessageProducer} which sends message exchanges to a
035     * Camel {@link Endpoint}
036     * 
037     * @version $Revision: $
038     */
039    public class CamelMessageProducer extends ActiveMQMessageProducerSupport {
040        private final CamelDestination destination;
041        private final Endpoint endpoint;
042        protected Producer producer;
043        private boolean closed;
044    
045        public CamelMessageProducer(CamelDestination destination, Endpoint endpoint, ActiveMQSession session) throws JMSException {
046            super(session);
047            this.destination = destination;
048            this.endpoint = endpoint;
049            try {
050                this.producer = endpoint.createProducer();
051            }
052            catch (JMSException e) {
053                throw e;
054            }
055            catch (Exception e) {
056                throw JMSExceptionSupport.create(e);
057            }
058        }
059    
060        public CamelDestination getDestination() throws JMSException {
061            return destination;
062        }
063    
064        public Endpoint getEndpoint() {
065            return endpoint;
066        }
067    
068        public void close() throws JMSException {
069            if (!closed) {
070                closed = true;
071                try {
072                    producer.stop();
073                }
074                catch (JMSException e) {
075                    throw e;
076                }
077                catch (Exception e) {
078                    throw JMSExceptionSupport.create(e);
079                }
080            }
081        }
082    
083        public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
084            CamelDestination camelDestination = null;
085            if (ObjectHelper.equals(destination, this.destination)) {
086                camelDestination = this.destination;
087            }
088            else {
089                // TODO support any CamelDestination?
090                throw new IllegalArgumentException("Invalid destination setting: " + destination + " when expected: " + this.destination);
091            }
092            try {
093                JmsExchange exchange = new JmsExchange(endpoint.getContext(), camelDestination.getBinding(), message);
094                producer.process(exchange);
095            }
096            catch (JMSException e) {
097                throw e;
098            }
099            catch (Exception e) {
100                throw JMSExceptionSupport.create(e);
101            }
102        }
103    
104        protected void checkClosed() throws IllegalStateException {
105            if (closed) {
106                throw new IllegalStateException("The producer is closed");
107            }
108        }
109    }