001// Copyright 2011 The Apache Software Foundation 002// 003// Licensed under the Apache License, Version 2.0 (the "License"); 004// you may not use this file except in compliance with the License. 005// You may obtain a copy of the License at 006// 007// http://www.apache.org/licenses/LICENSE-2.0 008// 009// Unless required by applicable law or agreed to in writing, software 010// distributed under the License is distributed on an "AS IS" BASIS, 011// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 012// See the License for the specific language governing permissions and 013// limitations under the License. 014 015package org.apache.tapestry5.ioc.internal.services.cron; 016 017import org.apache.tapestry5.ioc.Invokable; 018import org.apache.tapestry5.ioc.annotations.PostInjection; 019import org.apache.tapestry5.ioc.internal.util.CollectionFactory; 020import org.apache.tapestry5.ioc.services.ParallelExecutor; 021import org.apache.tapestry5.ioc.services.RegistryShutdownHub; 022import org.apache.tapestry5.ioc.services.cron.PeriodicExecutor; 023import org.apache.tapestry5.ioc.services.cron.PeriodicJob; 024import org.apache.tapestry5.ioc.services.cron.Schedule; 025import org.slf4j.Logger; 026 027import java.util.List; 028import java.util.concurrent.atomic.AtomicInteger; 029 030public class PeriodicExecutorImpl implements PeriodicExecutor, Runnable 031{ 032 private final ParallelExecutor parallelExecutor; 033 034 private final Logger logger; 035 036 // Synchronized by this 037 private final List<Job> jobs = CollectionFactory.newList(); 038 039 private final Thread thread = new Thread(this, "Tapestry PeriodicExecutor"); 040 041 // Synchronized by this. Set when the registry is shutdown. 042 private boolean shutdown; 043 044 private static final long FIVE_MINUTES = 5 * 60 * 1000; 045 046 private final AtomicInteger jobIdAllocator = new AtomicInteger(); 047 048 private class Job implements PeriodicJob, Invokable<Void> 049 { 050 final int jobId = jobIdAllocator.incrementAndGet(); 051 052 private final Schedule schedule; 053 054 private final String name; 055 056 private final Runnable runnableJob; 057 058 private boolean executing, canceled; 059 060 private long nextExecution; 061 062 public Job(Schedule schedule, String name, Runnable runnableJob) 063 { 064 this.schedule = schedule; 065 this.name = name; 066 this.runnableJob = runnableJob; 067 068 nextExecution = schedule.firstExecution(); 069 } 070 071 @Override 072 public String getName() 073 { 074 return name; 075 } 076 077 public synchronized long getNextExecution() 078 { 079 return nextExecution; 080 } 081 082 083 @Override 084 public synchronized boolean isExecuting() 085 { 086 return executing; 087 } 088 089 @Override 090 public synchronized boolean isCanceled() 091 { 092 return canceled; 093 } 094 095 @Override 096 public synchronized void cancel() 097 { 098 canceled = true; 099 100 if (!executing) 101 { 102 removeJob(this); 103 } 104 105 // Otherwise, it will be caught when the job finishes execution. 106 } 107 108 @Override 109 public synchronized String toString() 110 { 111 StringBuilder builder = new StringBuilder("PeriodicJob[#").append(jobId); 112 113 114 builder.append(", (").append(name).append(")"); 115 116 if (executing) 117 { 118 builder.append(", executing"); 119 } 120 121 if (canceled) 122 { 123 builder.append(", canceled"); 124 } else 125 { 126 builder.append(String.format(", next execution %Tk:%<TM:%<TS+%<TL", nextExecution)); 127 } 128 129 return builder.append("]").toString(); 130 } 131 132 /** 133 * Starts execution of the job; this sets the executing flag, calculates the next execution time, 134 * and uses the ParallelExecutor to run the job. 135 */ 136 synchronized void start() 137 { 138 executing = true; 139 140 // This is a bit naive; it assumes there will not be a delay waiting to execute. There's a lot of options 141 // here, such as basing the next execution on the actual start time, or event actual completion time, or allowing 142 // overlapping executions of the Job on a more rigid schedule. Use Quartz. 143 144 nextExecution = schedule.nextExecution(nextExecution); 145 146 parallelExecutor.invoke(this); 147 148 if (logger.isTraceEnabled()) 149 { 150 logger.trace(this + " sent for execution"); 151 } 152 } 153 154 synchronized void cleanupAfterExecution() 155 { 156 if (logger.isTraceEnabled()) 157 { 158 logger.trace(this + " execution complete"); 159 } 160 161 executing = false; 162 163 if (canceled) 164 { 165 removeJob(this); 166 } else 167 { 168 // Again, naive but necessary. 169 thread.interrupt(); 170 } 171 } 172 173 @Override 174 public Void invoke() 175 { 176 if (logger.isDebugEnabled()) 177 { 178 logger.debug(String.format("Executing job #%d (%s)", jobId, name)); 179 } 180 181 try 182 { 183 runnableJob.run(); 184 } finally 185 { 186 cleanupAfterExecution(); 187 } 188 189 return null; 190 } 191 } 192 193 public PeriodicExecutorImpl(ParallelExecutor parallelExecutor, Logger logger) 194 { 195 this.parallelExecutor = parallelExecutor; 196 this.logger = logger; 197 } 198 199 @PostInjection 200 public void start(RegistryShutdownHub hub) 201 { 202 hub.addRegistryShutdownListener(new Runnable() 203 { 204 @Override 205 public void run() 206 { 207 registryDidShutdown(); 208 } 209 }); 210 211 thread.start(); 212 } 213 214 215 synchronized void removeJob(Job job) 216 { 217 if (logger.isDebugEnabled()) 218 { 219 logger.debug("Removing " + job); 220 } 221 222 jobs.remove(job); 223 } 224 225 226 @Override 227 public synchronized PeriodicJob addJob(Schedule schedule, String name, Runnable job) 228 { 229 assert schedule != null; 230 assert name != null; 231 assert job != null; 232 233 Job periodicJob = new Job(schedule, name, job); 234 235 jobs.add(periodicJob); 236 237 if (logger.isDebugEnabled()) 238 { 239 logger.debug("Added " + periodicJob); 240 } 241 242 // Wake the thread so that it can start the job, if necessary. 243 244 // Technically, this is only necessary if the new job is scheduled earlier 245 // than any job currently in the list of jobs, but this naive implementation 246 // is simpler. 247 thread.interrupt(); 248 249 return periodicJob; 250 } 251 252 @Override 253 public void run() 254 { 255 while (!isShutdown()) 256 { 257 long nextExecution = executeCurrentBatch(); 258 259 try 260 { 261 long delay = nextExecution - System.currentTimeMillis(); 262 263 if (logger.isTraceEnabled()) 264 { 265 logger.trace(String.format("Sleeping for %,d ms", delay)); 266 } 267 268 if (delay > 0) 269 { 270 Thread.sleep(delay); 271 } 272 } catch (InterruptedException 273 ex) 274 { 275 // Ignored; the thread is interrupted() to shut it down, 276 // or to have it execute a new batch. 277 278 logger.trace("Interrupted"); 279 } 280 } 281 } 282 283 private synchronized boolean isShutdown() 284 { 285 return shutdown; 286 } 287 288 private synchronized void registryDidShutdown() 289 { 290 shutdown = true; 291 292 thread.interrupt(); 293 } 294 295 /** 296 * Finds jobs and executes jobs that are ready to be executed. 297 * 298 * @return the next execution time (from the non-executing job that is scheduled earliest for execution). 299 */ 300 private synchronized long executeCurrentBatch() 301 { 302 long now = System.currentTimeMillis(); 303 long nextExecution = now + FIVE_MINUTES; 304 305 for (Job job : jobs) 306 { 307 if (job.isExecuting()) 308 { 309 continue; 310 } 311 312 long jobNextExecution = job.getNextExecution(); 313 314 if (jobNextExecution <= now) 315 { 316 job.start(); 317 } else 318 { 319 nextExecution = Math.min(nextExecution, jobNextExecution); 320 } 321 } 322 323 return nextExecution; 324 } 325 326 327}