Mercurial > projects > ldc
view druntime/src/common/core/sync/rwmutex.d @ 1458:e0b2d67cfe7c
Added druntime (this should be removed once it works).
author | Robert Clipsham <robert@octarineparrot.com> |
---|---|
date | Tue, 02 Jun 2009 17:43:06 +0100 |
parents | |
children |
line wrap: on
line source
/** * The read/write mutex module provides a primitive for maintaining shared read * access and mutually exclusive write access. * * Copyright: Copyright Sean Kelly 2005 - 2009. * License: <a href="http://www.boost.org/LICENSE_1_0.txt>Boost License 1.0</a>. * Authors: Sean Kelly * * Copyright Sean Kelly 2005 - 2009. * Distributed under the Boost Software License, Version 1.0. * (See accompanying file LICENSE_1_0.txt or copy at * http://www.boost.org/LICENSE_1_0.txt) */ module core.sync.rwmutex; public import core.sync.exception; private import core.sync.condition; private import core.sync.mutex; version( Win32 ) { private import core.sys.windows.windows; } else version( Posix ) { private import core.sys.posix.pthread; } //////////////////////////////////////////////////////////////////////////////// // ReadWriteMutex // // Reader reader(); // Writer writer(); //////////////////////////////////////////////////////////////////////////////// /** * This class represents a mutex that allows any number of readers to enter, * but when a writer enters, all other readers and writers are blocked. * * Please note that this mutex is not recursive and is intended to guard access * to data only. Also, no deadlock checking is in place because doing so would * require dynamic memory allocation, which would reduce performance by an * unacceptable amount. As a result, any attempt to recursively acquire this * mutex may well deadlock the caller, particularly if a write lock is acquired * while holding a read lock, or vice-versa. In practice, this should not be * an issue however, because it is uncommon to call deeply into unknown code * while holding a lock that simply protects data. */ class ReadWriteMutex { /** * Defines the policy used by this mutex. Currently, two policies are * defined. * * The first will queue writers until no readers hold the mutex, then * pass the writers through one at a time. If a reader acquires the mutex * while there are still writers queued, the reader will take precedence. * * The second will queue readers if there are any writers queued. Writers * are passed through one at a time, and once there are no writers present, * all queued readers will be alerted. * * Future policies may offer a more even balance between reader and writer * precedence. */ enum Policy { PREFER_READERS, /// Readers get preference. This may starve writers. PREFER_WRITERS /// Writers get preference. This may starve readers. } //////////////////////////////////////////////////////////////////////////// // Initialization //////////////////////////////////////////////////////////////////////////// /** * Initializes a read/write mutex object with the supplied policy. * * Params: * policy = The policy to use. * * Throws: * SyncException on error. */ this( Policy policy = Policy.PREFER_WRITERS ) { m_commonMutex = new Mutex; if( !m_commonMutex ) throw new SyncException( "Unable to initialize mutex" ); scope(failure) delete m_commonMutex; m_readerQueue = new Condition( m_commonMutex ); if( !m_readerQueue ) throw new SyncException( "Unable to initialize mutex" ); scope(failure) delete m_readerQueue; m_writerQueue = new Condition( m_commonMutex ); if( !m_writerQueue ) throw new SyncException( "Unable to initialize mutex" ); scope(failure) delete m_writerQueue; m_policy = policy; m_reader = new Reader; m_writer = new Writer; } //////////////////////////////////////////////////////////////////////////// // General Properties //////////////////////////////////////////////////////////////////////////// /** * Gets the policy for the associated mutex. * * Returns: * The policy used by this mutex. */ Policy policy() { return m_policy; } //////////////////////////////////////////////////////////////////////////// // Reader/Writer Handles //////////////////////////////////////////////////////////////////////////// /** * Gets an object representing the reader lock for the associated mutex. * * Returns: * A reader sub-mutex. */ Reader reader() { return m_reader; } /** * Gets an object representing the writer lock for the associated mutex. * * Returns: * A writer sub-mutex. */ Writer writer() { return m_writer; } //////////////////////////////////////////////////////////////////////////// // Reader //////////////////////////////////////////////////////////////////////////// /** * This class can be considered a mutex in its own right, and is used to * negotiate a read lock for the enclosing mutex. */ class Reader : Object.Monitor { /** * Initializes a read/write mutex reader proxy object. */ this() { m_proxy.link = this; (cast(void**) this)[1] = &m_proxy; } /** * Acquires a read lock on the enclosing mutex. */ void lock() { synchronized( m_commonMutex ) { ++m_numQueuedReaders; scope(exit) --m_numQueuedReaders; while( shouldQueueReader() ) m_readerQueue.wait(); ++m_numActiveReaders; } } /** * Releases a read lock on the enclosing mutex. */ void unlock() { synchronized( m_commonMutex ) { if( --m_numActiveReaders < 1 ) { if( m_numQueuedWriters > 0 ) m_writerQueue.notify(); } } } /** * Attempts to acquire a read lock on the enclosing mutex. If one can * be obtained without blocking, the lock is acquired and true is * returned. If not, the lock is not acquired and false is returned. * * Returns: * true if the lock was acquired and false if not. */ bool tryLock() { synchronized( m_commonMutex ) { if( shouldQueueReader() ) return false; ++m_numActiveReaders; return true; } } private: bool shouldQueueReader() { if( m_numActiveWriters > 0 ) return true; switch( m_policy ) { case Policy.PREFER_WRITERS: return m_numQueuedWriters > 0; case Policy.PREFER_READERS: default: break; } return false; } struct MonitorProxy { Object.Monitor link; } MonitorProxy m_proxy; } //////////////////////////////////////////////////////////////////////////// // Writer //////////////////////////////////////////////////////////////////////////// /** * This class can be considered a mutex in its own right, and is used to * negotiate a write lock for the enclosing mutex. */ class Writer : Object.Monitor { /** * Initializes a read/write mutex writer proxy object. */ this() { m_proxy.link = this; (cast(void**) this)[1] = &m_proxy; } /** * Acquires a write lock on the enclosing mutex. */ void lock() { synchronized( m_commonMutex ) { ++m_numQueuedWriters; scope(exit) --m_numQueuedWriters; while( shouldQueueWriter() ) m_writerQueue.wait(); ++m_numActiveWriters; } } /** * Releases a write lock on the enclosing mutex. */ void unlock() { synchronized( m_commonMutex ) { if( --m_numActiveWriters < 1 ) { switch( m_policy ) { default: case Policy.PREFER_READERS: if( m_numQueuedReaders > 0 ) m_readerQueue.notifyAll(); else if( m_numQueuedWriters > 0 ) m_writerQueue.notify(); break; case Policy.PREFER_WRITERS: if( m_numQueuedWriters > 0 ) m_writerQueue.notify(); else if( m_numQueuedReaders > 0 ) m_readerQueue.notifyAll(); } } } } /** * Attempts to acquire a write lock on the enclosing mutex. If one can * be obtained without blocking, the lock is acquired and true is * returned. If not, the lock is not acquired and false is returned. * * Returns: * true if the lock was acquired and false if not. */ bool tryLock() { synchronized( m_commonMutex ) { if( shouldQueueWriter() ) return false; ++m_numActiveWriters; return true; } } private: bool shouldQueueWriter() { if( m_numActiveWriters > 0 || m_numActiveReaders > 0 ) return true; switch( m_policy ) { case Policy.PREFER_READERS: return m_numQueuedReaders > 0; case Policy.PREFER_WRITERS: default: break; } return false; } struct MonitorProxy { Object.Monitor link; } MonitorProxy m_proxy; } private: Policy m_policy; Reader m_reader; Writer m_writer; Mutex m_commonMutex; Condition m_readerQueue; Condition m_writerQueue; int m_numQueuedReaders; int m_numActiveReaders; int m_numQueuedWriters; int m_numActiveWriters; } //////////////////////////////////////////////////////////////////////////////// // Unit Tests //////////////////////////////////////////////////////////////////////////////// version( unittest ) { static if( !is( typeof( Thread ) ) ) private import core.thread; void testRead( ReadWriteMutex.Policy policy ) { auto mutex = new ReadWriteMutex( policy ); auto synInfo = new Object; int numThreads = 10; int numReaders = 0; int maxReaders = 0; void readerFn() { synchronized( mutex.reader() ) { synchronized( synInfo ) { if( ++numReaders > maxReaders ) maxReaders = numReaders; } Thread.sleep( 100_000 ); // 1ms synchronized( synInfo ) { --numReaders; } } } auto group = new ThreadGroup; for( int i = 0; i < numThreads; ++i ) { group.create( &readerFn ); } group.joinAll(); assert( numReaders < 1 && maxReaders > 1 ); } void testReadWrite( ReadWriteMutex.Policy policy ) { auto mutex = new ReadWriteMutex( policy ); auto synInfo = new Object; int numThreads = 10; int numReaders = 0; int numWriters = 0; int maxReaders = 0; int maxWriters = 0; int numTries = 20; void readerFn() { for( int i = 0; i < numTries; ++i ) { synchronized( mutex.reader() ) { synchronized( synInfo ) { if( ++numReaders > maxReaders ) maxReaders = numReaders; } Thread.sleep( 100_000 ); // 1ms synchronized( synInfo ) { --numReaders; } } } } void writerFn() { for( int i = 0; i < numTries; ++i ) { synchronized( mutex.writer() ) { synchronized( synInfo ) { if( ++numWriters > maxWriters ) maxWriters = numWriters; } Thread.sleep( 100_000 ); // 1ms synchronized( synInfo ) { --numWriters; } } } } auto group = new ThreadGroup; for( int i = 0; i < numThreads; ++i ) { group.create( &readerFn ); group.create( &writerFn ); } group.joinAll(); assert( numReaders < 1 && maxReaders > 1 && numWriters < 1 && maxWriters < 2 ); } unittest { testRead( ReadWriteMutex.Policy.PREFER_READERS ); testRead( ReadWriteMutex.Policy.PREFER_WRITERS ); testReadWrite( ReadWriteMutex.Policy.PREFER_READERS ); testReadWrite( ReadWriteMutex.Policy.PREFER_WRITERS ); } }