001// Copyright 2011 The Apache Software Foundation
002//
003// Licensed under the Apache License, Version 2.0 (the "License");
004// you may not use this file except in compliance with the License.
005// You may obtain a copy of the License at
006//
007// http://www.apache.org/licenses/LICENSE-2.0
008//
009// Unless required by applicable law or agreed to in writing, software
010// distributed under the License is distributed on an "AS IS" BASIS,
011// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
012// See the License for the specific language governing permissions and
013// limitations under the License.
014
015package org.apache.tapestry5.ioc.internal.services.cron;
016
017import org.apache.tapestry5.ioc.Invokable;
018import org.apache.tapestry5.ioc.annotations.PostInjection;
019import org.apache.tapestry5.ioc.internal.util.CollectionFactory;
020import org.apache.tapestry5.ioc.services.ParallelExecutor;
021import org.apache.tapestry5.ioc.services.RegistryShutdownHub;
022import org.apache.tapestry5.ioc.services.cron.PeriodicExecutor;
023import org.apache.tapestry5.ioc.services.cron.PeriodicJob;
024import org.apache.tapestry5.ioc.services.cron.Schedule;
025import org.slf4j.Logger;
026
027import java.util.List;
028import java.util.concurrent.atomic.AtomicInteger;
029
030public class PeriodicExecutorImpl implements PeriodicExecutor, Runnable
031{
032    private final ParallelExecutor parallelExecutor;
033
034    private final Logger logger;
035
036    // Synchronized by this
037    private final List<Job> jobs = CollectionFactory.newList();
038
039    private final Thread thread = new Thread(this, "Tapestry PeriodicExecutor");
040
041    // Synchronized by this. Set when the registry is shutdown.
042    private boolean shutdown;
043
044    private static final long FIVE_MINUTES = 5 * 60 * 1000;
045
046    private final AtomicInteger jobIdAllocator = new AtomicInteger();
047
048    private class Job implements PeriodicJob, Invokable<Void>
049    {
050        final int jobId = jobIdAllocator.incrementAndGet();
051
052        private final Schedule schedule;
053
054        private final String name;
055
056        private final Runnable runnableJob;
057
058        private boolean executing, canceled;
059
060        private long nextExecution;
061
062        public Job(Schedule schedule, String name, Runnable runnableJob)
063        {
064            this.schedule = schedule;
065            this.name = name;
066            this.runnableJob = runnableJob;
067
068            nextExecution = schedule.firstExecution();
069        }
070
071        public String getName()
072        {
073            return name;
074        }
075
076        public synchronized long getNextExecution()
077        {
078            return nextExecution;
079        }
080
081
082        public synchronized boolean isExecuting()
083        {
084            return executing;
085        }
086
087        public synchronized boolean isCanceled()
088        {
089            return canceled;
090        }
091
092        public synchronized void cancel()
093        {
094            canceled = true;
095
096            if (!executing)
097            {
098                removeJob(this);
099            }
100
101            // Otherwise, it will be caught when the job finishes execution.
102        }
103
104        @Override
105        public synchronized String toString()
106        {
107            StringBuilder builder = new StringBuilder("PeriodicJob[#").append(jobId);
108
109
110            builder.append(", (").append(name).append(")");
111
112            if (executing)
113            {
114                builder.append(", executing");
115            }
116
117            if (canceled)
118            {
119                builder.append(", canceled");
120            } else
121            {
122                builder.append(String.format(", next execution %Tk:%<TM:%<TS+%<TL", nextExecution));
123            }
124
125            return builder.append("]").toString();
126        }
127
128        /**
129         * Starts execution of the job; this sets the executing flag, calculates the next execution time,
130         * and uses the ParallelExecutor to run the job.
131         */
132        synchronized void start()
133        {
134            executing = true;
135
136            // This is a bit naive; it assumes there will not be a delay waiting to execute. There's a lot of options
137            // here, such as basing the next execution on the actual start time, or event actual completion time, or allowing
138            // overlapping executions of the Job on a more rigid schedule.  Use Quartz.
139
140            nextExecution = schedule.nextExecution(nextExecution);
141
142            parallelExecutor.invoke(this);
143
144            if (logger.isTraceEnabled())
145            {
146                logger.trace(this + " sent for execution");
147            }
148        }
149
150        synchronized void cleanupAfterExecution()
151        {
152            if (logger.isTraceEnabled())
153            {
154                logger.trace(this + " execution complete");
155            }
156
157            executing = false;
158
159            if (canceled)
160            {
161                removeJob(this);
162            } else
163            {
164                // Again, naive but necessary.
165                thread.interrupt();
166            }
167        }
168
169        public Void invoke()
170        {
171            if (logger.isDebugEnabled())
172            {
173                logger.debug(String.format("Executing job #%d (%s)", jobId, name));
174            }
175
176            try
177            {
178                runnableJob.run();
179            } finally
180            {
181                cleanupAfterExecution();
182            }
183
184            return null;
185        }
186    }
187
188    public PeriodicExecutorImpl(ParallelExecutor parallelExecutor, Logger logger)
189    {
190        this.parallelExecutor = parallelExecutor;
191        this.logger = logger;
192    }
193
194    @PostInjection
195    public void start(RegistryShutdownHub hub)
196    {
197        hub.addRegistryShutdownListener(new Runnable()
198        {
199            public void run()
200            {
201                registryDidShutdown();
202            }
203        });
204
205        thread.start();
206    }
207
208
209    synchronized void removeJob(Job job)
210    {
211        if (logger.isDebugEnabled())
212        {
213            logger.debug("Removing " + job);
214        }
215
216        jobs.remove(job);
217    }
218
219
220    public synchronized PeriodicJob addJob(Schedule schedule, String name, Runnable job)
221    {
222        assert schedule != null;
223        assert name != null;
224        assert job != null;
225
226        Job periodicJob = new Job(schedule, name, job);
227
228        jobs.add(periodicJob);
229
230        if (logger.isDebugEnabled())
231        {
232            logger.debug("Added " + periodicJob);
233        }
234
235        // Wake the thread so that it can start the job, if necessary.
236
237        // Technically, this is only necessary if the new job is scheduled earlier
238        // than any job currently in the list of jobs, but this naive implementation
239        // is simpler.
240        thread.interrupt();
241
242        return periodicJob;
243    }
244
245    public void run()
246    {
247        while (!isShutdown())
248        {
249            long nextExecution = executeCurrentBatch();
250
251            try
252            {
253                long delay = nextExecution - System.currentTimeMillis();
254
255                if (logger.isTraceEnabled())
256                {
257                    logger.trace(String.format("Sleeping for %,d ms", delay));
258                }
259
260                if (delay > 0)
261                {
262                    Thread.sleep(delay);
263                }
264            } catch (InterruptedException
265                    ex)
266            {
267                // Ignored; the thread is interrupted() to shut it down,
268                // or to have it execute a new batch.
269
270                logger.trace("Interrupted");
271            }
272        }
273    }
274
275    private synchronized boolean isShutdown()
276    {
277        return shutdown;
278    }
279
280    private synchronized void registryDidShutdown()
281    {
282        shutdown = true;
283
284        thread.interrupt();
285    }
286
287    /**
288     * Finds jobs and executes jobs that are ready to be executed.
289     *
290     * @return the next execution time (from the non-executing job that is scheduled earliest for execution).
291     */
292    private synchronized long executeCurrentBatch()
293    {
294        long now = System.currentTimeMillis();
295        long nextExecution = now + FIVE_MINUTES;
296
297        for (Job job : jobs)
298        {
299            if (job.isExecuting())
300            {
301                continue;
302            }
303
304            long jobNextExecution = job.getNextExecution();
305
306            if (jobNextExecution <= now)
307            {
308                job.start();
309            } else
310            {
311                nextExecution = Math.min(nextExecution, jobNextExecution);
312            }
313        }
314
315        return nextExecution;
316    }
317
318
319}