Mercurial > projects > ldc
diff druntime/src/common/core/sync/semaphore.d @ 1458:e0b2d67cfe7c
Added druntime (this should be removed once it works).
author | Robert Clipsham <robert@octarineparrot.com> |
---|---|
date | Tue, 02 Jun 2009 17:43:06 +0100 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/druntime/src/common/core/sync/semaphore.d Tue Jun 02 17:43:06 2009 +0100 @@ -0,0 +1,506 @@ +/** + * The semaphore module provides a general use semaphore for synchronization. + * + * Copyright: Copyright Sean Kelly 2005 - 2009. + * License: <a href="http://www.boost.org/LICENSE_1_0.txt>Boost License 1.0</a>. + * Authors: Sean Kelly + * + * Copyright Sean Kelly 2005 - 2009. + * Distributed under the Boost Software License, Version 1.0. + * (See accompanying file LICENSE_1_0.txt or copy at + * http://www.boost.org/LICENSE_1_0.txt) + */ +module core.sync.semaphore; + + +public import core.sync.exception; + +version( Win32 ) +{ + private import core.sys.windows.windows; +} +else version( OSX ) +{ + private import core.sync.config; + private import core.stdc.errno; + private import core.sys.posix.time; + private import core.sys.osx.mach.semaphore; +} +else version( Posix ) +{ + private import core.sync.config; + private import core.stdc.errno; + private import core.sys.posix.pthread; + private import core.sys.posix.semaphore; +} + + +//////////////////////////////////////////////////////////////////////////////// +// Semaphore +// +// void wait(); +// void notify(); +// bool tryWait(); +//////////////////////////////////////////////////////////////////////////////// + + +/** + * This class represents a general counting semaphore as concieved by Edsger + * Dijkstra. As per Mesa type monitors however, "signal" has been replaced + * with "notify" to indicate that control is not transferred to the waiter when + * a notification is sent. + */ +class Semaphore +{ + //////////////////////////////////////////////////////////////////////////// + // Initialization + //////////////////////////////////////////////////////////////////////////// + + + /** + * Initializes a semaphore object with the specified initial count. + * + * Params: + * count = The initial count for the semaphore. + * + * Throws: + * SyncException on error. + */ + this( uint count = 0 ) + { + version( Win32 ) + { + m_hndl = CreateSemaphoreA( null, count, int.max, null ); + if( m_hndl == m_hndl.init ) + throw new SyncException( "Unable to create semaphore" ); + } + else version( OSX ) + { + auto rc = semaphore_create( mach_task_self(), &m_hndl, SYNC_POLICY_FIFO, count ); + if( rc ) + throw new SyncException( "Unable to create semaphore" ); + } + else version( Posix ) + { + int rc = sem_init( &m_hndl, 0, count ); + if( rc ) + throw new SyncException( "Unable to create semaphore" ); + } + } + + + ~this() + { + version( Win32 ) + { + BOOL rc = CloseHandle( m_hndl ); + assert( rc, "Unable to destroy semaphore" ); + } + else version( OSX ) + { + auto rc = semaphore_destroy( mach_task_self(), m_hndl ); + assert( !rc, "Unable to destroy semaphore" ); + } + else version( Posix ) + { + int rc = sem_destroy( &m_hndl ); + assert( !rc, "Unable to destroy semaphore" ); + } + } + + + //////////////////////////////////////////////////////////////////////////// + // General Actions + //////////////////////////////////////////////////////////////////////////// + + + /** + * Wait until the current count is above zero, then atomically decrement + * the count by one and return. + * + * Throws: + * SyncException on error. + */ + void wait() + { + version( Win32 ) + { + DWORD rc = WaitForSingleObject( m_hndl, INFINITE ); + if( rc != WAIT_OBJECT_0 ) + throw new SyncException( "Unable to wait for semaphore" ); + } + else version( OSX ) + { + while( true ) + { + auto rc = semaphore_wait( m_hndl ); + if( !rc ) + return; + if( rc == KERN_ABORTED && errno == EINTR ) + continue; + throw new SyncException( "Unable to wait for semaphore" ); + } + } + else version( Posix ) + { + while( true ) + { + if( !sem_wait( &m_hndl ) ) + return; + if( errno != EINTR ) + throw new SyncException( "Unable to wait for semaphore" ); + } + } + } + + + /** + * Suspends the calling thread until the current count moves above zero or + * until the supplied time period has elapsed. If the count moves above + * zero in this interval, then atomically decrement the count by one and + * return true. Otherwise, return false. + * + * + * Params: + * period = The time to wait, in 100 nanosecond intervals. This value may + * be adjusted to equal to the maximum wait period supported by + * the target platform if it is too large. + * + * In: + * period must be non-negative. + * + * Throws: + * SyncException on error. + * + * Returns: + * true if notified before the timeout and false if not. + */ + bool wait( long period ) + in + { + assert( period >= 0 ); + } + body + { + version( Win32 ) + { + enum : uint + { + TICKS_PER_MILLI = 10_000, + MAX_WAIT_MILLIS = uint.max - 1 + } + + period /= TICKS_PER_MILLI; + if( period > MAX_WAIT_MILLIS ) + period = MAX_WAIT_MILLIS; + switch( WaitForSingleObject( m_hndl, cast(uint) period ) ) + { + case WAIT_OBJECT_0: + return true; + case WAIT_TIMEOUT: + return false; + default: + throw new SyncException( "Unable to wait for semaphore" ); + } + } + else version( OSX ) + { + mach_timespec_t t = void; + (cast(byte*) &t)[0 .. t.sizeof] = 0; + + if( period != 0 ) + { + enum : uint + { + NANOS_PER_TICK = 100, + TICKS_PER_SECOND = 10_000_000, + NANOS_PER_SECOND = NANOS_PER_TICK * TICKS_PER_SECOND, + } + + if( t.tv_sec.max - t.tv_sec < period / TICKS_PER_SECOND ) + { + t.tv_sec = t.tv_sec.max; + t.tv_nsec = 0; + } + else + { + t.tv_sec += cast(typeof(t.tv_sec)) (period / TICKS_PER_SECOND); + long ns = (period % TICKS_PER_SECOND) * NANOS_PER_TICK; + if( NANOS_PER_SECOND - t.tv_nsec > ns ) + t.tv_nsec = cast(typeof(t.tv_nsec)) ns; + else + { + t.tv_sec += 1; + t.tv_nsec += ns - NANOS_PER_SECOND; + } + } + } + while( true ) + { + auto rc = semaphore_timedwait( m_hndl, t ); + if( !rc ) + return true; + if( rc == KERN_OPERATION_TIMED_OUT ) + return false; + if( rc != KERN_ABORTED || errno != EINTR ) + throw new SyncException( "Unable to wait for semaphore" ); + } + // -w trip + return false; + } + else version( Posix ) + { + timespec t = void; + mktspec( t, period ); + + while( true ) + { + if( !sem_timedwait( &m_hndl, &t ) ) + return true; + if( errno == ETIMEDOUT ) + return false; + if( errno != EINTR ) + throw new SyncException( "Unable to wait for semaphore" ); + } + // -w trip + return false; + } + } + + + /** + * Atomically increment the current count by one. This will notify one + * waiter, if there are any in the queue. + * + * Throws: + * SyncException on error. + */ + void notify() + { + version( Win32 ) + { + if( !ReleaseSemaphore( m_hndl, 1, null ) ) + throw new SyncException( "Unable to notify semaphore" ); + } + else version( OSX ) + { + auto rc = semaphore_signal( m_hndl ); + if( rc ) + throw new SyncException( "Unable to notify semaphore" ); + } + else version( Posix ) + { + int rc = sem_post( &m_hndl ); + if( rc ) + throw new SyncException( "Unable to notify semaphore" ); + } + } + + + /** + * If the current count is equal to zero, return. Otherwise, atomically + * decrement the count by one and return true. + * + * Throws: + * SyncException on error. + * + * Returns: + * true if the count was above zero and false if not. + */ + bool tryWait() + { + version( Win32 ) + { + switch( WaitForSingleObject( m_hndl, 0 ) ) + { + case WAIT_OBJECT_0: + return true; + case WAIT_TIMEOUT: + return false; + default: + throw new SyncException( "Unable to wait for semaphore" ); + } + } + else version( OSX ) + { + return wait( 0 ); + } + else version( Posix ) + { + while( true ) + { + if( !sem_trywait( &m_hndl ) ) + return true; + if( errno == EAGAIN ) + return false; + if( errno != EINTR ) + throw new SyncException( "Unable to wait for semaphore" ); + } + // -w trip + return false; + } + } + + +private: + version( Win32 ) + { + HANDLE m_hndl; + } + else version( OSX ) + { + semaphore_t m_hndl; + } + else version( Posix ) + { + sem_t m_hndl; + } +} + + +//////////////////////////////////////////////////////////////////////////////// +// Unit Tests +//////////////////////////////////////////////////////////////////////////////// + + +version( unittest ) +{ + private import core.thread; + + + void testWait() + { + auto semaphore = new Semaphore; + int numToProduce = 10; + bool allProduced = false; + auto synProduced = new Object; + int numConsumed = 0; + auto synConsumed = new Object; + int numConsumers = 10; + int numComplete = 0; + auto synComplete = new Object; + + void consumer() + { + while( true ) + { + semaphore.wait(); + + synchronized( synProduced ) + { + if( allProduced ) + break; + } + + synchronized( synConsumed ) + { + ++numConsumed; + } + } + + synchronized( synComplete ) + { + ++numComplete; + } + } + + void producer() + { + assert( !semaphore.tryWait() ); + + for( int i = 0; i < numToProduce; ++i ) + { + semaphore.notify(); + Thread.yield(); + } + Thread.sleep( 10_000_000 ); // 1s + synchronized( synProduced ) + { + allProduced = true; + } + + for( int i = 0; i < numConsumers; ++i ) + { + semaphore.notify(); + Thread.yield(); + } + + for( int i = numConsumers * 10000; i > 0; --i ) + { + synchronized( synComplete ) + { + if( numComplete == numConsumers ) + break; + } + Thread.yield(); + } + + synchronized( synComplete ) + { + assert( numComplete == numConsumers ); + } + + synchronized( synConsumed ) + { + assert( numConsumed == numToProduce ); + } + + assert( !semaphore.tryWait() ); + semaphore.notify(); + assert( semaphore.tryWait() ); + assert( !semaphore.tryWait() ); + } + + auto group = new ThreadGroup; + + for( int i = 0; i < numConsumers; ++i ) + group.create( &consumer ); + group.create( &producer ); + group.joinAll(); + } + + + void testWaitTimeout() + { + auto synReady = new Object; + auto semReady = new Semaphore; + bool waiting = false; + bool alertedOne = true; + bool alertedTwo = true; + + void waiter() + { + synchronized( synReady ) + { + waiting = true; + } + alertedOne = semReady.wait( 10_000_000 ); // 100ms + alertedTwo = semReady.wait( 10_000_000 ); // 100ms + } + + auto thread = new Thread( &waiter ); + thread.start(); + + while( true ) + { + synchronized( synReady ) + { + if( waiting ) + { + semReady.notify(); + break; + } + } + Thread.yield(); + } + thread.join(); + assert( waiting && alertedOne && !alertedTwo ); + } + + + unittest + { + testWait(); + testWaitTimeout(); + } +}