diff dwtx/core/internal/jobs/OrderedLock.d @ 122:9d0585bcb7aa

Add core.jobs package
author Frank Benoit <benoit@tionex.de>
date Tue, 12 Aug 2008 02:34:21 +0200
parents
children 862b05e0334a
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/dwtx/core/internal/jobs/OrderedLock.d	Tue Aug 12 02:34:21 2008 +0200
@@ -0,0 +1,318 @@
+/*******************************************************************************
+ * Copyright (c) 2003, 2006 IBM Corporation and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ *     IBM - Initial API and implementation
+ * Port to the D programming language:
+ *     Frank Benoit <benoit@tionex.de>
+ *******************************************************************************/
+module dwtx.core.internal.jobs.OrderedLock;
+
+import tango.text.convert.Format;
+import tango.core.Thread;
+import tango.io.Stdout;
+import dwt.dwthelper.utils;
+
+import dwtx.core.runtime.Assert;
+import dwtx.core.runtime.jobs.ILock;
+import dwtx.core.runtime.jobs.ISchedulingRule;
+
+import dwtx.core.internal.jobs.LockManager;
+import dwtx.core.internal.jobs.Queue;
+import dwtx.core.internal.jobs.Semaphore;
+
+/**
+ * A lock used to control write access to an exclusive resource.
+ *
+ * The lock avoids circular waiting deadlocks by detecting the deadlocks
+ * and resolving them through the suspension of all locks owned by one
+ * of the threads involved in the deadlock. This makes it impossible for n such
+ * locks to deadlock while waiting for each other.  The downside is that this means
+ * that during an interval when a process owns a lock, it can be forced
+ * to give the lock up and wait until all locks it requires become
+ * available.  This removes the feature of exclusive access to the
+ * resource in contention for the duration between acquire() and
+ * release() calls.
+ *
+ * The lock implementation prevents starvation by granting the
+ * lock in the same order in which acquire() requests arrive. In
+ * this scheme, starvation is only possible if a thread retains
+ * a lock indefinitely.
+ */
+public class OrderedLock : ILock, ISchedulingRule {
+
+    private static const bool DEBUG = false;
+    /**
+     * Locks are sequentially ordered for debugging purposes.
+     */
+    private static int nextLockNumber = 0;
+    /**
+     * The thread of the operation that currently owns the lock.
+     */
+    private /+volatile+/ Thread currentOperationThread;
+    /**
+     * Records the number of successive acquires in the same
+     * thread. The lock is released only when the depth
+     * reaches zero.
+     */
+    private int depth;
+    /**
+     * The manager that implements the deadlock detection and resolution protocol.
+     */
+    private const LockManager manager;
+    private const int number;
+
+    /**
+     * Queue of semaphores for threads currently waiting
+     * on the lock. This queue is not thread-safe, so access
+     * to this queue must be synchronized on the lock instance.
+     */
+    private const Queue operations;
+
+    /**
+     * Creates a new workspace lock.
+     */
+    this(LockManager manager) {
+
+        operations = new Queue();
+
+        this.manager = manager;
+        this.number = nextLockNumber++;
+    }
+
+    /* (non-Javadoc)
+     * @see Locks.ILock#acquire()
+     */
+    public void acquire() {
+        //spin until the lock is successfully acquired
+        //NOTE: spinning here allows the UI thread to service pending syncExecs
+        //if the UI thread is waiting to acquire a lock.
+        while (true) {
+            try {
+                if (acquire(Long.MAX_VALUE))
+                    return;
+            } catch (InterruptedException e) {
+                //ignore and loop
+            }
+        }
+    }
+
+    /* (non-Javadoc)
+     * @see Locks.ILock#acquire(long)
+     */
+    public bool acquire(long delay) {
+        implMissing(__FILE__, __LINE__ );
+//         if (Thread.interrupted())
+//             throw new InterruptedException();
+
+        bool success = false;
+        if (delay <= 0)
+            return attempt();
+        Semaphore semaphore = createSemaphore();
+        if (semaphore is null)
+            return true;
+        if (DEBUG)
+            Stdout.formatln("[{}] Operation waiting to be executed... ", Thread.getThis(), this); //$NON-NLS-1$ //$NON-NLS-2$
+        success = doAcquire(semaphore, delay);
+        manager.resumeSuspendedLocks(Thread.getThis());
+        if (DEBUG && success)
+            Stdout.formatln("[{}] Operation started... ", Thread.getThis(), this); //$NON-NLS-1$ //$NON-NLS-2$
+        else if (DEBUG)
+            Stdout.formatln("[{}] Operation timed out... ", Thread.getThis(), this); //$NON-NLS-1$ //$NON-NLS-2$
+        return success;
+    }
+
+    /**
+     * Attempts to acquire the lock.  Returns false if the lock is not available and
+     * true if the lock has been successfully acquired.
+     */
+    private synchronized bool attempt() {
+        //return true if we already own the lock
+        //also, if nobody is waiting, grant the lock immediately
+        if ((currentOperationThread is Thread.getThis()) || (currentOperationThread is null && operations.isEmpty())) {
+            depth++;
+            setCurrentOperationThread(Thread.getThis());
+            return true;
+        }
+        return false;
+    }
+
+    /* (non-Javadoc)
+     * @see dwtx.core.runtime.jobs.ISchedulingRule#contains(dwtx.core.runtime.jobs.ISchedulingRule)
+     */
+    public bool contains(ISchedulingRule rule) {
+        return false;
+    }
+
+    /**
+     * Returns null if acquired and a Semaphore object otherwise. If a
+     * waiting semaphore already exists for this thread, it will be returned,
+     * otherwise a new semaphore will be created, enqueued, and returned.
+     */
+    private synchronized Semaphore createSemaphore() {
+        return attempt() ? null : enqueue(new Semaphore(Thread.getThis()));
+    }
+
+    /**
+     * Attempts to acquire this lock.  Callers will block  until this lock comes available to
+     * them, or until the specified delay has elapsed.
+     */
+    private bool doAcquire(Semaphore semaphore, long delay) {
+        bool success = false;
+        //notify hook to service pending syncExecs before falling asleep
+        if (manager.aboutToWait(this.currentOperationThread)) {
+            //hook granted immediate access
+            //remove semaphore for the lock request from the queue
+            //do not log in graph because this thread did not really get the lock
+            removeFromQueue(semaphore);
+            depth++;
+            manager.addLockThread(currentOperationThread, this);
+            return true;
+        }
+        //Make sure the semaphore is in the queue before we start waiting
+        //It might have been removed from the queue while servicing syncExecs
+        //This is will return our existing semaphore if it is still in the queue
+        semaphore = createSemaphore();
+        if (semaphore is null)
+            return true;
+        manager.addLockWaitThread(Thread.getThis(), this);
+        try {
+            success = semaphore.acquire(delay);
+        } catch (InterruptedException e) {
+            if (DEBUG)
+                Stdout.formatln(Format("[{}] Operation interrupted while waiting... :-|", Thread.getThis())); //$NON-NLS-1$ //$NON-NLS-2$
+            throw e;
+        }
+        if (success) {
+            depth++;
+            updateCurrentOperation();
+        } else {
+            removeFromQueue(semaphore);
+            manager.removeLockWaitThread(Thread.getThis(), this);
+        }
+        return success;
+    }
+
+    /**
+     * Releases this lock from the thread that used to own it.
+     * Grants this lock to the next thread in the queue.
+     */
+    private synchronized void doRelease() {
+        //notify hook
+        manager.aboutToRelease();
+        depth = 0;
+        Semaphore next = cast(Semaphore) operations.peek();
+        setCurrentOperationThread(null);
+        if (next !is null)
+            next.release();
+    }
+
+    /**
+     * If there is another semaphore with the same runnable in the
+     * queue, the other is returned and the new one is not added.
+     */
+    private synchronized Semaphore enqueue(Semaphore newSemaphore) {
+        Semaphore semaphore = cast(Semaphore) operations.get(newSemaphore);
+        if (semaphore is null) {
+            operations.enqueue(newSemaphore);
+            return newSemaphore;
+        }
+        return semaphore;
+    }
+
+    /**
+     * Suspend this lock by granting the lock to the next lock in the queue.
+     * Return the depth of the suspended lock.
+     */
+    protected int forceRelease() {
+        int oldDepth = depth;
+        doRelease();
+        return oldDepth;
+    }
+    package int forceRelease_package() {
+        return forceRelease();
+    }
+
+    /* (non-Javadoc)
+     * @see Locks.ILock#getDepth()
+     */
+    public int getDepth() {
+        return depth;
+    }
+
+    /* (non-Javadoc)
+     * @see dwtx.core.runtime.jobs.ISchedulingRule#isConflicting(dwtx.core.runtime.jobs.ISchedulingRule)
+     */
+    public bool isConflicting(ISchedulingRule rule) {
+        return rule is this;
+    }
+
+    /* (non-Javadoc)
+     * @see Locks.ILock#release()
+     */
+    public void release() {
+        if (depth is 0)
+            return;
+        //only release the lock when the depth reaches zero
+        Assert.isTrue(depth >= 0, "Lock released too many times"); //$NON-NLS-1$
+        if (--depth is 0)
+            doRelease();
+        else
+            manager.removeLockThread(currentOperationThread, this);
+    }
+
+    /**
+     * Removes a semaphore from the queue of waiting operations.
+     *
+     * @param semaphore The semaphore to remove
+     */
+    private synchronized void removeFromQueue(Semaphore semaphore) {
+        operations.remove(semaphore);
+    }
+
+    /**
+     * If newThread is null, release this lock from its previous owner.
+     * If newThread is not null, grant this lock to newThread.
+     */
+    private void setCurrentOperationThread(Thread newThread) {
+        if ((currentOperationThread !is null) && (newThread is null))
+            manager.removeLockThread(currentOperationThread, this);
+        this.currentOperationThread = newThread;
+        if (currentOperationThread !is null)
+            manager.addLockThread(currentOperationThread, this);
+    }
+
+    /**
+     * Forces the lock to be at the given depth.
+     * Used when re-acquiring a suspended lock.
+     */
+    protected void setDepth(int newDepth) {
+        for (int i = depth; i < newDepth; i++) {
+            manager.addLockThread(currentOperationThread, this);
+        }
+        this.depth = newDepth;
+    }
+    package void setDepth_package(int newDepth) {
+        return setDepth(newDepth);
+    }
+
+    /**
+     * For debugging purposes only.
+     */
+    public String toString() {
+        return Format("OrderedLock ({})", number ); //$NON-NLS-1$ //$NON-NLS-2$
+    }
+
+    /**
+     * This lock has just been granted to a new thread (the thread waited for it).
+     * Remove the request from the queue and update both the graph and the lock.
+     */
+    private synchronized void updateCurrentOperation() {
+        operations.dequeue();
+        setCurrentOperationThread(Thread.getThis());
+    }
+}