diff druntime/src/common/core/sync/rwmutex.d @ 1458:e0b2d67cfe7c

Added druntime (this should be removed once it works).
author Robert Clipsham <robert@octarineparrot.com>
date Tue, 02 Jun 2009 17:43:06 +0100
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/druntime/src/common/core/sync/rwmutex.d	Tue Jun 02 17:43:06 2009 +0100
@@ -0,0 +1,512 @@
+/**
+ * The read/write mutex module provides a primitive for maintaining shared read
+ * access and mutually exclusive write access.
+ *
+ * Copyright: Copyright Sean Kelly 2005 - 2009.
+ * License:   <a href="http://www.boost.org/LICENSE_1_0.txt>Boost License 1.0</a>.
+ * Authors:   Sean Kelly
+ *
+ *          Copyright Sean Kelly 2005 - 2009.
+ * Distributed under the Boost Software License, Version 1.0.
+ *    (See accompanying file LICENSE_1_0.txt or copy at
+ *          http://www.boost.org/LICENSE_1_0.txt)
+ */
+module core.sync.rwmutex;
+
+
+public import core.sync.exception;
+private import core.sync.condition;
+private import core.sync.mutex;
+
+version( Win32 )
+{
+    private import core.sys.windows.windows;
+}
+else version( Posix )
+{
+    private import core.sys.posix.pthread;
+}
+
+
+////////////////////////////////////////////////////////////////////////////////
+// ReadWriteMutex
+//
+// Reader reader();
+// Writer writer();
+////////////////////////////////////////////////////////////////////////////////
+
+
+/**
+ * This class represents a mutex that allows any number of readers to enter,
+ * but when a writer enters, all other readers and writers are blocked.
+ *
+ * Please note that this mutex is not recursive and is intended to guard access
+ * to data only.  Also, no deadlock checking is in place because doing so would
+ * require dynamic memory allocation, which would reduce performance by an
+ * unacceptable amount.  As a result, any attempt to recursively acquire this
+ * mutex may well deadlock the caller, particularly if a write lock is acquired
+ * while holding a read lock, or vice-versa.  In practice, this should not be
+ * an issue however, because it is uncommon to call deeply into unknown code
+ * while holding a lock that simply protects data.
+ */
+class ReadWriteMutex
+{
+    /**
+     * Defines the policy used by this mutex.  Currently, two policies are
+     * defined.
+     *
+     * The first will queue writers until no readers hold the mutex, then
+     * pass the writers through one at a time.  If a reader acquires the mutex
+     * while there are still writers queued, the reader will take precedence.
+     *
+     * The second will queue readers if there are any writers queued.  Writers
+     * are passed through one at a time, and once there are no writers present,
+     * all queued readers will be alerted.
+     *
+     * Future policies may offer a more even balance between reader and writer
+     * precedence.
+     */
+    enum Policy
+    {
+        PREFER_READERS, /// Readers get preference.  This may starve writers.
+        PREFER_WRITERS  /// Writers get preference.  This may starve readers.
+    }
+
+
+    ////////////////////////////////////////////////////////////////////////////
+    // Initialization
+    ////////////////////////////////////////////////////////////////////////////
+
+
+    /**
+     * Initializes a read/write mutex object with the supplied policy.
+     *
+     * Params:
+     *  policy = The policy to use.
+     *
+     * Throws:
+     *  SyncException on error.
+     */
+    this( Policy policy = Policy.PREFER_WRITERS )
+    {
+        m_commonMutex = new Mutex;
+        if( !m_commonMutex )
+            throw new SyncException( "Unable to initialize mutex" );
+        scope(failure) delete m_commonMutex;
+
+        m_readerQueue = new Condition( m_commonMutex );
+        if( !m_readerQueue )
+            throw new SyncException( "Unable to initialize mutex" );
+        scope(failure) delete m_readerQueue;
+
+        m_writerQueue = new Condition( m_commonMutex );
+        if( !m_writerQueue )
+            throw new SyncException( "Unable to initialize mutex" );
+        scope(failure) delete m_writerQueue;
+
+        m_policy = policy;
+        m_reader = new Reader;
+        m_writer = new Writer;
+    }
+
+
+    ////////////////////////////////////////////////////////////////////////////
+    // General Properties
+    ////////////////////////////////////////////////////////////////////////////
+
+
+    /**
+     * Gets the policy for the associated mutex.
+     *
+     * Returns:
+     *  The policy used by this mutex.
+     */
+    Policy policy()
+    {
+        return m_policy;
+    }
+
+
+    ////////////////////////////////////////////////////////////////////////////
+    // Reader/Writer Handles
+    ////////////////////////////////////////////////////////////////////////////
+
+
+    /**
+     * Gets an object representing the reader lock for the associated mutex.
+     *
+     * Returns:
+     *  A reader sub-mutex.
+     */
+    Reader reader()
+    {
+        return m_reader;
+    }
+
+
+    /**
+     * Gets an object representing the writer lock for the associated mutex.
+     *
+     * Returns:
+     *  A writer sub-mutex.
+     */
+    Writer writer()
+    {
+        return m_writer;
+    }
+
+
+    ////////////////////////////////////////////////////////////////////////////
+    // Reader
+    ////////////////////////////////////////////////////////////////////////////
+
+
+    /**
+     * This class can be considered a mutex in its own right, and is used to
+     * negotiate a read lock for the enclosing mutex.
+     */
+    class Reader :
+        Object.Monitor
+    {
+        /**
+         * Initializes a read/write mutex reader proxy object.
+         */
+        this()
+        {
+            m_proxy.link = this;
+            (cast(void**) this)[1] = &m_proxy;
+        }
+
+
+        /**
+         * Acquires a read lock on the enclosing mutex.
+         */
+        void lock()
+        {
+            synchronized( m_commonMutex )
+            {
+                ++m_numQueuedReaders;
+                scope(exit) --m_numQueuedReaders;
+
+                while( shouldQueueReader() )
+                    m_readerQueue.wait();
+                ++m_numActiveReaders;
+            }
+        }
+
+
+        /**
+         * Releases a read lock on the enclosing mutex.
+         */
+        void unlock()
+        {
+            synchronized( m_commonMutex )
+            {
+                if( --m_numActiveReaders < 1 )
+                {
+                    if( m_numQueuedWriters > 0 )
+                        m_writerQueue.notify();
+                }
+            }
+        }
+
+
+        /**
+         * Attempts to acquire a read lock on the enclosing mutex.  If one can
+         * be obtained without blocking, the lock is acquired and true is
+         * returned.  If not, the lock is not acquired and false is returned.
+         *
+         * Returns:
+         *  true if the lock was acquired and false if not.
+         */
+        bool tryLock()
+        {
+            synchronized( m_commonMutex )
+            {
+                if( shouldQueueReader() )
+                    return false;
+                ++m_numActiveReaders;
+                return true;
+            }
+        }
+
+
+    private:
+        bool shouldQueueReader()
+        {
+            if( m_numActiveWriters > 0 )
+                return true;
+
+            switch( m_policy )
+            {
+            case Policy.PREFER_WRITERS:
+                 return m_numQueuedWriters > 0;
+
+            case Policy.PREFER_READERS:
+            default:
+                 break;
+            }
+
+        return false;
+        }
+
+        struct MonitorProxy
+        {
+            Object.Monitor link;
+        }
+
+        MonitorProxy    m_proxy;
+    }
+
+
+    ////////////////////////////////////////////////////////////////////////////
+    // Writer
+    ////////////////////////////////////////////////////////////////////////////
+
+
+    /**
+     * This class can be considered a mutex in its own right, and is used to
+     * negotiate a write lock for the enclosing mutex.
+     */
+    class Writer :
+        Object.Monitor
+    {
+        /**
+         * Initializes a read/write mutex writer proxy object.
+         */
+        this()
+        {
+            m_proxy.link = this;
+            (cast(void**) this)[1] = &m_proxy;
+        }
+
+
+        /**
+         * Acquires a write lock on the enclosing mutex.
+         */
+        void lock()
+        {
+            synchronized( m_commonMutex )
+            {
+                ++m_numQueuedWriters;
+                scope(exit) --m_numQueuedWriters;
+
+                while( shouldQueueWriter() )
+                    m_writerQueue.wait();
+                ++m_numActiveWriters;
+            }
+        }
+
+
+        /**
+         * Releases a write lock on the enclosing mutex.
+         */
+        void unlock()
+        {
+            synchronized( m_commonMutex )
+            {
+                if( --m_numActiveWriters < 1 )
+                {
+                    switch( m_policy )
+                    {
+                    default:
+                    case Policy.PREFER_READERS:
+                        if( m_numQueuedReaders > 0 )
+                            m_readerQueue.notifyAll();
+                        else if( m_numQueuedWriters > 0 )
+                            m_writerQueue.notify();
+                        break;
+                    case Policy.PREFER_WRITERS:
+                        if( m_numQueuedWriters > 0 )
+                            m_writerQueue.notify();
+                        else if( m_numQueuedReaders > 0 )
+                            m_readerQueue.notifyAll();
+                    }
+                }
+            }
+        }
+
+
+        /**
+         * Attempts to acquire a write lock on the enclosing mutex.  If one can
+         * be obtained without blocking, the lock is acquired and true is
+         * returned.  If not, the lock is not acquired and false is returned.
+         *
+         * Returns:
+         *  true if the lock was acquired and false if not.
+         */
+        bool tryLock()
+        {
+            synchronized( m_commonMutex )
+            {
+                if( shouldQueueWriter() )
+                    return false;
+                ++m_numActiveWriters;
+                return true;
+            }
+        }
+
+
+    private:
+        bool shouldQueueWriter()
+        {
+            if( m_numActiveWriters > 0 ||
+                m_numActiveReaders > 0 )
+                return true;
+            switch( m_policy )
+            {
+            case Policy.PREFER_READERS:
+                return m_numQueuedReaders > 0;
+
+            case Policy.PREFER_WRITERS:
+            default:
+                 break;
+            }
+
+        return false;
+        }
+
+        struct MonitorProxy
+        {
+            Object.Monitor link;
+        }
+
+        MonitorProxy    m_proxy;
+    }
+
+
+private:
+    Policy      m_policy;
+    Reader      m_reader;
+    Writer      m_writer;
+
+    Mutex       m_commonMutex;
+    Condition   m_readerQueue;
+    Condition   m_writerQueue;
+
+    int         m_numQueuedReaders;
+    int         m_numActiveReaders;
+    int         m_numQueuedWriters;
+    int         m_numActiveWriters;
+}
+
+
+////////////////////////////////////////////////////////////////////////////////
+// Unit Tests
+////////////////////////////////////////////////////////////////////////////////
+
+
+version( unittest )
+{
+    static if( !is( typeof( Thread ) ) )
+        private import core.thread;
+
+
+    void testRead( ReadWriteMutex.Policy policy )
+    {
+        auto mutex      = new ReadWriteMutex( policy );
+        auto synInfo    = new Object;
+        int  numThreads = 10;
+        int  numReaders = 0;
+        int  maxReaders = 0;
+
+        void readerFn()
+        {
+            synchronized( mutex.reader() )
+            {
+                synchronized( synInfo )
+                {
+                    if( ++numReaders > maxReaders )
+                        maxReaders = numReaders;
+                }
+                Thread.sleep( 100_000 ); // 1ms
+                synchronized( synInfo )
+                {
+                    --numReaders;
+                }
+            }
+        }
+
+        auto group = new ThreadGroup;
+
+        for( int i = 0; i < numThreads; ++i )
+        {
+            group.create( &readerFn );
+        }
+        group.joinAll();
+        assert( numReaders < 1 && maxReaders > 1 );
+    }
+
+
+    void testReadWrite( ReadWriteMutex.Policy policy )
+    {
+        auto mutex      = new ReadWriteMutex( policy );
+        auto synInfo    = new Object;
+        int  numThreads = 10;
+        int  numReaders = 0;
+        int  numWriters = 0;
+        int  maxReaders = 0;
+        int  maxWriters = 0;
+        int  numTries   = 20;
+
+        void readerFn()
+        {
+            for( int i = 0; i < numTries; ++i )
+            {
+                synchronized( mutex.reader() )
+                {
+                    synchronized( synInfo )
+                    {
+                        if( ++numReaders > maxReaders )
+                            maxReaders = numReaders;
+                    }
+                    Thread.sleep( 100_000 ); // 1ms
+                    synchronized( synInfo )
+                    {
+                        --numReaders;
+                    }
+                }
+            }
+        }
+
+        void writerFn()
+        {
+            for( int i = 0; i < numTries; ++i )
+            {
+                synchronized( mutex.writer() )
+                {
+                    synchronized( synInfo )
+                    {
+                        if( ++numWriters > maxWriters )
+                            maxWriters = numWriters;
+                    }
+                    Thread.sleep( 100_000 ); // 1ms
+                    synchronized( synInfo )
+                    {
+                        --numWriters;
+                    }
+                }
+            }
+        }
+
+        auto group = new ThreadGroup;
+
+        for( int i = 0; i < numThreads; ++i )
+        {
+            group.create( &readerFn );
+            group.create( &writerFn );
+        }
+        group.joinAll();
+        assert( numReaders < 1 && maxReaders > 1 &&
+                numWriters < 1 && maxWriters < 2 );
+    }
+
+
+    unittest
+    {
+        testRead( ReadWriteMutex.Policy.PREFER_READERS );
+        testRead( ReadWriteMutex.Policy.PREFER_WRITERS );
+        testReadWrite( ReadWriteMutex.Policy.PREFER_READERS );
+        testReadWrite( ReadWriteMutex.Policy.PREFER_WRITERS );
+    }
+}