comparison dwtx/core/internal/jobs/JobManager.d @ 122:9d0585bcb7aa

Add core.jobs package
author Frank Benoit <benoit@tionex.de>
date Tue, 12 Aug 2008 02:34:21 +0200
parents
children 862b05e0334a
comparison
equal deleted inserted replaced
121:c0304616ea23 122:9d0585bcb7aa
1 /*******************************************************************************
2 * Copyright (c) 2003, 2007 IBM Corporation and others.
3 * All rights reserved. This program and the accompanying materials
4 * are made available under the terms of the Eclipse Public License v1.0
5 * which accompanies this distribution, and is available at
6 * http://www.eclipse.org/legal/epl-v10.html
7 *
8 * Contributors:
9 * IBM Corporation - initial API and implementation
10 * Port to the D programming language:
11 * Frank Benoit <benoit@tionex.de>
12 *******************************************************************************/
13 module dwtx.core.internal.jobs.JobManager;
14
15 import dwt.dwthelper.utils;
16 import dwtx.dwtxhelper.Collection;
17 import tango.io.Stdout;
18 import tango.text.convert.Format;
19 import tango.time.WallClock;
20 import tango.time.Time;
21 import tango.core.Thread;
22 import tango.text.convert.Format;
23
24 //don't use ICU because this is used for debugging only (see bug 135785)
25 // import java.text.DateFormat;
26 // import java.text.FieldPosition;
27 // import java.text.SimpleDateFormat;
28
29 import dwtx.core.internal.runtime.RuntimeLog;
30 import dwtx.core.runtime.Assert;
31 import dwtx.core.runtime.IProgressMonitor;
32 import dwtx.core.runtime.IProgressMonitorWithBlocking;
33 import dwtx.core.runtime.IStatus;
34 import dwtx.core.runtime.NullProgressMonitor;
35 import dwtx.core.runtime.OperationCanceledException;
36 import dwtx.core.runtime.Status;
37 import dwtx.core.runtime.jobs.IJobChangeEvent;
38 import dwtx.core.runtime.jobs.IJobChangeListener;
39 import dwtx.core.runtime.jobs.IJobManager;
40 import dwtx.core.runtime.jobs.ILock;
41 import dwtx.core.runtime.jobs.ISchedulingRule;
42 import dwtx.core.runtime.jobs.Job;
43 import dwtx.core.runtime.jobs.JobChangeAdapter;
44 import dwtx.core.runtime.jobs.LockListener;
45 import dwtx.core.runtime.jobs.ProgressProvider;
46 import dwtx.osgi.util.NLS;
47
48 import dwtx.core.internal.jobs.ImplicitJobs;
49 import dwtx.core.internal.jobs.WorkerPool;
50 import dwtx.core.internal.jobs.JobListeners;
51 import dwtx.core.internal.jobs.LockManager;
52 import dwtx.core.internal.jobs.JobQueue;
53 import dwtx.core.internal.jobs.InternalJob;
54 import dwtx.core.internal.jobs.ThreadJob;
55 import dwtx.core.internal.jobs.JobOSGiUtils;
56 import dwtx.core.internal.jobs.Worker;
57 import dwtx.core.internal.jobs.Semaphore;
58 import dwtx.core.internal.jobs.JobChangeEvent;
59 import dwtx.core.internal.jobs.JobMessages;
60 import dwtx.core.internal.jobs.JobStatus;
61
62 /**
63 * Implementation of API type IJobManager
64 *
65 * Implementation note: all the data structures of this class are protected
66 * by a single lock object held as a private field in this class. The JobManager
67 * instance itself is not used because this class is publicly reachable, and third
68 * party clients may try to synchronize on it.
69 *
70 * The WorkerPool class uses its own monitor for synchronizing its data
71 * structures. To avoid deadlock between the two classes, the JobManager
72 * must NEVER call the worker pool while its own monitor is held.
73 */
74 public class JobManager : IJobManager {
75
76 /**
77 * The unique identifier constant of this plug-in.
78 */
79 public static const String PI_JOBS = "dwtx.core.jobs"; //$NON-NLS-1$
80
81 /**
82 * Status code constant indicating an error occurred while running a plug-in.
83 * For backward compatibility with Platform.PLUGIN_ERROR left at (value = 2).
84 */
85 public static const int PLUGIN_ERROR = 2;
86
87 private static const String OPTION_DEADLOCK_ERROR = PI_JOBS ~ "/jobs/errorondeadlock"; //$NON-NLS-1$
88 private static const String OPTION_DEBUG_BEGIN_END = PI_JOBS ~ "/jobs/beginend"; //$NON-NLS-1$
89 private static const String OPTION_DEBUG_JOBS = PI_JOBS ~ "/jobs"; //$NON-NLS-1$
90 private static const String OPTION_DEBUG_JOBS_TIMING = PI_JOBS ~ "/jobs/timing"; //$NON-NLS-1$
91 private static const String OPTION_LOCKS = PI_JOBS ~ "/jobs/locks"; //$NON-NLS-1$
92 private static const String OPTION_SHUTDOWN = PI_JOBS ~ "/jobs/shutdown"; //$NON-NLS-1$
93
94 static bool DEBUG = false;
95 static bool DEBUG_BEGIN_END = false;
96 static bool DEBUG_DEADLOCK = false;
97 static bool DEBUG_LOCKS = false;
98 static bool DEBUG_TIMING = false;
99 static bool DEBUG_SHUTDOWN = false;
100 // private static DateFormat DEBUG_FORMAT;
101
102 /**
103 * The singleton job manager instance. It must be a singleton because
104 * all job instances maintain a reference (as an optimization) and have no way
105 * of updating it.
106 */
107 private static JobManager instance = null;
108 /**
109 * Scheduling rule used for validation of client-defined rules.
110 */
111 private static ISchedulingRule nullRule;
112 private static void initNullRule(){
113 if( nullRule !is null ) return;
114 nullRule = new class ISchedulingRule {
115 public bool contains(ISchedulingRule rule) {
116 return rule is this;
117 }
118
119 public bool isConflicting(ISchedulingRule rule) {
120 return rule is this;
121 }
122 };
123 }
124
125 /**
126 * True if this manager is active, and false otherwise. A job manager
127 * starts out active, and becomes inactive if it has been shutdown
128 * and not restarted.
129 */
130 private /+volatile+/ bool active = true;
131
132 const ImplicitJobs implicitJobs;
133
134 private const JobListeners jobListeners;
135
136 /**
137 * The lock for synchronizing all activity in the job manager. To avoid deadlock,
138 * this lock must never be held for extended periods, and must never be
139 * held while third party code is being called.
140 */
141 private const Object lock;
142
143 private const LockManager lockManager;
144
145 /**
146 * The pool of worker threads.
147 */
148 private WorkerPool pool;
149
150 private ProgressProvider progressProvider = null;
151 /**
152 * Jobs that are currently running. Should only be modified from changeState
153 */
154 private const HashSet running;
155
156 /**
157 * Jobs that are sleeping. Some sleeping jobs are scheduled to wake
158 * up at a given start time, while others will sleep indefinitely until woken.
159 * Should only be modified from changeState
160 */
161 private const JobQueue sleeping;
162 /**
163 * True if this manager has been suspended, and false otherwise. A job manager
164 * starts out not suspended, and becomes suspended when <code>suspend</code>
165 * is invoked. Once suspended, no jobs will start running until <code>resume</code>
166 * is called.
167 */
168 private bool suspended = false;
169
170 /**
171 * jobs that are waiting to be run. Should only be modified from changeState
172 */
173 private const JobQueue waiting;
174
175 /**
176 * Counter to record wait queue insertion order.
177 */
178 private long waitQueueCounter;
179
180 public static void debug_(String msg) {
181 StringBuffer msgBuf = new StringBuffer(msg.length + 40);
182 if (DEBUG_TIMING) {
183 //lazy initialize to avoid overhead when not debugging
184 // if (DEBUG_FORMAT is null)
185 // DEBUG_FORMAT = new SimpleDateFormat("HH:mm:ss.SSS"); //$NON-NLS-1$
186 // DEBUG_FORMAT.format(new Date(), msgBuf, new FieldPosition(0));
187 auto time = WallClock.now();
188 msgBuf.append(Format("{:d2}:{:d2}:{:d2}:{:d3}",
189 time.time.span.hours,
190 time.time.span.minutes,
191 time.time.span.seconds,
192 time.time.span.millis ));
193 msgBuf.append('-');
194 }
195 msgBuf.append('[');
196 msgBuf.append(Thread.getThis().toString());
197 msgBuf.append(']');
198 msgBuf.append(msg);
199 Stdout.formatln( "{}", msgBuf.toString());
200 }
201
202 /**
203 * Returns the job manager singleton. For internal use only.
204 */
205 static synchronized JobManager getInstance() {
206 if (instance is null)
207 new JobManager();
208 return instance;
209 }
210
211 /**
212 * For debugging purposes only
213 */
214 private static String printJobName(Job job) {
215 if (cast(ThreadJob)job ) {
216 Job realJob = (cast(ThreadJob) job).realJob;
217 if (realJob !is null)
218 return realJob.classinfo.name;
219 return Format("ThreadJob on rule: {}", job.getRule()); //$NON-NLS-1$
220 }
221 return job.classinfo.name;
222 }
223
224 /**
225 * For debugging purposes only
226 */
227 public static String printState(int state) {
228 switch (state) {
229 case Job.NONE :
230 return "NONE"; //$NON-NLS-1$
231 case Job.WAITING :
232 return "WAITING"; //$NON-NLS-1$
233 case Job.SLEEPING :
234 return "SLEEPING"; //$NON-NLS-1$
235 case Job.RUNNING :
236 return "RUNNING"; //$NON-NLS-1$
237 case InternalJob.BLOCKED :
238 return "BLOCKED"; //$NON-NLS-1$
239 case InternalJob.ABOUT_TO_RUN :
240 return "ABOUT_TO_RUN"; //$NON-NLS-1$
241 case InternalJob.ABOUT_TO_SCHEDULE :
242 return "ABOUT_TO_SCHEDULE";//$NON-NLS-1$
243 }
244 return "UNKNOWN"; //$NON-NLS-1$
245 }
246
247 /**
248 * Note that although this method is not API, clients have historically used
249 * it to force jobs shutdown in cases where OSGi shutdown does not occur.
250 * For this reason, this method should be considered near-API and should not
251 * be changed if at all possible.
252 */
253 public static void shutdown() {
254 if (instance !is null) {
255 instance.doShutdown();
256 instance = null;
257 }
258 }
259
260 private this() {
261 // DWT instance init
262 implicitJobs = new ImplicitJobs(this);
263 jobListeners = new JobListeners();
264 lock = new Object();
265 lockManager = new LockManager();
266
267 instance = this;
268
269 initDebugOptions();
270 synchronized (lock) {
271 waiting = new JobQueue(false);
272 sleeping = new JobQueue(true);
273 running = new HashSet(10);
274 pool = new WorkerPool(this);
275 }
276 pool.setDaemon(JobOSGiUtils.getDefault().useDaemonThreads());
277 }
278
279 /* (non-Javadoc)
280 * @see dwtx.core.runtime.jobs.IJobManager#addJobListener(dwtx.core.runtime.jobs.IJobChangeListener)
281 */
282 public void addJobChangeListener(IJobChangeListener listener) {
283 jobListeners.add(listener);
284 }
285
286 /* (non-Javadoc)
287 * @see dwtx.core.runtime.jobs.IJobManager#beginRule(dwtx.core.runtime.jobs.ISchedulingRule, dwtx.core.runtime.IProgressMonitor)
288 */
289 public void beginRule(ISchedulingRule rule, IProgressMonitor monitor) {
290 validateRule(rule);
291 implicitJobs.begin(rule, monitorFor(monitor), false);
292 }
293
294 /**
295 * Cancels a job
296 */
297 protected bool cancel(InternalJob job) {
298 IProgressMonitor monitor = null;
299 synchronized (lock) {
300 switch (job.getState_package()) {
301 case Job.NONE :
302 return true;
303 case Job.RUNNING :
304 //cannot cancel a job that has already started (as opposed to ABOUT_TO_RUN)
305 if (job.internalGetState() is Job.RUNNING) {
306 monitor = job.getProgressMonitor();
307 break;
308 }
309 //signal that the job should be canceled before it gets a chance to run
310 job.setAboutToRunCanceled(true);
311 return true;
312 default :
313 changeState(job, Job.NONE);
314 }
315 }
316 //call monitor outside sync block
317 if (monitor !is null) {
318 if (!monitor.isCanceled()) {
319 monitor.setCanceled(true);
320 job.canceling();
321 }
322 return false;
323 }
324 //only notify listeners if the job was waiting or sleeping
325 jobListeners.done(cast(Job) job, Status.CANCEL_STATUS, false);
326 return true;
327 }
328 package bool cancel_package(InternalJob job) {
329 return cancel(job);
330 }
331
332 /* (non-Javadoc)
333 * @see dwtx.core.runtime.jobs.IJobManager#cancel(java.lang.String)
334 */
335 public void cancel(Object family) {
336 //don't synchronize because cancel calls listeners
337 for (Iterator it = select(family).iterator(); it.hasNext();)
338 cancel(cast(InternalJob) it.next());
339 }
340
341 /**
342 * Atomically updates the state of a job, adding or removing from the
343 * necessary queues or sets.
344 */
345 private void changeState(InternalJob job, int newState) {
346 bool blockedJobs = false;
347 synchronized (lock) {
348 int oldState = job.internalGetState();
349 switch (oldState) {
350 case Job.NONE :
351 case InternalJob.ABOUT_TO_SCHEDULE :
352 break;
353 case InternalJob.BLOCKED :
354 //remove this job from the linked list of blocked jobs
355 job.remove();
356 break;
357 case Job.WAITING :
358 try {
359 waiting.remove(job);
360 } catch (RuntimeException e) {
361 Assert.isLegal(false, "Tried to remove a job that wasn't in the queue"); //$NON-NLS-1$
362 }
363 break;
364 case Job.SLEEPING :
365 try {
366 sleeping.remove(job);
367 } catch (RuntimeException e) {
368 Assert.isLegal(false, "Tried to remove a job that wasn't in the queue"); //$NON-NLS-1$
369 }
370 break;
371 case Job.RUNNING :
372 case InternalJob.ABOUT_TO_RUN :
373 running.remove(job);
374 //add any blocked jobs back to the wait queue
375 InternalJob blocked = job.previous();
376 job.remove();
377 blockedJobs = blocked !is null;
378 while (blocked !is null) {
379 InternalJob previous = blocked.previous();
380 changeState(blocked, Job.WAITING);
381 blocked = previous;
382 }
383 break;
384 default :
385 Assert.isLegal(false, Format("Invalid job state: {}, state: {}", job, oldState)); //$NON-NLS-1$ //$NON-NLS-2$
386 }
387 job.internalSetState(newState);
388 switch (newState) {
389 case Job.NONE :
390 job.setStartTime(InternalJob.T_NONE);
391 job.setWaitQueueStamp(InternalJob.T_NONE);
392 case InternalJob.BLOCKED :
393 break;
394 case Job.WAITING :
395 waiting.enqueue(job);
396 break;
397 case Job.SLEEPING :
398 try {
399 sleeping.enqueue(job);
400 } catch (RuntimeException e) {
401 throw new RuntimeException(Format("Error changing from state: ", oldState)); //$NON-NLS-1$
402 }
403 break;
404 case Job.RUNNING :
405 case InternalJob.ABOUT_TO_RUN :
406 job.setStartTime(InternalJob.T_NONE);
407 job.setWaitQueueStamp(InternalJob.T_NONE);
408 running.add(job);
409 break;
410 case InternalJob.ABOUT_TO_SCHEDULE :
411 break;
412 default :
413 Assert.isLegal(false, Format("Invalid job state: {}, state: {}", job, newState)); //$NON-NLS-1$ //$NON-NLS-2$
414 }
415 }
416 //notify queue outside sync block
417 if (blockedJobs)
418 pool.jobQueued();
419 }
420
421 /**
422 * Returns a new progress monitor for this job, belonging to the given
423 * progress group. Returns null if it is not a valid time to set the job's group.
424 */
425 protected IProgressMonitor createMonitor(InternalJob job, IProgressMonitor group, int ticks) {
426 synchronized (lock) {
427 //group must be set before the job is scheduled
428 //this includes the ABOUT_TO_SCHEDULE state, during which it is still
429 //valid to set the progress monitor
430 if (job.getState_package() !is Job.NONE)
431 return null;
432 IProgressMonitor monitor = null;
433 if (progressProvider !is null)
434 monitor = progressProvider.createMonitor(cast(Job) job, group, ticks);
435 if (monitor is null)
436 monitor = new NullProgressMonitor();
437 return monitor;
438 }
439 }
440 package IProgressMonitor createMonitor_package(InternalJob job, IProgressMonitor group, int ticks) {
441 return createMonitor(job, group, ticks);
442 }
443
444 /**
445 * Returns a new progress monitor for this job. Never returns null.
446 */
447 private IProgressMonitor createMonitor(Job job) {
448 IProgressMonitor monitor = null;
449 if (progressProvider !is null)
450 monitor = progressProvider.createMonitor(job);
451 if (monitor is null)
452 monitor = new NullProgressMonitor();
453 return monitor;
454 }
455
456 /* (non-Javadoc)
457 * @see dwtx.core.runtime.jobs.IJobManager#createProgressGroup()
458 */
459 public IProgressMonitor createProgressGroup() {
460 if (progressProvider !is null)
461 return progressProvider.createProgressGroup();
462 return new NullProgressMonitor();
463 }
464
465 /* (non-Javadoc)
466 * @see dwtx.core.runtime.jobs.IJobManager#currentJob()
467 */
468 public Job currentJob() {
469 Thread current = Thread.getThis();
470 if (cast(Worker)current )
471 return (cast(Worker) current).currentJob();
472 synchronized (lock) {
473 for (Iterator it = running.iterator(); it.hasNext();) {
474 Job job = cast(Job) it.next();
475 if (job.getThread() is current)
476 return job;
477 }
478 }
479 return null;
480 }
481
482 /**
483 * Returns the delay in milliseconds that a job with a given priority can
484 * tolerate waiting.
485 */
486 private long delayFor(int priority) {
487 //these values may need to be tweaked based on machine speed
488 switch (priority) {
489 case Job.INTERACTIVE :
490 return 0L;
491 case Job.SHORT :
492 return 50L;
493 case Job.LONG :
494 return 100L;
495 case Job.BUILD :
496 return 500L;
497 case Job.DECORATE :
498 return 1000L;
499 default :
500 Assert.isTrue(false, Format("Job has invalid priority: {}", priority)); //$NON-NLS-1$
501 return 0;
502 }
503 }
504
505 /**
506 * Performs the scheduling of a job. Does not perform any notifications.
507 */
508 private void doSchedule(InternalJob job, long delay) {
509 synchronized (lock) {
510 //if it's a decoration job with no rule, don't run it right now if the system is busy
511 if (job.getPriority() is Job.DECORATE && job.getRule() is null) {
512 long minDelay = running.size() * 100;
513 delay = Math.max(delay, minDelay);
514 }
515 if (delay > 0) {
516 job.setStartTime(System.currentTimeMillis() + delay);
517 changeState(job, Job.SLEEPING);
518 } else {
519 job.setStartTime(System.currentTimeMillis() + delayFor(job.getPriority()));
520 job.setWaitQueueStamp(waitQueueCounter++);
521 changeState(job, Job.WAITING);
522 }
523 }
524 }
525
526 /**
527 * Shuts down the job manager. Currently running jobs will be told
528 * to stop, but worker threads may still continue processing.
529 * (note: This implemented IJobManager.shutdown which was removed
530 * due to problems caused by premature shutdown)
531 */
532 private void doShutdown() {
533 Job[] toCancel = null;
534 synchronized (lock) {
535 if (active) {
536 active = false;
537 //cancel all running jobs
538 toCancel = arraycast!(Job)( running.toArray());
539 //clean up
540 sleeping.clear();
541 waiting.clear();
542 running.clear();
543 }
544 }
545
546 // Give running jobs a chance to finish. Wait 0.1 seconds for up to 3 times.
547 if (toCancel !is null && toCancel.length > 0) {
548 for (int i = 0; i < toCancel.length; i++) {
549 cancel(cast(InternalJob)toCancel[i]); // cancel jobs outside sync block to avoid deadlock
550 }
551
552 for (int waitAttempts = 0; waitAttempts < 3; waitAttempts++) {
553 Thread.yield();
554 synchronized (lock) {
555 if (running.isEmpty())
556 break;
557 }
558 if (DEBUG_SHUTDOWN) {
559 JobManager.debug_(Format("Shutdown - job wait cycle #{}", (waitAttempts + 1))); //$NON-NLS-1$
560 Job[] stillRunning = null;
561 synchronized (lock) {
562 stillRunning = arraycast!(Job)( running.toArray());
563 }
564 if (stillRunning !is null) {
565 for (int j = 0; j < stillRunning.length; j++) {
566 JobManager.debug_(Format("\tJob: {}", printJobName(stillRunning[j]))); //$NON-NLS-1$
567 }
568 }
569 }
570 try {
571 Thread.sleep(0.100);
572 } catch (InterruptedException e) {
573 //ignore
574 }
575 Thread.yield();
576 }
577
578 synchronized (lock) { // retrieve list of the jobs that are still running
579 toCancel = arraycast!(Job)( running.toArray());
580 }
581 }
582
583 if (toCancel !is null) {
584 for (int i = 0; i < toCancel.length; i++) {
585 String jobName = printJobName(toCancel[i]);
586 //this doesn't need to be translated because it's just being logged
587 String msg = "Job found still running after platform shutdown. Jobs should be canceled by the plugin that scheduled them during shutdown: " ~ jobName; //$NON-NLS-1$
588 RuntimeLog.log(new Status(IStatus.WARNING, JobManager.PI_JOBS, JobManager.PLUGIN_ERROR, msg, null));
589
590 // TODO the RuntimeLog.log in its current implementation won't produce a log
591 // during this stage of shutdown. For now add a standard error output.
592 // One the logging story is improved, the System.err output below can be removed:
593 Stderr.formatln("{}", msg);
594 }
595 }
596
597 pool.shutdown_package();
598 }
599
600 /**
601 * Indicates that a job was running, and has now finished. Note that this method
602 * can be called under OutOfMemoryError conditions and thus must be paranoid
603 * about allocating objects.
604 */
605 protected void endJob(InternalJob job, IStatus result, bool notify) {
606 long rescheduleDelay = InternalJob.T_NONE;
607 synchronized (lock) {
608 //if the job is finishing asynchronously, there is nothing more to do for now
609 if (result is Job.ASYNC_FINISH)
610 return;
611 //if job is not known then it cannot be done
612 if (job.getState_package() is Job.NONE)
613 return;
614 if (JobManager.DEBUG && notify)
615 JobManager.debug_(Format("Ending job: {}", job)); //$NON-NLS-1$
616 job.setResult(result);
617 job.setProgressMonitor(null);
618 job.setThread_package(null);
619 rescheduleDelay = job.getStartTime();
620 changeState(job, Job.NONE);
621 }
622 //notify listeners outside sync block
623 final bool reschedule = active && rescheduleDelay > InternalJob.T_NONE && job.shouldSchedule_package();
624 if (notify)
625 jobListeners.done(cast(Job) job, result, reschedule);
626 //reschedule the job if requested and we are still active
627 if (reschedule)
628 schedule(job, rescheduleDelay, reschedule);
629 }
630 package void endJob_package(InternalJob job, IStatus result, bool notify) {
631 endJob(job, result, notify);
632 }
633
634 /* (non-Javadoc)
635 * @see dwtx.core.runtime.jobs.IJobManager#endRule(dwtx.core.runtime.jobs.ISchedulingRule)
636 */
637 public void endRule(ISchedulingRule rule) {
638 implicitJobs.end(rule, false);
639 }
640
641 /* (non-Javadoc)
642 * @see dwtx.core.runtime.jobs.IJobManager#find(java.lang.String)
643 */
644 public Job[] find(Object family) {
645 List members = select(family);
646 return arraycast!(Job)( members.toArray());
647 }
648
649 /**
650 * Returns a running or blocked job whose scheduling rule conflicts with the
651 * scheduling rule of the given waiting job. Returns null if there are no
652 * conflicting jobs. A job can only run if there are no running jobs and no blocked
653 * jobs whose scheduling rule conflicts with its rule.
654 */
655 protected InternalJob findBlockingJob(InternalJob waitingJob) {
656 if (waitingJob.getRule() is null)
657 return null;
658 synchronized (lock) {
659 if (running.isEmpty())
660 return null;
661 //check the running jobs
662 bool hasBlockedJobs = false;
663 for (Iterator it = running.iterator(); it.hasNext();) {
664 InternalJob job = cast(InternalJob) it.next();
665 if (waitingJob.isConflicting(job))
666 return job;
667 if (!hasBlockedJobs)
668 hasBlockedJobs = job.previous() !is null;
669 }
670 //there are no blocked jobs, so we are done
671 if (!hasBlockedJobs)
672 return null;
673 //check all jobs blocked by running jobs
674 for (Iterator it = running.iterator(); it.hasNext();) {
675 InternalJob job = cast(InternalJob) it.next();
676 while (true) {
677 job = job.previous();
678 if (job is null)
679 break;
680 if (waitingJob.isConflicting(job))
681 return job;
682 }
683 }
684 }
685 return null;
686 }
687 package InternalJob findBlockingJob_package(InternalJob waitingJob) {
688 return findBlockingJob(waitingJob);
689 }
690
691 public LockManager getLockManager() {
692 return lockManager;
693 }
694
695 private void initDebugOptions() {
696 DEBUG = JobOSGiUtils.getDefault().getBooleanDebugOption(OPTION_DEBUG_JOBS, false);
697 DEBUG_BEGIN_END = JobOSGiUtils.getDefault().getBooleanDebugOption(OPTION_DEBUG_BEGIN_END, false);
698 DEBUG_DEADLOCK = JobOSGiUtils.getDefault().getBooleanDebugOption(OPTION_DEADLOCK_ERROR, false);
699 DEBUG_LOCKS = JobOSGiUtils.getDefault().getBooleanDebugOption(OPTION_LOCKS, false);
700 DEBUG_TIMING = JobOSGiUtils.getDefault().getBooleanDebugOption(OPTION_DEBUG_JOBS_TIMING, false);
701 DEBUG_SHUTDOWN = JobOSGiUtils.getDefault().getBooleanDebugOption(OPTION_SHUTDOWN, false);
702 }
703
704 /**
705 * Returns whether the job manager is active (has not been shutdown).
706 */
707 protected bool isActive() {
708 return active;
709 }
710 package bool isActive_package() {
711 return isActive();
712 }
713
714 /**
715 * Returns true if the given job is blocking the execution of a non-system
716 * job.
717 */
718 protected bool isBlocking(InternalJob runningJob) {
719 synchronized (lock) {
720 // if this job isn't running, it can't be blocking anyone
721 if (runningJob.getState_package() !is Job.RUNNING)
722 return false;
723 // if any job is queued behind this one, it is blocked by it
724 InternalJob previous = runningJob.previous();
725 while (previous !is null) {
726 // ignore jobs of lower priority (higher priority value means lower priority)
727 if (previous.getPriority() < runningJob.getPriority()) {
728 if (!previous.isSystem_package())
729 return true;
730 // implicit jobs should interrupt unless they act on behalf of system jobs
731 if (cast(ThreadJob)previous && (cast(ThreadJob) previous).shouldInterrupt())
732 return true;
733 }
734 previous = previous.previous();
735 }
736 // none found
737 return false;
738 }
739 }
740 package bool isBlocking_package(InternalJob runningJob) {
741 return isBlocking(runningJob);
742 }
743
744 /* (non-Javadoc)
745 * @see dwtx.core.runtime.jobs.IJobManager#isIdle()
746 */
747 public bool isIdle() {
748 synchronized (lock) {
749 return running.isEmpty() && waiting.isEmpty();
750 }
751 }
752
753 /* (non-Javadoc)
754 * @see dwtx.core.runtime.jobs.IJobManager#isSuspended()
755 */
756 public bool isSuspended() {
757 synchronized (lock) {
758 return suspended;
759 }
760 }
761
762 /* (non-Javadoc)
763 * @see dwtx.core.runtime.jobs.Job#job(dwtx.core.runtime.jobs.Job)
764 */
765 protected void join(InternalJob job) {
766 IJobChangeListener listener;
767 Semaphore barrier;
768 synchronized (lock) {
769 int state = job.getState_package();
770 if (state is Job.NONE)
771 return;
772 //don't join a waiting or sleeping job when suspended (deadlock risk)
773 if (suspended && state !is Job.RUNNING)
774 return;
775 //it's an error for a job to join itself
776 if (state is Job.RUNNING && job.getThread_package() is Thread.getThis())
777 throw new IllegalStateException("Job attempted to join itself"); //$NON-NLS-1$
778 //the semaphore will be released when the job is done
779 barrier = new Semaphore(null);
780 listener = new class(barrier) JobChangeAdapter {
781 Semaphore barrier_;
782 this( Semaphore a ){
783 barrier_ = a;
784 }
785 public void done(IJobChangeEvent event) {
786 barrier_.release();
787 }
788 };
789 job.addJobChangeListener_package(listener);
790 //compute set of all jobs that must run before this one
791 //add a listener that removes jobs from the blocking set when they finish
792 }
793 //wait until listener notifies this thread.
794 try {
795 while (true) {
796 //notify hook to service pending syncExecs before falling asleep
797 lockManager.aboutToWait(job.getThread_package());
798 try {
799 if (barrier.acquire(Long.MAX_VALUE))
800 break;
801 } catch (InterruptedException e) {
802 //loop and keep trying
803 }
804 }
805 } finally {
806 lockManager.aboutToRelease();
807 job.removeJobChangeListener_package(listener);
808 }
809 }
810 package void join_package(InternalJob job) {
811 join(job);
812 }
813
814 /* (non-Javadoc)
815 * @see IJobManager#join(String, IProgressMonitor)
816 */
817 public void join(Object family_, IProgressMonitor monitor) {
818 monitor = monitorFor(monitor);
819 IJobChangeListener listener = null;
820 Set jobs_;
821 int jobCount;
822 Job blocking = null;
823 synchronized (lock) {
824 //don't join a waiting or sleeping job when suspended (deadlock risk)
825 int states = suspended ? Job.RUNNING : Job.RUNNING | Job.WAITING | Job.SLEEPING;
826 jobs_ = Collections.synchronizedSet(new HashSet(select(family_, states)));
827 jobCount = jobs_.size();
828 if (jobCount > 0) {
829 //if there is only one blocking job, use it in the blockage callback below
830 if (jobCount is 1)
831 blocking = cast(Job) jobs_.iterator().next();
832 listener = new class(family_, jobs_ )JobChangeAdapter {
833 Object family;
834 Set jobs;
835 this(Object a, Set b){
836 family = a;
837 jobs = b;
838 }
839 public void done(IJobChangeEvent event) {
840 //don't remove from list if job is being rescheduled
841 if (!(cast(JobChangeEvent) event).reschedule)
842 jobs.remove(event.getJob());
843 }
844
845 //update the list of jobs if new ones are added during the join
846 public void scheduled(IJobChangeEvent event) {
847 //don't add to list if job is being rescheduled
848 if ((cast(JobChangeEvent) event).reschedule)
849 return;
850 Job job = event.getJob();
851 if (job.belongsTo(family))
852 jobs.add(job);
853 }
854 };
855 addJobChangeListener(listener);
856 }
857 }
858 if (jobCount is 0) {
859 //use up the monitor outside synchronized block because monitors call untrusted code
860 monitor.beginTask(JobMessages.jobs_blocked0, 1);
861 monitor.done();
862 return;
863 }
864 //spin until all jobs are completed
865 try {
866 monitor.beginTask(JobMessages.jobs_blocked0, jobCount);
867 monitor.subTask(NLS.bind(JobMessages.jobs_waitFamSub, Integer.toString(jobCount)));
868 reportBlocked(monitor, blocking);
869 int jobsLeft;
870 int reportedWorkDone = 0;
871 while ((jobsLeft = jobs_.size()) > 0) {
872 //don't let there be negative work done if new jobs have
873 //been added since the join began
874 int actualWorkDone = Math.max(0, jobCount - jobsLeft);
875 if (reportedWorkDone < actualWorkDone) {
876 monitor.worked(actualWorkDone - reportedWorkDone);
877 reportedWorkDone = actualWorkDone;
878 monitor.subTask(NLS.bind(JobMessages.jobs_waitFamSub, Integer.toString(jobsLeft)));
879 }
880 implMissing(__FILE__, __LINE__ );
881 // DWT
882 // if (Thread.interrupted())
883 // throw new InterruptedException();
884 if (monitor.isCanceled())
885 throw new OperationCanceledException();
886 //notify hook to service pending syncExecs before falling asleep
887 lockManager.aboutToWait(null);
888 Thread.sleep(0.100);
889 }
890 } finally {
891 lockManager.aboutToRelease();
892 removeJobChangeListener(listener);
893 reportUnblocked(monitor);
894 monitor.done();
895 }
896 }
897
898 /**
899 * Returns a non-null progress monitor instance. If the monitor is null,
900 * returns the default monitor supplied by the progress provider, or a
901 * NullProgressMonitor if no default monitor is available.
902 */
903 private IProgressMonitor monitorFor(IProgressMonitor monitor) {
904 if (monitor is null || (cast(NullProgressMonitor)monitor )) {
905 if (progressProvider !is null) {
906 try {
907 monitor = progressProvider.getDefaultMonitor();
908 } catch (Exception e) {
909 String msg = NLS.bind(JobMessages.meta_pluginProblems, JobManager.PI_JOBS);
910 RuntimeLog.log(new Status(IStatus.ERROR, JobManager.PI_JOBS, JobManager.PLUGIN_ERROR, msg, e));
911 }
912 }
913 }
914
915 if (monitor is null)
916 return new NullProgressMonitor();
917 return monitor;
918 }
919
920 /* (non-Javadoc)
921 * @see IJobManager#newLock(java.lang.String)
922 */
923 public ILock newLock() {
924 return lockManager.newLock();
925 }
926
927 /**
928 * Removes and returns the first waiting job in the queue. Returns null if there
929 * are no items waiting in the queue. If an item is removed from the queue,
930 * it is moved to the running jobs list.
931 */
932 private Job nextJob() {
933 synchronized (lock) {
934 //do nothing if the job manager is suspended
935 if (suspended)
936 return null;
937 //tickle the sleep queue to see if anyone wakes up
938 long now = System.currentTimeMillis();
939 InternalJob job = sleeping.peek();
940 while (job !is null && job.getStartTime() < now) {
941 job.setStartTime(now + delayFor(job.getPriority()));
942 job.setWaitQueueStamp(waitQueueCounter++);
943 changeState(job, Job.WAITING);
944 job = sleeping.peek();
945 }
946 //process the wait queue until we find a job whose rules are satisfied.
947 while ((job = waiting.peek()) !is null) {
948 InternalJob blocker = findBlockingJob(job);
949 if (blocker is null)
950 break;
951 //queue this job after the job that's blocking it
952 changeState(job, InternalJob.BLOCKED);
953 //assert job does not already belong to some other data structure
954 Assert.isTrue(job.next() is null);
955 Assert.isTrue(job.previous() is null);
956 blocker.addLast(job);
957 }
958 //the job to run must be in the running list before we exit
959 //the sync block, otherwise two jobs with conflicting rules could start at once
960 if (job !is null) {
961 changeState(job, InternalJob.ABOUT_TO_RUN);
962 if (JobManager.DEBUG)
963 JobManager.debug_(Format("Starting job: {}", job)); //$NON-NLS-1$
964 }
965 return cast(Job) job;
966 }
967 }
968
969 /* (non-Javadoc)
970 * @see dwtx.core.runtime.jobs.IJobManager#removeJobListener(dwtx.core.runtime.jobs.IJobChangeListener)
971 */
972 public void removeJobChangeListener(IJobChangeListener listener) {
973 jobListeners.remove(listener);
974 }
975
976 /**
977 * Report to the progress monitor that this thread is blocked, supplying
978 * an information message, and if possible the job that is causing the blockage.
979 * Important: An invocation of this method MUST be followed eventually be
980 * an invocation of reportUnblocked.
981 * @param monitor The monitor to report blocking to
982 * @param blockingJob The job that is blocking this thread, or <code>null</code>
983 * @see #reportUnblocked
984 */
985 final void reportBlocked(IProgressMonitor monitor, InternalJob blockingJob) {
986 if (!(cast(IProgressMonitorWithBlocking)monitor ))
987 return;
988 IStatus reason;
989 if (blockingJob is null || cast(ThreadJob)blockingJob || blockingJob.isSystem_package()) {
990 reason = new Status(IStatus.INFO, JobManager.PI_JOBS, 1, JobMessages.jobs_blocked0, null);
991 } else {
992 String msg = NLS.bind(JobMessages.jobs_blocked1, blockingJob.getName_package());
993 reason = new JobStatus(IStatus.INFO, cast(Job) blockingJob, msg);
994 }
995 (cast(IProgressMonitorWithBlocking) monitor).setBlocked(reason);
996 }
997
998 /**
999 * Reports that this thread was blocked, but is no longer blocked and is able
1000 * to proceed.
1001 * @param monitor The monitor to report unblocking to.
1002 * @see #reportBlocked
1003 */
1004 final void reportUnblocked(IProgressMonitor monitor) {
1005 if (cast(IProgressMonitorWithBlocking)monitor )
1006 (cast(IProgressMonitorWithBlocking) monitor).clearBlocked();
1007 }
1008
1009 /*(non-Javadoc)
1010 * @see dwtx.core.runtime.jobs.IJobManager#resume()
1011 */
1012 public final void resume() {
1013 synchronized (lock) {
1014 suspended = false;
1015 //poke the job pool
1016 pool.jobQueued();
1017 }
1018 }
1019
1020 /** (non-Javadoc)
1021 * @deprecated this method should not be used
1022 * @see dwtx.core.runtime.jobs.IJobManager#resume(dwtx.core.runtime.jobs.ISchedulingRule)
1023 */
1024 public final void resume(ISchedulingRule rule) {
1025 implicitJobs.resume(rule);
1026 }
1027
1028 /**
1029 * Attempts to immediately start a given job. Returns true if the job was
1030 * successfully started, and false if it could not be started immediately
1031 * due to a currently running job with a conflicting rule. Listeners will never
1032 * be notified of jobs that are run in this way.
1033 */
1034 protected bool runNow(InternalJob job) {
1035 synchronized (lock) {
1036 //cannot start if there is a conflicting job
1037 if (findBlockingJob(job) !is null)
1038 return false;
1039 changeState(job, Job.RUNNING);
1040 job.setProgressMonitor(new NullProgressMonitor());
1041 job.run_package(null);
1042 }
1043 return true;
1044 }
1045 package bool runNow_package(InternalJob job) {
1046 return runNow(job);
1047 }
1048
1049 /* (non-Javadoc)
1050 * @see dwtx.core.runtime.jobs.Job#schedule(long)
1051 */
1052 protected void schedule(InternalJob job, long delay, bool reschedule) {
1053 if (!active)
1054 throw new IllegalStateException("Job manager has been shut down."); //$NON-NLS-1$
1055 Assert.isNotNull(job, "Job is null"); //$NON-NLS-1$
1056 Assert.isLegal(delay >= 0, "Scheduling delay is negative"); //$NON-NLS-1$
1057 synchronized (lock) {
1058 //if the job is already running, set it to be rescheduled when done
1059 if (job.getState_package() is Job.RUNNING) {
1060 job.setStartTime(delay);
1061 return;
1062 }
1063 //can't schedule a job that is waiting or sleeping
1064 if (job.internalGetState() !is Job.NONE)
1065 return;
1066 if (JobManager.DEBUG)
1067 JobManager.debug_(Format("Scheduling job: {}", job)); //$NON-NLS-1$
1068 //remember that we are about to schedule the job
1069 //to prevent multiple schedule attempts from succeeding (bug 68452)
1070 changeState(job, InternalJob.ABOUT_TO_SCHEDULE);
1071 }
1072 //notify listeners outside sync block
1073 jobListeners.scheduled(cast(Job) job, delay, reschedule);
1074 //schedule the job
1075 doSchedule(job, delay);
1076 //call the pool outside sync block to avoid deadlock
1077 pool.jobQueued();
1078 }
1079 package void schedule_package(InternalJob job, long delay, bool reschedule) {
1080 schedule(job, delay, reschedule);
1081 }
1082
1083 /**
1084 * Adds all family members in the list of jobs to the collection
1085 */
1086 private void select(List members, Object family, InternalJob firstJob, int stateMask) {
1087 if (firstJob is null)
1088 return;
1089 InternalJob job = firstJob;
1090 do {
1091 //note that job state cannot be NONE at this point
1092 if ((family is null || job.belongsTo_package(family)) && ((job.getState_package() & stateMask) !is 0))
1093 members.add(job);
1094 job = job.previous();
1095 } while (job !is null && job !is firstJob);
1096 }
1097
1098 /**
1099 * Returns a list of all jobs known to the job manager that belong to the given family.
1100 */
1101 private List select(Object family) {
1102 return select(family, Job.WAITING | Job.SLEEPING | Job.RUNNING);
1103 }
1104
1105 /**
1106 * Returns a list of all jobs known to the job manager that belong to the given
1107 * family and are in one of the provided states.
1108 */
1109 private List select(Object family, int stateMask) {
1110 List members = new ArrayList();
1111 synchronized (lock) {
1112 if ((stateMask & Job.RUNNING) !is 0) {
1113 for (Iterator it = running.iterator(); it.hasNext();) {
1114 select(members, family, cast(InternalJob) it.next(), stateMask);
1115 }
1116 }
1117 if ((stateMask & Job.WAITING) !is 0)
1118 select(members, family, waiting.peek(), stateMask);
1119 if ((stateMask & Job.SLEEPING) !is 0)
1120 select(members, family, sleeping.peek(), stateMask);
1121 }
1122 return members;
1123 }
1124
1125 /* (non-Javadoc)
1126 * @see IJobManager#setLockListener(LockListener)
1127 */
1128 public void setLockListener(LockListener listener) {
1129 lockManager.setLockListener(listener);
1130 }
1131
1132 /**
1133 * Changes a job priority.
1134 */
1135 protected void setPriority(InternalJob job, int newPriority) {
1136 synchronized (lock) {
1137 int oldPriority = job.getPriority();
1138 if (oldPriority is newPriority)
1139 return;
1140 job.internalSetPriority(newPriority);
1141 //if the job is waiting to run, re-shuffle the queue
1142 if (job.getState_package() is Job.WAITING) {
1143 long oldStart = job.getStartTime();
1144 job.setStartTime(oldStart + (delayFor(newPriority) - delayFor(oldPriority)));
1145 waiting.resort(job);
1146 }
1147 }
1148 }
1149 package void setPriority_package(InternalJob job, int newPriority) {
1150 setPriority(job, newPriority);
1151 }
1152
1153 /* (non-Javadoc)
1154 * @see IJobManager#setProgressProvider(IProgressProvider)
1155 */
1156 public void setProgressProvider(ProgressProvider provider) {
1157 progressProvider = provider;
1158 }
1159
1160 /* (non-Javadoc)
1161 * @see Job#setRule
1162 */
1163 public void setRule(InternalJob job, ISchedulingRule rule) {
1164 synchronized (lock) {
1165 //cannot change the rule of a job that is already running
1166 Assert.isLegal(job.getState_package() is Job.NONE);
1167 validateRule(rule);
1168 job.internalSetRule(rule);
1169 }
1170 }
1171
1172 /**
1173 * Puts a job to sleep. Returns true if the job was successfully put to sleep.
1174 */
1175 protected bool sleep(InternalJob job) {
1176 synchronized (lock) {
1177 switch (job.getState_package()) {
1178 case Job.RUNNING :
1179 //cannot be paused if it is already running (as opposed to ABOUT_TO_RUN)
1180 if (job.internalGetState() is Job.RUNNING)
1181 return false;
1182 //job hasn't started running yet (aboutToRun listener)
1183 break;
1184 case Job.SLEEPING :
1185 //update the job wake time
1186 job.setStartTime(InternalJob.T_INFINITE);
1187 //change state again to re-shuffle the sleep queue
1188 changeState(job, Job.SLEEPING);
1189 return true;
1190 case Job.NONE :
1191 return true;
1192 case Job.WAITING :
1193 //put the job to sleep
1194 break;
1195 }
1196 job.setStartTime(InternalJob.T_INFINITE);
1197 changeState(job, Job.SLEEPING);
1198 }
1199 jobListeners.sleeping(cast(Job) job);
1200 return true;
1201 }
1202 package bool sleep_package(InternalJob job) {
1203 return sleep(job);
1204 }
1205
1206 /* (non-Javadoc)
1207 * @see IJobManager#sleep(String)
1208 */
1209 public void sleep(Object family) {
1210 //don't synchronize because sleep calls listeners
1211 for (Iterator it = select(family).iterator(); it.hasNext();) {
1212 sleep(cast(InternalJob) it.next());
1213 }
1214 }
1215
1216 /**
1217 * Returns the estimated time in milliseconds before the next job is scheduled
1218 * to wake up. The result may be negative. Returns InternalJob.T_INFINITE if
1219 * there are no sleeping or waiting jobs.
1220 */
1221 protected long sleepHint() {
1222 synchronized (lock) {
1223 //wait forever if job manager is suspended
1224 if (suspended)
1225 return InternalJob.T_INFINITE;
1226 if (!waiting.isEmpty())
1227 return 0L;
1228 //return the anticipated time that the next sleeping job will wake
1229 InternalJob next = sleeping.peek();
1230 if (next is null)
1231 return InternalJob.T_INFINITE;
1232 return next.getStartTime() - System.currentTimeMillis();
1233 }
1234 }
1235 package long sleepHint_package() {
1236 return sleepHint();
1237 }
1238 /**
1239 * Returns the next job to be run, or null if no jobs are waiting to run.
1240 * The worker must call endJob when the job is finished running.
1241 */
1242 protected Job startJob() {
1243 Job job = null;
1244 while (true) {
1245 job = nextJob();
1246 if (job is null)
1247 return null;
1248 //must perform this outside sync block because it is third party code
1249 if (job.shouldRun()) {
1250 //check for listener veto
1251 jobListeners.aboutToRun(job);
1252 //listeners may have canceled or put the job to sleep
1253 synchronized (lock) {
1254 if (job.getState() is Job.RUNNING) {
1255 InternalJob internal = job;
1256 if (internal.isAboutToRunCanceled()) {
1257 internal.setAboutToRunCanceled(false);
1258 //fall through and end the job below
1259 } else {
1260 internal.setProgressMonitor(createMonitor(job));
1261 //change from ABOUT_TO_RUN to RUNNING
1262 internal.internalSetState(Job.RUNNING);
1263 break;
1264 }
1265 }
1266 }
1267 }
1268 if (job.getState() !is Job.SLEEPING) {
1269 //job has been vetoed or canceled, so mark it as done
1270 endJob(job, Status.CANCEL_STATUS, true);
1271 continue;
1272 }
1273 }
1274 jobListeners.running(job);
1275 return job;
1276
1277 }
1278 package Job startJob_package() {
1279 return startJob();
1280 }
1281
1282 /* non-Javadoc)
1283 * @see dwtx.core.runtime.jobs.IJobManager#suspend()
1284 */
1285 public final void suspend() {
1286 synchronized (lock) {
1287 suspended = true;
1288 }
1289 }
1290
1291 /** (non-Javadoc)
1292 * @deprecated this method should not be used
1293 * @see dwtx.core.runtime.jobs.IJobManager#suspend(dwtx.core.runtime.jobs.ISchedulingRule, dwtx.core.runtime.IProgressMonitor)
1294 */
1295 public final void suspend(ISchedulingRule rule, IProgressMonitor monitor) {
1296 Assert.isNotNull(cast(Object)rule);
1297 implicitJobs.suspend(rule, monitorFor(monitor));
1298 }
1299
1300 /* non-Javadoc)
1301 * @see dwtx.core.runtime.jobs.IJobManager#transferRule()
1302 */
1303 public void transferRule(ISchedulingRule rule, Thread destinationThread) {
1304 implicitJobs.transfer(rule, destinationThread);
1305 }
1306
1307 /**
1308 * Validates that the given scheduling rule obeys the constraints of
1309 * scheduling rules as described in the <code>ISchedulingRule</code>
1310 * javadoc specification.
1311 */
1312 private void validateRule(ISchedulingRule rule) {
1313 //null rule always valid
1314 if (rule is null)
1315 return;
1316 initNullRule();
1317 //contains method must be reflexive
1318 Assert.isLegal(rule.contains(rule));
1319 //contains method must return false when given an unknown rule
1320 Assert.isLegal(!rule.contains(nullRule));
1321 //isConflicting method must be reflexive
1322 Assert.isLegal(rule.isConflicting(rule));
1323 //isConflicting method must return false when given an unknown rule
1324 Assert.isLegal(!rule.isConflicting(nullRule));
1325 }
1326
1327 /* (non-Javadoc)
1328 * @see Job#wakeUp(long)
1329 */
1330 protected void wakeUp(InternalJob job, long delay) {
1331 Assert.isLegal(delay >= 0, "Scheduling delay is negative"); //$NON-NLS-1$
1332 synchronized (lock) {
1333 //cannot wake up if it is not sleeping
1334 if (job.getState_package() !is Job.SLEEPING)
1335 return;
1336 doSchedule(job, delay);
1337 }
1338 //call the pool outside sync block to avoid deadlock
1339 pool.jobQueued();
1340
1341 //only notify of wake up if immediate
1342 if (delay is 0)
1343 jobListeners.awake(cast(Job) job);
1344 }
1345 package void wakeUp_package(InternalJob job, long delay) {
1346 wakeUp(job, delay);
1347 }
1348
1349 /* (non-Javadoc)
1350 * @see IJobFamily#wakeUp(String)
1351 */
1352 public void wakeUp(Object family) {
1353 //don't synchronize because wakeUp calls listeners
1354 for (Iterator it = select(family).iterator(); it.hasNext();) {
1355 wakeUp(cast(InternalJob) it.next(), 0L);
1356 }
1357 }
1358 }