diff dwtx/core/internal/jobs/JobManager.d @ 122:9d0585bcb7aa

Add core.jobs package
author Frank Benoit <benoit@tionex.de>
date Tue, 12 Aug 2008 02:34:21 +0200
parents
children 862b05e0334a
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/dwtx/core/internal/jobs/JobManager.d	Tue Aug 12 02:34:21 2008 +0200
@@ -0,0 +1,1358 @@
+/*******************************************************************************
+ * Copyright (c) 2003, 2007 IBM Corporation and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ *     IBM Corporation - initial API and implementation
+ * Port to the D programming language:
+ *     Frank Benoit <benoit@tionex.de>
+ *******************************************************************************/
+module dwtx.core.internal.jobs.JobManager;
+
+import dwt.dwthelper.utils;
+import dwtx.dwtxhelper.Collection;
+import tango.io.Stdout;
+import tango.text.convert.Format;
+import tango.time.WallClock;
+import tango.time.Time;
+import tango.core.Thread;
+import tango.text.convert.Format;
+
+//don't use ICU because this is used for debugging only (see bug 135785)
+// import java.text.DateFormat;
+// import java.text.FieldPosition;
+// import java.text.SimpleDateFormat;
+
+import dwtx.core.internal.runtime.RuntimeLog;
+import dwtx.core.runtime.Assert;
+import dwtx.core.runtime.IProgressMonitor;
+import dwtx.core.runtime.IProgressMonitorWithBlocking;
+import dwtx.core.runtime.IStatus;
+import dwtx.core.runtime.NullProgressMonitor;
+import dwtx.core.runtime.OperationCanceledException;
+import dwtx.core.runtime.Status;
+import dwtx.core.runtime.jobs.IJobChangeEvent;
+import dwtx.core.runtime.jobs.IJobChangeListener;
+import dwtx.core.runtime.jobs.IJobManager;
+import dwtx.core.runtime.jobs.ILock;
+import dwtx.core.runtime.jobs.ISchedulingRule;
+import dwtx.core.runtime.jobs.Job;
+import dwtx.core.runtime.jobs.JobChangeAdapter;
+import dwtx.core.runtime.jobs.LockListener;
+import dwtx.core.runtime.jobs.ProgressProvider;
+import dwtx.osgi.util.NLS;
+
+import dwtx.core.internal.jobs.ImplicitJobs;
+import dwtx.core.internal.jobs.WorkerPool;
+import dwtx.core.internal.jobs.JobListeners;
+import dwtx.core.internal.jobs.LockManager;
+import dwtx.core.internal.jobs.JobQueue;
+import dwtx.core.internal.jobs.InternalJob;
+import dwtx.core.internal.jobs.ThreadJob;
+import dwtx.core.internal.jobs.JobOSGiUtils;
+import dwtx.core.internal.jobs.Worker;
+import dwtx.core.internal.jobs.Semaphore;
+import dwtx.core.internal.jobs.JobChangeEvent;
+import dwtx.core.internal.jobs.JobMessages;
+import dwtx.core.internal.jobs.JobStatus;
+
+/**
+ * Implementation of API type IJobManager
+ *
+ * Implementation note: all the data structures of this class are protected
+ * by a single lock object held as a private field in this class.  The JobManager
+ * instance itself is not used because this class is publicly reachable, and third
+ * party clients may try to synchronize on it.
+ *
+ * The WorkerPool class uses its own monitor for synchronizing its data
+ * structures. To avoid deadlock between the two classes, the JobManager
+ * must NEVER call the worker pool while its own monitor is held.
+ */
+public class JobManager : IJobManager {
+
+    /**
+     * The unique identifier constant of this plug-in.
+     */
+    public static const String PI_JOBS = "dwtx.core.jobs"; //$NON-NLS-1$
+
+    /**
+     * Status code constant indicating an error occurred while running a plug-in.
+     * For backward compatibility with Platform.PLUGIN_ERROR left at (value = 2).
+     */
+    public static const int PLUGIN_ERROR = 2;
+
+    private static const String OPTION_DEADLOCK_ERROR = PI_JOBS ~ "/jobs/errorondeadlock"; //$NON-NLS-1$
+    private static const String OPTION_DEBUG_BEGIN_END = PI_JOBS ~ "/jobs/beginend"; //$NON-NLS-1$
+    private static const String OPTION_DEBUG_JOBS = PI_JOBS ~ "/jobs"; //$NON-NLS-1$
+    private static const String OPTION_DEBUG_JOBS_TIMING = PI_JOBS ~ "/jobs/timing"; //$NON-NLS-1$
+    private static const String OPTION_LOCKS = PI_JOBS ~ "/jobs/locks"; //$NON-NLS-1$
+    private static const String OPTION_SHUTDOWN = PI_JOBS ~ "/jobs/shutdown"; //$NON-NLS-1$
+
+    static bool DEBUG = false;
+    static bool DEBUG_BEGIN_END = false;
+    static bool DEBUG_DEADLOCK = false;
+    static bool DEBUG_LOCKS = false;
+    static bool DEBUG_TIMING = false;
+    static bool DEBUG_SHUTDOWN = false;
+//     private static DateFormat DEBUG_FORMAT;
+
+    /**
+     * The singleton job manager instance. It must be a singleton because
+     * all job instances maintain a reference (as an optimization) and have no way
+     * of updating it.
+     */
+    private static JobManager instance = null;
+    /**
+     * Scheduling rule used for validation of client-defined rules.
+     */
+    private static ISchedulingRule nullRule;
+    private static void initNullRule(){
+        if( nullRule !is null ) return;
+        nullRule = new class ISchedulingRule {
+            public bool contains(ISchedulingRule rule) {
+                return rule is this;
+            }
+
+            public bool isConflicting(ISchedulingRule rule) {
+                return rule is this;
+            }
+        };
+    }
+
+    /**
+     * True if this manager is active, and false otherwise.  A job manager
+     * starts out active, and becomes inactive if it has been shutdown
+     * and not restarted.
+     */
+    private /+volatile+/ bool active = true;
+
+    const ImplicitJobs implicitJobs;
+
+    private const JobListeners jobListeners;
+
+    /**
+     * The lock for synchronizing all activity in the job manager.  To avoid deadlock,
+     * this lock must never be held for extended periods, and must never be
+     * held while third party code is being called.
+     */
+    private const Object lock;
+
+    private const LockManager lockManager;
+
+    /**
+     * The pool of worker threads.
+     */
+    private WorkerPool pool;
+
+    private ProgressProvider progressProvider = null;
+    /**
+     * Jobs that are currently running. Should only be modified from changeState
+     */
+    private const HashSet running;
+
+    /**
+     * Jobs that are sleeping.  Some sleeping jobs are scheduled to wake
+     * up at a given start time, while others will sleep indefinitely until woken.
+     * Should only be modified from changeState
+     */
+    private const JobQueue sleeping;
+    /**
+     * True if this manager has been suspended, and false otherwise.  A job manager
+     * starts out not suspended, and becomes suspended when <code>suspend</code>
+     * is invoked. Once suspended, no jobs will start running until <code>resume</code>
+     * is called.
+     */
+    private bool suspended = false;
+
+    /**
+     * jobs that are waiting to be run. Should only be modified from changeState
+     */
+    private const JobQueue waiting;
+
+    /**
+     * Counter to record wait queue insertion order.
+     */
+    private long waitQueueCounter;
+
+    public static void debug_(String msg) {
+        StringBuffer msgBuf = new StringBuffer(msg.length + 40);
+        if (DEBUG_TIMING) {
+            //lazy initialize to avoid overhead when not debugging
+//             if (DEBUG_FORMAT is null)
+//                 DEBUG_FORMAT = new SimpleDateFormat("HH:mm:ss.SSS"); //$NON-NLS-1$
+//             DEBUG_FORMAT.format(new Date(), msgBuf, new FieldPosition(0));
+            auto time = WallClock.now();
+            msgBuf.append(Format("{:d2}:{:d2}:{:d2}:{:d3}",
+                time.time.span.hours,
+                time.time.span.minutes,
+                time.time.span.seconds,
+                time.time.span.millis ));
+            msgBuf.append('-');
+        }
+        msgBuf.append('[');
+        msgBuf.append(Thread.getThis().toString());
+        msgBuf.append(']');
+        msgBuf.append(msg);
+        Stdout.formatln( "{}", msgBuf.toString());
+    }
+
+    /**
+     * Returns the job manager singleton. For internal use only.
+     */
+    static synchronized JobManager getInstance() {
+        if (instance is null)
+            new JobManager();
+        return instance;
+    }
+
+    /**
+     * For debugging purposes only
+     */
+    private static String printJobName(Job job) {
+        if (cast(ThreadJob)job ) {
+            Job realJob = (cast(ThreadJob) job).realJob;
+            if (realJob !is null)
+                return realJob.classinfo.name;
+            return Format("ThreadJob on rule: {}", job.getRule()); //$NON-NLS-1$
+        }
+        return job.classinfo.name;
+    }
+
+    /**
+     * For debugging purposes only
+     */
+    public static String printState(int state) {
+        switch (state) {
+            case Job.NONE :
+                return "NONE"; //$NON-NLS-1$
+            case Job.WAITING :
+                return "WAITING"; //$NON-NLS-1$
+            case Job.SLEEPING :
+                return "SLEEPING"; //$NON-NLS-1$
+            case Job.RUNNING :
+                return "RUNNING"; //$NON-NLS-1$
+            case InternalJob.BLOCKED :
+                return "BLOCKED"; //$NON-NLS-1$
+            case InternalJob.ABOUT_TO_RUN :
+                return "ABOUT_TO_RUN"; //$NON-NLS-1$
+            case InternalJob.ABOUT_TO_SCHEDULE :
+                return "ABOUT_TO_SCHEDULE";//$NON-NLS-1$
+        }
+        return "UNKNOWN"; //$NON-NLS-1$
+    }
+
+    /**
+     * Note that although this method is not API, clients have historically used
+     * it to force jobs shutdown in cases where OSGi shutdown does not occur.
+     * For this reason, this method should be considered near-API and should not
+     * be changed if at all possible.
+     */
+    public static void shutdown() {
+        if (instance !is null) {
+            instance.doShutdown();
+            instance = null;
+        }
+    }
+
+    private this() {
+        // DWT instance init
+        implicitJobs = new ImplicitJobs(this);
+        jobListeners = new JobListeners();
+        lock = new Object();
+        lockManager = new LockManager();
+
+        instance = this;
+
+        initDebugOptions();
+        synchronized (lock) {
+            waiting = new JobQueue(false);
+            sleeping = new JobQueue(true);
+            running = new HashSet(10);
+            pool = new WorkerPool(this);
+        }
+        pool.setDaemon(JobOSGiUtils.getDefault().useDaemonThreads());
+    }
+
+    /* (non-Javadoc)
+     * @see dwtx.core.runtime.jobs.IJobManager#addJobListener(dwtx.core.runtime.jobs.IJobChangeListener)
+     */
+    public void addJobChangeListener(IJobChangeListener listener) {
+        jobListeners.add(listener);
+    }
+
+    /* (non-Javadoc)
+     * @see dwtx.core.runtime.jobs.IJobManager#beginRule(dwtx.core.runtime.jobs.ISchedulingRule, dwtx.core.runtime.IProgressMonitor)
+     */
+    public void beginRule(ISchedulingRule rule, IProgressMonitor monitor) {
+        validateRule(rule);
+        implicitJobs.begin(rule, monitorFor(monitor), false);
+    }
+
+    /**
+     * Cancels a job
+     */
+    protected bool cancel(InternalJob job) {
+        IProgressMonitor monitor = null;
+        synchronized (lock) {
+            switch (job.getState_package()) {
+                case Job.NONE :
+                    return true;
+                case Job.RUNNING :
+                    //cannot cancel a job that has already started (as opposed to ABOUT_TO_RUN)
+                    if (job.internalGetState() is Job.RUNNING) {
+                        monitor = job.getProgressMonitor();
+                        break;
+                    }
+                    //signal that the job should be canceled before it gets a chance to run
+                    job.setAboutToRunCanceled(true);
+                    return true;
+                default :
+                    changeState(job, Job.NONE);
+            }
+        }
+        //call monitor outside sync block
+        if (monitor !is null) {
+            if (!monitor.isCanceled()) {
+                monitor.setCanceled(true);
+                job.canceling();
+            }
+            return false;
+        }
+        //only notify listeners if the job was waiting or sleeping
+        jobListeners.done(cast(Job) job, Status.CANCEL_STATUS, false);
+        return true;
+    }
+    package bool cancel_package(InternalJob job) {
+        return cancel(job);
+    }
+
+    /* (non-Javadoc)
+     * @see dwtx.core.runtime.jobs.IJobManager#cancel(java.lang.String)
+     */
+    public void cancel(Object family) {
+        //don't synchronize because cancel calls listeners
+        for (Iterator it = select(family).iterator(); it.hasNext();)
+            cancel(cast(InternalJob) it.next());
+    }
+
+    /**
+     * Atomically updates the state of a job, adding or removing from the
+     * necessary queues or sets.
+     */
+    private void changeState(InternalJob job, int newState) {
+        bool blockedJobs = false;
+        synchronized (lock) {
+            int oldState = job.internalGetState();
+            switch (oldState) {
+                case Job.NONE :
+                case InternalJob.ABOUT_TO_SCHEDULE :
+                    break;
+                case InternalJob.BLOCKED :
+                    //remove this job from the linked list of blocked jobs
+                    job.remove();
+                    break;
+                case Job.WAITING :
+                    try {
+                        waiting.remove(job);
+                    } catch (RuntimeException e) {
+                        Assert.isLegal(false, "Tried to remove a job that wasn't in the queue"); //$NON-NLS-1$
+                    }
+                    break;
+                case Job.SLEEPING :
+                    try {
+                        sleeping.remove(job);
+                    } catch (RuntimeException e) {
+                        Assert.isLegal(false, "Tried to remove a job that wasn't in the queue"); //$NON-NLS-1$
+                    }
+                    break;
+                case Job.RUNNING :
+                case InternalJob.ABOUT_TO_RUN :
+                    running.remove(job);
+                    //add any blocked jobs back to the wait queue
+                    InternalJob blocked = job.previous();
+                    job.remove();
+                    blockedJobs = blocked !is null;
+                    while (blocked !is null) {
+                        InternalJob previous = blocked.previous();
+                        changeState(blocked, Job.WAITING);
+                        blocked = previous;
+                    }
+                    break;
+                default :
+                    Assert.isLegal(false, Format("Invalid job state: {}, state: {}", job, oldState)); //$NON-NLS-1$ //$NON-NLS-2$
+            }
+            job.internalSetState(newState);
+            switch (newState) {
+                case Job.NONE :
+                    job.setStartTime(InternalJob.T_NONE);
+                    job.setWaitQueueStamp(InternalJob.T_NONE);
+                case InternalJob.BLOCKED :
+                    break;
+                case Job.WAITING :
+                    waiting.enqueue(job);
+                    break;
+                case Job.SLEEPING :
+                    try {
+                        sleeping.enqueue(job);
+                    } catch (RuntimeException e) {
+                        throw new RuntimeException(Format("Error changing from state: ", oldState)); //$NON-NLS-1$
+                    }
+                    break;
+                case Job.RUNNING :
+                case InternalJob.ABOUT_TO_RUN :
+                    job.setStartTime(InternalJob.T_NONE);
+                    job.setWaitQueueStamp(InternalJob.T_NONE);
+                    running.add(job);
+                    break;
+                case InternalJob.ABOUT_TO_SCHEDULE :
+                    break;
+                default :
+                    Assert.isLegal(false, Format("Invalid job state: {}, state: {}", job, newState)); //$NON-NLS-1$ //$NON-NLS-2$
+            }
+        }
+        //notify queue outside sync block
+        if (blockedJobs)
+            pool.jobQueued();
+    }
+
+    /**
+     * Returns a new progress monitor for this job, belonging to the given
+     * progress group.  Returns null if it is not a valid time to set the job's group.
+     */
+    protected IProgressMonitor createMonitor(InternalJob job, IProgressMonitor group, int ticks) {
+        synchronized (lock) {
+            //group must be set before the job is scheduled
+            //this includes the ABOUT_TO_SCHEDULE state, during which it is still
+            //valid to set the progress monitor
+            if (job.getState_package() !is Job.NONE)
+                return null;
+            IProgressMonitor monitor = null;
+            if (progressProvider !is null)
+                monitor = progressProvider.createMonitor(cast(Job) job, group, ticks);
+            if (monitor is null)
+                monitor = new NullProgressMonitor();
+            return monitor;
+        }
+    }
+    package IProgressMonitor createMonitor_package(InternalJob job, IProgressMonitor group, int ticks) {
+        return createMonitor(job, group, ticks);
+    }
+
+    /**
+     * Returns a new progress monitor for this job.  Never returns null.
+     */
+    private IProgressMonitor createMonitor(Job job) {
+        IProgressMonitor monitor = null;
+        if (progressProvider !is null)
+            monitor = progressProvider.createMonitor(job);
+        if (monitor is null)
+            monitor = new NullProgressMonitor();
+        return monitor;
+    }
+
+    /* (non-Javadoc)
+     * @see dwtx.core.runtime.jobs.IJobManager#createProgressGroup()
+     */
+    public IProgressMonitor createProgressGroup() {
+        if (progressProvider !is null)
+            return progressProvider.createProgressGroup();
+        return new NullProgressMonitor();
+    }
+
+    /* (non-Javadoc)
+     * @see dwtx.core.runtime.jobs.IJobManager#currentJob()
+     */
+    public Job currentJob() {
+        Thread current = Thread.getThis();
+        if (cast(Worker)current )
+            return (cast(Worker) current).currentJob();
+        synchronized (lock) {
+            for (Iterator it = running.iterator(); it.hasNext();) {
+                Job job = cast(Job) it.next();
+                if (job.getThread() is current)
+                    return job;
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Returns the delay in milliseconds that a job with a given priority can
+     * tolerate waiting.
+     */
+    private long delayFor(int priority) {
+        //these values may need to be tweaked based on machine speed
+        switch (priority) {
+            case Job.INTERACTIVE :
+                return 0L;
+            case Job.SHORT :
+                return 50L;
+            case Job.LONG :
+                return 100L;
+            case Job.BUILD :
+                return 500L;
+            case Job.DECORATE :
+                return 1000L;
+            default :
+                Assert.isTrue(false, Format("Job has invalid priority: {}", priority)); //$NON-NLS-1$
+                return 0;
+        }
+    }
+
+    /**
+     * Performs the scheduling of a job.  Does not perform any notifications.
+     */
+    private void doSchedule(InternalJob job, long delay) {
+        synchronized (lock) {
+            //if it's a decoration job with no rule, don't run it right now if the system is busy
+            if (job.getPriority() is Job.DECORATE && job.getRule() is null) {
+                long minDelay = running.size() * 100;
+                delay = Math.max(delay, minDelay);
+            }
+            if (delay > 0) {
+                job.setStartTime(System.currentTimeMillis() + delay);
+                changeState(job, Job.SLEEPING);
+            } else {
+                job.setStartTime(System.currentTimeMillis() + delayFor(job.getPriority()));
+                job.setWaitQueueStamp(waitQueueCounter++);
+                changeState(job, Job.WAITING);
+            }
+        }
+    }
+
+    /**
+     * Shuts down the job manager.  Currently running jobs will be told
+     * to stop, but worker threads may still continue processing.
+     * (note: This implemented IJobManager.shutdown which was removed
+     * due to problems caused by premature shutdown)
+     */
+    private void doShutdown() {
+        Job[] toCancel = null;
+        synchronized (lock) {
+            if (active) {
+                active = false;
+                //cancel all running jobs
+                toCancel = arraycast!(Job)( running.toArray());
+                //clean up
+                sleeping.clear();
+                waiting.clear();
+                running.clear();
+            }
+        }
+
+        // Give running jobs a chance to finish. Wait 0.1 seconds for up to 3 times.
+        if (toCancel !is null && toCancel.length > 0) {
+            for (int i = 0; i < toCancel.length; i++) {
+                cancel(cast(InternalJob)toCancel[i]); // cancel jobs outside sync block to avoid deadlock
+            }
+
+            for (int waitAttempts = 0; waitAttempts < 3; waitAttempts++) {
+                Thread.yield();
+                synchronized (lock) {
+                    if (running.isEmpty())
+                        break;
+                }
+                if (DEBUG_SHUTDOWN) {
+                    JobManager.debug_(Format("Shutdown - job wait cycle #{}", (waitAttempts + 1))); //$NON-NLS-1$
+                    Job[] stillRunning = null;
+                    synchronized (lock) {
+                        stillRunning = arraycast!(Job)( running.toArray());
+                    }
+                    if (stillRunning !is null) {
+                        for (int j = 0; j < stillRunning.length; j++) {
+                            JobManager.debug_(Format("\tJob: {}", printJobName(stillRunning[j]))); //$NON-NLS-1$
+                        }
+                    }
+                }
+                try {
+                    Thread.sleep(0.100);
+                } catch (InterruptedException e) {
+                    //ignore
+                }
+                Thread.yield();
+            }
+
+            synchronized (lock) { // retrieve list of the jobs that are still running
+                toCancel = arraycast!(Job)( running.toArray());
+            }
+        }
+
+        if (toCancel !is null) {
+            for (int i = 0; i < toCancel.length; i++) {
+                String jobName = printJobName(toCancel[i]);
+                //this doesn't need to be translated because it's just being logged
+                String msg = "Job found still running after platform shutdown.  Jobs should be canceled by the plugin that scheduled them during shutdown: " ~ jobName; //$NON-NLS-1$
+                RuntimeLog.log(new Status(IStatus.WARNING, JobManager.PI_JOBS, JobManager.PLUGIN_ERROR, msg, null));
+
+                // TODO the RuntimeLog.log in its current implementation won't produce a log
+                // during this stage of shutdown. For now add a standard error output.
+                // One the logging story is improved, the System.err output below can be removed:
+                Stderr.formatln("{}", msg);
+            }
+        }
+
+        pool.shutdown_package();
+    }
+
+    /**
+     * Indicates that a job was running, and has now finished.  Note that this method
+     * can be called under OutOfMemoryError conditions and thus must be paranoid
+     * about allocating objects.
+     */
+    protected void endJob(InternalJob job, IStatus result, bool notify) {
+        long rescheduleDelay = InternalJob.T_NONE;
+        synchronized (lock) {
+            //if the job is finishing asynchronously, there is nothing more to do for now
+            if (result is Job.ASYNC_FINISH)
+                return;
+            //if job is not known then it cannot be done
+            if (job.getState_package() is Job.NONE)
+                return;
+            if (JobManager.DEBUG && notify)
+                JobManager.debug_(Format("Ending job: {}", job)); //$NON-NLS-1$
+            job.setResult(result);
+            job.setProgressMonitor(null);
+            job.setThread_package(null);
+            rescheduleDelay = job.getStartTime();
+            changeState(job, Job.NONE);
+        }
+        //notify listeners outside sync block
+        final bool reschedule = active && rescheduleDelay > InternalJob.T_NONE && job.shouldSchedule_package();
+        if (notify)
+            jobListeners.done(cast(Job) job, result, reschedule);
+        //reschedule the job if requested and we are still active
+        if (reschedule)
+            schedule(job, rescheduleDelay, reschedule);
+    }
+    package void endJob_package(InternalJob job, IStatus result, bool notify) {
+        endJob(job, result, notify);
+    }
+
+    /* (non-Javadoc)
+     * @see dwtx.core.runtime.jobs.IJobManager#endRule(dwtx.core.runtime.jobs.ISchedulingRule)
+     */
+    public void endRule(ISchedulingRule rule) {
+        implicitJobs.end(rule, false);
+    }
+
+    /* (non-Javadoc)
+     * @see dwtx.core.runtime.jobs.IJobManager#find(java.lang.String)
+     */
+    public Job[] find(Object family) {
+        List members = select(family);
+        return arraycast!(Job)( members.toArray());
+    }
+
+    /**
+     * Returns a running or blocked job whose scheduling rule conflicts with the
+     * scheduling rule of the given waiting job.  Returns null if there are no
+     * conflicting jobs.  A job can only run if there are no running jobs and no blocked
+     * jobs whose scheduling rule conflicts with its rule.
+     */
+    protected InternalJob findBlockingJob(InternalJob waitingJob) {
+        if (waitingJob.getRule() is null)
+            return null;
+        synchronized (lock) {
+            if (running.isEmpty())
+                return null;
+            //check the running jobs
+            bool hasBlockedJobs = false;
+            for (Iterator it = running.iterator(); it.hasNext();) {
+                InternalJob job = cast(InternalJob) it.next();
+                if (waitingJob.isConflicting(job))
+                    return job;
+                if (!hasBlockedJobs)
+                    hasBlockedJobs = job.previous() !is null;
+            }
+            //there are no blocked jobs, so we are done
+            if (!hasBlockedJobs)
+                return null;
+            //check all jobs blocked by running jobs
+            for (Iterator it = running.iterator(); it.hasNext();) {
+                InternalJob job = cast(InternalJob) it.next();
+                while (true) {
+                    job = job.previous();
+                    if (job is null)
+                        break;
+                    if (waitingJob.isConflicting(job))
+                        return job;
+                }
+            }
+        }
+        return null;
+    }
+    package InternalJob findBlockingJob_package(InternalJob waitingJob) {
+        return findBlockingJob(waitingJob);
+    }
+
+    public LockManager getLockManager() {
+        return lockManager;
+    }
+
+    private void initDebugOptions() {
+        DEBUG = JobOSGiUtils.getDefault().getBooleanDebugOption(OPTION_DEBUG_JOBS, false);
+        DEBUG_BEGIN_END = JobOSGiUtils.getDefault().getBooleanDebugOption(OPTION_DEBUG_BEGIN_END, false);
+        DEBUG_DEADLOCK = JobOSGiUtils.getDefault().getBooleanDebugOption(OPTION_DEADLOCK_ERROR, false);
+        DEBUG_LOCKS = JobOSGiUtils.getDefault().getBooleanDebugOption(OPTION_LOCKS, false);
+        DEBUG_TIMING = JobOSGiUtils.getDefault().getBooleanDebugOption(OPTION_DEBUG_JOBS_TIMING, false);
+        DEBUG_SHUTDOWN = JobOSGiUtils.getDefault().getBooleanDebugOption(OPTION_SHUTDOWN, false);
+    }
+
+    /**
+     * Returns whether the job manager is active (has not been shutdown).
+     */
+    protected bool isActive() {
+        return active;
+    }
+    package bool isActive_package() {
+        return isActive();
+    }
+
+    /**
+     * Returns true if the given job is blocking the execution of a non-system
+     * job.
+     */
+    protected bool isBlocking(InternalJob runningJob) {
+        synchronized (lock) {
+            // if this job isn't running, it can't be blocking anyone
+            if (runningJob.getState_package() !is Job.RUNNING)
+                return false;
+            // if any job is queued behind this one, it is blocked by it
+            InternalJob previous = runningJob.previous();
+            while (previous !is null) {
+                // ignore jobs of lower priority (higher priority value means lower priority)
+                if (previous.getPriority() < runningJob.getPriority()) {
+                    if (!previous.isSystem_package())
+                        return true;
+                    // implicit jobs should interrupt unless they act on behalf of system jobs
+                    if (cast(ThreadJob)previous  && (cast(ThreadJob) previous).shouldInterrupt())
+                        return true;
+                }
+                previous = previous.previous();
+            }
+            // none found
+            return false;
+        }
+    }
+    package bool isBlocking_package(InternalJob runningJob) {
+        return isBlocking(runningJob);
+    }
+
+    /* (non-Javadoc)
+     * @see dwtx.core.runtime.jobs.IJobManager#isIdle()
+     */
+    public bool isIdle() {
+        synchronized (lock) {
+            return running.isEmpty() && waiting.isEmpty();
+        }
+    }
+
+    /* (non-Javadoc)
+     * @see dwtx.core.runtime.jobs.IJobManager#isSuspended()
+     */
+    public bool isSuspended() {
+        synchronized (lock) {
+            return suspended;
+        }
+    }
+
+    /* (non-Javadoc)
+     * @see dwtx.core.runtime.jobs.Job#job(dwtx.core.runtime.jobs.Job)
+     */
+    protected void join(InternalJob job) {
+        IJobChangeListener listener;
+        Semaphore barrier;
+        synchronized (lock) {
+            int state = job.getState_package();
+            if (state is Job.NONE)
+                return;
+            //don't join a waiting or sleeping job when suspended (deadlock risk)
+            if (suspended && state !is Job.RUNNING)
+                return;
+            //it's an error for a job to join itself
+            if (state is Job.RUNNING && job.getThread_package() is Thread.getThis())
+                throw new IllegalStateException("Job attempted to join itself"); //$NON-NLS-1$
+            //the semaphore will be released when the job is done
+            barrier = new Semaphore(null);
+            listener = new class(barrier) JobChangeAdapter {
+                Semaphore barrier_;
+                this( Semaphore a ){
+                    barrier_ = a;
+                }
+                public void done(IJobChangeEvent event) {
+                    barrier_.release();
+                }
+            };
+            job.addJobChangeListener_package(listener);
+            //compute set of all jobs that must run before this one
+            //add a listener that removes jobs from the blocking set when they finish
+        }
+        //wait until listener notifies this thread.
+        try {
+            while (true) {
+                //notify hook to service pending syncExecs before falling asleep
+                lockManager.aboutToWait(job.getThread_package());
+                try {
+                    if (barrier.acquire(Long.MAX_VALUE))
+                        break;
+                } catch (InterruptedException e) {
+                    //loop and keep trying
+                }
+            }
+        } finally {
+            lockManager.aboutToRelease();
+            job.removeJobChangeListener_package(listener);
+        }
+    }
+    package void join_package(InternalJob job) {
+        join(job);
+    }
+
+    /* (non-Javadoc)
+     * @see IJobManager#join(String, IProgressMonitor)
+     */
+    public void join(Object family_, IProgressMonitor monitor) {
+        monitor = monitorFor(monitor);
+        IJobChangeListener listener = null;
+        Set jobs_;
+        int jobCount;
+        Job blocking = null;
+        synchronized (lock) {
+            //don't join a waiting or sleeping job when suspended (deadlock risk)
+            int states = suspended ? Job.RUNNING : Job.RUNNING | Job.WAITING | Job.SLEEPING;
+            jobs_ = Collections.synchronizedSet(new HashSet(select(family_, states)));
+            jobCount = jobs_.size();
+            if (jobCount > 0) {
+                //if there is only one blocking job, use it in the blockage callback below
+                if (jobCount is 1)
+                    blocking = cast(Job) jobs_.iterator().next();
+                listener = new class(family_, jobs_ )JobChangeAdapter {
+                    Object family;
+                    Set jobs;
+                    this(Object a, Set b){
+                        family = a;
+                        jobs = b;
+                    }
+                    public void done(IJobChangeEvent event) {
+                        //don't remove from list if job is being rescheduled
+                        if (!(cast(JobChangeEvent) event).reschedule)
+                            jobs.remove(event.getJob());
+                    }
+
+                    //update the list of jobs if new ones are added during the join
+                    public void scheduled(IJobChangeEvent event) {
+                        //don't add to list if job is being rescheduled
+                        if ((cast(JobChangeEvent) event).reschedule)
+                            return;
+                        Job job = event.getJob();
+                        if (job.belongsTo(family))
+                            jobs.add(job);
+                    }
+                };
+                addJobChangeListener(listener);
+            }
+        }
+        if (jobCount is 0) {
+            //use up the monitor outside synchronized block because monitors call untrusted code
+            monitor.beginTask(JobMessages.jobs_blocked0, 1);
+            monitor.done();
+            return;
+        }
+        //spin until all jobs are completed
+        try {
+            monitor.beginTask(JobMessages.jobs_blocked0, jobCount);
+            monitor.subTask(NLS.bind(JobMessages.jobs_waitFamSub, Integer.toString(jobCount)));
+            reportBlocked(monitor, blocking);
+            int jobsLeft;
+            int reportedWorkDone = 0;
+            while ((jobsLeft = jobs_.size()) > 0) {
+                //don't let there be negative work done if new jobs have
+                //been added since the join began
+                int actualWorkDone = Math.max(0, jobCount - jobsLeft);
+                if (reportedWorkDone < actualWorkDone) {
+                    monitor.worked(actualWorkDone - reportedWorkDone);
+                    reportedWorkDone = actualWorkDone;
+                    monitor.subTask(NLS.bind(JobMessages.jobs_waitFamSub, Integer.toString(jobsLeft)));
+                }
+                implMissing(__FILE__, __LINE__ );
+// DWT
+//                 if (Thread.interrupted())
+//                     throw new InterruptedException();
+                if (monitor.isCanceled())
+                    throw new OperationCanceledException();
+                //notify hook to service pending syncExecs before falling asleep
+                lockManager.aboutToWait(null);
+                Thread.sleep(0.100);
+            }
+        } finally {
+            lockManager.aboutToRelease();
+            removeJobChangeListener(listener);
+            reportUnblocked(monitor);
+            monitor.done();
+        }
+    }
+
+    /**
+     * Returns a non-null progress monitor instance.  If the monitor is null,
+     * returns the default monitor supplied by the progress provider, or a
+     * NullProgressMonitor if no default monitor is available.
+     */
+    private IProgressMonitor monitorFor(IProgressMonitor monitor) {
+        if (monitor is null || (cast(NullProgressMonitor)monitor )) {
+            if (progressProvider !is null) {
+                try {
+                    monitor = progressProvider.getDefaultMonitor();
+                } catch (Exception e) {
+                    String msg = NLS.bind(JobMessages.meta_pluginProblems, JobManager.PI_JOBS);
+                    RuntimeLog.log(new Status(IStatus.ERROR, JobManager.PI_JOBS, JobManager.PLUGIN_ERROR, msg, e));
+                }
+            }
+        }
+
+        if (monitor is null)
+            return new NullProgressMonitor();
+        return monitor;
+    }
+
+    /* (non-Javadoc)
+     * @see IJobManager#newLock(java.lang.String)
+     */
+    public ILock newLock() {
+        return lockManager.newLock();
+    }
+
+    /**
+     * Removes and returns the first waiting job in the queue. Returns null if there
+     * are no items waiting in the queue.  If an item is removed from the queue,
+     * it is moved to the running jobs list.
+     */
+    private Job nextJob() {
+        synchronized (lock) {
+            //do nothing if the job manager is suspended
+            if (suspended)
+                return null;
+            //tickle the sleep queue to see if anyone wakes up
+            long now = System.currentTimeMillis();
+            InternalJob job = sleeping.peek();
+            while (job !is null && job.getStartTime() < now) {
+                job.setStartTime(now + delayFor(job.getPriority()));
+                job.setWaitQueueStamp(waitQueueCounter++);
+                changeState(job, Job.WAITING);
+                job = sleeping.peek();
+            }
+            //process the wait queue until we find a job whose rules are satisfied.
+            while ((job = waiting.peek()) !is null) {
+                InternalJob blocker = findBlockingJob(job);
+                if (blocker is null)
+                    break;
+                //queue this job after the job that's blocking it
+                changeState(job, InternalJob.BLOCKED);
+                //assert job does not already belong to some other data structure
+                Assert.isTrue(job.next() is null);
+                Assert.isTrue(job.previous() is null);
+                blocker.addLast(job);
+            }
+            //the job to run must be in the running list before we exit
+            //the sync block, otherwise two jobs with conflicting rules could start at once
+            if (job !is null) {
+                changeState(job, InternalJob.ABOUT_TO_RUN);
+                if (JobManager.DEBUG)
+                    JobManager.debug_(Format("Starting job: {}", job)); //$NON-NLS-1$
+            }
+            return cast(Job) job;
+        }
+    }
+
+    /* (non-Javadoc)
+     * @see dwtx.core.runtime.jobs.IJobManager#removeJobListener(dwtx.core.runtime.jobs.IJobChangeListener)
+     */
+    public void removeJobChangeListener(IJobChangeListener listener) {
+        jobListeners.remove(listener);
+    }
+
+    /**
+     * Report to the progress monitor that this thread is blocked, supplying
+     * an information message, and if possible the job that is causing the blockage.
+     * Important: An invocation of this method MUST be followed eventually be
+     * an invocation of reportUnblocked.
+     * @param monitor The monitor to report blocking to
+     * @param blockingJob The job that is blocking this thread, or <code>null</code>
+     * @see #reportUnblocked
+     */
+    final void reportBlocked(IProgressMonitor monitor, InternalJob blockingJob) {
+        if (!(cast(IProgressMonitorWithBlocking)monitor ))
+            return;
+        IStatus reason;
+        if (blockingJob is null || cast(ThreadJob)blockingJob || blockingJob.isSystem_package()) {
+            reason = new Status(IStatus.INFO, JobManager.PI_JOBS, 1, JobMessages.jobs_blocked0, null);
+        } else {
+            String msg = NLS.bind(JobMessages.jobs_blocked1, blockingJob.getName_package());
+            reason = new JobStatus(IStatus.INFO, cast(Job) blockingJob, msg);
+        }
+        (cast(IProgressMonitorWithBlocking) monitor).setBlocked(reason);
+    }
+
+    /**
+     * Reports that this thread was blocked, but is no longer blocked and is able
+     * to proceed.
+     * @param monitor The monitor to report unblocking to.
+     * @see #reportBlocked
+     */
+    final void reportUnblocked(IProgressMonitor monitor) {
+        if (cast(IProgressMonitorWithBlocking)monitor )
+            (cast(IProgressMonitorWithBlocking) monitor).clearBlocked();
+    }
+
+    /*(non-Javadoc)
+     * @see dwtx.core.runtime.jobs.IJobManager#resume()
+     */
+    public final void resume() {
+        synchronized (lock) {
+            suspended = false;
+            //poke the job pool
+            pool.jobQueued();
+        }
+    }
+
+    /** (non-Javadoc)
+     * @deprecated this method should not be used
+     * @see dwtx.core.runtime.jobs.IJobManager#resume(dwtx.core.runtime.jobs.ISchedulingRule)
+     */
+    public final void resume(ISchedulingRule rule) {
+        implicitJobs.resume(rule);
+    }
+
+    /**
+     * Attempts to immediately start a given job.  Returns true if the job was
+     * successfully started, and false if it could not be started immediately
+     * due to a currently running job with a conflicting rule.  Listeners will never
+     * be notified of jobs that are run in this way.
+     */
+    protected bool runNow(InternalJob job) {
+        synchronized (lock) {
+            //cannot start if there is a conflicting job
+            if (findBlockingJob(job) !is null)
+                return false;
+            changeState(job, Job.RUNNING);
+            job.setProgressMonitor(new NullProgressMonitor());
+            job.run_package(null);
+        }
+        return true;
+    }
+    package bool runNow_package(InternalJob job) {
+        return runNow(job);
+    }
+
+    /* (non-Javadoc)
+     * @see dwtx.core.runtime.jobs.Job#schedule(long)
+     */
+    protected void schedule(InternalJob job, long delay, bool reschedule) {
+        if (!active)
+            throw new IllegalStateException("Job manager has been shut down."); //$NON-NLS-1$
+        Assert.isNotNull(job, "Job is null"); //$NON-NLS-1$
+        Assert.isLegal(delay >= 0, "Scheduling delay is negative"); //$NON-NLS-1$
+        synchronized (lock) {
+            //if the job is already running, set it to be rescheduled when done
+            if (job.getState_package() is Job.RUNNING) {
+                job.setStartTime(delay);
+                return;
+            }
+            //can't schedule a job that is waiting or sleeping
+            if (job.internalGetState() !is Job.NONE)
+                return;
+            if (JobManager.DEBUG)
+                JobManager.debug_(Format("Scheduling job: {}", job)); //$NON-NLS-1$
+            //remember that we are about to schedule the job
+            //to prevent multiple schedule attempts from succeeding (bug 68452)
+            changeState(job, InternalJob.ABOUT_TO_SCHEDULE);
+        }
+        //notify listeners outside sync block
+        jobListeners.scheduled(cast(Job) job, delay, reschedule);
+        //schedule the job
+        doSchedule(job, delay);
+        //call the pool outside sync block to avoid deadlock
+        pool.jobQueued();
+    }
+    package void schedule_package(InternalJob job, long delay, bool reschedule) {
+        schedule(job, delay, reschedule);
+    }
+
+    /**
+     * Adds all family members in the list of jobs to the collection
+     */
+    private void select(List members, Object family, InternalJob firstJob, int stateMask) {
+        if (firstJob is null)
+            return;
+        InternalJob job = firstJob;
+        do {
+            //note that job state cannot be NONE at this point
+            if ((family is null || job.belongsTo_package(family)) && ((job.getState_package() & stateMask) !is 0))
+                members.add(job);
+            job = job.previous();
+        } while (job !is null && job !is firstJob);
+    }
+
+    /**
+     * Returns a list of all jobs known to the job manager that belong to the given family.
+     */
+    private List select(Object family) {
+        return select(family, Job.WAITING | Job.SLEEPING | Job.RUNNING);
+    }
+
+    /**
+     * Returns a list of all jobs known to the job manager that belong to the given
+     * family and are in one of the provided states.
+     */
+    private List select(Object family, int stateMask) {
+        List members = new ArrayList();
+        synchronized (lock) {
+            if ((stateMask & Job.RUNNING) !is 0) {
+                for (Iterator it = running.iterator(); it.hasNext();) {
+                    select(members, family, cast(InternalJob) it.next(), stateMask);
+                }
+            }
+            if ((stateMask & Job.WAITING) !is 0)
+                select(members, family, waiting.peek(), stateMask);
+            if ((stateMask & Job.SLEEPING) !is 0)
+                select(members, family, sleeping.peek(), stateMask);
+        }
+        return members;
+    }
+
+    /* (non-Javadoc)
+     * @see IJobManager#setLockListener(LockListener)
+     */
+    public void setLockListener(LockListener listener) {
+        lockManager.setLockListener(listener);
+    }
+
+    /**
+     * Changes a job priority.
+     */
+    protected void setPriority(InternalJob job, int newPriority) {
+        synchronized (lock) {
+            int oldPriority = job.getPriority();
+            if (oldPriority is newPriority)
+                return;
+            job.internalSetPriority(newPriority);
+            //if the job is waiting to run, re-shuffle the queue
+            if (job.getState_package() is Job.WAITING) {
+                long oldStart = job.getStartTime();
+                job.setStartTime(oldStart + (delayFor(newPriority) - delayFor(oldPriority)));
+                waiting.resort(job);
+            }
+        }
+    }
+    package void setPriority_package(InternalJob job, int newPriority) {
+        setPriority(job, newPriority);
+    }
+
+    /* (non-Javadoc)
+     * @see IJobManager#setProgressProvider(IProgressProvider)
+     */
+    public void setProgressProvider(ProgressProvider provider) {
+        progressProvider = provider;
+    }
+
+    /* (non-Javadoc)
+     * @see Job#setRule
+     */
+    public void setRule(InternalJob job, ISchedulingRule rule) {
+        synchronized (lock) {
+            //cannot change the rule of a job that is already running
+            Assert.isLegal(job.getState_package() is Job.NONE);
+            validateRule(rule);
+            job.internalSetRule(rule);
+        }
+    }
+
+    /**
+     * Puts a job to sleep. Returns true if the job was successfully put to sleep.
+     */
+    protected bool sleep(InternalJob job) {
+        synchronized (lock) {
+            switch (job.getState_package()) {
+                case Job.RUNNING :
+                    //cannot be paused if it is already running (as opposed to ABOUT_TO_RUN)
+                    if (job.internalGetState() is Job.RUNNING)
+                        return false;
+                    //job hasn't started running yet (aboutToRun listener)
+                    break;
+                case Job.SLEEPING :
+                    //update the job wake time
+                    job.setStartTime(InternalJob.T_INFINITE);
+                    //change state again to re-shuffle the sleep queue
+                    changeState(job, Job.SLEEPING);
+                    return true;
+                case Job.NONE :
+                    return true;
+                case Job.WAITING :
+                    //put the job to sleep
+                    break;
+            }
+            job.setStartTime(InternalJob.T_INFINITE);
+            changeState(job, Job.SLEEPING);
+        }
+        jobListeners.sleeping(cast(Job) job);
+        return true;
+    }
+    package bool sleep_package(InternalJob job) {
+        return sleep(job);
+    }
+
+    /* (non-Javadoc)
+     * @see IJobManager#sleep(String)
+     */
+    public void sleep(Object family) {
+        //don't synchronize because sleep calls listeners
+        for (Iterator it = select(family).iterator(); it.hasNext();) {
+            sleep(cast(InternalJob) it.next());
+        }
+    }
+
+    /**
+     * Returns the estimated time in milliseconds before the next job is scheduled
+     * to wake up. The result may be negative.  Returns InternalJob.T_INFINITE if
+     * there are no sleeping or waiting jobs.
+     */
+    protected long sleepHint() {
+        synchronized (lock) {
+            //wait forever if job manager is suspended
+            if (suspended)
+                return InternalJob.T_INFINITE;
+            if (!waiting.isEmpty())
+                return 0L;
+            //return the anticipated time that the next sleeping job will wake
+            InternalJob next = sleeping.peek();
+            if (next is null)
+                return InternalJob.T_INFINITE;
+            return next.getStartTime() - System.currentTimeMillis();
+        }
+    }
+    package long sleepHint_package() {
+        return sleepHint();
+    }
+    /**
+     * Returns the next job to be run, or null if no jobs are waiting to run.
+     * The worker must call endJob when the job is finished running.
+     */
+    protected Job startJob() {
+        Job job = null;
+        while (true) {
+            job = nextJob();
+            if (job is null)
+                return null;
+            //must perform this outside sync block because it is third party code
+            if (job.shouldRun()) {
+                //check for listener veto
+                jobListeners.aboutToRun(job);
+                //listeners may have canceled or put the job to sleep
+                synchronized (lock) {
+                    if (job.getState() is Job.RUNNING) {
+                        InternalJob internal = job;
+                        if (internal.isAboutToRunCanceled()) {
+                            internal.setAboutToRunCanceled(false);
+                            //fall through and end the job below
+                        } else {
+                            internal.setProgressMonitor(createMonitor(job));
+                            //change from ABOUT_TO_RUN to RUNNING
+                            internal.internalSetState(Job.RUNNING);
+                            break;
+                        }
+                    }
+                }
+            }
+            if (job.getState() !is Job.SLEEPING) {
+                //job has been vetoed or canceled, so mark it as done
+                endJob(job, Status.CANCEL_STATUS, true);
+                continue;
+            }
+        }
+        jobListeners.running(job);
+        return job;
+
+    }
+    package Job startJob_package() {
+        return startJob();
+    }
+
+    /* non-Javadoc)
+     * @see dwtx.core.runtime.jobs.IJobManager#suspend()
+     */
+    public final void suspend() {
+        synchronized (lock) {
+            suspended = true;
+        }
+    }
+
+    /** (non-Javadoc)
+     * @deprecated this method should not be used
+     * @see dwtx.core.runtime.jobs.IJobManager#suspend(dwtx.core.runtime.jobs.ISchedulingRule, dwtx.core.runtime.IProgressMonitor)
+     */
+    public final void suspend(ISchedulingRule rule, IProgressMonitor monitor) {
+        Assert.isNotNull(cast(Object)rule);
+        implicitJobs.suspend(rule, monitorFor(monitor));
+    }
+
+    /* non-Javadoc)
+     * @see dwtx.core.runtime.jobs.IJobManager#transferRule()
+     */
+    public void transferRule(ISchedulingRule rule, Thread destinationThread) {
+        implicitJobs.transfer(rule, destinationThread);
+    }
+
+    /**
+     * Validates that the given scheduling rule obeys the constraints of
+     * scheduling rules as described in the <code>ISchedulingRule</code>
+     * javadoc specification.
+     */
+    private void validateRule(ISchedulingRule rule) {
+        //null rule always valid
+        if (rule is null)
+            return;
+        initNullRule();
+        //contains method must be reflexive
+        Assert.isLegal(rule.contains(rule));
+        //contains method must return false when given an unknown rule
+        Assert.isLegal(!rule.contains(nullRule));
+        //isConflicting method must be reflexive
+        Assert.isLegal(rule.isConflicting(rule));
+        //isConflicting method must return false when given an unknown rule
+        Assert.isLegal(!rule.isConflicting(nullRule));
+    }
+
+    /* (non-Javadoc)
+     * @see Job#wakeUp(long)
+     */
+    protected void wakeUp(InternalJob job, long delay) {
+        Assert.isLegal(delay >= 0, "Scheduling delay is negative"); //$NON-NLS-1$
+        synchronized (lock) {
+            //cannot wake up if it is not sleeping
+            if (job.getState_package() !is Job.SLEEPING)
+                return;
+            doSchedule(job, delay);
+        }
+        //call the pool outside sync block to avoid deadlock
+        pool.jobQueued();
+
+        //only notify of wake up if immediate
+        if (delay is 0)
+            jobListeners.awake(cast(Job) job);
+    }
+    package void wakeUp_package(InternalJob job, long delay) {
+        wakeUp(job, delay);
+    }
+
+    /* (non-Javadoc)
+     * @see IJobFamily#wakeUp(String)
+     */
+    public void wakeUp(Object family) {
+        //don't synchronize because wakeUp calls listeners
+        for (Iterator it = select(family).iterator(); it.hasNext();) {
+            wakeUp(cast(InternalJob) it.next(), 0L);
+        }
+    }
+}