001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.component.jms; 018 019 import javax.jms.ConnectionFactory; 020 import javax.jms.ExceptionListener; 021 022 import org.apache.camel.RuntimeCamelException; 023 import org.apache.camel.util.ObjectHelper; 024 025 import org.springframework.core.task.TaskExecutor; 026 import org.springframework.jms.core.JmsOperations; 027 import org.springframework.jms.core.JmsTemplate; 028 import org.springframework.jms.core.JmsTemplate102; 029 import org.springframework.jms.listener.AbstractMessageListenerContainer; 030 import org.springframework.jms.listener.DefaultMessageListenerContainer; 031 import org.springframework.jms.listener.DefaultMessageListenerContainer102; 032 import org.springframework.jms.listener.SimpleMessageListenerContainer; 033 import org.springframework.jms.listener.SimpleMessageListenerContainer102; 034 import org.springframework.jms.listener.serversession.ServerSessionFactory; 035 import org.springframework.jms.listener.serversession.ServerSessionMessageListenerContainer; 036 import org.springframework.jms.listener.serversession.ServerSessionMessageListenerContainer102; 037 import org.springframework.jms.support.converter.MessageConverter; 038 import org.springframework.transaction.PlatformTransactionManager; 039 040 /** 041 * @version $Revision: 563665 $ 042 */ 043 public class JmsConfiguration implements Cloneable { 044 protected static final String TRANSACTED = "TRANSACTED"; 045 protected static final String CLIENT_ACKNOWLEDGE = "CLIENT_ACKNOWLEDGE"; 046 protected static final String AUTO_ACKNOWLEDGE = "AUTO_ACKNOWLEDGE"; 047 protected static final String DUPS_OK_ACKNOWLEDGE = "DUPS_OK_ACKNOWLEDGE"; 048 049 private ConnectionFactory connectionFactory; 050 private ConnectionFactory templateConnectionFactory; 051 private ConnectionFactory listenerConnectionFactory; 052 private int acknowledgementMode = -1; 053 private String acknowledgementModeName = AUTO_ACKNOWLEDGE; 054 // Used to configure the spring Container 055 private ExceptionListener exceptionListener; 056 private ConsumerType consumerType = ConsumerType.Default; 057 private boolean autoStartup = true; 058 private boolean acceptMessagesWhileStopping; 059 private String clientId; 060 private String durableSubscriptionName; 061 private boolean subscriptionDurable; 062 private boolean exposeListenerSession = true; 063 private TaskExecutor taskExecutor; 064 private boolean pubSubNoLocal; 065 private int concurrentConsumers = 1; 066 private int maxMessagesPerTask = 1; 067 private ServerSessionFactory serverSessionFactory; 068 private int cacheLevel = -1; 069 private String cacheLevelName = "CACHE_CONSUMER"; 070 private long recoveryInterval = -1; 071 private long receiveTimeout = -1; 072 private int idleTaskExecutionLimit = 1; 073 private int maxConcurrentConsumers = 1; 074 // JmsTemplate only 075 private boolean useVersion102; 076 private boolean explicitQosEnabled; 077 private boolean deliveryPersistent = true; 078 private long timeToLive = -1; 079 private MessageConverter messageConverter; 080 private boolean messageIdEnabled = true; 081 private boolean messageTimestampEnabled = true; 082 private int priority = -1; 083 // Transaction related configuration 084 private boolean transacted; 085 private PlatformTransactionManager transactionManager; 086 private String transactionName; 087 private int transactionTimeout = -1; 088 089 public JmsConfiguration() { 090 } 091 092 public JmsConfiguration(ConnectionFactory connectionFactory) { 093 this.connectionFactory = connectionFactory; 094 } 095 096 /** 097 * Returns a copy of this configuration 098 */ 099 public JmsConfiguration copy() { 100 try { 101 return (JmsConfiguration)clone(); 102 } catch (CloneNotSupportedException e) { 103 throw new RuntimeCamelException(e); 104 } 105 } 106 107 public JmsOperations createJmsOperations(boolean pubSubDomain, String destination) { 108 ConnectionFactory factory = getTemplateConnectionFactory(); 109 JmsTemplate template = useVersion102 ? new JmsTemplate102(factory, pubSubDomain) : new JmsTemplate(factory); 110 template.setPubSubDomain(pubSubDomain); 111 template.setDefaultDestinationName(destination); 112 113 template.setExplicitQosEnabled(explicitQosEnabled); 114 template.setDeliveryPersistent(deliveryPersistent); 115 if (messageConverter != null) { 116 template.setMessageConverter(messageConverter); 117 } 118 template.setMessageIdEnabled(messageIdEnabled); 119 template.setMessageTimestampEnabled(messageTimestampEnabled); 120 if (priority >= 0) { 121 template.setPriority(priority); 122 } 123 template.setPubSubNoLocal(pubSubNoLocal); 124 if (receiveTimeout >= 0) { 125 template.setReceiveTimeout(receiveTimeout); 126 } 127 if (timeToLive >= 0) { 128 template.setTimeToLive(timeToLive); 129 } 130 131 template.setSessionTransacted(transacted); 132 133 // This is here for completeness, but the template should not get used 134 // for receiving messages. 135 if (acknowledgementMode >= 0) { 136 template.setSessionAcknowledgeMode(acknowledgementMode); 137 } else if (acknowledgementModeName != null) { 138 template.setSessionAcknowledgeModeName(acknowledgementModeName); 139 } 140 return template; 141 } 142 143 public AbstractMessageListenerContainer createMessageListenerContainer() { 144 AbstractMessageListenerContainer container = chooseMessageListenerContainerImplementation(); 145 configureMessageListenerContainer(container); 146 return container; 147 } 148 149 protected void configureMessageListenerContainer(AbstractMessageListenerContainer container) { 150 container.setConnectionFactory(getListenerConnectionFactory()); 151 if (autoStartup) { 152 container.setAutoStartup(true); 153 } 154 if (clientId != null) { 155 container.setClientId(clientId); 156 } 157 container.setSubscriptionDurable(subscriptionDurable); 158 if (durableSubscriptionName != null) { 159 container.setDurableSubscriptionName(durableSubscriptionName); 160 } 161 162 // lets default to durable subscription if the subscriber name and 163 // client ID are specified (as there's 164 // no reason to specify them if not! :) 165 if (durableSubscriptionName != null && clientId != null) { 166 container.setSubscriptionDurable(true); 167 } 168 169 if (exceptionListener != null) { 170 container.setExceptionListener(exceptionListener); 171 } 172 173 container.setAcceptMessagesWhileStopping(acceptMessagesWhileStopping); 174 container.setExposeListenerSession(exposeListenerSession); 175 container.setSessionTransacted(transacted); 176 177 if (acknowledgementMode >= 0) { 178 container.setSessionAcknowledgeMode(acknowledgementMode); 179 } else if (acknowledgementModeName != null) { 180 container.setSessionAcknowledgeModeName(acknowledgementModeName); 181 } 182 183 if (container instanceof DefaultMessageListenerContainer) { 184 // this includes DefaultMessageListenerContainer102 185 DefaultMessageListenerContainer listenerContainer = (DefaultMessageListenerContainer)container; 186 if (concurrentConsumers >= 0) { 187 listenerContainer.setConcurrentConsumers(concurrentConsumers); 188 } 189 190 if (cacheLevel >= 0) { 191 listenerContainer.setCacheLevel(cacheLevel); 192 } else if (cacheLevelName != null) { 193 listenerContainer.setCacheLevelName(cacheLevelName); 194 } else { 195 // Default to CACHE_CONSUMER unless specified. This works best 196 // with most JMS providers. 197 listenerContainer.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER); 198 } 199 200 if (idleTaskExecutionLimit >= 0) { 201 listenerContainer.setIdleTaskExecutionLimit(idleTaskExecutionLimit); 202 } 203 if (maxConcurrentConsumers >= 0) { 204 listenerContainer.setMaxConcurrentConsumers(maxConcurrentConsumers); 205 } 206 if (maxMessagesPerTask >= 0) { 207 listenerContainer.setMaxMessagesPerTask(maxMessagesPerTask); 208 } 209 listenerContainer.setPubSubNoLocal(pubSubNoLocal); 210 if (receiveTimeout >= 0) { 211 listenerContainer.setReceiveTimeout(receiveTimeout); 212 } 213 if (recoveryInterval >= 0) { 214 listenerContainer.setRecoveryInterval(recoveryInterval); 215 } 216 if (taskExecutor != null) { 217 listenerContainer.setTaskExecutor(taskExecutor); 218 } 219 if (transactionManager != null) { 220 listenerContainer.setTransactionManager(transactionManager); 221 } 222 if (transactionName != null) { 223 listenerContainer.setTransactionName(transactionName); 224 } 225 if (transactionTimeout >= 0) { 226 listenerContainer.setTransactionTimeout(transactionTimeout); 227 } 228 } else if (container instanceof ServerSessionMessageListenerContainer) { 229 // this includes ServerSessionMessageListenerContainer102 230 ServerSessionMessageListenerContainer listenerContainer = (ServerSessionMessageListenerContainer)container; 231 if (maxMessagesPerTask >= 0) { 232 listenerContainer.setMaxMessagesPerTask(maxMessagesPerTask); 233 } 234 if (serverSessionFactory != null) { 235 listenerContainer.setServerSessionFactory(serverSessionFactory); 236 } 237 } else if (container instanceof SimpleMessageListenerContainer) { 238 // this includes SimpleMessageListenerContainer102 239 SimpleMessageListenerContainer listenerContainer = (SimpleMessageListenerContainer)container; 240 if (concurrentConsumers >= 0) { 241 listenerContainer.setConcurrentConsumers(concurrentConsumers); 242 } 243 listenerContainer.setPubSubNoLocal(pubSubNoLocal); 244 if (taskExecutor != null) { 245 listenerContainer.setTaskExecutor(taskExecutor); 246 } 247 } 248 } 249 250 // Properties 251 // ------------------------------------------------------------------------- 252 public ConnectionFactory getConnectionFactory() { 253 if (connectionFactory == null) { 254 connectionFactory = createConnectionFactory(); 255 } 256 return connectionFactory; 257 } 258 259 /** 260 * Sets the default connection factory to be used if a connection factory is 261 * not specified for either 262 * {@link #setTemplateConnectionFactory(ConnectionFactory)} or 263 * {@link #setListenerConnectionFactory(ConnectionFactory)} 264 * 265 * @param connectionFactory the default connection factory to use 266 */ 267 public void setConnectionFactory(ConnectionFactory connectionFactory) { 268 this.connectionFactory = connectionFactory; 269 } 270 271 public ConnectionFactory getListenerConnectionFactory() { 272 if (listenerConnectionFactory == null) { 273 listenerConnectionFactory = createListenerConnectionFactory(); 274 } 275 return listenerConnectionFactory; 276 } 277 278 /** 279 * Sets the connection factory to be used for consuming messages via the 280 * {@link #createMessageListenerContainer()} 281 * 282 * @param listenerConnectionFactory the connection factory to use for 283 * consuming messages 284 */ 285 public void setListenerConnectionFactory(ConnectionFactory listenerConnectionFactory) { 286 this.listenerConnectionFactory = listenerConnectionFactory; 287 } 288 289 public ConnectionFactory getTemplateConnectionFactory() { 290 if (templateConnectionFactory == null) { 291 templateConnectionFactory = createTemplateConnectionFactory(); 292 } 293 return templateConnectionFactory; 294 } 295 296 /** 297 * Sets the connection factory to be used for sending messages via the 298 * {@link JmsTemplate} via {@link #createJmsOperations(boolean, String)} 299 * 300 * @param templateConnectionFactory the connection factory for sending 301 * messages 302 */ 303 public void setTemplateConnectionFactory(ConnectionFactory templateConnectionFactory) { 304 this.templateConnectionFactory = templateConnectionFactory; 305 } 306 307 public boolean isUseVersion102() { 308 return useVersion102; 309 } 310 311 public void setUseVersion102(boolean useVersion102) { 312 this.useVersion102 = useVersion102; 313 } 314 315 public boolean isAutoStartup() { 316 return autoStartup; 317 } 318 319 public void setAutoStartup(boolean autoStartup) { 320 this.autoStartup = autoStartup; 321 } 322 323 public boolean isAcceptMessagesWhileStopping() { 324 return acceptMessagesWhileStopping; 325 } 326 327 public void setAcceptMessagesWhileStopping(boolean acceptMessagesWhileStopping) { 328 this.acceptMessagesWhileStopping = acceptMessagesWhileStopping; 329 } 330 331 public String getClientId() { 332 return clientId; 333 } 334 335 public void setClientId(String consumerClientId) { 336 this.clientId = consumerClientId; 337 } 338 339 public String getDurableSubscriptionName() { 340 return durableSubscriptionName; 341 } 342 343 public void setDurableSubscriptionName(String durableSubscriptionName) { 344 this.durableSubscriptionName = durableSubscriptionName; 345 } 346 347 public ExceptionListener getExceptionListener() { 348 return exceptionListener; 349 } 350 351 public void setExceptionListener(ExceptionListener exceptionListener) { 352 this.exceptionListener = exceptionListener; 353 } 354 355 public boolean isSubscriptionDurable() { 356 return subscriptionDurable; 357 } 358 359 public void setSubscriptionDurable(boolean subscriptionDurable) { 360 this.subscriptionDurable = subscriptionDurable; 361 } 362 363 public String getAcknowledgementModeName() { 364 return acknowledgementModeName; 365 } 366 367 public void setAcknowledgementModeName(String consumerAcknowledgementMode) { 368 this.acknowledgementModeName = consumerAcknowledgementMode; 369 this.acknowledgementMode = -1; 370 } 371 372 public boolean isExposeListenerSession() { 373 return exposeListenerSession; 374 } 375 376 public void setExposeListenerSession(boolean exposeListenerSession) { 377 this.exposeListenerSession = exposeListenerSession; 378 } 379 380 public TaskExecutor getTaskExecutor() { 381 return taskExecutor; 382 } 383 384 public void setTaskExecutor(TaskExecutor taskExecutor) { 385 this.taskExecutor = taskExecutor; 386 } 387 388 public boolean isPubSubNoLocal() { 389 return pubSubNoLocal; 390 } 391 392 public void setPubSubNoLocal(boolean pubSubNoLocal) { 393 this.pubSubNoLocal = pubSubNoLocal; 394 } 395 396 public int getConcurrentConsumers() { 397 return concurrentConsumers; 398 } 399 400 public void setConcurrentConsumers(int concurrentConsumers) { 401 this.concurrentConsumers = concurrentConsumers; 402 } 403 404 public int getMaxMessagesPerTask() { 405 return maxMessagesPerTask; 406 } 407 408 public void setMaxMessagesPerTask(int maxMessagesPerTask) { 409 this.maxMessagesPerTask = maxMessagesPerTask; 410 } 411 412 public ServerSessionFactory getServerSessionFactory() { 413 return serverSessionFactory; 414 } 415 416 public void setServerSessionFactory(ServerSessionFactory serverSessionFactory) { 417 this.serverSessionFactory = serverSessionFactory; 418 } 419 420 public int getCacheLevel() { 421 return cacheLevel; 422 } 423 424 public void setCacheLevel(int cacheLevel) { 425 this.cacheLevel = cacheLevel; 426 } 427 428 public String getCacheLevelName() { 429 return cacheLevelName; 430 } 431 432 public void setCacheLevelName(String cacheName) { 433 this.cacheLevelName = cacheName; 434 } 435 436 public long getRecoveryInterval() { 437 return recoveryInterval; 438 } 439 440 public void setRecoveryInterval(long recoveryInterval) { 441 this.recoveryInterval = recoveryInterval; 442 } 443 444 public long getReceiveTimeout() { 445 return receiveTimeout; 446 } 447 448 public void setReceiveTimeout(long receiveTimeout) { 449 this.receiveTimeout = receiveTimeout; 450 } 451 452 public PlatformTransactionManager getTransactionManager() { 453 return transactionManager; 454 } 455 456 public void setTransactionManager(PlatformTransactionManager transactionManager) { 457 this.transactionManager = transactionManager; 458 } 459 460 public String getTransactionName() { 461 return transactionName; 462 } 463 464 public void setTransactionName(String transactionName) { 465 this.transactionName = transactionName; 466 } 467 468 public int getTransactionTimeout() { 469 return transactionTimeout; 470 } 471 472 public void setTransactionTimeout(int transactionTimeout) { 473 this.transactionTimeout = transactionTimeout; 474 } 475 476 public int getIdleTaskExecutionLimit() { 477 return idleTaskExecutionLimit; 478 } 479 480 public void setIdleTaskExecutionLimit(int idleTaskExecutionLimit) { 481 this.idleTaskExecutionLimit = idleTaskExecutionLimit; 482 } 483 484 public int getMaxConcurrentConsumers() { 485 return maxConcurrentConsumers; 486 } 487 488 public void setMaxConcurrentConsumers(int maxConcurrentConsumers) { 489 this.maxConcurrentConsumers = maxConcurrentConsumers; 490 } 491 492 public boolean isExplicitQosEnabled() { 493 return explicitQosEnabled; 494 } 495 496 public void setExplicitQosEnabled(boolean explicitQosEnabled) { 497 this.explicitQosEnabled = explicitQosEnabled; 498 } 499 500 public boolean isDeliveryPersistent() { 501 return deliveryPersistent; 502 } 503 504 public void setDeliveryPersistent(boolean deliveryPersistent) { 505 this.deliveryPersistent = deliveryPersistent; 506 } 507 508 public long getTimeToLive() { 509 return timeToLive; 510 } 511 512 public void setTimeToLive(long timeToLive) { 513 this.timeToLive = timeToLive; 514 } 515 516 public MessageConverter getMessageConverter() { 517 return messageConverter; 518 } 519 520 public void setMessageConverter(MessageConverter messageConverter) { 521 this.messageConverter = messageConverter; 522 } 523 524 public boolean isMessageIdEnabled() { 525 return messageIdEnabled; 526 } 527 528 public void setMessageIdEnabled(boolean messageIdEnabled) { 529 this.messageIdEnabled = messageIdEnabled; 530 } 531 532 public boolean isMessageTimestampEnabled() { 533 return messageTimestampEnabled; 534 } 535 536 public void setMessageTimestampEnabled(boolean messageTimestampEnabled) { 537 this.messageTimestampEnabled = messageTimestampEnabled; 538 } 539 540 public int getPriority() { 541 return priority; 542 } 543 544 public void setPriority(int priority) { 545 this.priority = priority; 546 } 547 548 public ConsumerType getConsumerType() { 549 return consumerType; 550 } 551 552 public void setConsumerType(ConsumerType consumerType) { 553 this.consumerType = consumerType; 554 } 555 556 public int getAcknowledgementMode() { 557 return acknowledgementMode; 558 } 559 560 public void setAcknowledgementMode(int consumerAcknowledgementMode) { 561 this.acknowledgementMode = consumerAcknowledgementMode; 562 this.acknowledgementModeName = null; 563 } 564 565 public boolean isTransacted() { 566 return transacted; 567 } 568 569 public void setTransacted(boolean consumerTransacted) { 570 this.transacted = consumerTransacted; 571 } 572 573 // Implementation methods 574 // ------------------------------------------------------------------------- 575 protected AbstractMessageListenerContainer chooseMessageListenerContainerImplementation() { 576 // TODO we could allow a spring container to auto-inject these objects? 577 switch (consumerType) { 578 case Simple: 579 return isUseVersion102() ? new SimpleMessageListenerContainer102() : new SimpleMessageListenerContainer(); 580 case ServerSessionPool: 581 return isUseVersion102() ? new ServerSessionMessageListenerContainer102() : new ServerSessionMessageListenerContainer(); 582 case Default: 583 return isUseVersion102() ? new DefaultMessageListenerContainer102() : new DefaultMessageListenerContainer(); 584 default: 585 throw new IllegalArgumentException("Unknown consumer type: " + consumerType); 586 } 587 } 588 589 /** 590 * Factory method which allows derived classes to customize the lazy 591 * creation 592 */ 593 protected ConnectionFactory createConnectionFactory() { 594 ObjectHelper.notNull(connectionFactory, "connectionFactory"); 595 return null; 596 } 597 598 /** 599 * Factory method which allows derived classes to customize the lazy 600 * creation 601 */ 602 protected ConnectionFactory createListenerConnectionFactory() { 603 return getConnectionFactory(); 604 } 605 606 /** 607 * Factory method which allows derived classes to customize the lazy 608 * creation 609 */ 610 protected ConnectionFactory createTemplateConnectionFactory() { 611 return getConnectionFactory(); 612 } 613 }