001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.jms; 018 019 import org.apache.activemq.ActiveMQSession; 020 import org.apache.activemq.util.JMSExceptionSupport; 021 import org.apache.camel.Consumer; 022 import org.apache.camel.Endpoint; 023 import org.apache.camel.Exchange; 024 import org.apache.camel.PollingConsumer; 025 import org.apache.camel.Processor; 026 027 import javax.jms.*; 028 import javax.jms.IllegalStateException; 029 030 /** 031 * A JMS {@link javax.jms.MessageConsumer} which consumes message exchanges from a 032 * Camel {@link Endpoint} 033 * 034 * @version $Revision: $ 035 */ 036 public class CamelMessageConsumer implements MessageConsumer { 037 private final CamelDestination destination; 038 private final Endpoint endpoint; 039 private final ActiveMQSession session; 040 private final String messageSelector; 041 private final boolean noLocal; 042 private MessageListener messageListener; 043 private Consumer consumer; 044 private PollingConsumer pollingConsumer; 045 private boolean closed; 046 047 public CamelMessageConsumer(CamelDestination destination, Endpoint endpoint, ActiveMQSession session, String messageSelector, boolean noLocal) { 048 this.destination = destination; 049 this.endpoint = endpoint; 050 this.session = session; 051 this.messageSelector = messageSelector; 052 this.noLocal = noLocal; 053 } 054 055 public void close() throws JMSException { 056 if (!closed) { 057 closed = true; 058 try { 059 if (consumer != null) { 060 consumer.stop(); 061 } 062 if (pollingConsumer != null) { 063 pollingConsumer.stop(); 064 } 065 } 066 catch (JMSException e) { 067 throw e; 068 } 069 catch (Exception e) { 070 throw JMSExceptionSupport.create(e); 071 } 072 } 073 } 074 075 public MessageListener getMessageListener() throws JMSException { 076 return messageListener; 077 } 078 079 public void setMessageListener(MessageListener messageListener) throws JMSException { 080 this.messageListener = messageListener; 081 if (messageListener != null && consumer == null) { 082 consumer = createConsumer(); 083 } 084 } 085 086 public Message receive() throws JMSException { 087 Exchange exchange = getPollingConsumer().receive(); 088 return createMessage(exchange); 089 } 090 091 public Message receive(long timeoutMillis) throws JMSException { 092 Exchange exchange = getPollingConsumer().receive(timeoutMillis); 093 return createMessage(exchange); 094 } 095 096 public Message receiveNoWait() throws JMSException { 097 Exchange exchange = getPollingConsumer().receiveNoWait(); 098 return createMessage(exchange); 099 } 100 101 // Properties 102 //----------------------------------------------------------------------- 103 104 public CamelDestination getDestination() { 105 return destination; 106 } 107 108 public Endpoint getEndpoint() { 109 return endpoint; 110 } 111 112 public String getMessageSelector() { 113 return messageSelector; 114 } 115 116 public boolean isNoLocal() { 117 return noLocal; 118 } 119 120 public ActiveMQSession getSession() { 121 return session; 122 } 123 124 // Implementation methods 125 //----------------------------------------------------------------------- 126 127 protected PollingConsumer getPollingConsumer() throws JMSException { 128 try { 129 if (pollingConsumer == null) { 130 pollingConsumer = endpoint.createPollingConsumer(); 131 pollingConsumer.start(); 132 } 133 return pollingConsumer; 134 } 135 catch (JMSException e) { 136 throw e; 137 } 138 catch (Exception e) { 139 throw JMSExceptionSupport.create(e); 140 } 141 } 142 143 protected Message createMessage(Exchange exchange) throws JMSException { 144 if (exchange != null) { 145 Message message = destination.getBinding().makeJmsMessage(exchange, session); 146 return message; 147 } 148 else { 149 return null; 150 } 151 } 152 153 protected Consumer createConsumer() throws JMSException { 154 try { 155 Consumer answer = endpoint.createConsumer(new Processor() { 156 public void process(Exchange exchange) throws Exception { 157 Message message = createMessage(exchange); 158 getMessageListener().onMessage(message); 159 } 160 }); 161 answer.start(); 162 return answer; 163 } 164 catch (JMSException e) { 165 throw e; 166 } 167 catch (Exception e) { 168 throw JMSExceptionSupport.create(e); 169 } 170 } 171 172 protected void checkClosed() throws javax.jms.IllegalStateException { 173 if (closed) { 174 throw new IllegalStateException("The producer is closed"); 175 } 176 } 177 }