001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.component.quartz; 018 019 import org.apache.camel.Processor; 020 import org.apache.camel.Producer; 021 import org.apache.camel.impl.DefaultEndpoint; 022 import org.apache.camel.processor.loadbalancer.LoadBalancer; 023 import org.apache.camel.processor.loadbalancer.RoundRobinLoadBalancer; 024 import org.apache.camel.util.ObjectHelper; 025 import org.apache.commons.logging.Log; 026 import org.apache.commons.logging.LogFactory; 027 import org.quartz.JobDetail; 028 import org.quartz.JobExecutionContext; 029 import org.quartz.JobExecutionException; 030 import org.quartz.Scheduler; 031 import org.quartz.SchedulerException; 032 import org.quartz.Trigger; 033 import org.quartz.SimpleTrigger; 034 035 import java.util.Date; 036 import java.util.Map; 037 import java.util.Set; 038 039 /** 040 * A <a href="http://activemq.apache.org/quartz.html">Quartz Endpoint</a> 041 * 042 * @version $Revision:520964 $ 043 */ 044 public class QuartzEndpoint extends DefaultEndpoint<QuartzExchange> { 045 public static final String ENDPOINT_KEY = "org.apache.camel.quartz"; 046 private static final transient Log log = LogFactory.getLog(QuartzEndpoint.class); 047 private Scheduler scheduler; 048 private LoadBalancer loadBalancer; 049 private Trigger trigger; 050 private JobDetail jobDetail; 051 private boolean started; 052 053 public QuartzEndpoint(String endpointUri, QuartzComponent component, Scheduler scheduler) { 054 super(endpointUri, component); 055 this.scheduler = scheduler; 056 } 057 058 public void addTriggers(Map<Trigger, JobDetail> triggerMap) throws SchedulerException { 059 if (triggerMap != null) { 060 Set<Map.Entry<Trigger, JobDetail>> entries = triggerMap.entrySet(); 061 for (Map.Entry<Trigger, JobDetail> entry : entries) { 062 Trigger key = entry.getKey(); 063 JobDetail value = entry.getValue(); 064 ObjectHelper.notNull(key, "key"); 065 ObjectHelper.notNull(value, "value"); 066 067 addTrigger(key, value); 068 } 069 } 070 } 071 072 public void addTrigger(Trigger trigger, JobDetail detail) throws SchedulerException { 073 // lets default the trigger name to the job name 074 if (trigger.getName() == null) { 075 trigger.setName(detail.getName()); 076 } 077 // lets default the trigger group to the job group 078 if (trigger.getGroup() == null) { 079 trigger.setGroup(detail.getGroup()); 080 } 081 // default start time to now if not specified 082 if (trigger.getStartTime() == null) { 083 trigger.setStartTime(new Date()); 084 } 085 detail.getJobDataMap().put(ENDPOINT_KEY, this); 086 Class jobClass = detail.getJobClass(); 087 if (jobClass == null) { 088 detail.setJobClass(CamelJob.class); 089 } 090 if (detail.getName() == null) { 091 detail.setName(getEndpointUri()); 092 } 093 getScheduler().scheduleJob(detail, trigger); 094 } 095 096 public void removeTrigger(Trigger trigger, JobDetail jobDetail) throws SchedulerException { 097 getScheduler().unscheduleJob(trigger.getName(), trigger.getGroup()); 098 } 099 100 /** 101 * This method is invoked when a Quartz job is fired. 102 * 103 * @param jobExecutionContext the Quartz Job context 104 */ 105 public void onJobExecute(JobExecutionContext jobExecutionContext) throws JobExecutionException { 106 if (log.isDebugEnabled()) { 107 log.debug("Firing Quartz Job with context: " + jobExecutionContext); 108 } 109 QuartzExchange exchange = createExchange(jobExecutionContext); 110 try { 111 getLoadBalancer().process(exchange); 112 } 113 catch (JobExecutionException e) { 114 throw e; 115 } 116 catch (Exception e) { 117 throw new JobExecutionException(e); 118 } 119 } 120 121 public QuartzExchange createExchange() { 122 return new QuartzExchange(getContext(), null); 123 } 124 125 public QuartzExchange createExchange(JobExecutionContext jobExecutionContext) { 126 return new QuartzExchange(getContext(), jobExecutionContext); 127 } 128 129 public Producer<QuartzExchange> createProducer() throws Exception { 130 throw new UnsupportedOperationException("You cannot send messages to this endpoint"); 131 } 132 133 public QuartzConsumer createConsumer(Processor processor) throws Exception { 134 return new QuartzConsumer(this, processor); 135 } 136 137 // Properties 138 //------------------------------------------------------------------------- 139 140 @Override 141 public QuartzComponent getComponent() { 142 return (QuartzComponent) super.getComponent(); 143 } 144 145 public boolean isSingleton() { 146 return true; 147 } 148 149 public Scheduler getScheduler() { 150 return scheduler; 151 } 152 153 public LoadBalancer getLoadBalancer() { 154 if (loadBalancer == null) { 155 loadBalancer = createLoadBalancer(); 156 } 157 return loadBalancer; 158 } 159 160 public void setLoadBalancer(LoadBalancer loadBalancer) { 161 this.loadBalancer = loadBalancer; 162 } 163 164 public JobDetail getJobDetail() { 165 if (jobDetail == null) { 166 jobDetail = createJobDetail(); 167 } 168 return jobDetail; 169 } 170 171 public void setJobDetail(JobDetail jobDetail) { 172 this.jobDetail = jobDetail; 173 } 174 175 public Trigger getTrigger() { 176 if (trigger == null) { 177 trigger = createTrigger(); 178 } 179 return trigger; 180 } 181 182 public void setTrigger(Trigger trigger) { 183 this.trigger = trigger; 184 } 185 186 // Implementation methods 187 //------------------------------------------------------------------------- 188 public synchronized void consumerStarted(QuartzConsumer consumer) throws SchedulerException { 189 getLoadBalancer().addProcessor(consumer.getProcessor()); 190 191 // if we have not yet added our default trigger, then lets do it 192 if (!started) { 193 addTrigger(getTrigger(), getJobDetail()); 194 started = true; 195 } 196 } 197 198 public synchronized void consumerStopped(QuartzConsumer consumer) throws SchedulerException { 199 getLoadBalancer().removeProcessor(consumer.getProcessor()); 200 if (getLoadBalancer().getProcessors().isEmpty() && started) { 201 removeTrigger(getTrigger(), getJobDetail()); 202 started = false; 203 } 204 } 205 206 protected LoadBalancer createLoadBalancer() { 207 return new RoundRobinLoadBalancer(); 208 } 209 210 protected JobDetail createJobDetail() { 211 return new JobDetail(); 212 } 213 214 protected Trigger createTrigger() { 215 return new SimpleTrigger(); 216 } 217 }