Mercurial > projects > dwt-addons
comparison dwtx/core/internal/jobs/ImplicitJobs.d @ 122:9d0585bcb7aa
Add core.jobs package
author | Frank Benoit <benoit@tionex.de> |
---|---|
date | Tue, 12 Aug 2008 02:34:21 +0200 |
parents | |
children | 862b05e0334a |
comparison
equal
deleted
inserted
replaced
121:c0304616ea23 | 122:9d0585bcb7aa |
---|---|
1 /******************************************************************************* | |
2 * Copyright (c) 2003, 2006 IBM Corporation and others. | |
3 * All rights reserved. This program and the accompanying materials | |
4 * are made available under the terms of the Eclipse Public License v1.0 | |
5 * which accompanies this distribution, and is available at | |
6 * http://www.eclipse.org/legal/epl-v10.html | |
7 * | |
8 * Contributors: | |
9 * IBM - Initial API and implementation | |
10 * Port to the D programming language: | |
11 * Frank Benoit <benoit@tionex.de> | |
12 *******************************************************************************/ | |
13 module dwtx.core.internal.jobs.ImplicitJobs; | |
14 | |
15 import tango.text.convert.Format; | |
16 import tango.core.Thread; | |
17 import tango.io.Stdout; | |
18 import dwt.dwthelper.utils; | |
19 import dwtx.dwtxhelper.Collection; | |
20 | |
21 import dwtx.core.internal.runtime.RuntimeLog; | |
22 import dwtx.core.runtime.Assert; | |
23 import dwtx.core.runtime.IProgressMonitor; | |
24 import dwtx.core.runtime.IStatus; | |
25 import dwtx.core.runtime.Status; | |
26 import dwtx.core.runtime.jobs.ISchedulingRule; | |
27 import dwtx.core.runtime.jobs.Job; | |
28 | |
29 import dwtx.core.internal.jobs.JobManager; | |
30 import dwtx.core.internal.jobs.ThreadJob; | |
31 import dwtx.core.internal.jobs.InternalJob; | |
32 | |
33 /** | |
34 * Implicit jobs are jobs that are running by virtue of a JobManager.begin/end | |
35 * pair. They act like normal jobs, except they are tied to an arbitrary thread | |
36 * of the client's choosing, and they can be nested. | |
37 */ | |
38 class ImplicitJobs { | |
39 /** | |
40 * Cached unused instance that can be reused | |
41 */ | |
42 private ThreadJob jobCache = null; | |
43 protected JobManager manager; | |
44 | |
45 /** | |
46 * Set of suspended scheduling rules. | |
47 */ | |
48 private const Set suspendedRules; | |
49 | |
50 /** | |
51 * Maps (Thread->ThreadJob), threads to the currently running job for that | |
52 * thread. | |
53 */ | |
54 private final Map threadJobs; | |
55 | |
56 this(JobManager manager) { | |
57 this.manager = manager; | |
58 suspendedRules = new HashSet(20); | |
59 threadJobs = new HashMap(20); | |
60 } | |
61 | |
62 /* (Non-javadoc) | |
63 * @see IJobManager#beginRule | |
64 */ | |
65 void begin(ISchedulingRule rule, IProgressMonitor monitor, bool suspend) { | |
66 if (JobManager.DEBUG_BEGIN_END) | |
67 JobManager.debug_(Format("Begin rule: {}", rule)); //$NON-NLS-1$ | |
68 final Thread getThis = Thread.getThis(); | |
69 ThreadJob threadJob; | |
70 synchronized (this) { | |
71 threadJob = cast(ThreadJob) threadJobs.get(getThis); | |
72 if (threadJob !is null) { | |
73 //nested rule, just push on stack and return | |
74 threadJob.push(rule); | |
75 return; | |
76 } | |
77 //no need to schedule a thread job for a null rule | |
78 if (rule is null) | |
79 return; | |
80 //create a thread job for this thread, use the rule from the real job if it has one | |
81 Job realJob = manager.currentJob(); | |
82 if (realJob !is null && realJob.getRule() !is null) | |
83 threadJob = newThreadJob(realJob.getRule()); | |
84 else { | |
85 threadJob = newThreadJob(rule); | |
86 threadJob.acquireRule = true; | |
87 } | |
88 //don't acquire rule if it is a suspended rule | |
89 if (isSuspended(rule)) | |
90 threadJob.acquireRule = false; | |
91 //indicate if it is a system job to ensure isBlocking works correctly | |
92 threadJob.setRealJob(realJob); | |
93 threadJob.setThread(getThis); | |
94 } | |
95 try { | |
96 threadJob.push(rule); | |
97 //join the thread job outside sync block | |
98 if (threadJob.acquireRule) { | |
99 //no need to re-acquire any locks because the thread did not wait to get this lock | |
100 if (manager.runNow_package(threadJob)) | |
101 manager.getLockManager().addLockThread(Thread.getThis(), rule); | |
102 else | |
103 threadJob = threadJob.joinRun(monitor); | |
104 } | |
105 } finally { | |
106 //remember this thread job - only do this | |
107 //after the rule is acquired because it is ok for this thread to acquire | |
108 //and release other rules while waiting. | |
109 synchronized (this) { | |
110 threadJobs.put(getThis, threadJob); | |
111 if (suspend) | |
112 suspendedRules.add(cast(Object)rule); | |
113 } | |
114 if (threadJob.isBlocked) { | |
115 threadJob.isBlocked = false; | |
116 manager.reportUnblocked(monitor); | |
117 } | |
118 } | |
119 } | |
120 | |
121 /* (Non-javadoc) | |
122 * @see IJobManager#endRule | |
123 */ | |
124 synchronized void end(ISchedulingRule rule, bool resume) { | |
125 if (JobManager.DEBUG_BEGIN_END) | |
126 JobManager.debug_(Format("End rule: {}", rule)); //$NON-NLS-1$ | |
127 ThreadJob threadJob = cast(ThreadJob) threadJobs.get(Thread.getThis()); | |
128 if (threadJob is null) | |
129 Assert.isLegal(rule is null, Format("endRule without matching beginRule: {}", rule)); //$NON-NLS-1$ | |
130 else if (threadJob.pop(rule)) { | |
131 endThreadJob(threadJob, resume); | |
132 } | |
133 } | |
134 | |
135 /** | |
136 * Called when a worker thread has finished running a job. At this | |
137 * point, the worker thread must not own any scheduling rules | |
138 * @param lastJob The last job to run in this thread | |
139 */ | |
140 void endJob(InternalJob lastJob) { | |
141 final Thread getThis = Thread.getThis(); | |
142 IStatus error; | |
143 synchronized (this) { | |
144 ThreadJob threadJob = cast(ThreadJob) threadJobs.get(getThis); | |
145 if (threadJob is null) { | |
146 if (lastJob.getRule() !is null) | |
147 notifyWaitingThreadJobs(); | |
148 return; | |
149 } | |
150 String msg = Format("Worker thread ended job: {}, but still holds rule: {}", lastJob, threadJob ); //$NON-NLS-1$ //$NON-NLS-2$ | |
151 error = new Status(IStatus.ERROR, JobManager.PI_JOBS, 1, msg, null); | |
152 //end the thread job | |
153 endThreadJob(threadJob, false); | |
154 } | |
155 try { | |
156 RuntimeLog.log(error); | |
157 } catch (RuntimeException e) { | |
158 //failed to log, so print to console instead | |
159 Stderr.formatln("{}", error.getMessage()); | |
160 } | |
161 } | |
162 | |
163 private void endThreadJob(ThreadJob threadJob, bool resume) { | |
164 Thread getThis = Thread.getThis(); | |
165 //clean up when last rule scope exits | |
166 threadJobs.remove(getThis); | |
167 ISchedulingRule rule = threadJob.getRule(); | |
168 if (resume && rule !is null) | |
169 suspendedRules.remove(cast(Object)rule); | |
170 //if this job had a rule, then we are essentially releasing a lock | |
171 //note it is safe to do this even if the acquire was aborted | |
172 if (threadJob.acquireRule) { | |
173 manager.getLockManager().removeLockThread(getThis, rule); | |
174 notifyWaitingThreadJobs(); | |
175 } | |
176 //if the job was started, we need to notify job manager to end it | |
177 if (threadJob.isRunning()) | |
178 manager.endJob_package(threadJob, Status.OK_STATUS, false); | |
179 recycle(threadJob); | |
180 } | |
181 | |
182 /** | |
183 * Returns true if this rule has been suspended, and false otherwise. | |
184 */ | |
185 private bool isSuspended(ISchedulingRule rule) { | |
186 if (suspendedRules.size() is 0) | |
187 return false; | |
188 for (Iterator it = suspendedRules.iterator(); it.hasNext();) | |
189 if ((cast(ISchedulingRule) it.next()).contains(rule)) | |
190 return true; | |
191 return false; | |
192 } | |
193 | |
194 /** | |
195 * Returns a new or reused ThreadJob instance. | |
196 */ | |
197 private ThreadJob newThreadJob(ISchedulingRule rule) { | |
198 if (jobCache !is null) { | |
199 ThreadJob job = jobCache; | |
200 job.setRule(rule); | |
201 job.acquireRule = job.isRunning_ = false; | |
202 job.realJob = null; | |
203 jobCache = null; | |
204 return job; | |
205 } | |
206 return new ThreadJob(manager, rule); | |
207 } | |
208 | |
209 /** | |
210 * A job has just finished that was holding a scheduling rule, and the | |
211 * scheduling rule is now free. Wake any blocked thread jobs so they can | |
212 * compete for the newly freed lock | |
213 */ | |
214 private void notifyWaitingThreadJobs() { | |
215 synchronized (ThreadJob.mutex) { | |
216 ThreadJob.condition.notifyAll(); | |
217 } | |
218 } | |
219 | |
220 /** | |
221 * Indicates that a thread job is no longer in use and can be reused. | |
222 */ | |
223 private void recycle(ThreadJob job) { | |
224 if (jobCache is null && job.recycle()) | |
225 jobCache = job; | |
226 } | |
227 | |
228 /** | |
229 * Implements IJobManager#resume(ISchedulingRule) | |
230 * @param rule | |
231 */ | |
232 void resume(ISchedulingRule rule) { | |
233 //resume happens as a consequence of freeing the last rule in the stack | |
234 end(rule, true); | |
235 if (JobManager.DEBUG_BEGIN_END) | |
236 JobManager.debug_(Format("Resume rule: {}", rule)); //$NON-NLS-1$ | |
237 } | |
238 | |
239 /** | |
240 * Implements IJobManager#suspend(ISchedulingRule, IProgressMonitor) | |
241 * @param rule | |
242 * @param monitor | |
243 */ | |
244 void suspend(ISchedulingRule rule, IProgressMonitor monitor) { | |
245 if (JobManager.DEBUG_BEGIN_END) | |
246 JobManager.debug_(Format("Suspend rule: {}", rule)); //$NON-NLS-1$ | |
247 //the suspend job will be remembered once the rule is acquired | |
248 begin(rule, monitor, true); | |
249 } | |
250 | |
251 /** | |
252 * Implements IJobManager#transferRule(ISchedulingRule, Thread) | |
253 */ | |
254 synchronized void transfer(ISchedulingRule rule, Thread destinationThread) { | |
255 //nothing to do for null | |
256 if (rule is null) | |
257 return; | |
258 Thread getThis = Thread.getThis(); | |
259 //nothing to do if transferring to the same thread | |
260 if (getThis is destinationThread) | |
261 return; | |
262 //ensure destination thread doesn't already have a rule | |
263 ThreadJob job = cast(ThreadJob) threadJobs.get(destinationThread); | |
264 Assert.isLegal(job is null); | |
265 //ensure calling thread owns the job being transferred | |
266 job = cast(ThreadJob) threadJobs.get(getThis); | |
267 Assert.isNotNull(job); | |
268 Assert.isLegal(job.getRule() is rule); | |
269 //transfer the thread job without ending it | |
270 job.setThread(destinationThread); | |
271 threadJobs.remove(getThis); | |
272 threadJobs.put(destinationThread, job); | |
273 //transfer lock | |
274 if (job.acquireRule) { | |
275 manager.getLockManager().removeLockThread(getThis, rule); | |
276 manager.getLockManager().addLockThread(destinationThread, rule); | |
277 } | |
278 } | |
279 } |