001 /** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one or more 004 * contributor license agreements. See the NOTICE file distributed with 005 * this work for additional information regarding copyright ownership. 006 * The ASF licenses this file to You under the Apache License, Version 2.0 007 * (the "License"); you may not use this file except in compliance with 008 * the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.camel.component.jms; 019 020 import org.apache.camel.CamelContext; 021 import org.apache.camel.Endpoint; 022 import org.apache.camel.impl.DefaultComponent; 023 import org.apache.camel.util.IntrospectionSupport; 024 import static org.apache.camel.util.ObjectHelper.removeStartingCharacters; 025 import org.springframework.core.task.TaskExecutor; 026 import org.springframework.jms.listener.serversession.ServerSessionFactory; 027 import org.springframework.jms.support.converter.MessageConverter; 028 import org.springframework.transaction.PlatformTransactionManager; 029 030 import javax.jms.ConnectionFactory; 031 import javax.jms.ExceptionListener; 032 import javax.jms.Session; 033 import java.util.Map; 034 035 /** 036 * A <a href="http://activemq.apache.org/jms.html">JMS Component</a> 037 * 038 * @version $Revision:520964 $ 039 */ 040 public class JmsComponent extends DefaultComponent<JmsExchange> { 041 public static final String QUEUE_PREFIX = "queue:"; 042 public static final String TOPIC_PREFIX = "topic:"; 043 044 private JmsConfiguration configuration; 045 046 /** 047 * Static builder method 048 */ 049 public static JmsComponent jmsComponent() { 050 return new JmsComponent(); 051 } 052 053 /** 054 * Static builder method 055 */ 056 public static JmsComponent jmsComponent(JmsConfiguration configuration) { 057 return new JmsComponent(configuration); 058 } 059 060 /** 061 * Static builder method 062 */ 063 public static JmsComponent jmsComponent(ConnectionFactory connectionFactory) { 064 return jmsComponent(new JmsConfiguration(connectionFactory)); 065 } 066 067 /** 068 * Static builder method 069 */ 070 public static JmsComponent jmsComponentClientAcknowledge(ConnectionFactory connectionFactory) { 071 JmsConfiguration template = new JmsConfiguration(connectionFactory); 072 template.setAcknowledgementMode(Session.CLIENT_ACKNOWLEDGE); 073 return jmsComponent(template); 074 } 075 076 /** 077 * Static builder method 078 */ 079 public static JmsComponent jmsComponentAutoAcknowledge(ConnectionFactory connectionFactory) { 080 JmsConfiguration template = new JmsConfiguration(connectionFactory); 081 template.setAcknowledgementMode(Session.AUTO_ACKNOWLEDGE); 082 return jmsComponent(template); 083 } 084 085 public static JmsComponent jmsComponentTransacted(ConnectionFactory connectionFactory) { 086 JmsConfiguration template = new JmsConfiguration(connectionFactory); 087 template.setTransacted(true); 088 return jmsComponent(template); 089 } 090 091 public static JmsComponent jmsComponentTransacted(ConnectionFactory connectionFactory, PlatformTransactionManager transactionManager) { 092 JmsConfiguration template = new JmsConfiguration(connectionFactory); 093 template.setTransactionManager(transactionManager); 094 template.setTransacted(true); 095 return jmsComponent(template); 096 } 097 098 public JmsComponent() { 099 } 100 101 public JmsComponent(JmsConfiguration configuration) { 102 this.configuration = configuration; 103 } 104 105 public JmsComponent(CamelContext context) { 106 super(context); 107 } 108 109 @Override 110 protected Endpoint<JmsExchange> createEndpoint(String uri, String remaining, Map parameters) throws Exception { 111 112 boolean pubSubDomain = false; 113 if (remaining.startsWith(QUEUE_PREFIX)) { 114 pubSubDomain = false; 115 remaining = removeStartingCharacters(remaining.substring(QUEUE_PREFIX.length()), '/'); 116 } 117 else if (remaining.startsWith(TOPIC_PREFIX)) { 118 pubSubDomain = true; 119 remaining = removeStartingCharacters(remaining.substring(TOPIC_PREFIX.length()), '/'); 120 } 121 122 final String subject = convertPathToActualDestination(remaining); 123 124 // lets make sure we copy the configuration as each endpoint can customize its own version 125 JmsEndpoint endpoint = new JmsEndpoint(uri, this, subject, pubSubDomain, getConfiguration().copy()); 126 127 String selector = (String) parameters.remove("selector"); 128 if (selector != null) { 129 endpoint.setSelector(selector); 130 } 131 IntrospectionSupport.setProperties(endpoint.getConfiguration(), parameters); 132 return endpoint; 133 } 134 135 public JmsConfiguration getConfiguration() { 136 if (configuration == null) { 137 configuration = createConfiguration(); 138 } 139 return configuration; 140 } 141 142 /** 143 * Sets the JMS configuration 144 * 145 * @param configuration the configuration to use by default for endpoints 146 */ 147 public void setConfiguration(JmsConfiguration configuration) { 148 this.configuration = configuration; 149 } 150 151 152 public void setAcceptMessagesWhileStopping(boolean acceptMessagesWhileStopping) { 153 getConfiguration().setAcceptMessagesWhileStopping(acceptMessagesWhileStopping); 154 } 155 156 public void setAcknowledgementMode(int consumerAcknowledgementMode) { 157 getConfiguration().setAcknowledgementMode(consumerAcknowledgementMode); 158 } 159 160 public void setAcknowledgementModeName(String consumerAcknowledgementMode) { 161 getConfiguration().setAcknowledgementModeName(consumerAcknowledgementMode); 162 } 163 164 public void setAutoStartup(boolean autoStartup) { 165 getConfiguration().setAutoStartup(autoStartup); 166 } 167 168 public void setCacheLevel(int cacheLevel) { 169 getConfiguration().setCacheLevel(cacheLevel); 170 } 171 172 public void setCacheLevelName(String cacheName) { 173 getConfiguration().setCacheLevelName(cacheName); 174 } 175 176 public void setClientId(String consumerClientId) { 177 getConfiguration().setClientId(consumerClientId); 178 } 179 180 public void setConcurrentConsumers(int concurrentConsumers) { 181 getConfiguration().setConcurrentConsumers(concurrentConsumers); 182 } 183 184 public void setConnectionFactory(ConnectionFactory connectionFactory) { 185 getConfiguration().setConnectionFactory(connectionFactory); 186 } 187 188 public void setConsumerType(ConsumerType consumerType) { 189 getConfiguration().setConsumerType(consumerType); 190 } 191 192 public void setDeliveryPersistent(boolean deliveryPersistent) { 193 getConfiguration().setDeliveryPersistent(deliveryPersistent); 194 } 195 196 public void setDurableSubscriptionName(String durableSubscriptionName) { 197 getConfiguration().setDurableSubscriptionName(durableSubscriptionName); 198 } 199 200 public void setExceptionListener(ExceptionListener exceptionListener) { 201 getConfiguration().setExceptionListener(exceptionListener); 202 } 203 204 public void setExplicitQosEnabled(boolean explicitQosEnabled) { 205 getConfiguration().setExplicitQosEnabled(explicitQosEnabled); 206 } 207 208 public void setExposeListenerSession(boolean exposeListenerSession) { 209 getConfiguration().setExposeListenerSession(exposeListenerSession); 210 } 211 212 public void setIdleTaskExecutionLimit(int idleTaskExecutionLimit) { 213 getConfiguration().setIdleTaskExecutionLimit(idleTaskExecutionLimit); 214 } 215 216 public void setMaxConcurrentConsumers(int maxConcurrentConsumers) { 217 getConfiguration().setMaxConcurrentConsumers(maxConcurrentConsumers); 218 } 219 220 public void setMaxMessagesPerTask(int maxMessagesPerTask) { 221 getConfiguration().setMaxMessagesPerTask(maxMessagesPerTask); 222 } 223 224 public void setMessageConverter(MessageConverter messageConverter) { 225 getConfiguration().setMessageConverter(messageConverter); 226 } 227 228 public void setMessageIdEnabled(boolean messageIdEnabled) { 229 getConfiguration().setMessageIdEnabled(messageIdEnabled); 230 } 231 232 public void setMessageTimestampEnabled(boolean messageTimestampEnabled) { 233 getConfiguration().setMessageTimestampEnabled(messageTimestampEnabled); 234 } 235 236 public void setPriority(int priority) { 237 getConfiguration().setPriority(priority); 238 } 239 240 public void setPubSubNoLocal(boolean pubSubNoLocal) { 241 getConfiguration().setPubSubNoLocal(pubSubNoLocal); 242 } 243 244 public void setReceiveTimeout(long receiveTimeout) { 245 getConfiguration().setReceiveTimeout(receiveTimeout); 246 } 247 248 public void setRecoveryInterval(long recoveryInterval) { 249 getConfiguration().setRecoveryInterval(recoveryInterval); 250 } 251 252 public void setServerSessionFactory(ServerSessionFactory serverSessionFactory) { 253 getConfiguration().setServerSessionFactory(serverSessionFactory); 254 } 255 256 public void setSubscriptionDurable(boolean subscriptionDurable) { 257 getConfiguration().setSubscriptionDurable(subscriptionDurable); 258 } 259 260 public void setTaskExecutor(TaskExecutor taskExecutor) { 261 getConfiguration().setTaskExecutor(taskExecutor); 262 } 263 264 public void setTimeToLive(long timeToLive) { 265 getConfiguration().setTimeToLive(timeToLive); 266 } 267 268 public void setTransacted(boolean consumerTransacted) { 269 getConfiguration().setTransacted(consumerTransacted); 270 } 271 272 public void setTransactionManager(PlatformTransactionManager transactionManager) { 273 getConfiguration().setTransactionManager(transactionManager); 274 } 275 276 public void setTransactionName(String transactionName) { 277 getConfiguration().setTransactionName(transactionName); 278 } 279 280 public void setTransactionTimeout(int transactionTimeout) { 281 getConfiguration().setTransactionTimeout(transactionTimeout); 282 } 283 284 public void setUseVersion102(boolean useVersion102) { 285 getConfiguration().setUseVersion102(useVersion102); 286 } 287 288 /** 289 * A strategy method allowing the URI destination to be translated into the actual JMS destination name 290 * (say by looking up in JNDI or something) 291 */ 292 protected String convertPathToActualDestination(String path) { 293 return path; 294 } 295 296 /** 297 * Factory method to create the default configuration instance 298 * 299 * @return a newly created configuration object which can then be further customized 300 */ 301 protected JmsConfiguration createConfiguration() { 302 return new JmsConfiguration(); 303 } 304 305 }