001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.component.jms; 018 019 import java.util.Map; 020 021 import javax.jms.ConnectionFactory; 022 import javax.jms.ExceptionListener; 023 import javax.jms.Session; 024 025 import org.apache.camel.CamelContext; 026 import org.apache.camel.Endpoint; 027 import org.apache.camel.impl.DefaultComponent; 028 import org.apache.camel.util.IntrospectionSupport; 029 030 import org.springframework.core.task.TaskExecutor; 031 import org.springframework.jms.listener.serversession.ServerSessionFactory; 032 import org.springframework.jms.support.converter.MessageConverter; 033 import org.springframework.transaction.PlatformTransactionManager; 034 035 import static org.apache.camel.util.ObjectHelper.removeStartingCharacters; 036 037 /** 038 * A <a href="http://activemq.apache.org/jms.html">JMS Component</a> 039 * 040 * @version $Revision:520964 $ 041 */ 042 public class JmsComponent extends DefaultComponent<JmsExchange> { 043 public static final String QUEUE_PREFIX = "queue:"; 044 public static final String TOPIC_PREFIX = "topic:"; 045 046 private JmsConfiguration configuration; 047 048 public JmsComponent() { 049 } 050 051 public JmsComponent(JmsConfiguration configuration) { 052 this.configuration = configuration; 053 } 054 055 public JmsComponent(CamelContext context) { 056 super(context); 057 } 058 059 /** 060 * Static builder method 061 */ 062 public static JmsComponent jmsComponent() { 063 return new JmsComponent(); 064 } 065 066 /** 067 * Static builder method 068 */ 069 public static JmsComponent jmsComponent(JmsConfiguration configuration) { 070 return new JmsComponent(configuration); 071 } 072 073 /** 074 * Static builder method 075 */ 076 public static JmsComponent jmsComponent(ConnectionFactory connectionFactory) { 077 return jmsComponent(new JmsConfiguration(connectionFactory)); 078 } 079 080 /** 081 * Static builder method 082 */ 083 public static JmsComponent jmsComponentClientAcknowledge(ConnectionFactory connectionFactory) { 084 JmsConfiguration template = new JmsConfiguration(connectionFactory); 085 template.setAcknowledgementMode(Session.CLIENT_ACKNOWLEDGE); 086 return jmsComponent(template); 087 } 088 089 /** 090 * Static builder method 091 */ 092 public static JmsComponent jmsComponentAutoAcknowledge(ConnectionFactory connectionFactory) { 093 JmsConfiguration template = new JmsConfiguration(connectionFactory); 094 template.setAcknowledgementMode(Session.AUTO_ACKNOWLEDGE); 095 return jmsComponent(template); 096 } 097 098 public static JmsComponent jmsComponentTransacted(ConnectionFactory connectionFactory) { 099 JmsConfiguration template = new JmsConfiguration(connectionFactory); 100 template.setTransacted(true); 101 return jmsComponent(template); 102 } 103 104 public static JmsComponent jmsComponentTransacted(ConnectionFactory connectionFactory, PlatformTransactionManager transactionManager) { 105 JmsConfiguration template = new JmsConfiguration(connectionFactory); 106 template.setTransactionManager(transactionManager); 107 template.setTransacted(true); 108 return jmsComponent(template); 109 } 110 111 @Override 112 protected Endpoint<JmsExchange> createEndpoint(String uri, String remaining, Map parameters) throws Exception { 113 114 boolean pubSubDomain = false; 115 if (remaining.startsWith(QUEUE_PREFIX)) { 116 pubSubDomain = false; 117 remaining = removeStartingCharacters(remaining.substring(QUEUE_PREFIX.length()), '/'); 118 } else if (remaining.startsWith(TOPIC_PREFIX)) { 119 pubSubDomain = true; 120 remaining = removeStartingCharacters(remaining.substring(TOPIC_PREFIX.length()), '/'); 121 } 122 123 final String subject = convertPathToActualDestination(remaining); 124 125 // lets make sure we copy the configuration as each endpoint can 126 // customize its own version 127 JmsEndpoint endpoint = new JmsEndpoint(uri, this, subject, pubSubDomain, getConfiguration().copy()); 128 129 String selector = (String)parameters.remove("selector"); 130 if (selector != null) { 131 endpoint.setSelector(selector); 132 } 133 IntrospectionSupport.setProperties(endpoint.getConfiguration(), parameters); 134 return endpoint; 135 } 136 137 public JmsConfiguration getConfiguration() { 138 if (configuration == null) { 139 configuration = createConfiguration(); 140 } 141 return configuration; 142 } 143 144 /** 145 * Sets the JMS configuration 146 * 147 * @param configuration the configuration to use by default for endpoints 148 */ 149 public void setConfiguration(JmsConfiguration configuration) { 150 this.configuration = configuration; 151 } 152 153 public void setAcceptMessagesWhileStopping(boolean acceptMessagesWhileStopping) { 154 getConfiguration().setAcceptMessagesWhileStopping(acceptMessagesWhileStopping); 155 } 156 157 public void setAcknowledgementMode(int consumerAcknowledgementMode) { 158 getConfiguration().setAcknowledgementMode(consumerAcknowledgementMode); 159 } 160 161 public void setAcknowledgementModeName(String consumerAcknowledgementMode) { 162 getConfiguration().setAcknowledgementModeName(consumerAcknowledgementMode); 163 } 164 165 public void setAutoStartup(boolean autoStartup) { 166 getConfiguration().setAutoStartup(autoStartup); 167 } 168 169 public void setCacheLevel(int cacheLevel) { 170 getConfiguration().setCacheLevel(cacheLevel); 171 } 172 173 public void setCacheLevelName(String cacheName) { 174 getConfiguration().setCacheLevelName(cacheName); 175 } 176 177 public void setClientId(String consumerClientId) { 178 getConfiguration().setClientId(consumerClientId); 179 } 180 181 public void setConcurrentConsumers(int concurrentConsumers) { 182 getConfiguration().setConcurrentConsumers(concurrentConsumers); 183 } 184 185 public void setConnectionFactory(ConnectionFactory connectionFactory) { 186 getConfiguration().setConnectionFactory(connectionFactory); 187 } 188 189 public void setConsumerType(ConsumerType consumerType) { 190 getConfiguration().setConsumerType(consumerType); 191 } 192 193 public void setDeliveryPersistent(boolean deliveryPersistent) { 194 getConfiguration().setDeliveryPersistent(deliveryPersistent); 195 } 196 197 public void setDurableSubscriptionName(String durableSubscriptionName) { 198 getConfiguration().setDurableSubscriptionName(durableSubscriptionName); 199 } 200 201 public void setExceptionListener(ExceptionListener exceptionListener) { 202 getConfiguration().setExceptionListener(exceptionListener); 203 } 204 205 public void setExplicitQosEnabled(boolean explicitQosEnabled) { 206 getConfiguration().setExplicitQosEnabled(explicitQosEnabled); 207 } 208 209 public void setExposeListenerSession(boolean exposeListenerSession) { 210 getConfiguration().setExposeListenerSession(exposeListenerSession); 211 } 212 213 public void setIdleTaskExecutionLimit(int idleTaskExecutionLimit) { 214 getConfiguration().setIdleTaskExecutionLimit(idleTaskExecutionLimit); 215 } 216 217 public void setMaxConcurrentConsumers(int maxConcurrentConsumers) { 218 getConfiguration().setMaxConcurrentConsumers(maxConcurrentConsumers); 219 } 220 221 public void setMaxMessagesPerTask(int maxMessagesPerTask) { 222 getConfiguration().setMaxMessagesPerTask(maxMessagesPerTask); 223 } 224 225 public void setMessageConverter(MessageConverter messageConverter) { 226 getConfiguration().setMessageConverter(messageConverter); 227 } 228 229 public void setMessageIdEnabled(boolean messageIdEnabled) { 230 getConfiguration().setMessageIdEnabled(messageIdEnabled); 231 } 232 233 public void setMessageTimestampEnabled(boolean messageTimestampEnabled) { 234 getConfiguration().setMessageTimestampEnabled(messageTimestampEnabled); 235 } 236 237 public void setPriority(int priority) { 238 getConfiguration().setPriority(priority); 239 } 240 241 public void setPubSubNoLocal(boolean pubSubNoLocal) { 242 getConfiguration().setPubSubNoLocal(pubSubNoLocal); 243 } 244 245 public void setReceiveTimeout(long receiveTimeout) { 246 getConfiguration().setReceiveTimeout(receiveTimeout); 247 } 248 249 public void setRecoveryInterval(long recoveryInterval) { 250 getConfiguration().setRecoveryInterval(recoveryInterval); 251 } 252 253 public void setServerSessionFactory(ServerSessionFactory serverSessionFactory) { 254 getConfiguration().setServerSessionFactory(serverSessionFactory); 255 } 256 257 public void setSubscriptionDurable(boolean subscriptionDurable) { 258 getConfiguration().setSubscriptionDurable(subscriptionDurable); 259 } 260 261 public void setTaskExecutor(TaskExecutor taskExecutor) { 262 getConfiguration().setTaskExecutor(taskExecutor); 263 } 264 265 public void setTimeToLive(long timeToLive) { 266 getConfiguration().setTimeToLive(timeToLive); 267 } 268 269 public void setTransacted(boolean consumerTransacted) { 270 getConfiguration().setTransacted(consumerTransacted); 271 } 272 273 public void setTransactionManager(PlatformTransactionManager transactionManager) { 274 getConfiguration().setTransactionManager(transactionManager); 275 } 276 277 public void setTransactionName(String transactionName) { 278 getConfiguration().setTransactionName(transactionName); 279 } 280 281 public void setTransactionTimeout(int transactionTimeout) { 282 getConfiguration().setTransactionTimeout(transactionTimeout); 283 } 284 285 public void setUseVersion102(boolean useVersion102) { 286 getConfiguration().setUseVersion102(useVersion102); 287 } 288 289 /** 290 * A strategy method allowing the URI destination to be translated into the 291 * actual JMS destination name (say by looking up in JNDI or something) 292 */ 293 protected String convertPathToActualDestination(String path) { 294 return path; 295 } 296 297 /** 298 * Factory method to create the default configuration instance 299 * 300 * @return a newly created configuration object which can then be further 301 * customized 302 */ 303 protected JmsConfiguration createConfiguration() { 304 return new JmsConfiguration(); 305 } 306 307 }