comparison tango/tango/core/sync/Condition.d @ 132:1700239cab2e trunk

[svn r136] MAJOR UNSTABLE UPDATE!!! Initial commit after moving to Tango instead of Phobos. Lots of bugfixes... This build is not suitable for most things.
author lindquist
date Fri, 11 Jan 2008 17:57:40 +0100
parents
children
comparison
equal deleted inserted replaced
131:5825d48b27d1 132:1700239cab2e
1 /**
2 * The condition module provides a primitive for synchronized condition
3 * checking.
4 *
5 * Copyright: Copyright (C) 2005-2006 Sean Kelly. All rights reserved.
6 * License: BSD style: $(LICENSE)
7 * Authors: Sean Kelly
8 */
9 module tango.core.sync.Condition;
10
11
12 public import tango.core.Exception : SyncException;
13 public import tango.core.sync.Mutex;
14
15 version( Win32 )
16 {
17 private import tango.core.sync.Semaphore;
18 private import tango.sys.win32.UserGdi;
19 }
20 else version( Posix )
21 {
22 private import tango.core.sync.Config;
23 private import tango.stdc.errno;
24 private import tango.stdc.posix.pthread;
25 private import tango.stdc.posix.time;
26 }
27
28
29 ////////////////////////////////////////////////////////////////////////////////
30 // Condition
31 //
32 // void wait();
33 // void notify();
34 // void notifyAll();
35 ////////////////////////////////////////////////////////////////////////////////
36
37
38 /**
39 * This class represents a condition variable as concieved by C.A.R. Hoare. As
40 * per Mesa type monitors however, "signal" has been replaced with "notify" to
41 * indicate that control is not transferred to the waiter when a notification
42 * is sent.
43 */
44 class Condition
45 {
46 ////////////////////////////////////////////////////////////////////////////
47 // Initialization
48 ////////////////////////////////////////////////////////////////////////////
49
50
51 /**
52 * Initializes a condition object which is associated with the supplied
53 * mutex object.
54 *
55 * Params:
56 * m = The mutex with which this condition will be associated.
57 *
58 * Throws:
59 * SyncException on error.
60 */
61 this( Mutex m )
62 {
63 version( Win32 )
64 {
65 m_blockLock = CreateSemaphoreA( null, 1, 1, null );
66 if( m_blockLock == m_blockLock.init )
67 throw new SyncException( "Unable to initialize condition" );
68 scope(failure) CloseHandle( m_blockLock );
69
70 m_blockQueue = CreateSemaphoreA( null, 0, int.max, null );
71 if( m_blockQueue == m_blockQueue.init )
72 throw new SyncException( "Unable to initialize condition" );
73 scope(failure) CloseHandle( m_blockQueue );
74
75 InitializeCriticalSection( &m_unblockLock );
76 m_assocMutex = m;
77 }
78 else version( Posix )
79 {
80 m_mutexAddr = m.handleAddr();
81
82 int rc = pthread_cond_init( &m_hndl, null );
83 if( rc )
84 throw new SyncException( "Unable to initialize condition" );
85 }
86 }
87
88
89 ~this()
90 {
91 version( Win32 )
92 {
93 BOOL rc = CloseHandle( m_blockLock );
94 assert( rc, "Unable to destroy condition" );
95 rc = CloseHandle( m_blockQueue );
96 assert( rc, "Unable to destroy condition" );
97 DeleteCriticalSection( &m_unblockLock );
98 }
99 else version( Posix )
100 {
101 int rc = pthread_cond_destroy( &m_hndl );
102 assert( !rc, "Unable to destroy condition" );
103 }
104 }
105
106
107 ////////////////////////////////////////////////////////////////////////////
108 // General Actions
109 ////////////////////////////////////////////////////////////////////////////
110
111
112 /**
113 * Wait until notified.
114 *
115 * Throws:
116 * SyncException on error.
117 */
118 void wait()
119 {
120 version( Win32 )
121 {
122 timedWait( INFINITE );
123 }
124 else version( Posix )
125 {
126 int rc = pthread_cond_wait( &m_hndl, m_mutexAddr );
127 if( rc )
128 throw new SyncException( "Unable to wait for condition" );
129 }
130 }
131
132
133 /**
134 * Suspends the calling thread until a notification occurs or until the
135 * supplied time period has elapsed. The supplied period may be up to a
136 * maximum of (uint.max - 1) milliseconds.
137 *
138 * Params:
139 * period = The time to wait, in seconds (fractional values are accepted).
140 *
141 * In:
142 * period must be less than (uint.max - 1) milliseconds.
143 *
144 * Returns:
145 * true if notified before the timeout and false if not.
146 *
147 * Throws:
148 * SyncException on error.
149 */
150 bool wait( double period )
151 in
152 {
153 // NOTE: The fractional value added to period is to correct fp error.
154 assert( period * 1000 + 0.1 < uint.max - 1 );
155 }
156 body
157 {
158 version( Win32 )
159 {
160 return timedWait( cast(uint)(period * 1000 + 0.1) );
161 }
162 else version( Posix )
163 {
164 timespec t;
165
166 getTimespec( t );
167 adjTimespec( t, period );
168 int rc = pthread_cond_timedwait( &m_hndl, m_mutexAddr, &t );
169 if( !rc )
170 return true;
171 if( rc == ETIMEDOUT )
172 return false;
173 throw new SyncException( "Unable to wait for condition" );
174 }
175 }
176
177 /**
178 * Notifies one waiter.
179 *
180 * Throws:
181 * SyncException on error.
182 */
183 void notify()
184 {
185 version( Win32 )
186 {
187 notify( false );
188 }
189 else version( Posix )
190 {
191 int rc = pthread_cond_signal( &m_hndl );
192 if( rc )
193 throw new SyncException( "Unable to notify condition" );
194 }
195 }
196
197
198 /**
199 * Notifies all waiters.
200 *
201 * Throws:
202 * SyncException on error.
203 */
204 void notifyAll()
205 {
206 version( Win32 )
207 {
208 notify( true );
209 }
210 else version( Posix )
211 {
212 int rc = pthread_cond_broadcast( &m_hndl );
213 if( rc )
214 throw new SyncException( "Unable to notify condition" );
215 }
216 }
217
218
219 private:
220 version( Win32 )
221 {
222 bool timedWait( DWORD timeout )
223 {
224 int numSignalsLeft;
225 int numWaitersGone;
226 DWORD rc;
227
228 rc = WaitForSingleObject( m_blockLock, INFINITE );
229 assert( rc == WAIT_OBJECT_0 );
230
231 m_numWaitersBlocked++;
232
233 rc = ReleaseSemaphore( m_blockLock, 1, null );
234 assert( rc );
235
236 m_assocMutex.unlock();
237 scope(failure) m_assocMutex.lock();
238
239 rc = WaitForSingleObject( m_blockQueue, timeout );
240 assert( rc == WAIT_OBJECT_0 || rc == WAIT_TIMEOUT );
241 bool timedOut = (rc == WAIT_TIMEOUT);
242
243 EnterCriticalSection( &m_unblockLock );
244 scope(failure) LeaveCriticalSection( &m_unblockLock );
245
246 if( (numSignalsLeft = m_numWaitersToUnblock) != 0 )
247 {
248 if ( timedOut )
249 {
250 // timeout (or canceled)
251 if( m_numWaitersBlocked != 0 )
252 {
253 m_numWaitersBlocked--;
254 // do not unblock next waiter below (already unblocked)
255 numSignalsLeft = 0;
256 }
257 else
258 {
259 // spurious wakeup pending!!
260 m_numWaitersGone = 1;
261 }
262 }
263 if( --m_numWaitersToUnblock == 0 )
264 {
265 if( m_numWaitersBlocked != 0 )
266 {
267 // open the gate
268 rc = ReleaseSemaphore( m_blockLock, 1, null );
269 assert( rc );
270 // do not open the gate below again
271 numSignalsLeft = 0;
272 }
273 else if( (numWaitersGone = m_numWaitersGone) != 0 )
274 {
275 m_numWaitersGone = 0;
276 }
277 }
278 }
279 else if( ++m_numWaitersGone == int.max / 2 )
280 {
281 // timeout/canceled or spurious event :-)
282 rc = WaitForSingleObject( m_blockLock, INFINITE );
283 assert( rc == WAIT_OBJECT_0 );
284 // something is going on here - test of timeouts?
285 m_numWaitersBlocked -= m_numWaitersGone;
286 rc = ReleaseSemaphore( m_blockLock, 1, null );
287 assert( rc == WAIT_OBJECT_0 );
288 m_numWaitersGone = 0;
289 }
290
291 LeaveCriticalSection( &m_unblockLock );
292
293 if( numSignalsLeft == 1 )
294 {
295 // better now than spurious later (same as ResetEvent)
296 for( ; numWaitersGone > 0; --numWaitersGone )
297 {
298 rc = WaitForSingleObject( m_blockQueue, INFINITE );
299 assert( rc == WAIT_OBJECT_0 );
300 }
301 // open the gate
302 rc = ReleaseSemaphore( m_blockLock, 1, null );
303 assert( rc );
304 }
305 else if( numSignalsLeft != 0 )
306 {
307 // unblock next waiter
308 rc = ReleaseSemaphore( m_blockQueue, 1, null );
309 assert( rc );
310 }
311 m_assocMutex.lock();
312 return !timedOut;
313 }
314
315
316 void notify( bool all )
317 {
318 DWORD rc;
319
320 EnterCriticalSection( &m_unblockLock );
321 scope(failure) LeaveCriticalSection( &m_unblockLock );
322
323 if( m_numWaitersToUnblock != 0 )
324 {
325 if( m_numWaitersBlocked == 0 )
326 {
327 LeaveCriticalSection( &m_unblockLock );
328 return;
329 }
330 if( all )
331 {
332 m_numWaitersToUnblock += m_numWaitersBlocked;
333 m_numWaitersBlocked = 0;
334 }
335 else
336 {
337 m_numWaitersToUnblock++;
338 m_numWaitersBlocked--;
339 }
340 LeaveCriticalSection( &m_unblockLock );
341 }
342 else if( m_numWaitersBlocked > m_numWaitersGone )
343 {
344 rc = WaitForSingleObject( m_blockLock, INFINITE );
345 assert( rc == WAIT_OBJECT_0 );
346 if( 0 != m_numWaitersGone )
347 {
348 m_numWaitersBlocked -= m_numWaitersGone;
349 m_numWaitersGone = 0;
350 }
351 if( all )
352 {
353 m_numWaitersToUnblock = m_numWaitersBlocked;
354 m_numWaitersBlocked = 0;
355 }
356 else
357 {
358 m_numWaitersToUnblock = 1;
359 m_numWaitersBlocked--;
360 }
361 LeaveCriticalSection( &m_unblockLock );
362 rc = ReleaseSemaphore( m_blockQueue, 1, null );
363 assert( rc );
364 }
365 else
366 {
367 LeaveCriticalSection( &m_unblockLock );
368 }
369 }
370
371
372 // NOTE: This implementation uses Algorithm 8c as described here:
373 // http://groups.google.com/group/comp.programming.threads/
374 // browse_frm/thread/1692bdec8040ba40/e7a5f9d40e86503a
375 HANDLE m_blockLock; // auto-reset event (now semaphore)
376 HANDLE m_blockQueue; // auto-reset event (now semaphore)
377 Mutex m_assocMutex; // external mutex/CS
378 CRITICAL_SECTION m_unblockLock; // internal mutex/CS
379 int m_numWaitersGone = 0;
380 int m_numWaitersBlocked = 0;
381 int m_numWaitersToUnblock = 0;
382 }
383 else version( Posix )
384 {
385 pthread_cond_t m_hndl;
386 pthread_mutex_t* m_mutexAddr;
387 }
388 }
389
390
391 ////////////////////////////////////////////////////////////////////////////////
392 // Unit Tests
393 ////////////////////////////////////////////////////////////////////////////////
394
395
396 debug( UnitTest )
397 {
398 private import tango.core.Thread;
399 private import tango.core.sync.Mutex;
400 private import tango.core.sync.Semaphore;
401
402
403 void testNotify()
404 {
405 auto mutex = new Mutex;
406 auto condReady = new Condition( mutex );
407 auto semDone = new Semaphore;
408 auto synLoop = new Object;
409 int numWaiters = 10;
410 int numTries = 10;
411 int numReady = 0;
412 int numTotal = 0;
413 int numDone = 0;
414 int numPost = 0;
415
416 void waiter()
417 {
418 for( int i = 0; i < numTries; ++i )
419 {
420 synchronized( mutex )
421 {
422 while( numReady < 1 )
423 {
424 condReady.wait();
425 }
426 --numReady;
427 ++numTotal;
428 }
429
430 synchronized( synLoop )
431 {
432 ++numDone;
433 }
434 semDone.wait();
435 }
436 }
437
438 auto group = new ThreadGroup;
439
440 for( int i = 0; i < numWaiters; ++i )
441 group.create( &waiter );
442
443 for( int i = 0; i < numTries; ++i )
444 {
445 for( int j = 0; j < numWaiters; ++j )
446 {
447 synchronized( mutex )
448 {
449 ++numReady;
450 condReady.notify();
451 }
452 }
453 while( true )
454 {
455 synchronized( synLoop )
456 {
457 if( numDone >= numWaiters )
458 break;
459 }
460 Thread.yield();
461 }
462 for( int j = 0; j < numWaiters; ++j )
463 {
464 semDone.notify();
465 }
466 }
467
468 group.joinAll();
469 assert( numTotal == numWaiters * numTries );
470 }
471
472
473 void testNotifyAll()
474 {
475 auto mutex = new Mutex;
476 auto condReady = new Condition( mutex );
477 int numWaiters = 10;
478 int numReady = 0;
479 int numDone = 0;
480 bool alert = false;
481
482 void waiter()
483 {
484 synchronized( mutex )
485 {
486 ++numReady;
487 while( !alert )
488 condReady.wait();
489 ++numDone;
490 }
491 }
492
493 auto group = new ThreadGroup;
494
495 for( int i = 0; i < numWaiters; ++i )
496 group.create( &waiter );
497
498 while( true )
499 {
500 synchronized( mutex )
501 {
502 if( numReady >= numWaiters )
503 {
504 alert = true;
505 condReady.notifyAll();
506 break;
507 }
508 }
509 Thread.yield();
510 }
511 group.joinAll();
512 assert( numReady == numWaiters && numDone == numWaiters );
513 }
514
515
516 void testWaitTimeout()
517 {
518 auto mutex = new Mutex;
519 auto condReady = new Condition( mutex );
520 bool waiting = false;
521 bool alertedOne = true;
522 bool alertedTwo = true;
523
524 void waiter()
525 {
526 synchronized( mutex )
527 {
528 waiting = true;
529 alertedOne = condReady.wait( 1 );
530 alertedTwo = condReady.wait( 1 );
531 }
532 }
533
534 auto thread = new Thread( &waiter );
535 thread.start();
536
537 while( true )
538 {
539 synchronized( mutex )
540 {
541 if( waiting )
542 {
543 condReady.notify();
544 break;
545 }
546 }
547 Thread.yield();
548 }
549 thread.join();
550 assert( waiting && alertedOne && !alertedTwo );
551 }
552
553
554 unittest
555 {
556 testNotify();
557 testNotifyAll();
558 testWaitTimeout();
559 }
560 }