001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.quartz;
018    
019    import java.util.Date;
020    import java.util.Map;
021    import java.util.Set;
022    
023    import org.apache.camel.Processor;
024    import org.apache.camel.Producer;
025    import org.apache.camel.impl.DefaultEndpoint;
026    import org.apache.camel.processor.loadbalancer.LoadBalancer;
027    import org.apache.camel.processor.loadbalancer.RoundRobinLoadBalancer;
028    import org.apache.camel.util.ObjectHelper;
029    import org.apache.commons.logging.Log;
030    import org.apache.commons.logging.LogFactory;
031    
032    import org.quartz.JobDetail;
033    import org.quartz.JobExecutionContext;
034    import org.quartz.JobExecutionException;
035    import org.quartz.Scheduler;
036    import org.quartz.SchedulerException;
037    import org.quartz.SimpleTrigger;
038    import org.quartz.Trigger;
039    
040    /**
041     * A <a href="http://activemq.apache.org/quartz.html">Quartz Endpoint</a>
042     * 
043     * @version $Revision:520964 $
044     */
045    public class QuartzEndpoint extends DefaultEndpoint<QuartzExchange> {
046        public static final String ENDPOINT_KEY = "org.apache.camel.quartz";
047        private static final transient Log LOG = LogFactory.getLog(QuartzEndpoint.class);
048        private Scheduler scheduler;
049        private LoadBalancer loadBalancer;
050        private Trigger trigger;
051        private JobDetail jobDetail;
052        private boolean started;
053    
054        public QuartzEndpoint(String endpointUri, QuartzComponent component, Scheduler scheduler) {
055            super(endpointUri, component);
056            this.scheduler = scheduler;
057        }
058    
059        public void addTriggers(Map<Trigger, JobDetail> triggerMap) throws SchedulerException {
060            if (triggerMap != null) {
061                Set<Map.Entry<Trigger, JobDetail>> entries = triggerMap.entrySet();
062                for (Map.Entry<Trigger, JobDetail> entry : entries) {
063                    Trigger key = entry.getKey();
064                    JobDetail value = entry.getValue();
065                    ObjectHelper.notNull(key, "key");
066                    ObjectHelper.notNull(value, "value");
067    
068                    addTrigger(key, value);
069                }
070            }
071        }
072    
073        public void addTrigger(Trigger trigger, JobDetail detail) throws SchedulerException {
074            // lets default the trigger name to the job name
075            if (trigger.getName() == null) {
076                trigger.setName(detail.getName());
077            }
078            // lets default the trigger group to the job group
079            if (trigger.getGroup() == null) {
080                trigger.setGroup(detail.getGroup());
081            }
082            // default start time to now if not specified
083            if (trigger.getStartTime() == null) {
084                trigger.setStartTime(new Date());
085            }
086            detail.getJobDataMap().put(ENDPOINT_KEY, this);
087            Class jobClass = detail.getJobClass();
088            if (jobClass == null) {
089                detail.setJobClass(CamelJob.class);
090            }
091            if (detail.getName() == null) {
092                detail.setName(getEndpointUri());
093            }
094            getScheduler().scheduleJob(detail, trigger);
095        }
096    
097        public void removeTrigger(Trigger trigger, JobDetail jobDetail) throws SchedulerException {
098            getScheduler().unscheduleJob(trigger.getName(), trigger.getGroup());
099        }
100    
101        /**
102         * This method is invoked when a Quartz job is fired.
103         * 
104         * @param jobExecutionContext the Quartz Job context
105         */
106        public void onJobExecute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
107            if (LOG.isDebugEnabled()) {
108                LOG.debug("Firing Quartz Job with context: " + jobExecutionContext);
109            }
110            QuartzExchange exchange = createExchange(jobExecutionContext);
111            try {
112                getLoadBalancer().process(exchange);
113            } catch (JobExecutionException e) {
114                throw e;
115            } catch (Exception e) {
116                throw new JobExecutionException(e);
117            }
118        }
119    
120        public QuartzExchange createExchange() {
121            return new QuartzExchange(getContext(), null);
122        }
123    
124        public QuartzExchange createExchange(JobExecutionContext jobExecutionContext) {
125            return new QuartzExchange(getContext(), jobExecutionContext);
126        }
127    
128        public Producer<QuartzExchange> createProducer() throws Exception {
129            throw new UnsupportedOperationException("You cannot send messages to this endpoint");
130        }
131    
132        public QuartzConsumer createConsumer(Processor processor) throws Exception {
133            return new QuartzConsumer(this, processor);
134        }
135    
136        // Properties
137        // -------------------------------------------------------------------------
138    
139        @Override
140        public QuartzComponent getComponent() {
141            return (QuartzComponent)super.getComponent();
142        }
143    
144        public boolean isSingleton() {
145            return true;
146        }
147    
148        public Scheduler getScheduler() {
149            return scheduler;
150        }
151    
152        public LoadBalancer getLoadBalancer() {
153            if (loadBalancer == null) {
154                loadBalancer = createLoadBalancer();
155            }
156            return loadBalancer;
157        }
158    
159        public void setLoadBalancer(LoadBalancer loadBalancer) {
160            this.loadBalancer = loadBalancer;
161        }
162    
163        public JobDetail getJobDetail() {
164            if (jobDetail == null) {
165                jobDetail = createJobDetail();
166            }
167            return jobDetail;
168        }
169    
170        public void setJobDetail(JobDetail jobDetail) {
171            this.jobDetail = jobDetail;
172        }
173    
174        public Trigger getTrigger() {
175            if (trigger == null) {
176                trigger = createTrigger();
177            }
178            return trigger;
179        }
180    
181        public void setTrigger(Trigger trigger) {
182            this.trigger = trigger;
183        }
184    
185        // Implementation methods
186        // -------------------------------------------------------------------------
187        public synchronized void consumerStarted(QuartzConsumer consumer) throws SchedulerException {
188            getLoadBalancer().addProcessor(consumer.getProcessor());
189    
190            // if we have not yet added our default trigger, then lets do it
191            if (!started) {
192                addTrigger(getTrigger(), getJobDetail());
193                started = true;
194            }
195        }
196    
197        public synchronized void consumerStopped(QuartzConsumer consumer) throws SchedulerException {
198            getLoadBalancer().removeProcessor(consumer.getProcessor());
199            if (getLoadBalancer().getProcessors().isEmpty() && started) {
200                removeTrigger(getTrigger(), getJobDetail());
201                started = false;
202            }
203        }
204    
205        protected LoadBalancer createLoadBalancer() {
206            return new RoundRobinLoadBalancer();
207        }
208    
209        protected JobDetail createJobDetail() {
210            return new JobDetail();
211        }
212    
213        protected Trigger createTrigger() {
214            return new SimpleTrigger();
215        }
216    }