comparison dwtx/core/internal/jobs/OrderedLock.d @ 122:9d0585bcb7aa

Add core.jobs package
author Frank Benoit <benoit@tionex.de>
date Tue, 12 Aug 2008 02:34:21 +0200
parents
children 862b05e0334a
comparison
equal deleted inserted replaced
121:c0304616ea23 122:9d0585bcb7aa
1 /*******************************************************************************
2 * Copyright (c) 2003, 2006 IBM Corporation and others.
3 * All rights reserved. This program and the accompanying materials
4 * are made available under the terms of the Eclipse Public License v1.0
5 * which accompanies this distribution, and is available at
6 * http://www.eclipse.org/legal/epl-v10.html
7 *
8 * Contributors:
9 * IBM - Initial API and implementation
10 * Port to the D programming language:
11 * Frank Benoit <benoit@tionex.de>
12 *******************************************************************************/
13 module dwtx.core.internal.jobs.OrderedLock;
14
15 import tango.text.convert.Format;
16 import tango.core.Thread;
17 import tango.io.Stdout;
18 import dwt.dwthelper.utils;
19
20 import dwtx.core.runtime.Assert;
21 import dwtx.core.runtime.jobs.ILock;
22 import dwtx.core.runtime.jobs.ISchedulingRule;
23
24 import dwtx.core.internal.jobs.LockManager;
25 import dwtx.core.internal.jobs.Queue;
26 import dwtx.core.internal.jobs.Semaphore;
27
28 /**
29 * A lock used to control write access to an exclusive resource.
30 *
31 * The lock avoids circular waiting deadlocks by detecting the deadlocks
32 * and resolving them through the suspension of all locks owned by one
33 * of the threads involved in the deadlock. This makes it impossible for n such
34 * locks to deadlock while waiting for each other. The downside is that this means
35 * that during an interval when a process owns a lock, it can be forced
36 * to give the lock up and wait until all locks it requires become
37 * available. This removes the feature of exclusive access to the
38 * resource in contention for the duration between acquire() and
39 * release() calls.
40 *
41 * The lock implementation prevents starvation by granting the
42 * lock in the same order in which acquire() requests arrive. In
43 * this scheme, starvation is only possible if a thread retains
44 * a lock indefinitely.
45 */
46 public class OrderedLock : ILock, ISchedulingRule {
47
48 private static const bool DEBUG = false;
49 /**
50 * Locks are sequentially ordered for debugging purposes.
51 */
52 private static int nextLockNumber = 0;
53 /**
54 * The thread of the operation that currently owns the lock.
55 */
56 private /+volatile+/ Thread currentOperationThread;
57 /**
58 * Records the number of successive acquires in the same
59 * thread. The lock is released only when the depth
60 * reaches zero.
61 */
62 private int depth;
63 /**
64 * The manager that implements the deadlock detection and resolution protocol.
65 */
66 private const LockManager manager;
67 private const int number;
68
69 /**
70 * Queue of semaphores for threads currently waiting
71 * on the lock. This queue is not thread-safe, so access
72 * to this queue must be synchronized on the lock instance.
73 */
74 private const Queue operations;
75
76 /**
77 * Creates a new workspace lock.
78 */
79 this(LockManager manager) {
80
81 operations = new Queue();
82
83 this.manager = manager;
84 this.number = nextLockNumber++;
85 }
86
87 /* (non-Javadoc)
88 * @see Locks.ILock#acquire()
89 */
90 public void acquire() {
91 //spin until the lock is successfully acquired
92 //NOTE: spinning here allows the UI thread to service pending syncExecs
93 //if the UI thread is waiting to acquire a lock.
94 while (true) {
95 try {
96 if (acquire(Long.MAX_VALUE))
97 return;
98 } catch (InterruptedException e) {
99 //ignore and loop
100 }
101 }
102 }
103
104 /* (non-Javadoc)
105 * @see Locks.ILock#acquire(long)
106 */
107 public bool acquire(long delay) {
108 implMissing(__FILE__, __LINE__ );
109 // if (Thread.interrupted())
110 // throw new InterruptedException();
111
112 bool success = false;
113 if (delay <= 0)
114 return attempt();
115 Semaphore semaphore = createSemaphore();
116 if (semaphore is null)
117 return true;
118 if (DEBUG)
119 Stdout.formatln("[{}] Operation waiting to be executed... ", Thread.getThis(), this); //$NON-NLS-1$ //$NON-NLS-2$
120 success = doAcquire(semaphore, delay);
121 manager.resumeSuspendedLocks(Thread.getThis());
122 if (DEBUG && success)
123 Stdout.formatln("[{}] Operation started... ", Thread.getThis(), this); //$NON-NLS-1$ //$NON-NLS-2$
124 else if (DEBUG)
125 Stdout.formatln("[{}] Operation timed out... ", Thread.getThis(), this); //$NON-NLS-1$ //$NON-NLS-2$
126 return success;
127 }
128
129 /**
130 * Attempts to acquire the lock. Returns false if the lock is not available and
131 * true if the lock has been successfully acquired.
132 */
133 private synchronized bool attempt() {
134 //return true if we already own the lock
135 //also, if nobody is waiting, grant the lock immediately
136 if ((currentOperationThread is Thread.getThis()) || (currentOperationThread is null && operations.isEmpty())) {
137 depth++;
138 setCurrentOperationThread(Thread.getThis());
139 return true;
140 }
141 return false;
142 }
143
144 /* (non-Javadoc)
145 * @see dwtx.core.runtime.jobs.ISchedulingRule#contains(dwtx.core.runtime.jobs.ISchedulingRule)
146 */
147 public bool contains(ISchedulingRule rule) {
148 return false;
149 }
150
151 /**
152 * Returns null if acquired and a Semaphore object otherwise. If a
153 * waiting semaphore already exists for this thread, it will be returned,
154 * otherwise a new semaphore will be created, enqueued, and returned.
155 */
156 private synchronized Semaphore createSemaphore() {
157 return attempt() ? null : enqueue(new Semaphore(Thread.getThis()));
158 }
159
160 /**
161 * Attempts to acquire this lock. Callers will block until this lock comes available to
162 * them, or until the specified delay has elapsed.
163 */
164 private bool doAcquire(Semaphore semaphore, long delay) {
165 bool success = false;
166 //notify hook to service pending syncExecs before falling asleep
167 if (manager.aboutToWait(this.currentOperationThread)) {
168 //hook granted immediate access
169 //remove semaphore for the lock request from the queue
170 //do not log in graph because this thread did not really get the lock
171 removeFromQueue(semaphore);
172 depth++;
173 manager.addLockThread(currentOperationThread, this);
174 return true;
175 }
176 //Make sure the semaphore is in the queue before we start waiting
177 //It might have been removed from the queue while servicing syncExecs
178 //This is will return our existing semaphore if it is still in the queue
179 semaphore = createSemaphore();
180 if (semaphore is null)
181 return true;
182 manager.addLockWaitThread(Thread.getThis(), this);
183 try {
184 success = semaphore.acquire(delay);
185 } catch (InterruptedException e) {
186 if (DEBUG)
187 Stdout.formatln(Format("[{}] Operation interrupted while waiting... :-|", Thread.getThis())); //$NON-NLS-1$ //$NON-NLS-2$
188 throw e;
189 }
190 if (success) {
191 depth++;
192 updateCurrentOperation();
193 } else {
194 removeFromQueue(semaphore);
195 manager.removeLockWaitThread(Thread.getThis(), this);
196 }
197 return success;
198 }
199
200 /**
201 * Releases this lock from the thread that used to own it.
202 * Grants this lock to the next thread in the queue.
203 */
204 private synchronized void doRelease() {
205 //notify hook
206 manager.aboutToRelease();
207 depth = 0;
208 Semaphore next = cast(Semaphore) operations.peek();
209 setCurrentOperationThread(null);
210 if (next !is null)
211 next.release();
212 }
213
214 /**
215 * If there is another semaphore with the same runnable in the
216 * queue, the other is returned and the new one is not added.
217 */
218 private synchronized Semaphore enqueue(Semaphore newSemaphore) {
219 Semaphore semaphore = cast(Semaphore) operations.get(newSemaphore);
220 if (semaphore is null) {
221 operations.enqueue(newSemaphore);
222 return newSemaphore;
223 }
224 return semaphore;
225 }
226
227 /**
228 * Suspend this lock by granting the lock to the next lock in the queue.
229 * Return the depth of the suspended lock.
230 */
231 protected int forceRelease() {
232 int oldDepth = depth;
233 doRelease();
234 return oldDepth;
235 }
236 package int forceRelease_package() {
237 return forceRelease();
238 }
239
240 /* (non-Javadoc)
241 * @see Locks.ILock#getDepth()
242 */
243 public int getDepth() {
244 return depth;
245 }
246
247 /* (non-Javadoc)
248 * @see dwtx.core.runtime.jobs.ISchedulingRule#isConflicting(dwtx.core.runtime.jobs.ISchedulingRule)
249 */
250 public bool isConflicting(ISchedulingRule rule) {
251 return rule is this;
252 }
253
254 /* (non-Javadoc)
255 * @see Locks.ILock#release()
256 */
257 public void release() {
258 if (depth is 0)
259 return;
260 //only release the lock when the depth reaches zero
261 Assert.isTrue(depth >= 0, "Lock released too many times"); //$NON-NLS-1$
262 if (--depth is 0)
263 doRelease();
264 else
265 manager.removeLockThread(currentOperationThread, this);
266 }
267
268 /**
269 * Removes a semaphore from the queue of waiting operations.
270 *
271 * @param semaphore The semaphore to remove
272 */
273 private synchronized void removeFromQueue(Semaphore semaphore) {
274 operations.remove(semaphore);
275 }
276
277 /**
278 * If newThread is null, release this lock from its previous owner.
279 * If newThread is not null, grant this lock to newThread.
280 */
281 private void setCurrentOperationThread(Thread newThread) {
282 if ((currentOperationThread !is null) && (newThread is null))
283 manager.removeLockThread(currentOperationThread, this);
284 this.currentOperationThread = newThread;
285 if (currentOperationThread !is null)
286 manager.addLockThread(currentOperationThread, this);
287 }
288
289 /**
290 * Forces the lock to be at the given depth.
291 * Used when re-acquiring a suspended lock.
292 */
293 protected void setDepth(int newDepth) {
294 for (int i = depth; i < newDepth; i++) {
295 manager.addLockThread(currentOperationThread, this);
296 }
297 this.depth = newDepth;
298 }
299 package void setDepth_package(int newDepth) {
300 return setDepth(newDepth);
301 }
302
303 /**
304 * For debugging purposes only.
305 */
306 public String toString() {
307 return Format("OrderedLock ({})", number ); //$NON-NLS-1$ //$NON-NLS-2$
308 }
309
310 /**
311 * This lock has just been granted to a new thread (the thread waited for it).
312 * Remove the request from the queue and update both the graph and the lock.
313 */
314 private synchronized void updateCurrentOperation() {
315 operations.dequeue();
316 setCurrentOperationThread(Thread.getThis());
317 }
318 }