001 /** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one or more 004 * contributor license agreements. See the NOTICE file distributed with 005 * this work for additional information regarding copyright ownership. 006 * The ASF licenses this file to You under the Apache License, Version 2.0 007 * (the "License"); you may not use this file except in compliance with 008 * the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.camel.component.jms; 019 020 import org.apache.camel.RuntimeCamelException; 021 import org.apache.camel.util.ObjectHelper; 022 import org.springframework.core.task.TaskExecutor; 023 import org.springframework.jms.core.JmsOperations; 024 import org.springframework.jms.core.JmsTemplate; 025 import org.springframework.jms.core.JmsTemplate102; 026 import org.springframework.jms.listener.AbstractMessageListenerContainer; 027 import org.springframework.jms.listener.DefaultMessageListenerContainer; 028 import org.springframework.jms.listener.DefaultMessageListenerContainer102; 029 import org.springframework.jms.listener.SimpleMessageListenerContainer; 030 import org.springframework.jms.listener.SimpleMessageListenerContainer102; 031 import org.springframework.jms.listener.serversession.ServerSessionFactory; 032 import org.springframework.jms.listener.serversession.ServerSessionMessageListenerContainer; 033 import org.springframework.jms.listener.serversession.ServerSessionMessageListenerContainer102; 034 import org.springframework.jms.support.converter.MessageConverter; 035 import org.springframework.transaction.PlatformTransactionManager; 036 037 import javax.jms.ConnectionFactory; 038 import javax.jms.ExceptionListener; 039 040 /** 041 * @version $Revision: 545124 $ 042 */ 043 public class JmsConfiguration implements Cloneable { 044 protected static final String TRANSACTED = "TRANSACTED"; 045 protected static final String CLIENT_ACKNOWLEDGE = "CLIENT_ACKNOWLEDGE"; 046 protected static final String AUTO_ACKNOWLEDGE = "AUTO_ACKNOWLEDGE"; 047 protected static final String DUPS_OK_ACKNOWLEDGE = "DUPS_OK_ACKNOWLEDGE"; 048 049 private ConnectionFactory connectionFactory; 050 private ConnectionFactory templateConnectionFactory; 051 private ConnectionFactory listenerConnectionFactory; 052 private int acknowledgementMode = -1; 053 private String acknowledgementModeName = AUTO_ACKNOWLEDGE; 054 // Used to configure the spring Container 055 private ExceptionListener exceptionListener; 056 private ConsumerType consumerType = ConsumerType.Default; 057 private boolean autoStartup = true; 058 private boolean acceptMessagesWhileStopping; 059 private String clientId; 060 private String durableSubscriptionName; 061 private boolean subscriptionDurable; 062 private boolean exposeListenerSession = true; 063 private TaskExecutor taskExecutor; 064 private boolean pubSubNoLocal; 065 private int concurrentConsumers = 1; 066 private int maxMessagesPerTask = 1; 067 private ServerSessionFactory serverSessionFactory; 068 private int cacheLevel = -1; 069 private String cacheLevelName = "CACHE_CONSUMER"; 070 private long recoveryInterval = -1; 071 private long receiveTimeout = -1; 072 private int idleTaskExecutionLimit = 1; 073 private int maxConcurrentConsumers = 1; 074 // JmsTemplate only 075 private boolean useVersion102 = false; 076 private boolean explicitQosEnabled = false; 077 private boolean deliveryPersistent = true; 078 private long timeToLive = -1; 079 private MessageConverter messageConverter; 080 private boolean messageIdEnabled = true; 081 private boolean messageTimestampEnabled = true; 082 private int priority = -1; 083 // Transaction related configuration 084 private boolean transacted; 085 private PlatformTransactionManager transactionManager; 086 private String transactionName; 087 private int transactionTimeout = -1; 088 089 public JmsConfiguration() { 090 } 091 092 public JmsConfiguration(ConnectionFactory connectionFactory) { 093 this.connectionFactory = connectionFactory; 094 } 095 096 /** 097 * Returns a copy of this configuration 098 */ 099 public JmsConfiguration copy() { 100 try { 101 return (JmsConfiguration) clone(); 102 } 103 catch (CloneNotSupportedException e) { 104 throw new RuntimeCamelException(e); 105 } 106 } 107 108 public JmsOperations createJmsOperations(boolean pubSubDomain, String destination) { 109 ConnectionFactory factory = getTemplateConnectionFactory(); 110 JmsTemplate template = useVersion102 111 ? new JmsTemplate102(factory, pubSubDomain) 112 : new JmsTemplate(factory); 113 template.setPubSubDomain(pubSubDomain); 114 template.setDefaultDestinationName(destination); 115 116 template.setExplicitQosEnabled(explicitQosEnabled); 117 template.setDeliveryPersistent(deliveryPersistent); 118 if (messageConverter != null) { 119 template.setMessageConverter(messageConverter); 120 } 121 template.setMessageIdEnabled(messageIdEnabled); 122 template.setMessageTimestampEnabled(messageTimestampEnabled); 123 if (priority >= 0) { 124 template.setPriority(priority); 125 } 126 template.setPubSubNoLocal(pubSubNoLocal); 127 if (receiveTimeout >= 0) { 128 template.setReceiveTimeout(receiveTimeout); 129 } 130 if (timeToLive >= 0) { 131 template.setTimeToLive(timeToLive); 132 } 133 134 template.setSessionTransacted(transacted); 135 136 // This is here for completeness, but the template should not get used for receiving messages. 137 if (acknowledgementMode >= 0) { 138 template.setSessionAcknowledgeMode(acknowledgementMode); 139 } 140 else if (acknowledgementModeName != null) { 141 template.setSessionAcknowledgeModeName(acknowledgementModeName); 142 } 143 return template; 144 } 145 146 public AbstractMessageListenerContainer createMessageListenerContainer() { 147 AbstractMessageListenerContainer container = chooseMessageListenerContainerImplementation(); 148 configureMessageListenerContainer(container); 149 return container; 150 } 151 152 protected void configureMessageListenerContainer(AbstractMessageListenerContainer container) { 153 container.setConnectionFactory(getListenerConnectionFactory()); 154 if (autoStartup) { 155 container.setAutoStartup(true); 156 } 157 if (clientId != null) { 158 container.setClientId(clientId); 159 } 160 container.setSubscriptionDurable(subscriptionDurable); 161 if (durableSubscriptionName != null) { 162 container.setDurableSubscriptionName(durableSubscriptionName); 163 } 164 165 // lets default to durable subscription if the subscriber name and client ID are specified (as there's 166 // no reason to specify them if not! :) 167 if (durableSubscriptionName != null && clientId != null) { 168 container.setSubscriptionDurable(true); 169 } 170 171 if (exceptionListener != null) { 172 container.setExceptionListener(exceptionListener); 173 } 174 175 container.setAcceptMessagesWhileStopping(acceptMessagesWhileStopping); 176 container.setExposeListenerSession(exposeListenerSession); 177 container.setSessionTransacted(transacted); 178 179 if (acknowledgementMode >= 0) { 180 container.setSessionAcknowledgeMode(acknowledgementMode); 181 } 182 else if (acknowledgementModeName != null) { 183 container.setSessionAcknowledgeModeName(acknowledgementModeName); 184 } 185 186 if (container instanceof DefaultMessageListenerContainer) { 187 // this includes DefaultMessageListenerContainer102 188 DefaultMessageListenerContainer listenerContainer = (DefaultMessageListenerContainer) container; 189 if (concurrentConsumers >= 0) { 190 listenerContainer.setConcurrentConsumers(concurrentConsumers); 191 } 192 193 if (cacheLevel >= 0) { 194 listenerContainer.setCacheLevel(cacheLevel); 195 } 196 else if (cacheLevelName != null) { 197 listenerContainer.setCacheLevelName(cacheLevelName); 198 } 199 else { 200 // Default to CACHE_CONSUMER unless specified. This works best with most JMS providers. 201 listenerContainer.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER); 202 } 203 204 if (idleTaskExecutionLimit >= 0) { 205 listenerContainer.setIdleTaskExecutionLimit(idleTaskExecutionLimit); 206 } 207 if (maxConcurrentConsumers >= 0) { 208 listenerContainer.setMaxConcurrentConsumers(maxConcurrentConsumers); 209 } 210 if (maxMessagesPerTask >= 0) { 211 listenerContainer.setMaxMessagesPerTask(maxMessagesPerTask); 212 } 213 listenerContainer.setPubSubNoLocal(pubSubNoLocal); 214 if (receiveTimeout >= 0) { 215 listenerContainer.setReceiveTimeout(receiveTimeout); 216 } 217 if (recoveryInterval >= 0) { 218 listenerContainer.setRecoveryInterval(recoveryInterval); 219 } 220 if (taskExecutor != null) { 221 listenerContainer.setTaskExecutor(taskExecutor); 222 } 223 if (transactionManager != null) { 224 listenerContainer.setTransactionManager(transactionManager); 225 } 226 if (transactionName != null) { 227 listenerContainer.setTransactionName(transactionName); 228 } 229 if (transactionTimeout >= 0) { 230 listenerContainer.setTransactionTimeout(transactionTimeout); 231 } 232 } 233 else if (container instanceof ServerSessionMessageListenerContainer) { 234 // this includes ServerSessionMessageListenerContainer102 235 ServerSessionMessageListenerContainer listenerContainer = (ServerSessionMessageListenerContainer) container; 236 if (maxMessagesPerTask >= 0) { 237 listenerContainer.setMaxMessagesPerTask(maxMessagesPerTask); 238 } 239 if (serverSessionFactory != null) { 240 listenerContainer.setServerSessionFactory(serverSessionFactory); 241 } 242 } 243 else if (container instanceof SimpleMessageListenerContainer) { 244 // this includes SimpleMessageListenerContainer102 245 SimpleMessageListenerContainer listenerContainer = (SimpleMessageListenerContainer) container; 246 if (concurrentConsumers >= 0) { 247 listenerContainer.setConcurrentConsumers(concurrentConsumers); 248 } 249 listenerContainer.setPubSubNoLocal(pubSubNoLocal); 250 if (taskExecutor != null) { 251 listenerContainer.setTaskExecutor(taskExecutor); 252 } 253 } 254 } 255 256 // Properties 257 //------------------------------------------------------------------------- 258 public ConnectionFactory getConnectionFactory() { 259 if (connectionFactory == null) { 260 connectionFactory = createConnectionFactory(); 261 } 262 return connectionFactory; 263 } 264 265 /** 266 * Sets the default connection factory to be used if a connection factory is not specified 267 * for either {@link #setTemplateConnectionFactory(ConnectionFactory)} or 268 * {@link #setListenerConnectionFactory(ConnectionFactory)} 269 * 270 * @param connectionFactory the default connection factory to use 271 */ 272 public void setConnectionFactory(ConnectionFactory connectionFactory) { 273 this.connectionFactory = connectionFactory; 274 } 275 276 public ConnectionFactory getListenerConnectionFactory() { 277 if (listenerConnectionFactory == null) { 278 listenerConnectionFactory = createListenerConnectionFactory(); 279 } 280 return listenerConnectionFactory; 281 } 282 283 /** 284 * Sets the connection factory to be used for consuming messages via the {@link #createMessageListenerContainer()} 285 * 286 * @param listenerConnectionFactory the connection factory to use for consuming messages 287 */ 288 public void setListenerConnectionFactory(ConnectionFactory listenerConnectionFactory) { 289 this.listenerConnectionFactory = listenerConnectionFactory; 290 } 291 292 public ConnectionFactory getTemplateConnectionFactory() { 293 if (templateConnectionFactory == null) { 294 templateConnectionFactory = createTemplateConnectionFactory(); 295 } 296 return templateConnectionFactory; 297 } 298 299 /** 300 * Sets the connection factory to be used for sending messages via the {@link JmsTemplate} via 301 * {@link #createJmsOperations(boolean, String)} 302 * 303 * @param templateConnectionFactory the connection factory for sending messages 304 */ 305 public void setTemplateConnectionFactory(ConnectionFactory templateConnectionFactory) { 306 this.templateConnectionFactory = templateConnectionFactory; 307 } 308 309 public boolean isUseVersion102() { 310 return useVersion102; 311 } 312 313 public void setUseVersion102(boolean useVersion102) { 314 this.useVersion102 = useVersion102; 315 } 316 317 public boolean isAutoStartup() { 318 return autoStartup; 319 } 320 321 public void setAutoStartup(boolean autoStartup) { 322 this.autoStartup = autoStartup; 323 } 324 325 public boolean isAcceptMessagesWhileStopping() { 326 return acceptMessagesWhileStopping; 327 } 328 329 public void setAcceptMessagesWhileStopping(boolean acceptMessagesWhileStopping) { 330 this.acceptMessagesWhileStopping = acceptMessagesWhileStopping; 331 } 332 333 public String getClientId() { 334 return clientId; 335 } 336 337 public void setClientId(String consumerClientId) { 338 this.clientId = consumerClientId; 339 } 340 341 public String getDurableSubscriptionName() { 342 return durableSubscriptionName; 343 } 344 345 public void setDurableSubscriptionName(String durableSubscriptionName) { 346 this.durableSubscriptionName = durableSubscriptionName; 347 } 348 349 public ExceptionListener getExceptionListener() { 350 return exceptionListener; 351 } 352 353 public void setExceptionListener(ExceptionListener exceptionListener) { 354 this.exceptionListener = exceptionListener; 355 } 356 357 public boolean isSubscriptionDurable() { 358 return subscriptionDurable; 359 } 360 361 public void setSubscriptionDurable(boolean subscriptionDurable) { 362 this.subscriptionDurable = subscriptionDurable; 363 } 364 365 public String getAcknowledgementModeName() { 366 return acknowledgementModeName; 367 } 368 369 public void setAcknowledgementModeName(String consumerAcknowledgementMode) { 370 this.acknowledgementModeName = consumerAcknowledgementMode; 371 this.acknowledgementMode = -1; 372 } 373 374 public boolean isExposeListenerSession() { 375 return exposeListenerSession; 376 } 377 378 public void setExposeListenerSession(boolean exposeListenerSession) { 379 this.exposeListenerSession = exposeListenerSession; 380 } 381 382 public TaskExecutor getTaskExecutor() { 383 return taskExecutor; 384 } 385 386 public void setTaskExecutor(TaskExecutor taskExecutor) { 387 this.taskExecutor = taskExecutor; 388 } 389 390 public boolean isPubSubNoLocal() { 391 return pubSubNoLocal; 392 } 393 394 public void setPubSubNoLocal(boolean pubSubNoLocal) { 395 this.pubSubNoLocal = pubSubNoLocal; 396 } 397 398 public int getConcurrentConsumers() { 399 return concurrentConsumers; 400 } 401 402 public void setConcurrentConsumers(int concurrentConsumers) { 403 this.concurrentConsumers = concurrentConsumers; 404 } 405 406 public int getMaxMessagesPerTask() { 407 return maxMessagesPerTask; 408 } 409 410 public void setMaxMessagesPerTask(int maxMessagesPerTask) { 411 this.maxMessagesPerTask = maxMessagesPerTask; 412 } 413 414 public ServerSessionFactory getServerSessionFactory() { 415 return serverSessionFactory; 416 } 417 418 public void setServerSessionFactory(ServerSessionFactory serverSessionFactory) { 419 this.serverSessionFactory = serverSessionFactory; 420 } 421 422 public int getCacheLevel() { 423 return cacheLevel; 424 } 425 426 public void setCacheLevel(int cacheLevel) { 427 this.cacheLevel = cacheLevel; 428 } 429 430 public String getCacheLevelName() { 431 return cacheLevelName; 432 } 433 434 public void setCacheLevelName(String cacheName) { 435 this.cacheLevelName = cacheName; 436 } 437 438 public long getRecoveryInterval() { 439 return recoveryInterval; 440 } 441 442 public void setRecoveryInterval(long recoveryInterval) { 443 this.recoveryInterval = recoveryInterval; 444 } 445 446 public long getReceiveTimeout() { 447 return receiveTimeout; 448 } 449 450 public void setReceiveTimeout(long receiveTimeout) { 451 this.receiveTimeout = receiveTimeout; 452 } 453 454 public PlatformTransactionManager getTransactionManager() { 455 return transactionManager; 456 } 457 458 public void setTransactionManager(PlatformTransactionManager transactionManager) { 459 this.transactionManager = transactionManager; 460 } 461 462 public String getTransactionName() { 463 return transactionName; 464 } 465 466 public void setTransactionName(String transactionName) { 467 this.transactionName = transactionName; 468 } 469 470 public int getTransactionTimeout() { 471 return transactionTimeout; 472 } 473 474 public void setTransactionTimeout(int transactionTimeout) { 475 this.transactionTimeout = transactionTimeout; 476 } 477 478 public int getIdleTaskExecutionLimit() { 479 return idleTaskExecutionLimit; 480 } 481 482 public void setIdleTaskExecutionLimit(int idleTaskExecutionLimit) { 483 this.idleTaskExecutionLimit = idleTaskExecutionLimit; 484 } 485 486 public int getMaxConcurrentConsumers() { 487 return maxConcurrentConsumers; 488 } 489 490 public void setMaxConcurrentConsumers(int maxConcurrentConsumers) { 491 this.maxConcurrentConsumers = maxConcurrentConsumers; 492 } 493 494 public boolean isExplicitQosEnabled() { 495 return explicitQosEnabled; 496 } 497 498 public void setExplicitQosEnabled(boolean explicitQosEnabled) { 499 this.explicitQosEnabled = explicitQosEnabled; 500 } 501 502 public boolean isDeliveryPersistent() { 503 return deliveryPersistent; 504 } 505 506 public void setDeliveryPersistent(boolean deliveryPersistent) { 507 this.deliveryPersistent = deliveryPersistent; 508 } 509 510 public long getTimeToLive() { 511 return timeToLive; 512 } 513 514 public void setTimeToLive(long timeToLive) { 515 this.timeToLive = timeToLive; 516 } 517 518 public MessageConverter getMessageConverter() { 519 return messageConverter; 520 } 521 522 public void setMessageConverter(MessageConverter messageConverter) { 523 this.messageConverter = messageConverter; 524 } 525 526 public boolean isMessageIdEnabled() { 527 return messageIdEnabled; 528 } 529 530 public void setMessageIdEnabled(boolean messageIdEnabled) { 531 this.messageIdEnabled = messageIdEnabled; 532 } 533 534 public boolean isMessageTimestampEnabled() { 535 return messageTimestampEnabled; 536 } 537 538 public void setMessageTimestampEnabled(boolean messageTimestampEnabled) { 539 this.messageTimestampEnabled = messageTimestampEnabled; 540 } 541 542 public int getPriority() { 543 return priority; 544 } 545 546 public void setPriority(int priority) { 547 this.priority = priority; 548 } 549 550 public ConsumerType getConsumerType() { 551 return consumerType; 552 } 553 554 public void setConsumerType(ConsumerType consumerType) { 555 this.consumerType = consumerType; 556 } 557 558 public int getAcknowledgementMode() { 559 return acknowledgementMode; 560 } 561 562 public void setAcknowledgementMode(int consumerAcknowledgementMode) { 563 this.acknowledgementMode = consumerAcknowledgementMode; 564 this.acknowledgementModeName = null; 565 } 566 567 public boolean isTransacted() { 568 return transacted; 569 } 570 571 public void setTransacted(boolean consumerTransacted) { 572 this.transacted = consumerTransacted; 573 } 574 575 // Implementation methods 576 //------------------------------------------------------------------------- 577 protected AbstractMessageListenerContainer chooseMessageListenerContainerImplementation() { 578 // TODO we could allow a spring container to auto-inject these objects? 579 switch (consumerType) { 580 case Simple: 581 return isUseVersion102() ? new SimpleMessageListenerContainer102() : new SimpleMessageListenerContainer(); 582 case ServerSessionPool: 583 return isUseVersion102() ? new ServerSessionMessageListenerContainer102() : new ServerSessionMessageListenerContainer(); 584 case Default: 585 return isUseVersion102() ? new DefaultMessageListenerContainer102() : new DefaultMessageListenerContainer(); 586 default: 587 throw new IllegalArgumentException("Unknown consumer type: " + consumerType); 588 } 589 } 590 591 /** 592 * Factory method which allows derived classes to customize the lazy creation 593 */ 594 protected ConnectionFactory createConnectionFactory() { 595 ObjectHelper.notNull(connectionFactory, "connectionFactory"); 596 return null; 597 } 598 599 /** 600 * Factory method which allows derived classes to customize the lazy creation 601 */ 602 protected ConnectionFactory createListenerConnectionFactory() { 603 return getConnectionFactory(); 604 } 605 606 /** 607 * Factory method which allows derived classes to customize the lazy creation 608 */ 609 protected ConnectionFactory createTemplateConnectionFactory() { 610 return getConnectionFactory(); 611 } 612 }