view tango/tango/core/sync/Condition.d @ 132:1700239cab2e trunk

[svn r136] MAJOR UNSTABLE UPDATE!!! Initial commit after moving to Tango instead of Phobos. Lots of bugfixes... This build is not suitable for most things.
author lindquist
date Fri, 11 Jan 2008 17:57:40 +0100
parents
children
line wrap: on
line source

/**
 * The condition module provides a primitive for synchronized condition
 * checking.
 *
 * Copyright: Copyright (C) 2005-2006 Sean Kelly.  All rights reserved.
 * License:   BSD style: $(LICENSE)
 * Authors:   Sean Kelly
 */
module tango.core.sync.Condition;


public import tango.core.Exception : SyncException;
public import tango.core.sync.Mutex;

version( Win32 )
{
    private import tango.core.sync.Semaphore;
    private import tango.sys.win32.UserGdi;
}
else version( Posix )
{
    private import tango.core.sync.Config;
    private import tango.stdc.errno;
    private import tango.stdc.posix.pthread;
    private import tango.stdc.posix.time;
}


////////////////////////////////////////////////////////////////////////////////
// Condition
//
// void wait();
// void notify();
// void notifyAll();
////////////////////////////////////////////////////////////////////////////////


/**
 * This class represents a condition variable as concieved by C.A.R. Hoare.  As
 * per Mesa type monitors however, "signal" has been replaced with "notify" to
 * indicate that control is not transferred to the waiter when a notification
 * is sent.
 */
class Condition
{
    ////////////////////////////////////////////////////////////////////////////
    // Initialization
    ////////////////////////////////////////////////////////////////////////////


    /**
     * Initializes a condition object which is associated with the supplied
     * mutex object.
     *
     * Params:
     *  m = The mutex with which this condition will be associated.
     *
     * Throws:
     *  SyncException on error.
     */
    this( Mutex m )
    {
        version( Win32 )
        {
            m_blockLock = CreateSemaphoreA( null, 1, 1, null );
            if( m_blockLock == m_blockLock.init )
                throw new SyncException( "Unable to initialize condition" );
            scope(failure) CloseHandle( m_blockLock );

            m_blockQueue = CreateSemaphoreA( null, 0, int.max, null );
            if( m_blockQueue == m_blockQueue.init )
                throw new SyncException( "Unable to initialize condition" );
            scope(failure) CloseHandle( m_blockQueue );

            InitializeCriticalSection( &m_unblockLock );
            m_assocMutex = m;
        }
        else version( Posix )
        {
            m_mutexAddr = m.handleAddr();

            int rc = pthread_cond_init( &m_hndl, null );
            if( rc )
                throw new SyncException( "Unable to initialize condition" );
        }
    }


    ~this()
    {
        version( Win32 )
        {
            BOOL rc = CloseHandle( m_blockLock );
            assert( rc, "Unable to destroy condition" );
            rc = CloseHandle( m_blockQueue );
            assert( rc, "Unable to destroy condition" );
            DeleteCriticalSection( &m_unblockLock );
        }
        else version( Posix )
        {
            int rc = pthread_cond_destroy( &m_hndl );
            assert( !rc, "Unable to destroy condition" );
        }
    }


    ////////////////////////////////////////////////////////////////////////////
    // General Actions
    ////////////////////////////////////////////////////////////////////////////


    /**
     * Wait until notified.
     *
     * Throws:
     *  SyncException on error.
     */
    void wait()
    {
        version( Win32 )
        {
            timedWait( INFINITE );
        }
        else version( Posix )
        {
            int rc = pthread_cond_wait( &m_hndl, m_mutexAddr );
            if( rc )
                throw new SyncException( "Unable to wait for condition" );
        }
    }


    /**
     * Suspends the calling thread until a notification occurs or until the
     * supplied time period has elapsed.  The supplied period may be up to a
     * maximum of (uint.max - 1) milliseconds.
     *
     * Params:
     *  period = The time to wait, in seconds (fractional values are accepted).
     *
     * In:
     *  period must be less than (uint.max - 1) milliseconds.
     *
     * Returns:
     *  true if notified before the timeout and false if not.
     *
     * Throws:
     *  SyncException on error.
     */
    bool wait( double period )
    in
    {
        // NOTE: The fractional value added to period is to correct fp error.
        assert( period * 1000 + 0.1 < uint.max - 1 );
    }
    body
    {
        version( Win32 )
        {
            return timedWait( cast(uint)(period * 1000 + 0.1) );
        }
        else version( Posix )
        {
            timespec t;

            getTimespec( t );
            adjTimespec( t, period );
            int rc = pthread_cond_timedwait( &m_hndl, m_mutexAddr, &t );
            if( !rc )
                return true;
            if( rc == ETIMEDOUT )
                return false;
            throw new SyncException( "Unable to wait for condition" );
        }
    }

    /**
     * Notifies one waiter.
     *
     * Throws:
     *  SyncException on error.
     */
    void notify()
    {
        version( Win32 )
        {
            notify( false );
        }
        else version( Posix )
        {
            int rc = pthread_cond_signal( &m_hndl );
            if( rc )
                throw new SyncException( "Unable to notify condition" );
        }
    }


    /**
     * Notifies all waiters.
     *
     * Throws:
     *  SyncException on error.
     */
    void notifyAll()
    {
        version( Win32 )
        {
            notify( true );
        }
        else version( Posix )
        {
            int rc = pthread_cond_broadcast( &m_hndl );
            if( rc )
                throw new SyncException( "Unable to notify condition" );
        }
    }


private:
    version( Win32 )
    {
        bool timedWait( DWORD timeout )
        {
            int   numSignalsLeft;
            int   numWaitersGone;
            DWORD rc;

            rc = WaitForSingleObject( m_blockLock, INFINITE );
            assert( rc == WAIT_OBJECT_0 );

            m_numWaitersBlocked++;

            rc = ReleaseSemaphore( m_blockLock, 1, null );
            assert( rc );

            m_assocMutex.unlock();
            scope(failure) m_assocMutex.lock();

            rc = WaitForSingleObject( m_blockQueue, timeout );
            assert( rc == WAIT_OBJECT_0 || rc == WAIT_TIMEOUT );
            bool timedOut = (rc == WAIT_TIMEOUT);

            EnterCriticalSection( &m_unblockLock );
            scope(failure) LeaveCriticalSection( &m_unblockLock );

            if( (numSignalsLeft = m_numWaitersToUnblock) != 0 )
            {
                if ( timedOut )
                {
                    // timeout (or canceled)
                    if( m_numWaitersBlocked != 0 )
                    {
                        m_numWaitersBlocked--;
                        // do not unblock next waiter below (already unblocked)
                        numSignalsLeft = 0;
                    }
                    else
                    {
                        // spurious wakeup pending!!
                        m_numWaitersGone = 1;
                    }
                }
                if( --m_numWaitersToUnblock == 0 )
                {
                    if( m_numWaitersBlocked != 0 )
                    {
                        // open the gate
                        rc = ReleaseSemaphore( m_blockLock, 1, null );
                        assert( rc );
                        // do not open the gate below again
                        numSignalsLeft = 0;
                    }
                    else if( (numWaitersGone = m_numWaitersGone) != 0 )
                    {
                        m_numWaitersGone = 0;
                    }
                }
            }
            else if( ++m_numWaitersGone == int.max / 2 )
            {
                // timeout/canceled or spurious event :-)
                rc = WaitForSingleObject( m_blockLock, INFINITE );
                assert( rc == WAIT_OBJECT_0 );
                // something is going on here - test of timeouts?
                m_numWaitersBlocked -= m_numWaitersGone;
                rc = ReleaseSemaphore( m_blockLock, 1, null );
                assert( rc == WAIT_OBJECT_0 );
                m_numWaitersGone = 0;
            }

            LeaveCriticalSection( &m_unblockLock );

            if( numSignalsLeft == 1 )
            {
                // better now than spurious later (same as ResetEvent)
                for( ; numWaitersGone > 0; --numWaitersGone )
                {
                    rc = WaitForSingleObject( m_blockQueue, INFINITE );
                    assert( rc == WAIT_OBJECT_0 );
                }
                // open the gate
                rc = ReleaseSemaphore( m_blockLock, 1, null );
                assert( rc );
            }
            else if( numSignalsLeft != 0 )
            {
                // unblock next waiter
                rc = ReleaseSemaphore( m_blockQueue, 1, null );
                assert( rc );
            }
            m_assocMutex.lock();
            return !timedOut;
        }


        void notify( bool all )
        {
            DWORD rc;

            EnterCriticalSection( &m_unblockLock );
            scope(failure) LeaveCriticalSection( &m_unblockLock );

            if( m_numWaitersToUnblock != 0 )
            {
                if( m_numWaitersBlocked == 0 )
                {
                    LeaveCriticalSection( &m_unblockLock );
                    return;
                }
                if( all )
                {
                    m_numWaitersToUnblock += m_numWaitersBlocked;
                    m_numWaitersBlocked = 0;
                }
                else
                {
                    m_numWaitersToUnblock++;
                    m_numWaitersBlocked--;
                }
                LeaveCriticalSection( &m_unblockLock );
            }
            else if( m_numWaitersBlocked > m_numWaitersGone )
            {
                rc = WaitForSingleObject( m_blockLock, INFINITE );
                assert( rc == WAIT_OBJECT_0 );
                if( 0 != m_numWaitersGone )
                {
                    m_numWaitersBlocked -= m_numWaitersGone;
                    m_numWaitersGone = 0;
                }
                if( all )
                {
                    m_numWaitersToUnblock = m_numWaitersBlocked;
                    m_numWaitersBlocked = 0;
                }
                else
                {
                    m_numWaitersToUnblock = 1;
                    m_numWaitersBlocked--;
                }
                LeaveCriticalSection( &m_unblockLock );
                rc = ReleaseSemaphore( m_blockQueue, 1, null );
                assert( rc );
            }
            else
            {
                LeaveCriticalSection( &m_unblockLock );
            }
        }


        // NOTE: This implementation uses Algorithm 8c as described here:
        //       http://groups.google.com/group/comp.programming.threads/
        //              browse_frm/thread/1692bdec8040ba40/e7a5f9d40e86503a
        HANDLE              m_blockLock;    // auto-reset event (now semaphore)
        HANDLE              m_blockQueue;   // auto-reset event (now semaphore)
        Mutex               m_assocMutex;   // external mutex/CS
        CRITICAL_SECTION    m_unblockLock;  // internal mutex/CS
        int                 m_numWaitersGone        = 0;
        int                 m_numWaitersBlocked     = 0;
        int                 m_numWaitersToUnblock   = 0;
    }
    else version( Posix )
    {
        pthread_cond_t      m_hndl;
        pthread_mutex_t*    m_mutexAddr;
    }
}


////////////////////////////////////////////////////////////////////////////////
// Unit Tests
////////////////////////////////////////////////////////////////////////////////


debug( UnitTest )
{
    private import tango.core.Thread;
    private import tango.core.sync.Mutex;
    private import tango.core.sync.Semaphore;


    void testNotify()
    {
        auto mutex      = new Mutex;
        auto condReady  = new Condition( mutex );
        auto semDone    = new Semaphore;
        auto synLoop    = new Object;
        int  numWaiters = 10;
        int  numTries   = 10;
        int  numReady   = 0;
        int  numTotal   = 0;
        int  numDone    = 0;
        int  numPost    = 0;

        void waiter()
        {
            for( int i = 0; i < numTries; ++i )
            {
                synchronized( mutex )
                {
                    while( numReady < 1 )
                    {
                        condReady.wait();
                    }
                    --numReady;
                    ++numTotal;
                }

                synchronized( synLoop )
                {
                    ++numDone;
                }
                semDone.wait();
            }
        }

        auto group = new ThreadGroup;

        for( int i = 0; i < numWaiters; ++i )
            group.create( &waiter );

        for( int i = 0; i < numTries; ++i )
        {
            for( int j = 0; j < numWaiters; ++j )
            {
                synchronized( mutex )
                {
                    ++numReady;
                    condReady.notify();
                }
            }
            while( true )
            {
                synchronized( synLoop )
                {
                    if( numDone >= numWaiters )
                        break;
                }
                Thread.yield();
            }
            for( int j = 0; j < numWaiters; ++j )
            {
                semDone.notify();
            }
        }

        group.joinAll();
        assert( numTotal == numWaiters * numTries );
    }


    void testNotifyAll()
    {
        auto mutex      = new Mutex;
        auto condReady  = new Condition( mutex );
        int  numWaiters = 10;
        int  numReady   = 0;
        int  numDone    = 0;
        bool alert      = false;

        void waiter()
        {
            synchronized( mutex )
            {
                ++numReady;
                while( !alert )
                    condReady.wait();
                ++numDone;
            }
        }

        auto group = new ThreadGroup;

        for( int i = 0; i < numWaiters; ++i )
            group.create( &waiter );

        while( true )
        {
            synchronized( mutex )
            {
                if( numReady >= numWaiters )
                {
                    alert = true;
                    condReady.notifyAll();
                    break;
                }
            }
            Thread.yield();
        }
        group.joinAll();
        assert( numReady == numWaiters && numDone == numWaiters );
    }


    void testWaitTimeout()
    {
        auto mutex      = new Mutex;
        auto condReady  = new Condition( mutex );
        bool waiting    = false;
        bool alertedOne = true;
        bool alertedTwo = true;

        void waiter()
        {
            synchronized( mutex )
            {
                waiting    = true;
                alertedOne = condReady.wait( 1 );
                alertedTwo = condReady.wait( 1 );
            }
        }

        auto thread = new Thread( &waiter );
        thread.start();

        while( true )
        {
            synchronized( mutex )
            {
                if( waiting )
                {
                    condReady.notify();
                    break;
                }
            }
            Thread.yield();
        }
        thread.join();
        assert( waiting && alertedOne && !alertedTwo );
    }


    unittest
    {
        testNotify();
        testNotifyAll();
        testWaitTimeout();
    }
}