Coverage Report - org.apache.camel.component.quartz.QuartzEndpoint
 
Classes in this File Line Coverage Branch Coverage Complexity
QuartzEndpoint
68% 
85% 
0
 
 1  
 /**
 2  
  * Licensed to the Apache Software Foundation (ASF) under one or more
 3  
  * contributor license agreements.  See the NOTICE file distributed with
 4  
  * this work for additional information regarding copyright ownership.
 5  
  * The ASF licenses this file to You under the Apache License, Version 2.0
 6  
  * (the "License"); you may not use this file except in compliance with
 7  
  * the License.  You may obtain a copy of the License at
 8  
  *
 9  
  *      http://www.apache.org/licenses/LICENSE-2.0
 10  
  *
 11  
  * Unless required by applicable law or agreed to in writing, software
 12  
  * distributed under the License is distributed on an "AS IS" BASIS,
 13  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 14  
  * See the License for the specific language governing permissions and
 15  
  * limitations under the License.
 16  
  */
 17  
 package org.apache.camel.component.quartz;
 18  
 
 19  
 import java.util.Date;
 20  
 import java.util.Map;
 21  
 import java.util.Set;
 22  
 
 23  
 import org.apache.camel.Processor;
 24  
 import org.apache.camel.Producer;
 25  
 import org.apache.camel.impl.DefaultEndpoint;
 26  
 import org.apache.camel.processor.loadbalancer.LoadBalancer;
 27  
 import org.apache.camel.processor.loadbalancer.RoundRobinLoadBalancer;
 28  
 import org.apache.camel.util.ObjectHelper;
 29  
 import org.apache.commons.logging.Log;
 30  
 import org.apache.commons.logging.LogFactory;
 31  
 
 32  
 import org.quartz.JobDetail;
 33  
 import org.quartz.JobExecutionContext;
 34  
 import org.quartz.JobExecutionException;
 35  
 import org.quartz.Scheduler;
 36  
 import org.quartz.SchedulerException;
 37  
 import org.quartz.SimpleTrigger;
 38  
 import org.quartz.Trigger;
 39  
 
 40  
 /**
 41  
  * A <a href="http://activemq.apache.org/quartz.html">Quartz Endpoint</a>
 42  
  * 
 43  
  * @version $Revision:520964 $
 44  
  */
 45  1
 public class QuartzEndpoint extends DefaultEndpoint<QuartzExchange> {
 46  
     public static final String ENDPOINT_KEY = "org.apache.camel.quartz";
 47  1
     private static final transient Log LOG = LogFactory.getLog(QuartzEndpoint.class);
 48  
     private Scheduler scheduler;
 49  
     private LoadBalancer loadBalancer;
 50  
     private Trigger trigger;
 51  
     private JobDetail jobDetail;
 52  
     private boolean started;
 53  
 
 54  
     public QuartzEndpoint(String endpointUri, QuartzComponent component, Scheduler scheduler) {
 55  4
         super(endpointUri, component);
 56  4
         this.scheduler = scheduler;
 57  4
     }
 58  
 
 59  
     public void addTriggers(Map<Trigger, JobDetail> triggerMap) throws SchedulerException {
 60  0
         if (triggerMap != null) {
 61  0
             Set<Map.Entry<Trigger, JobDetail>> entries = triggerMap.entrySet();
 62  0
             for (Map.Entry<Trigger, JobDetail> entry : entries) {
 63  0
                 Trigger key = entry.getKey();
 64  0
                 JobDetail value = entry.getValue();
 65  0
                 ObjectHelper.notNull(key, "key");
 66  0
                 ObjectHelper.notNull(value, "value");
 67  
 
 68  0
                 addTrigger(key, value);
 69  0
             }
 70  
         }
 71  0
     }
 72  
 
 73  
     public void addTrigger(Trigger trigger, JobDetail detail) throws SchedulerException {
 74  
         // lets default the trigger name to the job name
 75  1
         if (trigger.getName() == null) {
 76  0
             trigger.setName(detail.getName());
 77  
         }
 78  
         // lets default the trigger group to the job group
 79  1
         if (trigger.getGroup() == null) {
 80  0
             trigger.setGroup(detail.getGroup());
 81  
         }
 82  
         // default start time to now if not specified
 83  1
         if (trigger.getStartTime() == null) {
 84  1
             trigger.setStartTime(new Date());
 85  
         }
 86  1
         detail.getJobDataMap().put(ENDPOINT_KEY, this);
 87  1
         Class jobClass = detail.getJobClass();
 88  1
         if (jobClass == null) {
 89  1
             detail.setJobClass(CamelJob.class);
 90  
         }
 91  1
         if (detail.getName() == null) {
 92  1
             detail.setName(getEndpointUri());
 93  
         }
 94  1
         getScheduler().scheduleJob(detail, trigger);
 95  1
     }
 96  
 
 97  
     public void removeTrigger(Trigger trigger, JobDetail jobDetail) throws SchedulerException {
 98  1
         getScheduler().unscheduleJob(trigger.getName(), trigger.getGroup());
 99  1
     }
 100  
 
 101  
     /**
 102  
      * This method is invoked when a Quartz job is fired.
 103  
      * 
 104  
      * @param jobExecutionContext the Quartz Job context
 105  
      */
 106  
     public void onJobExecute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
 107  2
         if (LOG.isDebugEnabled()) {
 108  0
             LOG.debug("Firing Quartz Job with context: " + jobExecutionContext);
 109  
         }
 110  2
         QuartzExchange exchange = createExchange(jobExecutionContext);
 111  
         try {
 112  2
             getLoadBalancer().process(exchange);
 113  0
         } catch (JobExecutionException e) {
 114  0
             throw e;
 115  0
         } catch (Exception e) {
 116  0
             throw new JobExecutionException(e);
 117  2
         }
 118  2
     }
 119  
 
 120  
     public QuartzExchange createExchange() {
 121  0
         return new QuartzExchange(getContext(), null);
 122  
     }
 123  
 
 124  
     public QuartzExchange createExchange(JobExecutionContext jobExecutionContext) {
 125  2
         return new QuartzExchange(getContext(), jobExecutionContext);
 126  
     }
 127  
 
 128  
     public Producer<QuartzExchange> createProducer() throws Exception {
 129  0
         throw new UnsupportedOperationException("You cannot send messages to this endpoint");
 130  
     }
 131  
 
 132  
     public QuartzConsumer createConsumer(Processor processor) throws Exception {
 133  1
         return new QuartzConsumer(this, processor);
 134  
     }
 135  
 
 136  
     // Properties
 137  
     // -------------------------------------------------------------------------
 138  
 
 139  
     @Override
 140  
     public QuartzComponent getComponent() {
 141  0
         return (QuartzComponent)super.getComponent();
 142  
     }
 143  
 
 144  
     public boolean isSingleton() {
 145  4
         return true;
 146  
     }
 147  
 
 148  
     public Scheduler getScheduler() {
 149  2
         return scheduler;
 150  
     }
 151  
 
 152  
     public LoadBalancer getLoadBalancer() {
 153  5
         if (loadBalancer == null) {
 154  1
             loadBalancer = createLoadBalancer();
 155  
         }
 156  5
         return loadBalancer;
 157  
     }
 158  
 
 159  
     public void setLoadBalancer(LoadBalancer loadBalancer) {
 160  0
         this.loadBalancer = loadBalancer;
 161  0
     }
 162  
 
 163  
     public JobDetail getJobDetail() {
 164  6
         if (jobDetail == null) {
 165  4
             jobDetail = createJobDetail();
 166  
         }
 167  6
         return jobDetail;
 168  
     }
 169  
 
 170  
     public void setJobDetail(JobDetail jobDetail) {
 171  0
         this.jobDetail = jobDetail;
 172  0
     }
 173  
 
 174  
     public Trigger getTrigger() {
 175  8
         if (trigger == null) {
 176  3
             trigger = createTrigger();
 177  
         }
 178  8
         return trigger;
 179  
     }
 180  
 
 181  
     public void setTrigger(Trigger trigger) {
 182  1
         this.trigger = trigger;
 183  1
     }
 184  
 
 185  
     // Implementation methods
 186  
     // -------------------------------------------------------------------------
 187  
     public synchronized void consumerStarted(QuartzConsumer consumer) throws SchedulerException {
 188  1
         getLoadBalancer().addProcessor(consumer.getProcessor());
 189  
 
 190  
         // if we have not yet added our default trigger, then lets do it
 191  1
         if (!started) {
 192  1
             addTrigger(getTrigger(), getJobDetail());
 193  1
             started = true;
 194  
         }
 195  1
     }
 196  
 
 197  
     public synchronized void consumerStopped(QuartzConsumer consumer) throws SchedulerException {
 198  1
         getLoadBalancer().removeProcessor(consumer.getProcessor());
 199  1
         if (getLoadBalancer().getProcessors().isEmpty() && started) {
 200  1
             removeTrigger(getTrigger(), getJobDetail());
 201  1
             started = false;
 202  
         }
 203  1
     }
 204  
 
 205  
     protected LoadBalancer createLoadBalancer() {
 206  1
         return new RoundRobinLoadBalancer();
 207  
     }
 208  
 
 209  
     protected JobDetail createJobDetail() {
 210  4
         return new JobDetail();
 211  
     }
 212  
 
 213  
     protected Trigger createTrigger() {
 214  3
         return new SimpleTrigger();
 215  
     }
 216  
 }