Mercurial > projects > ldc
diff tango/tango/core/sync/Condition.d @ 132:1700239cab2e trunk
[svn r136] MAJOR UNSTABLE UPDATE!!!
Initial commit after moving to Tango instead of Phobos.
Lots of bugfixes...
This build is not suitable for most things.
author | lindquist |
---|---|
date | Fri, 11 Jan 2008 17:57:40 +0100 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tango/tango/core/sync/Condition.d Fri Jan 11 17:57:40 2008 +0100 @@ -0,0 +1,560 @@ +/** + * The condition module provides a primitive for synchronized condition + * checking. + * + * Copyright: Copyright (C) 2005-2006 Sean Kelly. All rights reserved. + * License: BSD style: $(LICENSE) + * Authors: Sean Kelly + */ +module tango.core.sync.Condition; + + +public import tango.core.Exception : SyncException; +public import tango.core.sync.Mutex; + +version( Win32 ) +{ + private import tango.core.sync.Semaphore; + private import tango.sys.win32.UserGdi; +} +else version( Posix ) +{ + private import tango.core.sync.Config; + private import tango.stdc.errno; + private import tango.stdc.posix.pthread; + private import tango.stdc.posix.time; +} + + +//////////////////////////////////////////////////////////////////////////////// +// Condition +// +// void wait(); +// void notify(); +// void notifyAll(); +//////////////////////////////////////////////////////////////////////////////// + + +/** + * This class represents a condition variable as concieved by C.A.R. Hoare. As + * per Mesa type monitors however, "signal" has been replaced with "notify" to + * indicate that control is not transferred to the waiter when a notification + * is sent. + */ +class Condition +{ + //////////////////////////////////////////////////////////////////////////// + // Initialization + //////////////////////////////////////////////////////////////////////////// + + + /** + * Initializes a condition object which is associated with the supplied + * mutex object. + * + * Params: + * m = The mutex with which this condition will be associated. + * + * Throws: + * SyncException on error. + */ + this( Mutex m ) + { + version( Win32 ) + { + m_blockLock = CreateSemaphoreA( null, 1, 1, null ); + if( m_blockLock == m_blockLock.init ) + throw new SyncException( "Unable to initialize condition" ); + scope(failure) CloseHandle( m_blockLock ); + + m_blockQueue = CreateSemaphoreA( null, 0, int.max, null ); + if( m_blockQueue == m_blockQueue.init ) + throw new SyncException( "Unable to initialize condition" ); + scope(failure) CloseHandle( m_blockQueue ); + + InitializeCriticalSection( &m_unblockLock ); + m_assocMutex = m; + } + else version( Posix ) + { + m_mutexAddr = m.handleAddr(); + + int rc = pthread_cond_init( &m_hndl, null ); + if( rc ) + throw new SyncException( "Unable to initialize condition" ); + } + } + + + ~this() + { + version( Win32 ) + { + BOOL rc = CloseHandle( m_blockLock ); + assert( rc, "Unable to destroy condition" ); + rc = CloseHandle( m_blockQueue ); + assert( rc, "Unable to destroy condition" ); + DeleteCriticalSection( &m_unblockLock ); + } + else version( Posix ) + { + int rc = pthread_cond_destroy( &m_hndl ); + assert( !rc, "Unable to destroy condition" ); + } + } + + + //////////////////////////////////////////////////////////////////////////// + // General Actions + //////////////////////////////////////////////////////////////////////////// + + + /** + * Wait until notified. + * + * Throws: + * SyncException on error. + */ + void wait() + { + version( Win32 ) + { + timedWait( INFINITE ); + } + else version( Posix ) + { + int rc = pthread_cond_wait( &m_hndl, m_mutexAddr ); + if( rc ) + throw new SyncException( "Unable to wait for condition" ); + } + } + + + /** + * Suspends the calling thread until a notification occurs or until the + * supplied time period has elapsed. The supplied period may be up to a + * maximum of (uint.max - 1) milliseconds. + * + * Params: + * period = The time to wait, in seconds (fractional values are accepted). + * + * In: + * period must be less than (uint.max - 1) milliseconds. + * + * Returns: + * true if notified before the timeout and false if not. + * + * Throws: + * SyncException on error. + */ + bool wait( double period ) + in + { + // NOTE: The fractional value added to period is to correct fp error. + assert( period * 1000 + 0.1 < uint.max - 1 ); + } + body + { + version( Win32 ) + { + return timedWait( cast(uint)(period * 1000 + 0.1) ); + } + else version( Posix ) + { + timespec t; + + getTimespec( t ); + adjTimespec( t, period ); + int rc = pthread_cond_timedwait( &m_hndl, m_mutexAddr, &t ); + if( !rc ) + return true; + if( rc == ETIMEDOUT ) + return false; + throw new SyncException( "Unable to wait for condition" ); + } + } + + /** + * Notifies one waiter. + * + * Throws: + * SyncException on error. + */ + void notify() + { + version( Win32 ) + { + notify( false ); + } + else version( Posix ) + { + int rc = pthread_cond_signal( &m_hndl ); + if( rc ) + throw new SyncException( "Unable to notify condition" ); + } + } + + + /** + * Notifies all waiters. + * + * Throws: + * SyncException on error. + */ + void notifyAll() + { + version( Win32 ) + { + notify( true ); + } + else version( Posix ) + { + int rc = pthread_cond_broadcast( &m_hndl ); + if( rc ) + throw new SyncException( "Unable to notify condition" ); + } + } + + +private: + version( Win32 ) + { + bool timedWait( DWORD timeout ) + { + int numSignalsLeft; + int numWaitersGone; + DWORD rc; + + rc = WaitForSingleObject( m_blockLock, INFINITE ); + assert( rc == WAIT_OBJECT_0 ); + + m_numWaitersBlocked++; + + rc = ReleaseSemaphore( m_blockLock, 1, null ); + assert( rc ); + + m_assocMutex.unlock(); + scope(failure) m_assocMutex.lock(); + + rc = WaitForSingleObject( m_blockQueue, timeout ); + assert( rc == WAIT_OBJECT_0 || rc == WAIT_TIMEOUT ); + bool timedOut = (rc == WAIT_TIMEOUT); + + EnterCriticalSection( &m_unblockLock ); + scope(failure) LeaveCriticalSection( &m_unblockLock ); + + if( (numSignalsLeft = m_numWaitersToUnblock) != 0 ) + { + if ( timedOut ) + { + // timeout (or canceled) + if( m_numWaitersBlocked != 0 ) + { + m_numWaitersBlocked--; + // do not unblock next waiter below (already unblocked) + numSignalsLeft = 0; + } + else + { + // spurious wakeup pending!! + m_numWaitersGone = 1; + } + } + if( --m_numWaitersToUnblock == 0 ) + { + if( m_numWaitersBlocked != 0 ) + { + // open the gate + rc = ReleaseSemaphore( m_blockLock, 1, null ); + assert( rc ); + // do not open the gate below again + numSignalsLeft = 0; + } + else if( (numWaitersGone = m_numWaitersGone) != 0 ) + { + m_numWaitersGone = 0; + } + } + } + else if( ++m_numWaitersGone == int.max / 2 ) + { + // timeout/canceled or spurious event :-) + rc = WaitForSingleObject( m_blockLock, INFINITE ); + assert( rc == WAIT_OBJECT_0 ); + // something is going on here - test of timeouts? + m_numWaitersBlocked -= m_numWaitersGone; + rc = ReleaseSemaphore( m_blockLock, 1, null ); + assert( rc == WAIT_OBJECT_0 ); + m_numWaitersGone = 0; + } + + LeaveCriticalSection( &m_unblockLock ); + + if( numSignalsLeft == 1 ) + { + // better now than spurious later (same as ResetEvent) + for( ; numWaitersGone > 0; --numWaitersGone ) + { + rc = WaitForSingleObject( m_blockQueue, INFINITE ); + assert( rc == WAIT_OBJECT_0 ); + } + // open the gate + rc = ReleaseSemaphore( m_blockLock, 1, null ); + assert( rc ); + } + else if( numSignalsLeft != 0 ) + { + // unblock next waiter + rc = ReleaseSemaphore( m_blockQueue, 1, null ); + assert( rc ); + } + m_assocMutex.lock(); + return !timedOut; + } + + + void notify( bool all ) + { + DWORD rc; + + EnterCriticalSection( &m_unblockLock ); + scope(failure) LeaveCriticalSection( &m_unblockLock ); + + if( m_numWaitersToUnblock != 0 ) + { + if( m_numWaitersBlocked == 0 ) + { + LeaveCriticalSection( &m_unblockLock ); + return; + } + if( all ) + { + m_numWaitersToUnblock += m_numWaitersBlocked; + m_numWaitersBlocked = 0; + } + else + { + m_numWaitersToUnblock++; + m_numWaitersBlocked--; + } + LeaveCriticalSection( &m_unblockLock ); + } + else if( m_numWaitersBlocked > m_numWaitersGone ) + { + rc = WaitForSingleObject( m_blockLock, INFINITE ); + assert( rc == WAIT_OBJECT_0 ); + if( 0 != m_numWaitersGone ) + { + m_numWaitersBlocked -= m_numWaitersGone; + m_numWaitersGone = 0; + } + if( all ) + { + m_numWaitersToUnblock = m_numWaitersBlocked; + m_numWaitersBlocked = 0; + } + else + { + m_numWaitersToUnblock = 1; + m_numWaitersBlocked--; + } + LeaveCriticalSection( &m_unblockLock ); + rc = ReleaseSemaphore( m_blockQueue, 1, null ); + assert( rc ); + } + else + { + LeaveCriticalSection( &m_unblockLock ); + } + } + + + // NOTE: This implementation uses Algorithm 8c as described here: + // http://groups.google.com/group/comp.programming.threads/ + // browse_frm/thread/1692bdec8040ba40/e7a5f9d40e86503a + HANDLE m_blockLock; // auto-reset event (now semaphore) + HANDLE m_blockQueue; // auto-reset event (now semaphore) + Mutex m_assocMutex; // external mutex/CS + CRITICAL_SECTION m_unblockLock; // internal mutex/CS + int m_numWaitersGone = 0; + int m_numWaitersBlocked = 0; + int m_numWaitersToUnblock = 0; + } + else version( Posix ) + { + pthread_cond_t m_hndl; + pthread_mutex_t* m_mutexAddr; + } +} + + +//////////////////////////////////////////////////////////////////////////////// +// Unit Tests +//////////////////////////////////////////////////////////////////////////////// + + +debug( UnitTest ) +{ + private import tango.core.Thread; + private import tango.core.sync.Mutex; + private import tango.core.sync.Semaphore; + + + void testNotify() + { + auto mutex = new Mutex; + auto condReady = new Condition( mutex ); + auto semDone = new Semaphore; + auto synLoop = new Object; + int numWaiters = 10; + int numTries = 10; + int numReady = 0; + int numTotal = 0; + int numDone = 0; + int numPost = 0; + + void waiter() + { + for( int i = 0; i < numTries; ++i ) + { + synchronized( mutex ) + { + while( numReady < 1 ) + { + condReady.wait(); + } + --numReady; + ++numTotal; + } + + synchronized( synLoop ) + { + ++numDone; + } + semDone.wait(); + } + } + + auto group = new ThreadGroup; + + for( int i = 0; i < numWaiters; ++i ) + group.create( &waiter ); + + for( int i = 0; i < numTries; ++i ) + { + for( int j = 0; j < numWaiters; ++j ) + { + synchronized( mutex ) + { + ++numReady; + condReady.notify(); + } + } + while( true ) + { + synchronized( synLoop ) + { + if( numDone >= numWaiters ) + break; + } + Thread.yield(); + } + for( int j = 0; j < numWaiters; ++j ) + { + semDone.notify(); + } + } + + group.joinAll(); + assert( numTotal == numWaiters * numTries ); + } + + + void testNotifyAll() + { + auto mutex = new Mutex; + auto condReady = new Condition( mutex ); + int numWaiters = 10; + int numReady = 0; + int numDone = 0; + bool alert = false; + + void waiter() + { + synchronized( mutex ) + { + ++numReady; + while( !alert ) + condReady.wait(); + ++numDone; + } + } + + auto group = new ThreadGroup; + + for( int i = 0; i < numWaiters; ++i ) + group.create( &waiter ); + + while( true ) + { + synchronized( mutex ) + { + if( numReady >= numWaiters ) + { + alert = true; + condReady.notifyAll(); + break; + } + } + Thread.yield(); + } + group.joinAll(); + assert( numReady == numWaiters && numDone == numWaiters ); + } + + + void testWaitTimeout() + { + auto mutex = new Mutex; + auto condReady = new Condition( mutex ); + bool waiting = false; + bool alertedOne = true; + bool alertedTwo = true; + + void waiter() + { + synchronized( mutex ) + { + waiting = true; + alertedOne = condReady.wait( 1 ); + alertedTwo = condReady.wait( 1 ); + } + } + + auto thread = new Thread( &waiter ); + thread.start(); + + while( true ) + { + synchronized( mutex ) + { + if( waiting ) + { + condReady.notify(); + break; + } + } + Thread.yield(); + } + thread.join(); + assert( waiting && alertedOne && !alertedTwo ); + } + + + unittest + { + testNotify(); + testNotifyAll(); + testWaitTimeout(); + } +}