001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.component.quartz; 018 019 import java.util.Date; 020 import java.util.Map; 021 import java.util.Set; 022 023 import org.apache.camel.Processor; 024 import org.apache.camel.Producer; 025 import org.apache.camel.impl.DefaultEndpoint; 026 import org.apache.camel.processor.loadbalancer.LoadBalancer; 027 import org.apache.camel.processor.loadbalancer.RoundRobinLoadBalancer; 028 import org.apache.camel.util.ObjectHelper; 029 import org.apache.commons.logging.Log; 030 import org.apache.commons.logging.LogFactory; 031 032 import org.quartz.JobDetail; 033 import org.quartz.JobExecutionContext; 034 import org.quartz.JobExecutionException; 035 import org.quartz.Scheduler; 036 import org.quartz.SchedulerException; 037 import org.quartz.SimpleTrigger; 038 import org.quartz.Trigger; 039 040 /** 041 * A <a href="http://activemq.apache.org/quartz.html">Quartz Endpoint</a> 042 * 043 * @version $Revision:520964 $ 044 */ 045 public class QuartzEndpoint extends DefaultEndpoint<QuartzExchange> { 046 public static final String ENDPOINT_KEY = "org.apache.camel.quartz"; 047 private static final transient Log LOG = LogFactory.getLog(QuartzEndpoint.class); 048 private Scheduler scheduler; 049 private LoadBalancer loadBalancer; 050 private Trigger trigger; 051 private JobDetail jobDetail; 052 private boolean started; 053 054 public QuartzEndpoint(String endpointUri, QuartzComponent component, Scheduler scheduler) { 055 super(endpointUri, component); 056 this.scheduler = scheduler; 057 } 058 059 public void addTriggers(Map<Trigger, JobDetail> triggerMap) throws SchedulerException { 060 if (triggerMap != null) { 061 Set<Map.Entry<Trigger, JobDetail>> entries = triggerMap.entrySet(); 062 for (Map.Entry<Trigger, JobDetail> entry : entries) { 063 Trigger key = entry.getKey(); 064 JobDetail value = entry.getValue(); 065 ObjectHelper.notNull(key, "key"); 066 ObjectHelper.notNull(value, "value"); 067 068 addTrigger(key, value); 069 } 070 } 071 } 072 073 public void addTrigger(Trigger trigger, JobDetail detail) throws SchedulerException { 074 // lets default the trigger name to the job name 075 if (trigger.getName() == null) { 076 trigger.setName(detail.getName()); 077 } 078 // lets default the trigger group to the job group 079 if (trigger.getGroup() == null) { 080 trigger.setGroup(detail.getGroup()); 081 } 082 // default start time to now if not specified 083 if (trigger.getStartTime() == null) { 084 trigger.setStartTime(new Date()); 085 } 086 detail.getJobDataMap().put(ENDPOINT_KEY, this); 087 Class jobClass = detail.getJobClass(); 088 if (jobClass == null) { 089 detail.setJobClass(CamelJob.class); 090 } 091 if (detail.getName() == null) { 092 detail.setName(getEndpointUri()); 093 } 094 getScheduler().scheduleJob(detail, trigger); 095 } 096 097 public void removeTrigger(Trigger trigger, JobDetail jobDetail) throws SchedulerException { 098 getScheduler().unscheduleJob(trigger.getName(), trigger.getGroup()); 099 } 100 101 /** 102 * This method is invoked when a Quartz job is fired. 103 * 104 * @param jobExecutionContext the Quartz Job context 105 */ 106 public void onJobExecute(JobExecutionContext jobExecutionContext) throws JobExecutionException { 107 if (LOG.isDebugEnabled()) { 108 LOG.debug("Firing Quartz Job with context: " + jobExecutionContext); 109 } 110 QuartzExchange exchange = createExchange(jobExecutionContext); 111 try { 112 getLoadBalancer().process(exchange); 113 } catch (JobExecutionException e) { 114 throw e; 115 } catch (Exception e) { 116 throw new JobExecutionException(e); 117 } 118 } 119 120 public QuartzExchange createExchange() { 121 return new QuartzExchange(getContext(), null); 122 } 123 124 public QuartzExchange createExchange(JobExecutionContext jobExecutionContext) { 125 return new QuartzExchange(getContext(), jobExecutionContext); 126 } 127 128 public Producer<QuartzExchange> createProducer() throws Exception { 129 throw new UnsupportedOperationException("You cannot send messages to this endpoint"); 130 } 131 132 public QuartzConsumer createConsumer(Processor processor) throws Exception { 133 return new QuartzConsumer(this, processor); 134 } 135 136 // Properties 137 // ------------------------------------------------------------------------- 138 139 @Override 140 public QuartzComponent getComponent() { 141 return (QuartzComponent)super.getComponent(); 142 } 143 144 public boolean isSingleton() { 145 return true; 146 } 147 148 public Scheduler getScheduler() { 149 return scheduler; 150 } 151 152 public LoadBalancer getLoadBalancer() { 153 if (loadBalancer == null) { 154 loadBalancer = createLoadBalancer(); 155 } 156 return loadBalancer; 157 } 158 159 public void setLoadBalancer(LoadBalancer loadBalancer) { 160 this.loadBalancer = loadBalancer; 161 } 162 163 public JobDetail getJobDetail() { 164 if (jobDetail == null) { 165 jobDetail = createJobDetail(); 166 } 167 return jobDetail; 168 } 169 170 public void setJobDetail(JobDetail jobDetail) { 171 this.jobDetail = jobDetail; 172 } 173 174 public Trigger getTrigger() { 175 if (trigger == null) { 176 trigger = createTrigger(); 177 } 178 return trigger; 179 } 180 181 public void setTrigger(Trigger trigger) { 182 this.trigger = trigger; 183 } 184 185 // Implementation methods 186 // ------------------------------------------------------------------------- 187 public synchronized void consumerStarted(QuartzConsumer consumer) throws SchedulerException { 188 getLoadBalancer().addProcessor(consumer.getProcessor()); 189 190 // if we have not yet added our default trigger, then lets do it 191 if (!started) { 192 addTrigger(getTrigger(), getJobDetail()); 193 started = true; 194 } 195 } 196 197 public synchronized void consumerStopped(QuartzConsumer consumer) throws SchedulerException { 198 getLoadBalancer().removeProcessor(consumer.getProcessor()); 199 if (getLoadBalancer().getProcessors().isEmpty() && started) { 200 removeTrigger(getTrigger(), getJobDetail()); 201 started = false; 202 } 203 } 204 205 protected LoadBalancer createLoadBalancer() { 206 return new RoundRobinLoadBalancer(); 207 } 208 209 protected JobDetail createJobDetail() { 210 return new JobDetail(); 211 } 212 213 protected Trigger createTrigger() { 214 return new SimpleTrigger(); 215 } 216 }