001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.jms;
018    
019    import javax.jms.ConnectionFactory;
020    import javax.jms.ExceptionListener;
021    
022    import org.apache.camel.RuntimeCamelException;
023    import org.apache.camel.util.ObjectHelper;
024    
025    import org.springframework.core.task.TaskExecutor;
026    import org.springframework.jms.core.JmsOperations;
027    import org.springframework.jms.core.JmsTemplate;
028    import org.springframework.jms.core.JmsTemplate102;
029    import org.springframework.jms.listener.AbstractMessageListenerContainer;
030    import org.springframework.jms.listener.DefaultMessageListenerContainer;
031    import org.springframework.jms.listener.DefaultMessageListenerContainer102;
032    import org.springframework.jms.listener.SimpleMessageListenerContainer;
033    import org.springframework.jms.listener.SimpleMessageListenerContainer102;
034    import org.springframework.jms.listener.serversession.ServerSessionFactory;
035    import org.springframework.jms.listener.serversession.ServerSessionMessageListenerContainer;
036    import org.springframework.jms.listener.serversession.ServerSessionMessageListenerContainer102;
037    import org.springframework.jms.support.converter.MessageConverter;
038    import org.springframework.transaction.PlatformTransactionManager;
039    
040    /**
041     * @version $Revision: 563665 $
042     */
043    public class JmsConfiguration implements Cloneable {
044        protected static final String TRANSACTED = "TRANSACTED";
045        protected static final String CLIENT_ACKNOWLEDGE = "CLIENT_ACKNOWLEDGE";
046        protected static final String AUTO_ACKNOWLEDGE = "AUTO_ACKNOWLEDGE";
047        protected static final String DUPS_OK_ACKNOWLEDGE = "DUPS_OK_ACKNOWLEDGE";
048    
049        private ConnectionFactory connectionFactory;
050        private ConnectionFactory templateConnectionFactory;
051        private ConnectionFactory listenerConnectionFactory;
052        private int acknowledgementMode = -1;
053        private String acknowledgementModeName = AUTO_ACKNOWLEDGE;
054        // Used to configure the spring Container
055        private ExceptionListener exceptionListener;
056        private ConsumerType consumerType = ConsumerType.Default;
057        private boolean autoStartup = true;
058        private boolean acceptMessagesWhileStopping;
059        private String clientId;
060        private String durableSubscriptionName;
061        private boolean subscriptionDurable;
062        private boolean exposeListenerSession = true;
063        private TaskExecutor taskExecutor;
064        private boolean pubSubNoLocal;
065        private int concurrentConsumers = 1;
066        private int maxMessagesPerTask = 1;
067        private ServerSessionFactory serverSessionFactory;
068        private int cacheLevel = -1;
069        private String cacheLevelName = "CACHE_CONSUMER";
070        private long recoveryInterval = -1;
071        private long receiveTimeout = -1;
072        private int idleTaskExecutionLimit = 1;
073        private int maxConcurrentConsumers = 1;
074        // JmsTemplate only
075        private boolean useVersion102;
076        private boolean explicitQosEnabled;
077        private boolean deliveryPersistent = true;
078        private long timeToLive = -1;
079        private MessageConverter messageConverter;
080        private boolean messageIdEnabled = true;
081        private boolean messageTimestampEnabled = true;
082        private int priority = -1;
083        // Transaction related configuration
084        private boolean transacted;
085        private PlatformTransactionManager transactionManager;
086        private String transactionName;
087        private int transactionTimeout = -1;
088    
089        public JmsConfiguration() {
090        }
091    
092        public JmsConfiguration(ConnectionFactory connectionFactory) {
093            this.connectionFactory = connectionFactory;
094        }
095    
096        /**
097         * Returns a copy of this configuration
098         */
099        public JmsConfiguration copy() {
100            try {
101                return (JmsConfiguration)clone();
102            } catch (CloneNotSupportedException e) {
103                throw new RuntimeCamelException(e);
104            }
105        }
106    
107        public JmsOperations createJmsOperations(boolean pubSubDomain, String destination) {
108            ConnectionFactory factory = getTemplateConnectionFactory();
109            JmsTemplate template = useVersion102 ? new JmsTemplate102(factory, pubSubDomain) : new JmsTemplate(factory);
110            template.setPubSubDomain(pubSubDomain);
111            template.setDefaultDestinationName(destination);
112    
113            template.setExplicitQosEnabled(explicitQosEnabled);
114            template.setDeliveryPersistent(deliveryPersistent);
115            if (messageConverter != null) {
116                template.setMessageConverter(messageConverter);
117            }
118            template.setMessageIdEnabled(messageIdEnabled);
119            template.setMessageTimestampEnabled(messageTimestampEnabled);
120            if (priority >= 0) {
121                template.setPriority(priority);
122            }
123            template.setPubSubNoLocal(pubSubNoLocal);
124            if (receiveTimeout >= 0) {
125                template.setReceiveTimeout(receiveTimeout);
126            }
127            if (timeToLive >= 0) {
128                template.setTimeToLive(timeToLive);
129            }
130    
131            template.setSessionTransacted(transacted);
132    
133            // This is here for completeness, but the template should not get used
134            // for receiving messages.
135            if (acknowledgementMode >= 0) {
136                template.setSessionAcknowledgeMode(acknowledgementMode);
137            } else if (acknowledgementModeName != null) {
138                template.setSessionAcknowledgeModeName(acknowledgementModeName);
139            }
140            return template;
141        }
142    
143        public AbstractMessageListenerContainer createMessageListenerContainer() {
144            AbstractMessageListenerContainer container = chooseMessageListenerContainerImplementation();
145            configureMessageListenerContainer(container);
146            return container;
147        }
148    
149        protected void configureMessageListenerContainer(AbstractMessageListenerContainer container) {
150            container.setConnectionFactory(getListenerConnectionFactory());
151            if (autoStartup) {
152                container.setAutoStartup(true);
153            }
154            if (clientId != null) {
155                container.setClientId(clientId);
156            }
157            container.setSubscriptionDurable(subscriptionDurable);
158            if (durableSubscriptionName != null) {
159                container.setDurableSubscriptionName(durableSubscriptionName);
160            }
161    
162            // lets default to durable subscription if the subscriber name and
163            // client ID are specified (as there's
164            // no reason to specify them if not! :)
165            if (durableSubscriptionName != null && clientId != null) {
166                container.setSubscriptionDurable(true);
167            }
168    
169            if (exceptionListener != null) {
170                container.setExceptionListener(exceptionListener);
171            }
172    
173            container.setAcceptMessagesWhileStopping(acceptMessagesWhileStopping);
174            container.setExposeListenerSession(exposeListenerSession);
175            container.setSessionTransacted(transacted);
176    
177            if (acknowledgementMode >= 0) {
178                container.setSessionAcknowledgeMode(acknowledgementMode);
179            } else if (acknowledgementModeName != null) {
180                container.setSessionAcknowledgeModeName(acknowledgementModeName);
181            }
182    
183            if (container instanceof DefaultMessageListenerContainer) {
184                // this includes DefaultMessageListenerContainer102
185                DefaultMessageListenerContainer listenerContainer = (DefaultMessageListenerContainer)container;
186                if (concurrentConsumers >= 0) {
187                    listenerContainer.setConcurrentConsumers(concurrentConsumers);
188                }
189    
190                if (cacheLevel >= 0) {
191                    listenerContainer.setCacheLevel(cacheLevel);
192                } else if (cacheLevelName != null) {
193                    listenerContainer.setCacheLevelName(cacheLevelName);
194                } else {
195                    // Default to CACHE_CONSUMER unless specified. This works best
196                    // with most JMS providers.
197                    listenerContainer.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
198                }
199    
200                if (idleTaskExecutionLimit >= 0) {
201                    listenerContainer.setIdleTaskExecutionLimit(idleTaskExecutionLimit);
202                }
203                if (maxConcurrentConsumers >= 0) {
204                    listenerContainer.setMaxConcurrentConsumers(maxConcurrentConsumers);
205                }
206                if (maxMessagesPerTask >= 0) {
207                    listenerContainer.setMaxMessagesPerTask(maxMessagesPerTask);
208                }
209                listenerContainer.setPubSubNoLocal(pubSubNoLocal);
210                if (receiveTimeout >= 0) {
211                    listenerContainer.setReceiveTimeout(receiveTimeout);
212                }
213                if (recoveryInterval >= 0) {
214                    listenerContainer.setRecoveryInterval(recoveryInterval);
215                }
216                if (taskExecutor != null) {
217                    listenerContainer.setTaskExecutor(taskExecutor);
218                }
219                if (transactionManager != null) {
220                    listenerContainer.setTransactionManager(transactionManager);
221                }
222                if (transactionName != null) {
223                    listenerContainer.setTransactionName(transactionName);
224                }
225                if (transactionTimeout >= 0) {
226                    listenerContainer.setTransactionTimeout(transactionTimeout);
227                }
228            } else if (container instanceof ServerSessionMessageListenerContainer) {
229                // this includes ServerSessionMessageListenerContainer102
230                ServerSessionMessageListenerContainer listenerContainer = (ServerSessionMessageListenerContainer)container;
231                if (maxMessagesPerTask >= 0) {
232                    listenerContainer.setMaxMessagesPerTask(maxMessagesPerTask);
233                }
234                if (serverSessionFactory != null) {
235                    listenerContainer.setServerSessionFactory(serverSessionFactory);
236                }
237            } else if (container instanceof SimpleMessageListenerContainer) {
238                // this includes SimpleMessageListenerContainer102
239                SimpleMessageListenerContainer listenerContainer = (SimpleMessageListenerContainer)container;
240                if (concurrentConsumers >= 0) {
241                    listenerContainer.setConcurrentConsumers(concurrentConsumers);
242                }
243                listenerContainer.setPubSubNoLocal(pubSubNoLocal);
244                if (taskExecutor != null) {
245                    listenerContainer.setTaskExecutor(taskExecutor);
246                }
247            }
248        }
249    
250        // Properties
251        // -------------------------------------------------------------------------
252        public ConnectionFactory getConnectionFactory() {
253            if (connectionFactory == null) {
254                connectionFactory = createConnectionFactory();
255            }
256            return connectionFactory;
257        }
258    
259        /**
260         * Sets the default connection factory to be used if a connection factory is
261         * not specified for either
262         * {@link #setTemplateConnectionFactory(ConnectionFactory)} or
263         * {@link #setListenerConnectionFactory(ConnectionFactory)}
264         * 
265         * @param connectionFactory the default connection factory to use
266         */
267        public void setConnectionFactory(ConnectionFactory connectionFactory) {
268            this.connectionFactory = connectionFactory;
269        }
270    
271        public ConnectionFactory getListenerConnectionFactory() {
272            if (listenerConnectionFactory == null) {
273                listenerConnectionFactory = createListenerConnectionFactory();
274            }
275            return listenerConnectionFactory;
276        }
277    
278        /**
279         * Sets the connection factory to be used for consuming messages via the
280         * {@link #createMessageListenerContainer()}
281         * 
282         * @param listenerConnectionFactory the connection factory to use for
283         *                consuming messages
284         */
285        public void setListenerConnectionFactory(ConnectionFactory listenerConnectionFactory) {
286            this.listenerConnectionFactory = listenerConnectionFactory;
287        }
288    
289        public ConnectionFactory getTemplateConnectionFactory() {
290            if (templateConnectionFactory == null) {
291                templateConnectionFactory = createTemplateConnectionFactory();
292            }
293            return templateConnectionFactory;
294        }
295    
296        /**
297         * Sets the connection factory to be used for sending messages via the
298         * {@link JmsTemplate} via {@link #createJmsOperations(boolean, String)}
299         * 
300         * @param templateConnectionFactory the connection factory for sending
301         *                messages
302         */
303        public void setTemplateConnectionFactory(ConnectionFactory templateConnectionFactory) {
304            this.templateConnectionFactory = templateConnectionFactory;
305        }
306    
307        public boolean isUseVersion102() {
308            return useVersion102;
309        }
310    
311        public void setUseVersion102(boolean useVersion102) {
312            this.useVersion102 = useVersion102;
313        }
314    
315        public boolean isAutoStartup() {
316            return autoStartup;
317        }
318    
319        public void setAutoStartup(boolean autoStartup) {
320            this.autoStartup = autoStartup;
321        }
322    
323        public boolean isAcceptMessagesWhileStopping() {
324            return acceptMessagesWhileStopping;
325        }
326    
327        public void setAcceptMessagesWhileStopping(boolean acceptMessagesWhileStopping) {
328            this.acceptMessagesWhileStopping = acceptMessagesWhileStopping;
329        }
330    
331        public String getClientId() {
332            return clientId;
333        }
334    
335        public void setClientId(String consumerClientId) {
336            this.clientId = consumerClientId;
337        }
338    
339        public String getDurableSubscriptionName() {
340            return durableSubscriptionName;
341        }
342    
343        public void setDurableSubscriptionName(String durableSubscriptionName) {
344            this.durableSubscriptionName = durableSubscriptionName;
345        }
346    
347        public ExceptionListener getExceptionListener() {
348            return exceptionListener;
349        }
350    
351        public void setExceptionListener(ExceptionListener exceptionListener) {
352            this.exceptionListener = exceptionListener;
353        }
354    
355        public boolean isSubscriptionDurable() {
356            return subscriptionDurable;
357        }
358    
359        public void setSubscriptionDurable(boolean subscriptionDurable) {
360            this.subscriptionDurable = subscriptionDurable;
361        }
362    
363        public String getAcknowledgementModeName() {
364            return acknowledgementModeName;
365        }
366    
367        public void setAcknowledgementModeName(String consumerAcknowledgementMode) {
368            this.acknowledgementModeName = consumerAcknowledgementMode;
369            this.acknowledgementMode = -1;
370        }
371    
372        public boolean isExposeListenerSession() {
373            return exposeListenerSession;
374        }
375    
376        public void setExposeListenerSession(boolean exposeListenerSession) {
377            this.exposeListenerSession = exposeListenerSession;
378        }
379    
380        public TaskExecutor getTaskExecutor() {
381            return taskExecutor;
382        }
383    
384        public void setTaskExecutor(TaskExecutor taskExecutor) {
385            this.taskExecutor = taskExecutor;
386        }
387    
388        public boolean isPubSubNoLocal() {
389            return pubSubNoLocal;
390        }
391    
392        public void setPubSubNoLocal(boolean pubSubNoLocal) {
393            this.pubSubNoLocal = pubSubNoLocal;
394        }
395    
396        public int getConcurrentConsumers() {
397            return concurrentConsumers;
398        }
399    
400        public void setConcurrentConsumers(int concurrentConsumers) {
401            this.concurrentConsumers = concurrentConsumers;
402        }
403    
404        public int getMaxMessagesPerTask() {
405            return maxMessagesPerTask;
406        }
407    
408        public void setMaxMessagesPerTask(int maxMessagesPerTask) {
409            this.maxMessagesPerTask = maxMessagesPerTask;
410        }
411    
412        public ServerSessionFactory getServerSessionFactory() {
413            return serverSessionFactory;
414        }
415    
416        public void setServerSessionFactory(ServerSessionFactory serverSessionFactory) {
417            this.serverSessionFactory = serverSessionFactory;
418        }
419    
420        public int getCacheLevel() {
421            return cacheLevel;
422        }
423    
424        public void setCacheLevel(int cacheLevel) {
425            this.cacheLevel = cacheLevel;
426        }
427    
428        public String getCacheLevelName() {
429            return cacheLevelName;
430        }
431    
432        public void setCacheLevelName(String cacheName) {
433            this.cacheLevelName = cacheName;
434        }
435    
436        public long getRecoveryInterval() {
437            return recoveryInterval;
438        }
439    
440        public void setRecoveryInterval(long recoveryInterval) {
441            this.recoveryInterval = recoveryInterval;
442        }
443    
444        public long getReceiveTimeout() {
445            return receiveTimeout;
446        }
447    
448        public void setReceiveTimeout(long receiveTimeout) {
449            this.receiveTimeout = receiveTimeout;
450        }
451    
452        public PlatformTransactionManager getTransactionManager() {
453            return transactionManager;
454        }
455    
456        public void setTransactionManager(PlatformTransactionManager transactionManager) {
457            this.transactionManager = transactionManager;
458        }
459    
460        public String getTransactionName() {
461            return transactionName;
462        }
463    
464        public void setTransactionName(String transactionName) {
465            this.transactionName = transactionName;
466        }
467    
468        public int getTransactionTimeout() {
469            return transactionTimeout;
470        }
471    
472        public void setTransactionTimeout(int transactionTimeout) {
473            this.transactionTimeout = transactionTimeout;
474        }
475    
476        public int getIdleTaskExecutionLimit() {
477            return idleTaskExecutionLimit;
478        }
479    
480        public void setIdleTaskExecutionLimit(int idleTaskExecutionLimit) {
481            this.idleTaskExecutionLimit = idleTaskExecutionLimit;
482        }
483    
484        public int getMaxConcurrentConsumers() {
485            return maxConcurrentConsumers;
486        }
487    
488        public void setMaxConcurrentConsumers(int maxConcurrentConsumers) {
489            this.maxConcurrentConsumers = maxConcurrentConsumers;
490        }
491    
492        public boolean isExplicitQosEnabled() {
493            return explicitQosEnabled;
494        }
495    
496        public void setExplicitQosEnabled(boolean explicitQosEnabled) {
497            this.explicitQosEnabled = explicitQosEnabled;
498        }
499    
500        public boolean isDeliveryPersistent() {
501            return deliveryPersistent;
502        }
503    
504        public void setDeliveryPersistent(boolean deliveryPersistent) {
505            this.deliveryPersistent = deliveryPersistent;
506        }
507    
508        public long getTimeToLive() {
509            return timeToLive;
510        }
511    
512        public void setTimeToLive(long timeToLive) {
513            this.timeToLive = timeToLive;
514        }
515    
516        public MessageConverter getMessageConverter() {
517            return messageConverter;
518        }
519    
520        public void setMessageConverter(MessageConverter messageConverter) {
521            this.messageConverter = messageConverter;
522        }
523    
524        public boolean isMessageIdEnabled() {
525            return messageIdEnabled;
526        }
527    
528        public void setMessageIdEnabled(boolean messageIdEnabled) {
529            this.messageIdEnabled = messageIdEnabled;
530        }
531    
532        public boolean isMessageTimestampEnabled() {
533            return messageTimestampEnabled;
534        }
535    
536        public void setMessageTimestampEnabled(boolean messageTimestampEnabled) {
537            this.messageTimestampEnabled = messageTimestampEnabled;
538        }
539    
540        public int getPriority() {
541            return priority;
542        }
543    
544        public void setPriority(int priority) {
545            this.priority = priority;
546        }
547    
548        public ConsumerType getConsumerType() {
549            return consumerType;
550        }
551    
552        public void setConsumerType(ConsumerType consumerType) {
553            this.consumerType = consumerType;
554        }
555    
556        public int getAcknowledgementMode() {
557            return acknowledgementMode;
558        }
559    
560        public void setAcknowledgementMode(int consumerAcknowledgementMode) {
561            this.acknowledgementMode = consumerAcknowledgementMode;
562            this.acknowledgementModeName = null;
563        }
564    
565        public boolean isTransacted() {
566            return transacted;
567        }
568    
569        public void setTransacted(boolean consumerTransacted) {
570            this.transacted = consumerTransacted;
571        }
572    
573        // Implementation methods
574        // -------------------------------------------------------------------------
575        protected AbstractMessageListenerContainer chooseMessageListenerContainerImplementation() {
576            // TODO we could allow a spring container to auto-inject these objects?
577            switch (consumerType) {
578            case Simple:
579                return isUseVersion102() ? new SimpleMessageListenerContainer102() : new SimpleMessageListenerContainer();
580            case ServerSessionPool:
581                return isUseVersion102() ? new ServerSessionMessageListenerContainer102() : new ServerSessionMessageListenerContainer();
582            case Default:
583                return isUseVersion102() ? new DefaultMessageListenerContainer102() : new DefaultMessageListenerContainer();
584            default:
585                throw new IllegalArgumentException("Unknown consumer type: " + consumerType);
586            }
587        }
588    
589        /**
590         * Factory method which allows derived classes to customize the lazy
591         * creation
592         */
593        protected ConnectionFactory createConnectionFactory() {
594            ObjectHelper.notNull(connectionFactory, "connectionFactory");
595            return null;
596        }
597    
598        /**
599         * Factory method which allows derived classes to customize the lazy
600         * creation
601         */
602        protected ConnectionFactory createListenerConnectionFactory() {
603            return getConnectionFactory();
604        }
605    
606        /**
607         * Factory method which allows derived classes to customize the lazy
608         * creation
609         */
610        protected ConnectionFactory createTemplateConnectionFactory() {
611            return getConnectionFactory();
612        }
613    }