001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.jms;
018    
019    import org.apache.activemq.ActiveMQSession;
020    import org.apache.activemq.util.JMSExceptionSupport;
021    import org.apache.camel.Consumer;
022    import org.apache.camel.Endpoint;
023    import org.apache.camel.Exchange;
024    import org.apache.camel.PollingConsumer;
025    import org.apache.camel.Processor;
026    
027    import javax.jms.*;
028    import javax.jms.IllegalStateException;
029    
030    /**
031     * A JMS {@link javax.jms.MessageConsumer} which consumes message exchanges from a
032     * Camel {@link Endpoint}
033     *
034     * @version $Revision: $
035     */
036    public class CamelMessageConsumer implements MessageConsumer {
037        private final CamelDestination destination;
038        private final Endpoint endpoint;
039        private final ActiveMQSession session;
040        private final String messageSelector;
041        private final boolean noLocal;
042        private MessageListener messageListener;
043        private Consumer consumer;
044        private PollingConsumer pollingConsumer;
045        private boolean closed;
046    
047        public CamelMessageConsumer(CamelDestination destination, Endpoint endpoint, ActiveMQSession session, String messageSelector, boolean noLocal) {
048            this.destination = destination;
049            this.endpoint = endpoint;
050            this.session = session;
051            this.messageSelector = messageSelector;
052            this.noLocal = noLocal;
053        }
054    
055        public void close() throws JMSException {
056            if (!closed) {
057                closed = true;
058                try {
059                    if (consumer != null) {
060                        consumer.stop();
061                    }
062                    if (pollingConsumer != null) {
063                        pollingConsumer.stop();
064                    }
065                }
066                catch (JMSException e) {
067                    throw e;
068                }
069                catch (Exception e) {
070                    throw JMSExceptionSupport.create(e);
071                }
072            }
073        }
074    
075        public MessageListener getMessageListener() throws JMSException {
076            return messageListener;
077        }
078    
079        public void setMessageListener(MessageListener messageListener) throws JMSException {
080            this.messageListener = messageListener;
081            if (messageListener != null && consumer == null) {
082                consumer = createConsumer();
083            }
084        }
085    
086        public Message receive() throws JMSException {
087            Exchange exchange = getPollingConsumer().receive();
088            return createMessage(exchange);
089        }
090    
091        public Message receive(long timeoutMillis) throws JMSException {
092            Exchange exchange = getPollingConsumer().receive(timeoutMillis);
093            return createMessage(exchange);
094        }
095    
096        public Message receiveNoWait() throws JMSException {
097            Exchange exchange = getPollingConsumer().receiveNoWait();
098            return createMessage(exchange);
099        }
100    
101        // Properties
102        //-----------------------------------------------------------------------
103    
104        public CamelDestination getDestination() {
105            return destination;
106        }
107    
108        public Endpoint getEndpoint() {
109            return endpoint;
110        }
111    
112        public String getMessageSelector() {
113            return messageSelector;
114        }
115    
116        public boolean isNoLocal() {
117            return noLocal;
118        }
119    
120        public ActiveMQSession getSession() {
121            return session;
122        }
123    
124        // Implementation methods
125        //-----------------------------------------------------------------------
126    
127        protected PollingConsumer getPollingConsumer() throws JMSException {
128            try {
129                if (pollingConsumer == null) {
130                    pollingConsumer = endpoint.createPollingConsumer();
131                    pollingConsumer.start();
132                }
133                return pollingConsumer;
134            }
135            catch (JMSException e) {
136                throw e;
137            }
138            catch (Exception e) {
139                throw JMSExceptionSupport.create(e);
140            }
141        }
142    
143        protected Message createMessage(Exchange exchange) throws JMSException {
144            if (exchange != null) {
145                Message message = destination.getBinding().makeJmsMessage(exchange, session);
146                return message;
147            }
148            else {
149                return null;
150            }
151        }
152    
153        protected Consumer createConsumer() throws JMSException {
154            try {
155                Consumer answer = endpoint.createConsumer(new Processor() {
156                    public void process(Exchange exchange) throws Exception {
157                        Message message = createMessage(exchange);
158                        getMessageListener().onMessage(message);
159                    }
160                });
161                answer.start();
162                return answer;
163            }
164            catch (JMSException e) {
165                throw e;
166            }
167            catch (Exception e) {
168                throw JMSExceptionSupport.create(e);
169            }
170        }
171    
172        protected void checkClosed() throws javax.jms.IllegalStateException {
173            if (closed) {
174                throw new IllegalStateException("The producer is closed");
175            }
176        }
177    }