1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.jetspeed.aggregator.impl;
19
20 import java.security.AccessControlContext;
21 import java.security.AccessController;
22 import java.util.Iterator;
23 import java.util.ArrayList;
24 import java.util.List;
25 import java.util.Stack;
26 import java.util.LinkedList;
27 import java.util.Collections;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.jetspeed.aggregator.RenderingJob;
32 import org.apache.jetspeed.aggregator.Worker;
33 import org.apache.jetspeed.aggregator.WorkerMonitor;
34 import org.apache.jetspeed.aggregator.PortletContent;
35 import org.apache.jetspeed.util.Queue;
36 import org.apache.jetspeed.util.FIFOQueue;
37
38 import org.apache.pluto.om.window.PortletWindow;
39 import org.apache.pluto.om.common.ObjectID;
40
41 /***
42 * The WorkerMonitor is responsible for dispatching jobs to workers
43 * It uses an Apache HTTPd configuration style of min/max/spare workers
44 * threads to throttle the rendering work.
45 * If jobs come in faster that processing, they are stored in a queue
46 * which is flushed periodically by a QueueMonitor.
47 *
48 * @author <a href="mailto:raphael@apache.org">Rapha\u00ebl Luta</a>
49 * @author <a href="mailto:taylor@apache.org">David Sean Taylor </a>
50 * @version $Id: WorkerMonitorImpl.java 516448 2007-03-09 16:25:47Z ate $
51 */
52 public class WorkerMonitorImpl implements WorkerMonitor
53 {
54 public WorkerMonitorImpl(int minWorkers, int maxWorkers, int spareWorkers, int maxJobsPerWorker)
55 {
56 this.minWorkers = minWorkers;
57 this.maxWorkers = maxWorkers;
58 this.spareWorkers = spareWorkers;
59 this.maxJobsPerWorker = maxJobsPerWorker;
60 }
61
62 /*** Commons logging */
63 protected final static Log log = LogFactory.getLog(WorkerMonitorImpl.class);
64
65 /*** Static counters for identifying workers */
66 protected static long sCount = 0;
67
68 /*** Count of running jobs **/
69 protected int runningJobs = 0;
70
71 /*** Minimum number of wokers to create */
72 protected int minWorkers = 5;
73
74 /*** Maximum number of workers */
75 protected int maxWorkers = 50;
76
77 /*** Minimum amount of spare workers */
78 protected int spareWorkers = 3;
79
80 /*** Maximum of job processed by a worker before being released */
81 protected int maxJobsPerWorker = 10;
82
83 /*** Stack containing currently idle workers */
84 protected Stack workers = new Stack();
85
86 /*** The thread group used to group all worker threads */
87 protected ThreadGroup tg = new ThreadGroup("Workers");
88
89 /*** Job queue */
90 protected Queue queue;
91
92 /*** Workers to be monitored for timeout checking */
93 protected List workersMonitored = Collections.synchronizedList(new LinkedList());
94
95 /*** Renering Job Timeout monitor */
96 protected RenderingJobTimeoutMonitor jobMonitor = null;
97
98 public void start()
99 {
100 addWorkers(this.minWorkers);
101 setQueue(new FIFOQueue());
102
103 jobMonitor = new RenderingJobTimeoutMonitor(1000);
104 jobMonitor.start();
105 }
106
107 public void stop()
108 {
109 if (jobMonitor != null)
110 jobMonitor.endThread();
111 jobMonitor = null;
112
113 }
114
115 public void setQueue(Queue queue)
116 {
117 this.queue = queue;
118 }
119
120 /***
121 * Create the request number of workers and add them to
122 * list of available workers.
123 *
124 * @param wCount the number of workers to create
125 */
126 protected synchronized void addWorkers(int wCount)
127 {
128 int wCurrent = this.tg.activeCount();
129
130 if (wCurrent < maxWorkers)
131 {
132 if (wCurrent + wCount > maxWorkers)
133 {
134 wCount = maxWorkers - wCurrent;
135 }
136
137 log.info("Creating "+ wCount +" workers -> "+ (wCurrent + wCount));
138
139 for (int i = 0; i < wCount; ++i)
140 {
141 Worker worker = new WorkerImpl(this, this.tg, "WORKER_" + (++sCount));
142 worker.start();
143 workers.push(worker);
144 }
145 }
146 }
147
148 /***
149 * Retrieves an idle worker
150 *
151 * @return a Worker from the idle pool or null if non available
152 */
153 public Worker getWorker()
154 {
155 synchronized(this.workers)
156 {
157 if (this.workers.size() < spareWorkers)
158 {
159 addWorkers(spareWorkers);
160 }
161
162 if (this.workers.size() == 0)
163 {
164 return null;
165 }
166
167 return (Worker)workers.pop();
168 }
169 }
170
171 /***
172 * Assign a job to a worker and execute it or queue the job if no
173 * worker is available.
174 *
175 * @param job the Job to process
176 */
177 public void process(RenderingJob job)
178 {
179 Worker worker = this.getWorker();
180
181 AccessControlContext context = AccessController.getContext();
182 if (worker==null)
183 {
184 queue.push(job);
185 queue.push(context);
186 }
187 else
188 {
189 try
190 {
191 synchronized (worker)
192 {
193 worker.setJob(job, context);
194
195 if (job.getTimeout() > 0) {
196 workersMonitored.add(worker);
197 }
198
199 worker.notify();
200 runningJobs++;
201 }
202 }
203 catch (Throwable t)
204 {
205 log.error("Worker exception", t);
206 }
207 }
208 }
209
210 /***
211 * Put back the worker in the idle queue unless there are pending jobs and
212 * worker can still be committed to a new job before being released.
213 */
214 public void release(Worker worker)
215 {
216
217
218
219
220 long jobTimeout = 0;
221
222 RenderingJob oldJob = (RenderingJob) worker.getJob();
223 if (oldJob != null)
224 {
225 jobTimeout = oldJob.getTimeout();
226 }
227
228 synchronized (worker)
229 {
230 if ((worker.getJobCount()<this.maxJobsPerWorker)&&(queue.size()>0))
231 {
232 RenderingJob job = (RenderingJob)queue.pop();
233 AccessControlContext context = (AccessControlContext)queue.pop();
234 worker.setJob(job, context);
235 runningJobs--;
236 return;
237 }
238 else
239 {
240 worker.setJob(null);
241 worker.resetJobCount();
242 runningJobs--;
243 }
244 }
245
246 if (jobTimeout > 0) {
247 workersMonitored.remove(worker);
248 }
249
250 synchronized (this.workers)
251 {
252 this.workers.push(worker);
253 }
254 }
255
256 public int getQueuedJobsCount()
257 {
258 return queue.size();
259 }
260
261 /***
262 * Returns a snapshot of the available jobs
263 * @return available jobs
264 */
265 public int getAvailableJobsCount()
266 {
267 return workers.size();
268 }
269
270 public int getRunningJobsCount()
271 {
272 return this.tg.activeCount();
273 }
274
275 class RenderingJobTimeoutMonitor extends Thread {
276
277 long interval = 1000;
278 boolean shouldRun = true;
279
280 RenderingJobTimeoutMonitor(long interval) {
281 super("RenderingJobTimeoutMonitor");
282
283 if (interval > 0) {
284 this.interval = interval;
285 }
286 }
287 /***
288 * Thread.stop() is deprecated.
289 * This method achieves the same by setting the run varaible "shouldRun" to false and interrupting the Thread,
290 * effectively causing the thread to shutdown correctly.
291 *
292 */
293 public void endThread()
294 {
295 shouldRun = false;
296 this.interrupt();
297 }
298
299 public void run() {
300 while (shouldRun) {
301 try
302 {
303
304
305
306
307 List timeoutWorkers = new ArrayList();
308
309 synchronized (workersMonitored)
310 {
311 for (Iterator it = workersMonitored.iterator(); it.hasNext(); )
312 {
313 WorkerImpl worker = (WorkerImpl) it.next();
314 RenderingJob job = (RenderingJob) worker.getJob();
315
316 if ((null != job) && (job.isTimeout()))
317 {
318 timeoutWorkers.add(worker);
319 }
320 }
321 }
322
323
324 for (Iterator it = timeoutWorkers.iterator(); it.hasNext(); )
325 {
326 WorkerImpl worker = (WorkerImpl) it.next();
327 RenderingJob job = (RenderingJob) worker.getJob();
328
329
330 if ((null != job) && (job.isTimeout()))
331 {
332 killJob(worker, job);
333 }
334 }
335 }
336 catch (Exception e)
337 {
338 log.error("Exception during job monitoring.", e);
339 }
340
341 try
342 {
343 synchronized (this)
344 {
345 wait(this.interval);
346 }
347 }
348 catch (InterruptedException e)
349 {
350 ;
351 }
352 }
353 }
354
355 public void killJob(WorkerImpl worker, RenderingJob job) {
356 try {
357 if (log.isWarnEnabled()) {
358 PortletWindow window = job.getWindow();
359 ObjectID windowId = (null != window ? window.getId() : null);
360 log.warn("Portlet Rendering job to be interrupted by timeout (" + job.getTimeout() + "ms): " + windowId);
361 }
362
363 int waitCount = 0;
364 PortletContent content = job.getPortletContent();
365
366 while (!content.isComplete()) {
367 if (++waitCount > 10) {
368 break;
369 }
370
371 worker.interrupt();
372
373 synchronized (content) {
374 content.wait();
375 }
376 }
377 } catch (Exception e) {
378 log.error("Exceptiong during job killing.", e);
379 }
380 }
381
382 }
383 }