001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.jms;
018    
019    import javax.jms.Message;
020    
021    import org.apache.camel.PollingConsumer;
022    import org.apache.camel.Processor;
023    import org.apache.camel.impl.DefaultEndpoint;
024    
025    import org.springframework.jms.core.JmsOperations;
026    import org.springframework.jms.core.JmsTemplate;
027    import org.springframework.jms.listener.AbstractMessageListenerContainer;
028    
029    /**
030     * A <a href="http://activemq.apache.org/jms.html">JMS Endpoint</a>
031     * 
032     * @version $Revision:520964 $
033     */
034    public class JmsEndpoint extends DefaultEndpoint<JmsExchange> {
035        private JmsBinding binding;
036        private String destination;
037        private final boolean pubSubDomain;
038        private String selector;
039        private JmsConfiguration configuration;
040    
041        public JmsEndpoint(String uri, JmsComponent component, String destination, boolean pubSubDomain, JmsConfiguration configuration) {
042            super(uri, component);
043            this.configuration = configuration;
044            this.destination = destination;
045            this.pubSubDomain = pubSubDomain;
046        }
047    
048        public JmsProducer createProducer() throws Exception {
049            JmsOperations template = createJmsOperations();
050            return createProducer(template);
051        }
052    
053        /**
054         * Creates a producer using the given template
055         */
056        public JmsProducer createProducer(JmsOperations template) throws Exception {
057            if (template instanceof JmsTemplate) {
058                JmsTemplate jmsTemplate = (JmsTemplate)template;
059                jmsTemplate.setPubSubDomain(pubSubDomain);
060                jmsTemplate.setDefaultDestinationName(destination);
061            }
062            return new JmsProducer(this, template);
063        }
064    
065        public JmsConsumer createConsumer(Processor processor) throws Exception {
066            AbstractMessageListenerContainer listenerContainer = configuration.createMessageListenerContainer();
067            return createConsumer(processor, listenerContainer);
068        }
069    
070        /**
071         * Creates a consumer using the given processor and listener container
072         * 
073         * @param processor the processor to use to process the messages
074         * @param listenerContainer the listener container
075         * @return a newly created consumer
076         * @throws Exception if the consumer cannot be created
077         */
078        public JmsConsumer createConsumer(Processor processor, AbstractMessageListenerContainer listenerContainer) throws Exception {
079            listenerContainer.setDestinationName(destination);
080            listenerContainer.setPubSubDomain(pubSubDomain);
081            if (selector != null) {
082                listenerContainer.setMessageSelector(selector);
083            }
084            return new JmsConsumer(this, processor, listenerContainer);
085        }
086    
087        @Override
088        public PollingConsumer<JmsExchange> createPollingConsumer() throws Exception {
089            JmsOperations template = createJmsOperations();
090            return new JmsPollingConsumer(this, template);
091        }
092    
093        public JmsExchange createExchange() {
094            return new JmsExchange(getContext(), getBinding());
095        }
096    
097        public JmsExchange createExchange(Message message) {
098            return new JmsExchange(getContext(), getBinding(), message);
099        }
100    
101        // Properties
102        // -------------------------------------------------------------------------
103        public JmsBinding getBinding() {
104            if (binding == null) {
105                binding = new JmsBinding();
106            }
107            return binding;
108        }
109    
110        /**
111         * Sets the binding used to convert from a Camel message to and from a JMS
112         * message
113         * 
114         * @param binding the binding to use
115         */
116        public void setBinding(JmsBinding binding) {
117            this.binding = binding;
118        }
119    
120        public String getDestination() {
121            return destination;
122        }
123    
124        public JmsConfiguration getConfiguration() {
125            return configuration;
126        }
127    
128        public String getSelector() {
129            return selector;
130        }
131    
132        /**
133         * Sets the JMS selector to use
134         */
135        public void setSelector(String selector) {
136            this.selector = selector;
137        }
138    
139        public boolean isSingleton() {
140            return false;
141        }
142    
143        protected JmsOperations createJmsOperations() {
144            return configuration.createJmsOperations(pubSubDomain, destination);
145        }
146    
147    }