Mercurial > projects > dwt-addons
view dwtx/core/internal/jobs/ImplicitJobs.d @ 167:862b05e0334a
Add a wrapper for Thread
author | Frank Benoit <benoit@tionex.de> |
---|---|
date | Tue, 09 Sep 2008 15:59:16 +0200 |
parents | 9d0585bcb7aa |
children |
line wrap: on
line source
/******************************************************************************* * Copyright (c) 2003, 2006 IBM Corporation and others. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * IBM - Initial API and implementation * Port to the D programming language: * Frank Benoit <benoit@tionex.de> *******************************************************************************/ module dwtx.core.internal.jobs.ImplicitJobs; import tango.text.convert.Format; import dwtx.dwtxhelper.JThread; import tango.io.Stdout; import dwt.dwthelper.utils; import dwtx.dwtxhelper.Collection; import dwtx.core.internal.runtime.RuntimeLog; import dwtx.core.runtime.Assert; import dwtx.core.runtime.IProgressMonitor; import dwtx.core.runtime.IStatus; import dwtx.core.runtime.Status; import dwtx.core.runtime.jobs.ISchedulingRule; import dwtx.core.runtime.jobs.Job; import dwtx.core.internal.jobs.JobManager; import dwtx.core.internal.jobs.ThreadJob; import dwtx.core.internal.jobs.InternalJob; /** * Implicit jobs are jobs that are running by virtue of a JobManager.begin/end * pair. They act like normal jobs, except they are tied to an arbitrary thread * of the client's choosing, and they can be nested. */ class ImplicitJobs { /** * Cached unused instance that can be reused */ private ThreadJob jobCache = null; protected JobManager manager; /** * Set of suspended scheduling rules. */ private const Set suspendedRules; /** * Maps (Thread->ThreadJob), threads to the currently running job for that * thread. */ private final Map threadJobs; this(JobManager manager) { this.manager = manager; suspendedRules = new HashSet(20); threadJobs = new HashMap(20); } /* (Non-javadoc) * @see IJobManager#beginRule */ void begin(ISchedulingRule rule, IProgressMonitor monitor, bool suspend) { if (JobManager.DEBUG_BEGIN_END) JobManager.debug_(Format("Begin rule: {}", rule)); //$NON-NLS-1$ final JThread getThis = JThread.currentThread(); ThreadJob threadJob; synchronized (this) { threadJob = cast(ThreadJob) threadJobs.get(getThis); if (threadJob !is null) { //nested rule, just push on stack and return threadJob.push(rule); return; } //no need to schedule a thread job for a null rule if (rule is null) return; //create a thread job for this thread, use the rule from the real job if it has one Job realJob = manager.currentJob(); if (realJob !is null && realJob.getRule() !is null) threadJob = newThreadJob(realJob.getRule()); else { threadJob = newThreadJob(rule); threadJob.acquireRule = true; } //don't acquire rule if it is a suspended rule if (isSuspended(rule)) threadJob.acquireRule = false; //indicate if it is a system job to ensure isBlocking works correctly threadJob.setRealJob(realJob); threadJob.setThread(getThis); } try { threadJob.push(rule); //join the thread job outside sync block if (threadJob.acquireRule) { //no need to re-acquire any locks because the thread did not wait to get this lock if (manager.runNow_package(threadJob)) manager.getLockManager().addLockThread(JThread.currentThread(), rule); else threadJob = threadJob.joinRun(monitor); } } finally { //remember this thread job - only do this //after the rule is acquired because it is ok for this thread to acquire //and release other rules while waiting. synchronized (this) { threadJobs.put(getThis, threadJob); if (suspend) suspendedRules.add(cast(Object)rule); } if (threadJob.isBlocked) { threadJob.isBlocked = false; manager.reportUnblocked(monitor); } } } /* (Non-javadoc) * @see IJobManager#endRule */ synchronized void end(ISchedulingRule rule, bool resume) { if (JobManager.DEBUG_BEGIN_END) JobManager.debug_(Format("End rule: {}", rule)); //$NON-NLS-1$ ThreadJob threadJob = cast(ThreadJob) threadJobs.get(JThread.currentThread()); if (threadJob is null) Assert.isLegal(rule is null, Format("endRule without matching beginRule: {}", rule)); //$NON-NLS-1$ else if (threadJob.pop(rule)) { endThreadJob(threadJob, resume); } } /** * Called when a worker thread has finished running a job. At this * point, the worker thread must not own any scheduling rules * @param lastJob The last job to run in this thread */ void endJob(InternalJob lastJob) { final JThread getThis = JThread.currentThread(); IStatus error; synchronized (this) { ThreadJob threadJob = cast(ThreadJob) threadJobs.get(getThis); if (threadJob is null) { if (lastJob.getRule() !is null) notifyWaitingThreadJobs(); return; } String msg = Format("Worker thread ended job: {}, but still holds rule: {}", lastJob, threadJob ); //$NON-NLS-1$ //$NON-NLS-2$ error = new Status(IStatus.ERROR, JobManager.PI_JOBS, 1, msg, null); //end the thread job endThreadJob(threadJob, false); } try { RuntimeLog.log(error); } catch (RuntimeException e) { //failed to log, so print to console instead Stderr.formatln("{}", error.getMessage()); } } private void endThreadJob(ThreadJob threadJob, bool resume) { JThread getThis = JThread.currentThread(); //clean up when last rule scope exits threadJobs.remove(getThis); ISchedulingRule rule = threadJob.getRule(); if (resume && rule !is null) suspendedRules.remove(cast(Object)rule); //if this job had a rule, then we are essentially releasing a lock //note it is safe to do this even if the acquire was aborted if (threadJob.acquireRule) { manager.getLockManager().removeLockThread(getThis, rule); notifyWaitingThreadJobs(); } //if the job was started, we need to notify job manager to end it if (threadJob.isRunning()) manager.endJob_package(threadJob, Status.OK_STATUS, false); recycle(threadJob); } /** * Returns true if this rule has been suspended, and false otherwise. */ private bool isSuspended(ISchedulingRule rule) { if (suspendedRules.size() is 0) return false; for (Iterator it = suspendedRules.iterator(); it.hasNext();) if ((cast(ISchedulingRule) it.next()).contains(rule)) return true; return false; } /** * Returns a new or reused ThreadJob instance. */ private ThreadJob newThreadJob(ISchedulingRule rule) { if (jobCache !is null) { ThreadJob job = jobCache; job.setRule(rule); job.acquireRule = job.isRunning_ = false; job.realJob = null; jobCache = null; return job; } return new ThreadJob(manager, rule); } /** * A job has just finished that was holding a scheduling rule, and the * scheduling rule is now free. Wake any blocked thread jobs so they can * compete for the newly freed lock */ private void notifyWaitingThreadJobs() { synchronized (ThreadJob.mutex) { ThreadJob.condition.notifyAll(); } } /** * Indicates that a thread job is no longer in use and can be reused. */ private void recycle(ThreadJob job) { if (jobCache is null && job.recycle()) jobCache = job; } /** * Implements IJobManager#resume(ISchedulingRule) * @param rule */ void resume(ISchedulingRule rule) { //resume happens as a consequence of freeing the last rule in the stack end(rule, true); if (JobManager.DEBUG_BEGIN_END) JobManager.debug_(Format("Resume rule: {}", rule)); //$NON-NLS-1$ } /** * Implements IJobManager#suspend(ISchedulingRule, IProgressMonitor) * @param rule * @param monitor */ void suspend(ISchedulingRule rule, IProgressMonitor monitor) { if (JobManager.DEBUG_BEGIN_END) JobManager.debug_(Format("Suspend rule: {}", rule)); //$NON-NLS-1$ //the suspend job will be remembered once the rule is acquired begin(rule, monitor, true); } /** * Implements IJobManager#transferRule(ISchedulingRule, Thread) */ synchronized void transfer(ISchedulingRule rule, JThread destinationThread) { //nothing to do for null if (rule is null) return; JThread getThis = JThread.currentThread(); //nothing to do if transferring to the same thread if (getThis is destinationThread) return; //ensure destination thread doesn't already have a rule ThreadJob job = cast(ThreadJob) threadJobs.get(destinationThread); Assert.isLegal(job is null); //ensure calling thread owns the job being transferred job = cast(ThreadJob) threadJobs.get(getThis); Assert.isNotNull(job); Assert.isLegal(job.getRule() is rule); //transfer the thread job without ending it job.setThread(destinationThread); threadJobs.remove(getThis); threadJobs.put(destinationThread, job); //transfer lock if (job.acquireRule) { manager.getLockManager().removeLockThread(getThis, rule); manager.getLockManager().addLockThread(destinationThread, rule); } } }