comparison dwtx/core/internal/jobs/ImplicitJobs.d @ 122:9d0585bcb7aa

Add core.jobs package
author Frank Benoit <benoit@tionex.de>
date Tue, 12 Aug 2008 02:34:21 +0200
parents
children 862b05e0334a
comparison
equal deleted inserted replaced
121:c0304616ea23 122:9d0585bcb7aa
1 /*******************************************************************************
2 * Copyright (c) 2003, 2006 IBM Corporation and others.
3 * All rights reserved. This program and the accompanying materials
4 * are made available under the terms of the Eclipse Public License v1.0
5 * which accompanies this distribution, and is available at
6 * http://www.eclipse.org/legal/epl-v10.html
7 *
8 * Contributors:
9 * IBM - Initial API and implementation
10 * Port to the D programming language:
11 * Frank Benoit <benoit@tionex.de>
12 *******************************************************************************/
13 module dwtx.core.internal.jobs.ImplicitJobs;
14
15 import tango.text.convert.Format;
16 import tango.core.Thread;
17 import tango.io.Stdout;
18 import dwt.dwthelper.utils;
19 import dwtx.dwtxhelper.Collection;
20
21 import dwtx.core.internal.runtime.RuntimeLog;
22 import dwtx.core.runtime.Assert;
23 import dwtx.core.runtime.IProgressMonitor;
24 import dwtx.core.runtime.IStatus;
25 import dwtx.core.runtime.Status;
26 import dwtx.core.runtime.jobs.ISchedulingRule;
27 import dwtx.core.runtime.jobs.Job;
28
29 import dwtx.core.internal.jobs.JobManager;
30 import dwtx.core.internal.jobs.ThreadJob;
31 import dwtx.core.internal.jobs.InternalJob;
32
33 /**
34 * Implicit jobs are jobs that are running by virtue of a JobManager.begin/end
35 * pair. They act like normal jobs, except they are tied to an arbitrary thread
36 * of the client's choosing, and they can be nested.
37 */
38 class ImplicitJobs {
39 /**
40 * Cached unused instance that can be reused
41 */
42 private ThreadJob jobCache = null;
43 protected JobManager manager;
44
45 /**
46 * Set of suspended scheduling rules.
47 */
48 private const Set suspendedRules;
49
50 /**
51 * Maps (Thread->ThreadJob), threads to the currently running job for that
52 * thread.
53 */
54 private final Map threadJobs;
55
56 this(JobManager manager) {
57 this.manager = manager;
58 suspendedRules = new HashSet(20);
59 threadJobs = new HashMap(20);
60 }
61
62 /* (Non-javadoc)
63 * @see IJobManager#beginRule
64 */
65 void begin(ISchedulingRule rule, IProgressMonitor monitor, bool suspend) {
66 if (JobManager.DEBUG_BEGIN_END)
67 JobManager.debug_(Format("Begin rule: {}", rule)); //$NON-NLS-1$
68 final Thread getThis = Thread.getThis();
69 ThreadJob threadJob;
70 synchronized (this) {
71 threadJob = cast(ThreadJob) threadJobs.get(getThis);
72 if (threadJob !is null) {
73 //nested rule, just push on stack and return
74 threadJob.push(rule);
75 return;
76 }
77 //no need to schedule a thread job for a null rule
78 if (rule is null)
79 return;
80 //create a thread job for this thread, use the rule from the real job if it has one
81 Job realJob = manager.currentJob();
82 if (realJob !is null && realJob.getRule() !is null)
83 threadJob = newThreadJob(realJob.getRule());
84 else {
85 threadJob = newThreadJob(rule);
86 threadJob.acquireRule = true;
87 }
88 //don't acquire rule if it is a suspended rule
89 if (isSuspended(rule))
90 threadJob.acquireRule = false;
91 //indicate if it is a system job to ensure isBlocking works correctly
92 threadJob.setRealJob(realJob);
93 threadJob.setThread(getThis);
94 }
95 try {
96 threadJob.push(rule);
97 //join the thread job outside sync block
98 if (threadJob.acquireRule) {
99 //no need to re-acquire any locks because the thread did not wait to get this lock
100 if (manager.runNow_package(threadJob))
101 manager.getLockManager().addLockThread(Thread.getThis(), rule);
102 else
103 threadJob = threadJob.joinRun(monitor);
104 }
105 } finally {
106 //remember this thread job - only do this
107 //after the rule is acquired because it is ok for this thread to acquire
108 //and release other rules while waiting.
109 synchronized (this) {
110 threadJobs.put(getThis, threadJob);
111 if (suspend)
112 suspendedRules.add(cast(Object)rule);
113 }
114 if (threadJob.isBlocked) {
115 threadJob.isBlocked = false;
116 manager.reportUnblocked(monitor);
117 }
118 }
119 }
120
121 /* (Non-javadoc)
122 * @see IJobManager#endRule
123 */
124 synchronized void end(ISchedulingRule rule, bool resume) {
125 if (JobManager.DEBUG_BEGIN_END)
126 JobManager.debug_(Format("End rule: {}", rule)); //$NON-NLS-1$
127 ThreadJob threadJob = cast(ThreadJob) threadJobs.get(Thread.getThis());
128 if (threadJob is null)
129 Assert.isLegal(rule is null, Format("endRule without matching beginRule: {}", rule)); //$NON-NLS-1$
130 else if (threadJob.pop(rule)) {
131 endThreadJob(threadJob, resume);
132 }
133 }
134
135 /**
136 * Called when a worker thread has finished running a job. At this
137 * point, the worker thread must not own any scheduling rules
138 * @param lastJob The last job to run in this thread
139 */
140 void endJob(InternalJob lastJob) {
141 final Thread getThis = Thread.getThis();
142 IStatus error;
143 synchronized (this) {
144 ThreadJob threadJob = cast(ThreadJob) threadJobs.get(getThis);
145 if (threadJob is null) {
146 if (lastJob.getRule() !is null)
147 notifyWaitingThreadJobs();
148 return;
149 }
150 String msg = Format("Worker thread ended job: {}, but still holds rule: {}", lastJob, threadJob ); //$NON-NLS-1$ //$NON-NLS-2$
151 error = new Status(IStatus.ERROR, JobManager.PI_JOBS, 1, msg, null);
152 //end the thread job
153 endThreadJob(threadJob, false);
154 }
155 try {
156 RuntimeLog.log(error);
157 } catch (RuntimeException e) {
158 //failed to log, so print to console instead
159 Stderr.formatln("{}", error.getMessage());
160 }
161 }
162
163 private void endThreadJob(ThreadJob threadJob, bool resume) {
164 Thread getThis = Thread.getThis();
165 //clean up when last rule scope exits
166 threadJobs.remove(getThis);
167 ISchedulingRule rule = threadJob.getRule();
168 if (resume && rule !is null)
169 suspendedRules.remove(cast(Object)rule);
170 //if this job had a rule, then we are essentially releasing a lock
171 //note it is safe to do this even if the acquire was aborted
172 if (threadJob.acquireRule) {
173 manager.getLockManager().removeLockThread(getThis, rule);
174 notifyWaitingThreadJobs();
175 }
176 //if the job was started, we need to notify job manager to end it
177 if (threadJob.isRunning())
178 manager.endJob_package(threadJob, Status.OK_STATUS, false);
179 recycle(threadJob);
180 }
181
182 /**
183 * Returns true if this rule has been suspended, and false otherwise.
184 */
185 private bool isSuspended(ISchedulingRule rule) {
186 if (suspendedRules.size() is 0)
187 return false;
188 for (Iterator it = suspendedRules.iterator(); it.hasNext();)
189 if ((cast(ISchedulingRule) it.next()).contains(rule))
190 return true;
191 return false;
192 }
193
194 /**
195 * Returns a new or reused ThreadJob instance.
196 */
197 private ThreadJob newThreadJob(ISchedulingRule rule) {
198 if (jobCache !is null) {
199 ThreadJob job = jobCache;
200 job.setRule(rule);
201 job.acquireRule = job.isRunning_ = false;
202 job.realJob = null;
203 jobCache = null;
204 return job;
205 }
206 return new ThreadJob(manager, rule);
207 }
208
209 /**
210 * A job has just finished that was holding a scheduling rule, and the
211 * scheduling rule is now free. Wake any blocked thread jobs so they can
212 * compete for the newly freed lock
213 */
214 private void notifyWaitingThreadJobs() {
215 synchronized (ThreadJob.mutex) {
216 ThreadJob.condition.notifyAll();
217 }
218 }
219
220 /**
221 * Indicates that a thread job is no longer in use and can be reused.
222 */
223 private void recycle(ThreadJob job) {
224 if (jobCache is null && job.recycle())
225 jobCache = job;
226 }
227
228 /**
229 * Implements IJobManager#resume(ISchedulingRule)
230 * @param rule
231 */
232 void resume(ISchedulingRule rule) {
233 //resume happens as a consequence of freeing the last rule in the stack
234 end(rule, true);
235 if (JobManager.DEBUG_BEGIN_END)
236 JobManager.debug_(Format("Resume rule: {}", rule)); //$NON-NLS-1$
237 }
238
239 /**
240 * Implements IJobManager#suspend(ISchedulingRule, IProgressMonitor)
241 * @param rule
242 * @param monitor
243 */
244 void suspend(ISchedulingRule rule, IProgressMonitor monitor) {
245 if (JobManager.DEBUG_BEGIN_END)
246 JobManager.debug_(Format("Suspend rule: {}", rule)); //$NON-NLS-1$
247 //the suspend job will be remembered once the rule is acquired
248 begin(rule, monitor, true);
249 }
250
251 /**
252 * Implements IJobManager#transferRule(ISchedulingRule, Thread)
253 */
254 synchronized void transfer(ISchedulingRule rule, Thread destinationThread) {
255 //nothing to do for null
256 if (rule is null)
257 return;
258 Thread getThis = Thread.getThis();
259 //nothing to do if transferring to the same thread
260 if (getThis is destinationThread)
261 return;
262 //ensure destination thread doesn't already have a rule
263 ThreadJob job = cast(ThreadJob) threadJobs.get(destinationThread);
264 Assert.isLegal(job is null);
265 //ensure calling thread owns the job being transferred
266 job = cast(ThreadJob) threadJobs.get(getThis);
267 Assert.isNotNull(job);
268 Assert.isLegal(job.getRule() is rule);
269 //transfer the thread job without ending it
270 job.setThread(destinationThread);
271 threadJobs.remove(getThis);
272 threadJobs.put(destinationThread, job);
273 //transfer lock
274 if (job.acquireRule) {
275 manager.getLockManager().removeLockThread(getThis, rule);
276 manager.getLockManager().addLockThread(destinationThread, rule);
277 }
278 }
279 }