Mercurial > projects > dwt-addons
diff dwtx/core/internal/jobs/ImplicitJobs.d @ 122:9d0585bcb7aa
Add core.jobs package
author | Frank Benoit <benoit@tionex.de> |
---|---|
date | Tue, 12 Aug 2008 02:34:21 +0200 |
parents | |
children | 862b05e0334a |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/dwtx/core/internal/jobs/ImplicitJobs.d Tue Aug 12 02:34:21 2008 +0200 @@ -0,0 +1,279 @@ +/******************************************************************************* + * Copyright (c) 2003, 2006 IBM Corporation and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * IBM - Initial API and implementation + * Port to the D programming language: + * Frank Benoit <benoit@tionex.de> + *******************************************************************************/ +module dwtx.core.internal.jobs.ImplicitJobs; + +import tango.text.convert.Format; +import tango.core.Thread; +import tango.io.Stdout; +import dwt.dwthelper.utils; +import dwtx.dwtxhelper.Collection; + +import dwtx.core.internal.runtime.RuntimeLog; +import dwtx.core.runtime.Assert; +import dwtx.core.runtime.IProgressMonitor; +import dwtx.core.runtime.IStatus; +import dwtx.core.runtime.Status; +import dwtx.core.runtime.jobs.ISchedulingRule; +import dwtx.core.runtime.jobs.Job; + +import dwtx.core.internal.jobs.JobManager; +import dwtx.core.internal.jobs.ThreadJob; +import dwtx.core.internal.jobs.InternalJob; + +/** + * Implicit jobs are jobs that are running by virtue of a JobManager.begin/end + * pair. They act like normal jobs, except they are tied to an arbitrary thread + * of the client's choosing, and they can be nested. + */ +class ImplicitJobs { + /** + * Cached unused instance that can be reused + */ + private ThreadJob jobCache = null; + protected JobManager manager; + + /** + * Set of suspended scheduling rules. + */ + private const Set suspendedRules; + + /** + * Maps (Thread->ThreadJob), threads to the currently running job for that + * thread. + */ + private final Map threadJobs; + + this(JobManager manager) { + this.manager = manager; + suspendedRules = new HashSet(20); + threadJobs = new HashMap(20); + } + + /* (Non-javadoc) + * @see IJobManager#beginRule + */ + void begin(ISchedulingRule rule, IProgressMonitor monitor, bool suspend) { + if (JobManager.DEBUG_BEGIN_END) + JobManager.debug_(Format("Begin rule: {}", rule)); //$NON-NLS-1$ + final Thread getThis = Thread.getThis(); + ThreadJob threadJob; + synchronized (this) { + threadJob = cast(ThreadJob) threadJobs.get(getThis); + if (threadJob !is null) { + //nested rule, just push on stack and return + threadJob.push(rule); + return; + } + //no need to schedule a thread job for a null rule + if (rule is null) + return; + //create a thread job for this thread, use the rule from the real job if it has one + Job realJob = manager.currentJob(); + if (realJob !is null && realJob.getRule() !is null) + threadJob = newThreadJob(realJob.getRule()); + else { + threadJob = newThreadJob(rule); + threadJob.acquireRule = true; + } + //don't acquire rule if it is a suspended rule + if (isSuspended(rule)) + threadJob.acquireRule = false; + //indicate if it is a system job to ensure isBlocking works correctly + threadJob.setRealJob(realJob); + threadJob.setThread(getThis); + } + try { + threadJob.push(rule); + //join the thread job outside sync block + if (threadJob.acquireRule) { + //no need to re-acquire any locks because the thread did not wait to get this lock + if (manager.runNow_package(threadJob)) + manager.getLockManager().addLockThread(Thread.getThis(), rule); + else + threadJob = threadJob.joinRun(monitor); + } + } finally { + //remember this thread job - only do this + //after the rule is acquired because it is ok for this thread to acquire + //and release other rules while waiting. + synchronized (this) { + threadJobs.put(getThis, threadJob); + if (suspend) + suspendedRules.add(cast(Object)rule); + } + if (threadJob.isBlocked) { + threadJob.isBlocked = false; + manager.reportUnblocked(monitor); + } + } + } + + /* (Non-javadoc) + * @see IJobManager#endRule + */ + synchronized void end(ISchedulingRule rule, bool resume) { + if (JobManager.DEBUG_BEGIN_END) + JobManager.debug_(Format("End rule: {}", rule)); //$NON-NLS-1$ + ThreadJob threadJob = cast(ThreadJob) threadJobs.get(Thread.getThis()); + if (threadJob is null) + Assert.isLegal(rule is null, Format("endRule without matching beginRule: {}", rule)); //$NON-NLS-1$ + else if (threadJob.pop(rule)) { + endThreadJob(threadJob, resume); + } + } + + /** + * Called when a worker thread has finished running a job. At this + * point, the worker thread must not own any scheduling rules + * @param lastJob The last job to run in this thread + */ + void endJob(InternalJob lastJob) { + final Thread getThis = Thread.getThis(); + IStatus error; + synchronized (this) { + ThreadJob threadJob = cast(ThreadJob) threadJobs.get(getThis); + if (threadJob is null) { + if (lastJob.getRule() !is null) + notifyWaitingThreadJobs(); + return; + } + String msg = Format("Worker thread ended job: {}, but still holds rule: {}", lastJob, threadJob ); //$NON-NLS-1$ //$NON-NLS-2$ + error = new Status(IStatus.ERROR, JobManager.PI_JOBS, 1, msg, null); + //end the thread job + endThreadJob(threadJob, false); + } + try { + RuntimeLog.log(error); + } catch (RuntimeException e) { + //failed to log, so print to console instead + Stderr.formatln("{}", error.getMessage()); + } + } + + private void endThreadJob(ThreadJob threadJob, bool resume) { + Thread getThis = Thread.getThis(); + //clean up when last rule scope exits + threadJobs.remove(getThis); + ISchedulingRule rule = threadJob.getRule(); + if (resume && rule !is null) + suspendedRules.remove(cast(Object)rule); + //if this job had a rule, then we are essentially releasing a lock + //note it is safe to do this even if the acquire was aborted + if (threadJob.acquireRule) { + manager.getLockManager().removeLockThread(getThis, rule); + notifyWaitingThreadJobs(); + } + //if the job was started, we need to notify job manager to end it + if (threadJob.isRunning()) + manager.endJob_package(threadJob, Status.OK_STATUS, false); + recycle(threadJob); + } + + /** + * Returns true if this rule has been suspended, and false otherwise. + */ + private bool isSuspended(ISchedulingRule rule) { + if (suspendedRules.size() is 0) + return false; + for (Iterator it = suspendedRules.iterator(); it.hasNext();) + if ((cast(ISchedulingRule) it.next()).contains(rule)) + return true; + return false; + } + + /** + * Returns a new or reused ThreadJob instance. + */ + private ThreadJob newThreadJob(ISchedulingRule rule) { + if (jobCache !is null) { + ThreadJob job = jobCache; + job.setRule(rule); + job.acquireRule = job.isRunning_ = false; + job.realJob = null; + jobCache = null; + return job; + } + return new ThreadJob(manager, rule); + } + + /** + * A job has just finished that was holding a scheduling rule, and the + * scheduling rule is now free. Wake any blocked thread jobs so they can + * compete for the newly freed lock + */ + private void notifyWaitingThreadJobs() { + synchronized (ThreadJob.mutex) { + ThreadJob.condition.notifyAll(); + } + } + + /** + * Indicates that a thread job is no longer in use and can be reused. + */ + private void recycle(ThreadJob job) { + if (jobCache is null && job.recycle()) + jobCache = job; + } + + /** + * Implements IJobManager#resume(ISchedulingRule) + * @param rule + */ + void resume(ISchedulingRule rule) { + //resume happens as a consequence of freeing the last rule in the stack + end(rule, true); + if (JobManager.DEBUG_BEGIN_END) + JobManager.debug_(Format("Resume rule: {}", rule)); //$NON-NLS-1$ + } + + /** + * Implements IJobManager#suspend(ISchedulingRule, IProgressMonitor) + * @param rule + * @param monitor + */ + void suspend(ISchedulingRule rule, IProgressMonitor monitor) { + if (JobManager.DEBUG_BEGIN_END) + JobManager.debug_(Format("Suspend rule: {}", rule)); //$NON-NLS-1$ + //the suspend job will be remembered once the rule is acquired + begin(rule, monitor, true); + } + + /** + * Implements IJobManager#transferRule(ISchedulingRule, Thread) + */ + synchronized void transfer(ISchedulingRule rule, Thread destinationThread) { + //nothing to do for null + if (rule is null) + return; + Thread getThis = Thread.getThis(); + //nothing to do if transferring to the same thread + if (getThis is destinationThread) + return; + //ensure destination thread doesn't already have a rule + ThreadJob job = cast(ThreadJob) threadJobs.get(destinationThread); + Assert.isLegal(job is null); + //ensure calling thread owns the job being transferred + job = cast(ThreadJob) threadJobs.get(getThis); + Assert.isNotNull(job); + Assert.isLegal(job.getRule() is rule); + //transfer the thread job without ending it + job.setThread(destinationThread); + threadJobs.remove(getThis); + threadJobs.put(destinationThread, job); + //transfer lock + if (job.acquireRule) { + manager.getLockManager().removeLockThread(getThis, rule); + manager.getLockManager().addLockThread(destinationThread, rule); + } + } +}