Mercurial > projects > ldc
view druntime/src/common/core/sync/semaphore.d @ 1458:e0b2d67cfe7c
Added druntime (this should be removed once it works).
author | Robert Clipsham <robert@octarineparrot.com> |
---|---|
date | Tue, 02 Jun 2009 17:43:06 +0100 |
parents | |
children |
line wrap: on
line source
/** * The semaphore module provides a general use semaphore for synchronization. * * Copyright: Copyright Sean Kelly 2005 - 2009. * License: <a href="http://www.boost.org/LICENSE_1_0.txt>Boost License 1.0</a>. * Authors: Sean Kelly * * Copyright Sean Kelly 2005 - 2009. * Distributed under the Boost Software License, Version 1.0. * (See accompanying file LICENSE_1_0.txt or copy at * http://www.boost.org/LICENSE_1_0.txt) */ module core.sync.semaphore; public import core.sync.exception; version( Win32 ) { private import core.sys.windows.windows; } else version( OSX ) { private import core.sync.config; private import core.stdc.errno; private import core.sys.posix.time; private import core.sys.osx.mach.semaphore; } else version( Posix ) { private import core.sync.config; private import core.stdc.errno; private import core.sys.posix.pthread; private import core.sys.posix.semaphore; } //////////////////////////////////////////////////////////////////////////////// // Semaphore // // void wait(); // void notify(); // bool tryWait(); //////////////////////////////////////////////////////////////////////////////// /** * This class represents a general counting semaphore as concieved by Edsger * Dijkstra. As per Mesa type monitors however, "signal" has been replaced * with "notify" to indicate that control is not transferred to the waiter when * a notification is sent. */ class Semaphore { //////////////////////////////////////////////////////////////////////////// // Initialization //////////////////////////////////////////////////////////////////////////// /** * Initializes a semaphore object with the specified initial count. * * Params: * count = The initial count for the semaphore. * * Throws: * SyncException on error. */ this( uint count = 0 ) { version( Win32 ) { m_hndl = CreateSemaphoreA( null, count, int.max, null ); if( m_hndl == m_hndl.init ) throw new SyncException( "Unable to create semaphore" ); } else version( OSX ) { auto rc = semaphore_create( mach_task_self(), &m_hndl, SYNC_POLICY_FIFO, count ); if( rc ) throw new SyncException( "Unable to create semaphore" ); } else version( Posix ) { int rc = sem_init( &m_hndl, 0, count ); if( rc ) throw new SyncException( "Unable to create semaphore" ); } } ~this() { version( Win32 ) { BOOL rc = CloseHandle( m_hndl ); assert( rc, "Unable to destroy semaphore" ); } else version( OSX ) { auto rc = semaphore_destroy( mach_task_self(), m_hndl ); assert( !rc, "Unable to destroy semaphore" ); } else version( Posix ) { int rc = sem_destroy( &m_hndl ); assert( !rc, "Unable to destroy semaphore" ); } } //////////////////////////////////////////////////////////////////////////// // General Actions //////////////////////////////////////////////////////////////////////////// /** * Wait until the current count is above zero, then atomically decrement * the count by one and return. * * Throws: * SyncException on error. */ void wait() { version( Win32 ) { DWORD rc = WaitForSingleObject( m_hndl, INFINITE ); if( rc != WAIT_OBJECT_0 ) throw new SyncException( "Unable to wait for semaphore" ); } else version( OSX ) { while( true ) { auto rc = semaphore_wait( m_hndl ); if( !rc ) return; if( rc == KERN_ABORTED && errno == EINTR ) continue; throw new SyncException( "Unable to wait for semaphore" ); } } else version( Posix ) { while( true ) { if( !sem_wait( &m_hndl ) ) return; if( errno != EINTR ) throw new SyncException( "Unable to wait for semaphore" ); } } } /** * Suspends the calling thread until the current count moves above zero or * until the supplied time period has elapsed. If the count moves above * zero in this interval, then atomically decrement the count by one and * return true. Otherwise, return false. * * * Params: * period = The time to wait, in 100 nanosecond intervals. This value may * be adjusted to equal to the maximum wait period supported by * the target platform if it is too large. * * In: * period must be non-negative. * * Throws: * SyncException on error. * * Returns: * true if notified before the timeout and false if not. */ bool wait( long period ) in { assert( period >= 0 ); } body { version( Win32 ) { enum : uint { TICKS_PER_MILLI = 10_000, MAX_WAIT_MILLIS = uint.max - 1 } period /= TICKS_PER_MILLI; if( period > MAX_WAIT_MILLIS ) period = MAX_WAIT_MILLIS; switch( WaitForSingleObject( m_hndl, cast(uint) period ) ) { case WAIT_OBJECT_0: return true; case WAIT_TIMEOUT: return false; default: throw new SyncException( "Unable to wait for semaphore" ); } } else version( OSX ) { mach_timespec_t t = void; (cast(byte*) &t)[0 .. t.sizeof] = 0; if( period != 0 ) { enum : uint { NANOS_PER_TICK = 100, TICKS_PER_SECOND = 10_000_000, NANOS_PER_SECOND = NANOS_PER_TICK * TICKS_PER_SECOND, } if( t.tv_sec.max - t.tv_sec < period / TICKS_PER_SECOND ) { t.tv_sec = t.tv_sec.max; t.tv_nsec = 0; } else { t.tv_sec += cast(typeof(t.tv_sec)) (period / TICKS_PER_SECOND); long ns = (period % TICKS_PER_SECOND) * NANOS_PER_TICK; if( NANOS_PER_SECOND - t.tv_nsec > ns ) t.tv_nsec = cast(typeof(t.tv_nsec)) ns; else { t.tv_sec += 1; t.tv_nsec += ns - NANOS_PER_SECOND; } } } while( true ) { auto rc = semaphore_timedwait( m_hndl, t ); if( !rc ) return true; if( rc == KERN_OPERATION_TIMED_OUT ) return false; if( rc != KERN_ABORTED || errno != EINTR ) throw new SyncException( "Unable to wait for semaphore" ); } // -w trip return false; } else version( Posix ) { timespec t = void; mktspec( t, period ); while( true ) { if( !sem_timedwait( &m_hndl, &t ) ) return true; if( errno == ETIMEDOUT ) return false; if( errno != EINTR ) throw new SyncException( "Unable to wait for semaphore" ); } // -w trip return false; } } /** * Atomically increment the current count by one. This will notify one * waiter, if there are any in the queue. * * Throws: * SyncException on error. */ void notify() { version( Win32 ) { if( !ReleaseSemaphore( m_hndl, 1, null ) ) throw new SyncException( "Unable to notify semaphore" ); } else version( OSX ) { auto rc = semaphore_signal( m_hndl ); if( rc ) throw new SyncException( "Unable to notify semaphore" ); } else version( Posix ) { int rc = sem_post( &m_hndl ); if( rc ) throw new SyncException( "Unable to notify semaphore" ); } } /** * If the current count is equal to zero, return. Otherwise, atomically * decrement the count by one and return true. * * Throws: * SyncException on error. * * Returns: * true if the count was above zero and false if not. */ bool tryWait() { version( Win32 ) { switch( WaitForSingleObject( m_hndl, 0 ) ) { case WAIT_OBJECT_0: return true; case WAIT_TIMEOUT: return false; default: throw new SyncException( "Unable to wait for semaphore" ); } } else version( OSX ) { return wait( 0 ); } else version( Posix ) { while( true ) { if( !sem_trywait( &m_hndl ) ) return true; if( errno == EAGAIN ) return false; if( errno != EINTR ) throw new SyncException( "Unable to wait for semaphore" ); } // -w trip return false; } } private: version( Win32 ) { HANDLE m_hndl; } else version( OSX ) { semaphore_t m_hndl; } else version( Posix ) { sem_t m_hndl; } } //////////////////////////////////////////////////////////////////////////////// // Unit Tests //////////////////////////////////////////////////////////////////////////////// version( unittest ) { private import core.thread; void testWait() { auto semaphore = new Semaphore; int numToProduce = 10; bool allProduced = false; auto synProduced = new Object; int numConsumed = 0; auto synConsumed = new Object; int numConsumers = 10; int numComplete = 0; auto synComplete = new Object; void consumer() { while( true ) { semaphore.wait(); synchronized( synProduced ) { if( allProduced ) break; } synchronized( synConsumed ) { ++numConsumed; } } synchronized( synComplete ) { ++numComplete; } } void producer() { assert( !semaphore.tryWait() ); for( int i = 0; i < numToProduce; ++i ) { semaphore.notify(); Thread.yield(); } Thread.sleep( 10_000_000 ); // 1s synchronized( synProduced ) { allProduced = true; } for( int i = 0; i < numConsumers; ++i ) { semaphore.notify(); Thread.yield(); } for( int i = numConsumers * 10000; i > 0; --i ) { synchronized( synComplete ) { if( numComplete == numConsumers ) break; } Thread.yield(); } synchronized( synComplete ) { assert( numComplete == numConsumers ); } synchronized( synConsumed ) { assert( numConsumed == numToProduce ); } assert( !semaphore.tryWait() ); semaphore.notify(); assert( semaphore.tryWait() ); assert( !semaphore.tryWait() ); } auto group = new ThreadGroup; for( int i = 0; i < numConsumers; ++i ) group.create( &consumer ); group.create( &producer ); group.joinAll(); } void testWaitTimeout() { auto synReady = new Object; auto semReady = new Semaphore; bool waiting = false; bool alertedOne = true; bool alertedTwo = true; void waiter() { synchronized( synReady ) { waiting = true; } alertedOne = semReady.wait( 10_000_000 ); // 100ms alertedTwo = semReady.wait( 10_000_000 ); // 100ms } auto thread = new Thread( &waiter ); thread.start(); while( true ) { synchronized( synReady ) { if( waiting ) { semReady.notify(); break; } } Thread.yield(); } thread.join(); assert( waiting && alertedOne && !alertedTwo ); } unittest { testWait(); testWaitTimeout(); } }