001    /**
002     *
003     * Licensed to the Apache Software Foundation (ASF) under one or more
004     * contributor license agreements.  See the NOTICE file distributed with
005     * this work for additional information regarding copyright ownership.
006     * The ASF licenses this file to You under the Apache License, Version 2.0
007     * (the "License"); you may not use this file except in compliance with
008     * the License.  You may obtain a copy of the License at
009     *
010     * http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.camel.component.jms;
019    
020    import org.apache.camel.RuntimeCamelException;
021    import org.apache.camel.util.ObjectHelper;
022    import org.springframework.core.task.TaskExecutor;
023    import org.springframework.jms.core.JmsOperations;
024    import org.springframework.jms.core.JmsTemplate;
025    import org.springframework.jms.core.JmsTemplate102;
026    import org.springframework.jms.listener.AbstractMessageListenerContainer;
027    import org.springframework.jms.listener.DefaultMessageListenerContainer;
028    import org.springframework.jms.listener.DefaultMessageListenerContainer102;
029    import org.springframework.jms.listener.SimpleMessageListenerContainer;
030    import org.springframework.jms.listener.SimpleMessageListenerContainer102;
031    import org.springframework.jms.listener.serversession.ServerSessionFactory;
032    import org.springframework.jms.listener.serversession.ServerSessionMessageListenerContainer;
033    import org.springframework.jms.listener.serversession.ServerSessionMessageListenerContainer102;
034    import org.springframework.jms.support.converter.MessageConverter;
035    import org.springframework.transaction.PlatformTransactionManager;
036    
037    import javax.jms.ConnectionFactory;
038    import javax.jms.ExceptionListener;
039    
040    /**
041     * @version $Revision: 545124 $
042     */
043    public class JmsConfiguration implements Cloneable {
044        protected static final String TRANSACTED = "TRANSACTED";
045        protected static final String CLIENT_ACKNOWLEDGE = "CLIENT_ACKNOWLEDGE";
046        protected static final String AUTO_ACKNOWLEDGE = "AUTO_ACKNOWLEDGE";
047        protected static final String DUPS_OK_ACKNOWLEDGE = "DUPS_OK_ACKNOWLEDGE";
048    
049        private ConnectionFactory connectionFactory;
050        private ConnectionFactory templateConnectionFactory;
051        private ConnectionFactory listenerConnectionFactory;
052        private int acknowledgementMode = -1;
053        private String acknowledgementModeName = AUTO_ACKNOWLEDGE;
054        // Used to configure the spring Container
055        private ExceptionListener exceptionListener;
056        private ConsumerType consumerType = ConsumerType.Default;
057        private boolean autoStartup = true;
058        private boolean acceptMessagesWhileStopping;
059        private String clientId;
060        private String durableSubscriptionName;
061        private boolean subscriptionDurable;
062        private boolean exposeListenerSession = true;
063        private TaskExecutor taskExecutor;
064        private boolean pubSubNoLocal;
065        private int concurrentConsumers = 1;
066        private int maxMessagesPerTask = 1;
067        private ServerSessionFactory serverSessionFactory;
068        private int cacheLevel = -1;
069        private String cacheLevelName = "CACHE_CONSUMER";
070        private long recoveryInterval = -1;
071        private long receiveTimeout = -1;
072        private int idleTaskExecutionLimit = 1;
073        private int maxConcurrentConsumers = 1;
074        // JmsTemplate only
075        private boolean useVersion102 = false;
076        private boolean explicitQosEnabled = false;
077        private boolean deliveryPersistent = true;
078        private long timeToLive = -1;
079        private MessageConverter messageConverter;
080        private boolean messageIdEnabled = true;
081        private boolean messageTimestampEnabled = true;
082        private int priority = -1;
083        // Transaction related configuration
084        private boolean transacted;
085        private PlatformTransactionManager transactionManager;
086        private String transactionName;
087        private int transactionTimeout = -1;
088    
089        public JmsConfiguration() {
090        }
091    
092        public JmsConfiguration(ConnectionFactory connectionFactory) {
093            this.connectionFactory = connectionFactory;
094        }
095    
096        /**
097         * Returns a copy of this configuration
098         */
099        public JmsConfiguration copy() {
100            try {
101                return (JmsConfiguration) clone();
102            }
103            catch (CloneNotSupportedException e) {
104                throw new RuntimeCamelException(e);
105            }
106        }
107    
108        public JmsOperations createJmsOperations(boolean pubSubDomain, String destination) {
109            ConnectionFactory factory = getTemplateConnectionFactory();
110            JmsTemplate template = useVersion102
111                    ? new JmsTemplate102(factory, pubSubDomain)
112                    : new JmsTemplate(factory);
113            template.setPubSubDomain(pubSubDomain);
114            template.setDefaultDestinationName(destination);
115    
116            template.setExplicitQosEnabled(explicitQosEnabled);
117            template.setDeliveryPersistent(deliveryPersistent);
118            if (messageConverter != null) {
119                template.setMessageConverter(messageConverter);
120            }
121            template.setMessageIdEnabled(messageIdEnabled);
122            template.setMessageTimestampEnabled(messageTimestampEnabled);
123            if (priority >= 0) {
124                template.setPriority(priority);
125            }
126            template.setPubSubNoLocal(pubSubNoLocal);
127            if (receiveTimeout >= 0) {
128                template.setReceiveTimeout(receiveTimeout);
129            }
130            if (timeToLive >= 0) {
131                template.setTimeToLive(timeToLive);
132            }
133    
134            template.setSessionTransacted(transacted);
135    
136            // This is here for completeness, but the template should not get used for receiving messages.
137            if (acknowledgementMode >= 0) {
138                template.setSessionAcknowledgeMode(acknowledgementMode);
139            }
140            else if (acknowledgementModeName != null) {
141                template.setSessionAcknowledgeModeName(acknowledgementModeName);
142            }
143            return template;
144        }
145    
146        public AbstractMessageListenerContainer createMessageListenerContainer() {
147            AbstractMessageListenerContainer container = chooseMessageListenerContainerImplementation();
148            configureMessageListenerContainer(container);
149            return container;
150        }
151    
152        protected void configureMessageListenerContainer(AbstractMessageListenerContainer container) {
153            container.setConnectionFactory(getListenerConnectionFactory());
154            if (autoStartup) {
155                container.setAutoStartup(true);
156            }
157            if (clientId != null) {
158                container.setClientId(clientId);
159            }
160            container.setSubscriptionDurable(subscriptionDurable);
161            if (durableSubscriptionName != null) {
162                container.setDurableSubscriptionName(durableSubscriptionName);
163            }
164    
165            // lets default to durable subscription if the subscriber name and client ID are specified (as there's
166            // no reason to specify them if not! :)
167            if (durableSubscriptionName != null && clientId != null) {
168                container.setSubscriptionDurable(true);
169            }
170    
171            if (exceptionListener != null) {
172                container.setExceptionListener(exceptionListener);
173            }
174    
175            container.setAcceptMessagesWhileStopping(acceptMessagesWhileStopping);
176            container.setExposeListenerSession(exposeListenerSession);
177            container.setSessionTransacted(transacted);
178    
179            if (acknowledgementMode >= 0) {
180                container.setSessionAcknowledgeMode(acknowledgementMode);
181            }
182            else if (acknowledgementModeName != null) {
183                container.setSessionAcknowledgeModeName(acknowledgementModeName);
184            }
185    
186            if (container instanceof DefaultMessageListenerContainer) {
187                // this includes DefaultMessageListenerContainer102
188                DefaultMessageListenerContainer listenerContainer = (DefaultMessageListenerContainer) container;
189                if (concurrentConsumers >= 0) {
190                    listenerContainer.setConcurrentConsumers(concurrentConsumers);
191                }
192    
193                if (cacheLevel >= 0) {
194                    listenerContainer.setCacheLevel(cacheLevel);
195                }
196                else if (cacheLevelName != null) {
197                    listenerContainer.setCacheLevelName(cacheLevelName);
198                }
199                else {
200                    // Default to CACHE_CONSUMER unless specified.  This works best with most JMS providers.
201                    listenerContainer.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
202                }
203    
204                if (idleTaskExecutionLimit >= 0) {
205                    listenerContainer.setIdleTaskExecutionLimit(idleTaskExecutionLimit);
206                }
207                if (maxConcurrentConsumers >= 0) {
208                    listenerContainer.setMaxConcurrentConsumers(maxConcurrentConsumers);
209                }
210                if (maxMessagesPerTask >= 0) {
211                    listenerContainer.setMaxMessagesPerTask(maxMessagesPerTask);
212                }
213                listenerContainer.setPubSubNoLocal(pubSubNoLocal);
214                if (receiveTimeout >= 0) {
215                    listenerContainer.setReceiveTimeout(receiveTimeout);
216                }
217                if (recoveryInterval >= 0) {
218                    listenerContainer.setRecoveryInterval(recoveryInterval);
219                }
220                if (taskExecutor != null) {
221                    listenerContainer.setTaskExecutor(taskExecutor);
222                }
223                if (transactionManager != null) {
224                    listenerContainer.setTransactionManager(transactionManager);
225                }
226                if (transactionName != null) {
227                    listenerContainer.setTransactionName(transactionName);
228                }
229                if (transactionTimeout >= 0) {
230                    listenerContainer.setTransactionTimeout(transactionTimeout);
231                }
232            }
233            else if (container instanceof ServerSessionMessageListenerContainer) {
234                // this includes ServerSessionMessageListenerContainer102
235                ServerSessionMessageListenerContainer listenerContainer = (ServerSessionMessageListenerContainer) container;
236                if (maxMessagesPerTask >= 0) {
237                    listenerContainer.setMaxMessagesPerTask(maxMessagesPerTask);
238                }
239                if (serverSessionFactory != null) {
240                    listenerContainer.setServerSessionFactory(serverSessionFactory);
241                }
242            }
243            else if (container instanceof SimpleMessageListenerContainer) {
244                // this includes SimpleMessageListenerContainer102
245                SimpleMessageListenerContainer listenerContainer = (SimpleMessageListenerContainer) container;
246                if (concurrentConsumers >= 0) {
247                    listenerContainer.setConcurrentConsumers(concurrentConsumers);
248                }
249                listenerContainer.setPubSubNoLocal(pubSubNoLocal);
250                if (taskExecutor != null) {
251                    listenerContainer.setTaskExecutor(taskExecutor);
252                }
253            }
254        }
255    
256        // Properties
257        //-------------------------------------------------------------------------
258        public ConnectionFactory getConnectionFactory() {
259            if (connectionFactory == null) {
260                connectionFactory = createConnectionFactory();
261            }
262            return connectionFactory;
263        }
264    
265        /**
266         * Sets the default connection factory to be used if a connection factory is not specified
267         * for either {@link #setTemplateConnectionFactory(ConnectionFactory)} or
268         * {@link #setListenerConnectionFactory(ConnectionFactory)}
269         *
270         * @param connectionFactory the default connection factory to use
271         */
272        public void setConnectionFactory(ConnectionFactory connectionFactory) {
273            this.connectionFactory = connectionFactory;
274        }
275    
276        public ConnectionFactory getListenerConnectionFactory() {
277            if (listenerConnectionFactory == null) {
278                listenerConnectionFactory = createListenerConnectionFactory();
279            }
280            return listenerConnectionFactory;
281        }
282    
283        /**
284         * Sets the connection factory to be used for consuming messages via the {@link #createMessageListenerContainer()}
285         *
286         * @param listenerConnectionFactory the connection factory to use for consuming messages
287         */
288        public void setListenerConnectionFactory(ConnectionFactory listenerConnectionFactory) {
289            this.listenerConnectionFactory = listenerConnectionFactory;
290        }
291    
292        public ConnectionFactory getTemplateConnectionFactory() {
293            if (templateConnectionFactory == null) {
294                templateConnectionFactory = createTemplateConnectionFactory();
295            }
296            return templateConnectionFactory;
297        }
298    
299        /**
300         * Sets the connection factory to be used for sending messages via the {@link JmsTemplate} via
301         * {@link #createJmsOperations(boolean, String)}
302         *
303         * @param templateConnectionFactory the connection factory for sending messages
304         */
305        public void setTemplateConnectionFactory(ConnectionFactory templateConnectionFactory) {
306            this.templateConnectionFactory = templateConnectionFactory;
307        }
308    
309        public boolean isUseVersion102() {
310            return useVersion102;
311        }
312    
313        public void setUseVersion102(boolean useVersion102) {
314            this.useVersion102 = useVersion102;
315        }
316    
317        public boolean isAutoStartup() {
318            return autoStartup;
319        }
320    
321        public void setAutoStartup(boolean autoStartup) {
322            this.autoStartup = autoStartup;
323        }
324    
325        public boolean isAcceptMessagesWhileStopping() {
326            return acceptMessagesWhileStopping;
327        }
328    
329        public void setAcceptMessagesWhileStopping(boolean acceptMessagesWhileStopping) {
330            this.acceptMessagesWhileStopping = acceptMessagesWhileStopping;
331        }
332    
333        public String getClientId() {
334            return clientId;
335        }
336    
337        public void setClientId(String consumerClientId) {
338            this.clientId = consumerClientId;
339        }
340    
341        public String getDurableSubscriptionName() {
342            return durableSubscriptionName;
343        }
344    
345        public void setDurableSubscriptionName(String durableSubscriptionName) {
346            this.durableSubscriptionName = durableSubscriptionName;
347        }
348    
349        public ExceptionListener getExceptionListener() {
350            return exceptionListener;
351        }
352    
353        public void setExceptionListener(ExceptionListener exceptionListener) {
354            this.exceptionListener = exceptionListener;
355        }
356    
357        public boolean isSubscriptionDurable() {
358            return subscriptionDurable;
359        }
360    
361        public void setSubscriptionDurable(boolean subscriptionDurable) {
362            this.subscriptionDurable = subscriptionDurable;
363        }
364    
365        public String getAcknowledgementModeName() {
366            return acknowledgementModeName;
367        }
368    
369        public void setAcknowledgementModeName(String consumerAcknowledgementMode) {
370            this.acknowledgementModeName = consumerAcknowledgementMode;
371            this.acknowledgementMode = -1;
372        }
373    
374        public boolean isExposeListenerSession() {
375            return exposeListenerSession;
376        }
377    
378        public void setExposeListenerSession(boolean exposeListenerSession) {
379            this.exposeListenerSession = exposeListenerSession;
380        }
381    
382        public TaskExecutor getTaskExecutor() {
383            return taskExecutor;
384        }
385    
386        public void setTaskExecutor(TaskExecutor taskExecutor) {
387            this.taskExecutor = taskExecutor;
388        }
389    
390        public boolean isPubSubNoLocal() {
391            return pubSubNoLocal;
392        }
393    
394        public void setPubSubNoLocal(boolean pubSubNoLocal) {
395            this.pubSubNoLocal = pubSubNoLocal;
396        }
397    
398        public int getConcurrentConsumers() {
399            return concurrentConsumers;
400        }
401    
402        public void setConcurrentConsumers(int concurrentConsumers) {
403            this.concurrentConsumers = concurrentConsumers;
404        }
405    
406        public int getMaxMessagesPerTask() {
407            return maxMessagesPerTask;
408        }
409    
410        public void setMaxMessagesPerTask(int maxMessagesPerTask) {
411            this.maxMessagesPerTask = maxMessagesPerTask;
412        }
413    
414        public ServerSessionFactory getServerSessionFactory() {
415            return serverSessionFactory;
416        }
417    
418        public void setServerSessionFactory(ServerSessionFactory serverSessionFactory) {
419            this.serverSessionFactory = serverSessionFactory;
420        }
421    
422        public int getCacheLevel() {
423            return cacheLevel;
424        }
425    
426        public void setCacheLevel(int cacheLevel) {
427            this.cacheLevel = cacheLevel;
428        }
429    
430        public String getCacheLevelName() {
431            return cacheLevelName;
432        }
433    
434        public void setCacheLevelName(String cacheName) {
435            this.cacheLevelName = cacheName;
436        }
437    
438        public long getRecoveryInterval() {
439            return recoveryInterval;
440        }
441    
442        public void setRecoveryInterval(long recoveryInterval) {
443            this.recoveryInterval = recoveryInterval;
444        }
445    
446        public long getReceiveTimeout() {
447            return receiveTimeout;
448        }
449    
450        public void setReceiveTimeout(long receiveTimeout) {
451            this.receiveTimeout = receiveTimeout;
452        }
453    
454        public PlatformTransactionManager getTransactionManager() {
455            return transactionManager;
456        }
457    
458        public void setTransactionManager(PlatformTransactionManager transactionManager) {
459            this.transactionManager = transactionManager;
460        }
461    
462        public String getTransactionName() {
463            return transactionName;
464        }
465    
466        public void setTransactionName(String transactionName) {
467            this.transactionName = transactionName;
468        }
469    
470        public int getTransactionTimeout() {
471            return transactionTimeout;
472        }
473    
474        public void setTransactionTimeout(int transactionTimeout) {
475            this.transactionTimeout = transactionTimeout;
476        }
477    
478        public int getIdleTaskExecutionLimit() {
479            return idleTaskExecutionLimit;
480        }
481    
482        public void setIdleTaskExecutionLimit(int idleTaskExecutionLimit) {
483            this.idleTaskExecutionLimit = idleTaskExecutionLimit;
484        }
485    
486        public int getMaxConcurrentConsumers() {
487            return maxConcurrentConsumers;
488        }
489    
490        public void setMaxConcurrentConsumers(int maxConcurrentConsumers) {
491            this.maxConcurrentConsumers = maxConcurrentConsumers;
492        }
493    
494        public boolean isExplicitQosEnabled() {
495            return explicitQosEnabled;
496        }
497    
498        public void setExplicitQosEnabled(boolean explicitQosEnabled) {
499            this.explicitQosEnabled = explicitQosEnabled;
500        }
501    
502        public boolean isDeliveryPersistent() {
503            return deliveryPersistent;
504        }
505    
506        public void setDeliveryPersistent(boolean deliveryPersistent) {
507            this.deliveryPersistent = deliveryPersistent;
508        }
509    
510        public long getTimeToLive() {
511            return timeToLive;
512        }
513    
514        public void setTimeToLive(long timeToLive) {
515            this.timeToLive = timeToLive;
516        }
517    
518        public MessageConverter getMessageConverter() {
519            return messageConverter;
520        }
521    
522        public void setMessageConverter(MessageConverter messageConverter) {
523            this.messageConverter = messageConverter;
524        }
525    
526        public boolean isMessageIdEnabled() {
527            return messageIdEnabled;
528        }
529    
530        public void setMessageIdEnabled(boolean messageIdEnabled) {
531            this.messageIdEnabled = messageIdEnabled;
532        }
533    
534        public boolean isMessageTimestampEnabled() {
535            return messageTimestampEnabled;
536        }
537    
538        public void setMessageTimestampEnabled(boolean messageTimestampEnabled) {
539            this.messageTimestampEnabled = messageTimestampEnabled;
540        }
541    
542        public int getPriority() {
543            return priority;
544        }
545    
546        public void setPriority(int priority) {
547            this.priority = priority;
548        }
549    
550        public ConsumerType getConsumerType() {
551            return consumerType;
552        }
553    
554        public void setConsumerType(ConsumerType consumerType) {
555            this.consumerType = consumerType;
556        }
557    
558        public int getAcknowledgementMode() {
559            return acknowledgementMode;
560        }
561    
562        public void setAcknowledgementMode(int consumerAcknowledgementMode) {
563            this.acknowledgementMode = consumerAcknowledgementMode;
564            this.acknowledgementModeName = null;
565        }
566    
567        public boolean isTransacted() {
568            return transacted;
569        }
570    
571        public void setTransacted(boolean consumerTransacted) {
572            this.transacted = consumerTransacted;
573        }
574    
575        // Implementation methods
576        //-------------------------------------------------------------------------
577        protected AbstractMessageListenerContainer chooseMessageListenerContainerImplementation() {
578            // TODO we could allow a spring container to auto-inject these objects?
579            switch (consumerType) {
580                case Simple:
581                    return isUseVersion102() ? new SimpleMessageListenerContainer102() : new SimpleMessageListenerContainer();
582                case ServerSessionPool:
583                    return isUseVersion102() ? new ServerSessionMessageListenerContainer102() : new ServerSessionMessageListenerContainer();
584                case Default:
585                    return isUseVersion102() ? new DefaultMessageListenerContainer102() : new DefaultMessageListenerContainer();
586                default:
587                    throw new IllegalArgumentException("Unknown consumer type: " + consumerType);
588            }
589        }
590    
591        /**
592         * Factory method which allows derived classes to customize the lazy creation
593         */
594        protected ConnectionFactory createConnectionFactory() {
595            ObjectHelper.notNull(connectionFactory, "connectionFactory");
596            return null;
597        }
598    
599        /**
600         * Factory method which allows derived classes to customize the lazy creation
601         */
602        protected ConnectionFactory createListenerConnectionFactory() {
603            return getConnectionFactory();
604        }
605    
606        /**
607         * Factory method which allows derived classes to customize the lazy creation
608         */
609        protected ConnectionFactory createTemplateConnectionFactory() {
610            return getConnectionFactory();
611        }
612    }