001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.jms; 018 019 import org.apache.camel.CamelContext; 020 import org.apache.camel.Exchange; 021 import org.apache.camel.impl.DefaultEndpoint; 022 import org.apache.commons.logging.Log; 023 import org.apache.commons.logging.LogFactory; 024 import org.springframework.jms.core.JmsOperations; 025 import org.springframework.jms.core.MessageCreator; 026 import org.springframework.jms.listener.AbstractMessageListenerContainer; 027 028 import javax.jms.JMSException; 029 import javax.jms.Message; 030 import javax.jms.MessageListener; 031 import javax.jms.Session; 032 033 /** 034 * @version $Revision: 520404 $ 035 */ 036 public class JmsEndpoint extends DefaultEndpoint<JmsExchange> implements MessageListener { 037 private static final Log log = LogFactory.getLog(JmsEndpoint.class); 038 039 private JmsOperations template; 040 private AbstractMessageListenerContainer listenerContainer; 041 private String destination; 042 043 public JmsEndpoint(String endpointUri, CamelContext container, String destination, JmsOperations template, AbstractMessageListenerContainer listenerContainer) { 044 super(endpointUri, container); 045 this.destination = destination; 046 this.template = template; 047 this.listenerContainer = listenerContainer; 048 this.listenerContainer.setMessageListener(this); 049 } 050 051 public void onMessage(Message message) { 052 if (log.isDebugEnabled()) { 053 log.debug(JmsEndpoint.this + " receiving JMS message: " + message); 054 } 055 JmsExchange exchange = createExchange(message); 056 getInboundProcessor().onExchange(exchange); 057 } 058 059 060 public void send(Exchange exchange) { 061 // lets convert to the type of an exchange 062 JmsExchange jmsExchange = convertTo(JmsExchange.class, exchange); 063 onExchange(jmsExchange); 064 } 065 066 public void onExchange(final JmsExchange exchange) { 067 template.send(destination, new MessageCreator() { 068 public Message createMessage(Session session) throws JMSException { 069 Message message = exchange.createMessage(session); 070 if (log.isDebugEnabled()) { 071 log.debug(JmsEndpoint.this + " sending JMS message: " + message); 072 } 073 return message; 074 } 075 }); 076 } 077 078 public JmsOperations getTemplate() { 079 return template; 080 } 081 082 public JmsExchange createExchange() { 083 return new DefaultJmsExchange(getContext()); 084 } 085 086 087 public JmsExchange createExchange(Message message) { 088 return new DefaultJmsExchange(getContext(), message); 089 } 090 091 092 protected void doActivate() { 093 super.doActivate(); 094 listenerContainer.afterPropertiesSet(); 095 listenerContainer.initialize(); 096 listenerContainer.start(); 097 } 098 099 protected void doDeactivate() { 100 listenerContainer.stop(); 101 listenerContainer.destroy(); 102 super.doDeactivate(); 103 } 104 }