001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.jms; 018 019 import org.apache.activemq.ActiveMQConnection; 020 import org.apache.activemq.ActiveMQSession; 021 import org.apache.activemq.CustomDestination; 022 import org.apache.camel.CamelContext; 023 import org.apache.camel.CamelContextAware; 024 import org.apache.camel.Endpoint; 025 import org.apache.camel.component.jms.JmsBinding; 026 027 import javax.jms.JMSException; 028 import javax.jms.MessageConsumer; 029 import javax.jms.MessageProducer; 030 import javax.jms.QueueReceiver; 031 import javax.jms.QueueSender; 032 import javax.jms.TopicPublisher; 033 import javax.jms.TopicSubscriber; 034 035 /** 036 * @version $Revision: $ 037 */ 038 public class CamelDestination implements CustomDestination, CamelContextAware { 039 private String uri; 040 private Endpoint endpoint; 041 private CamelContext camelContext; 042 private JmsBinding binding = new JmsBinding(); 043 044 public CamelDestination() { 045 } 046 047 public CamelDestination(String uri) { 048 this.uri = uri; 049 } 050 051 public String toString() { 052 return uri.toString(); 053 } 054 055 // CustomDestination interface 056 //----------------------------------------------------------------------- 057 public MessageConsumer createConsumer(ActiveMQSession session, String messageSelector) { 058 return createConsumer(session, messageSelector, false); 059 } 060 061 public MessageConsumer createConsumer(ActiveMQSession session, String messageSelector, boolean noLocal) { 062 return new CamelMessageConsumer(this, resolveEndpoint(session), session, messageSelector, noLocal); 063 } 064 065 public TopicSubscriber createSubscriber(ActiveMQSession session, String messageSelector, boolean noLocal) { 066 return createDurableSubscriber(session, null, messageSelector, noLocal); 067 } 068 069 public TopicSubscriber createDurableSubscriber(ActiveMQSession session, String name, String messageSelector, boolean noLocal) { 070 throw new UnsupportedOperationException("This destination is not a Topic: " + this); 071 } 072 073 public QueueReceiver createReceiver(ActiveMQSession session, String messageSelector) { 074 throw new UnsupportedOperationException("This destination is not a Queue: " + this); 075 } 076 077 // Producers 078 //----------------------------------------------------------------------- 079 public MessageProducer createProducer(ActiveMQSession session) throws JMSException { 080 return new CamelMessageProducer(this, resolveEndpoint(session), session); 081 } 082 083 public TopicPublisher createPublisher(ActiveMQSession session) throws JMSException { 084 throw new UnsupportedOperationException("This destination is not a Topic: " + this); 085 } 086 087 public QueueSender createSender(ActiveMQSession session) throws JMSException { 088 throw new UnsupportedOperationException("This destination is not a Queue: " + this); 089 } 090 091 // Properties 092 //----------------------------------------------------------------------- 093 094 public String getUri() { 095 return uri; 096 } 097 098 public void setUri(String uri) { 099 this.uri = uri; 100 } 101 102 public Endpoint getEndpoint() { 103 return endpoint; 104 } 105 106 public void setEndpoint(Endpoint endpoint) { 107 this.endpoint = endpoint; 108 } 109 110 public CamelContext getCamelContext() { 111 return camelContext; 112 } 113 114 public void setCamelContext(CamelContext camelContext) { 115 this.camelContext = camelContext; 116 } 117 118 public JmsBinding getBinding() { 119 return binding; 120 } 121 122 public void setBinding(JmsBinding binding) { 123 this.binding = binding; 124 } 125 126 // Implementation methods 127 //----------------------------------------------------------------------- 128 129 /** 130 * Resolves the Camel Endpoint for this destination 131 * 132 * @return 133 */ 134 protected Endpoint resolveEndpoint(ActiveMQSession session) { 135 Endpoint answer = getEndpoint(); 136 if (answer == null) { 137 answer = resolveCamelContext(session).getEndpoint(getUri()); 138 if (answer == null) { 139 throw new IllegalArgumentException("No endpoint could be found for URI: " + getUri()); 140 } 141 } 142 return answer; 143 } 144 145 protected CamelContext resolveCamelContext(ActiveMQSession session) { 146 CamelContext answer = getCamelContext(); 147 if (answer == null) { 148 ActiveMQConnection connection = session.getConnection(); 149 if (connection instanceof CamelConnection) { 150 CamelConnection camelConnection = (CamelConnection) connection; 151 answer = camelConnection.getCamelContext(); 152 } 153 } 154 if (answer == null) { 155 throw new IllegalArgumentException("No CamelContext has been configured"); 156 } 157 return answer; 158 } 159 }