diff dwtx/core/internal/jobs/ImplicitJobs.d @ 122:9d0585bcb7aa

Add core.jobs package
author Frank Benoit <benoit@tionex.de>
date Tue, 12 Aug 2008 02:34:21 +0200
parents
children 862b05e0334a
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/dwtx/core/internal/jobs/ImplicitJobs.d	Tue Aug 12 02:34:21 2008 +0200
@@ -0,0 +1,279 @@
+/*******************************************************************************
+ * Copyright (c) 2003, 2006 IBM Corporation and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ *     IBM - Initial API and implementation
+ * Port to the D programming language:
+ *     Frank Benoit <benoit@tionex.de>
+ *******************************************************************************/
+module dwtx.core.internal.jobs.ImplicitJobs;
+
+import tango.text.convert.Format;
+import tango.core.Thread;
+import tango.io.Stdout;
+import dwt.dwthelper.utils;
+import dwtx.dwtxhelper.Collection;
+
+import dwtx.core.internal.runtime.RuntimeLog;
+import dwtx.core.runtime.Assert;
+import dwtx.core.runtime.IProgressMonitor;
+import dwtx.core.runtime.IStatus;
+import dwtx.core.runtime.Status;
+import dwtx.core.runtime.jobs.ISchedulingRule;
+import dwtx.core.runtime.jobs.Job;
+
+import dwtx.core.internal.jobs.JobManager;
+import dwtx.core.internal.jobs.ThreadJob;
+import dwtx.core.internal.jobs.InternalJob;
+
+/**
+ * Implicit jobs are jobs that are running by virtue of a JobManager.begin/end
+ * pair. They act like normal jobs, except they are tied to an arbitrary thread
+ * of the client's choosing, and they can be nested.
+ */
+class ImplicitJobs {
+    /**
+     * Cached unused instance that can be reused
+     */
+    private ThreadJob jobCache = null;
+    protected JobManager manager;
+
+    /**
+     * Set of suspended scheduling rules.
+     */
+    private const Set suspendedRules;
+
+    /**
+     * Maps (Thread->ThreadJob), threads to the currently running job for that
+     * thread.
+     */
+    private final Map threadJobs;
+
+    this(JobManager manager) {
+        this.manager = manager;
+        suspendedRules = new HashSet(20);
+        threadJobs = new HashMap(20);
+    }
+
+    /* (Non-javadoc)
+     * @see IJobManager#beginRule
+     */
+    void begin(ISchedulingRule rule, IProgressMonitor monitor, bool suspend) {
+        if (JobManager.DEBUG_BEGIN_END)
+            JobManager.debug_(Format("Begin rule: {}", rule)); //$NON-NLS-1$
+        final Thread getThis = Thread.getThis();
+        ThreadJob threadJob;
+        synchronized (this) {
+            threadJob = cast(ThreadJob) threadJobs.get(getThis);
+            if (threadJob !is null) {
+                //nested rule, just push on stack and return
+                threadJob.push(rule);
+                return;
+            }
+            //no need to schedule a thread job for a null rule
+            if (rule is null)
+                return;
+            //create a thread job for this thread, use the rule from the real job if it has one
+            Job realJob = manager.currentJob();
+            if (realJob !is null && realJob.getRule() !is null)
+                threadJob = newThreadJob(realJob.getRule());
+            else {
+                threadJob = newThreadJob(rule);
+                threadJob.acquireRule = true;
+            }
+            //don't acquire rule if it is a suspended rule
+            if (isSuspended(rule))
+                threadJob.acquireRule = false;
+            //indicate if it is a system job to ensure isBlocking works correctly
+            threadJob.setRealJob(realJob);
+            threadJob.setThread(getThis);
+        }
+        try {
+            threadJob.push(rule);
+            //join the thread job outside sync block
+            if (threadJob.acquireRule) {
+                //no need to re-acquire any locks because the thread did not wait to get this lock
+                if (manager.runNow_package(threadJob))
+                    manager.getLockManager().addLockThread(Thread.getThis(), rule);
+                else
+                    threadJob = threadJob.joinRun(monitor);
+            }
+        } finally {
+            //remember this thread job  - only do this
+            //after the rule is acquired because it is ok for this thread to acquire
+            //and release other rules while waiting.
+            synchronized (this) {
+                threadJobs.put(getThis, threadJob);
+                if (suspend)
+                    suspendedRules.add(cast(Object)rule);
+            }
+            if (threadJob.isBlocked) {
+                threadJob.isBlocked = false;
+                manager.reportUnblocked(monitor);
+            }
+        }
+    }
+
+    /* (Non-javadoc)
+     * @see IJobManager#endRule
+     */
+    synchronized void end(ISchedulingRule rule, bool resume) {
+        if (JobManager.DEBUG_BEGIN_END)
+            JobManager.debug_(Format("End rule: {}", rule)); //$NON-NLS-1$
+        ThreadJob threadJob = cast(ThreadJob) threadJobs.get(Thread.getThis());
+        if (threadJob is null)
+            Assert.isLegal(rule is null, Format("endRule without matching beginRule: {}", rule)); //$NON-NLS-1$
+        else if (threadJob.pop(rule)) {
+            endThreadJob(threadJob, resume);
+        }
+    }
+
+    /**
+     * Called when a worker thread has finished running a job. At this
+     * point, the worker thread must not own any scheduling rules
+     * @param lastJob The last job to run in this thread
+     */
+    void endJob(InternalJob lastJob) {
+        final Thread getThis = Thread.getThis();
+        IStatus error;
+        synchronized (this) {
+            ThreadJob threadJob = cast(ThreadJob) threadJobs.get(getThis);
+            if (threadJob is null) {
+                if (lastJob.getRule() !is null)
+                    notifyWaitingThreadJobs();
+                return;
+            }
+            String msg = Format("Worker thread ended job: {}, but still holds rule: {}", lastJob, threadJob ); //$NON-NLS-1$ //$NON-NLS-2$
+            error = new Status(IStatus.ERROR, JobManager.PI_JOBS, 1, msg, null);
+            //end the thread job
+            endThreadJob(threadJob, false);
+        }
+        try {
+            RuntimeLog.log(error);
+        } catch (RuntimeException e) {
+            //failed to log, so print to console instead
+            Stderr.formatln("{}", error.getMessage());
+        }
+    }
+
+    private void endThreadJob(ThreadJob threadJob, bool resume) {
+        Thread getThis = Thread.getThis();
+        //clean up when last rule scope exits
+        threadJobs.remove(getThis);
+        ISchedulingRule rule = threadJob.getRule();
+        if (resume && rule !is null)
+            suspendedRules.remove(cast(Object)rule);
+        //if this job had a rule, then we are essentially releasing a lock
+        //note it is safe to do this even if the acquire was aborted
+        if (threadJob.acquireRule) {
+            manager.getLockManager().removeLockThread(getThis, rule);
+            notifyWaitingThreadJobs();
+        }
+        //if the job was started, we need to notify job manager to end it
+        if (threadJob.isRunning())
+            manager.endJob_package(threadJob, Status.OK_STATUS, false);
+        recycle(threadJob);
+    }
+
+    /**
+     * Returns true if this rule has been suspended, and false otherwise.
+     */
+    private bool isSuspended(ISchedulingRule rule) {
+        if (suspendedRules.size() is 0)
+            return false;
+        for (Iterator it = suspendedRules.iterator(); it.hasNext();)
+            if ((cast(ISchedulingRule) it.next()).contains(rule))
+                return true;
+        return false;
+    }
+
+    /**
+     * Returns a new or reused ThreadJob instance.
+     */
+    private ThreadJob newThreadJob(ISchedulingRule rule) {
+        if (jobCache !is null) {
+            ThreadJob job = jobCache;
+            job.setRule(rule);
+            job.acquireRule = job.isRunning_ = false;
+            job.realJob = null;
+            jobCache = null;
+            return job;
+        }
+        return new ThreadJob(manager, rule);
+    }
+
+    /**
+     * A job has just finished that was holding a scheduling rule, and the
+     * scheduling rule is now free.  Wake any blocked thread jobs so they can
+     * compete for the newly freed lock
+     */
+    private void notifyWaitingThreadJobs() {
+        synchronized (ThreadJob.mutex) {
+            ThreadJob.condition.notifyAll();
+        }
+    }
+
+    /**
+     * Indicates that a thread job is no longer in use and can be reused.
+     */
+    private void recycle(ThreadJob job) {
+        if (jobCache is null && job.recycle())
+            jobCache = job;
+    }
+
+    /**
+     * Implements IJobManager#resume(ISchedulingRule)
+     * @param rule
+     */
+    void resume(ISchedulingRule rule) {
+        //resume happens as a consequence of freeing the last rule in the stack
+        end(rule, true);
+        if (JobManager.DEBUG_BEGIN_END)
+            JobManager.debug_(Format("Resume rule: {}", rule)); //$NON-NLS-1$
+    }
+
+    /**
+     * Implements IJobManager#suspend(ISchedulingRule, IProgressMonitor)
+     * @param rule
+     * @param monitor
+     */
+    void suspend(ISchedulingRule rule, IProgressMonitor monitor) {
+        if (JobManager.DEBUG_BEGIN_END)
+            JobManager.debug_(Format("Suspend rule: {}", rule)); //$NON-NLS-1$
+        //the suspend job will be remembered once the rule is acquired
+        begin(rule, monitor, true);
+    }
+
+    /**
+     * Implements IJobManager#transferRule(ISchedulingRule, Thread)
+     */
+    synchronized void transfer(ISchedulingRule rule, Thread destinationThread) {
+        //nothing to do for null
+        if (rule is null)
+            return;
+        Thread getThis = Thread.getThis();
+        //nothing to do if transferring to the same thread
+        if (getThis is destinationThread)
+            return;
+        //ensure destination thread doesn't already have a rule
+        ThreadJob job = cast(ThreadJob) threadJobs.get(destinationThread);
+        Assert.isLegal(job is null);
+        //ensure calling thread owns the job being transferred
+        job = cast(ThreadJob) threadJobs.get(getThis);
+        Assert.isNotNull(job);
+        Assert.isLegal(job.getRule() is rule);
+        //transfer the thread job without ending it
+        job.setThread(destinationThread);
+        threadJobs.remove(getThis);
+        threadJobs.put(destinationThread, job);
+        //transfer lock
+        if (job.acquireRule) {
+            manager.getLockManager().removeLockThread(getThis, rule);
+            manager.getLockManager().addLockThread(destinationThread, rule);
+        }
+    }
+}