comparison druntime/src/common/core/sync/condition.d @ 1458:e0b2d67cfe7c

Added druntime (this should be removed once it works).
author Robert Clipsham <robert@octarineparrot.com>
date Tue, 02 Jun 2009 17:43:06 +0100
parents
children
comparison
equal deleted inserted replaced
1456:7b218ec1044f 1458:e0b2d67cfe7c
1 /**
2 * The condition module provides a primitive for synchronized condition
3 * checking.
4 *
5 * Copyright: Copyright Sean Kelly 2005 - 2009.
6 * License: <a href="http://www.boost.org/LICENSE_1_0.txt>Boost License 1.0</a>.
7 * Authors: Sean Kelly
8 *
9 * Copyright Sean Kelly 2005 - 2009.
10 * Distributed under the Boost Software License, Version 1.0.
11 * (See accompanying file LICENSE_1_0.txt or copy at
12 * http://www.boost.org/LICENSE_1_0.txt)
13 */
14 module core.sync.condition;
15
16
17 public import core.sync.exception;
18 public import core.sync.mutex;
19
20 version( Win32 )
21 {
22 private import core.sync.semaphore;
23 private import core.sys.windows.windows;
24 }
25 else version( Posix )
26 {
27 private import core.sync.config;
28 private import core.stdc.errno;
29 private import core.sys.posix.pthread;
30 private import core.sys.posix.time;
31 }
32
33
34 ////////////////////////////////////////////////////////////////////////////////
35 // Condition
36 //
37 // void wait();
38 // void notify();
39 // void notifyAll();
40 ////////////////////////////////////////////////////////////////////////////////
41
42
43 /**
44 * This class represents a condition variable as concieved by C.A.R. Hoare. As
45 * per Mesa type monitors however, "signal" has been replaced with "notify" to
46 * indicate that control is not transferred to the waiter when a notification
47 * is sent.
48 */
49 class Condition
50 {
51 ////////////////////////////////////////////////////////////////////////////
52 // Initialization
53 ////////////////////////////////////////////////////////////////////////////
54
55
56 /**
57 * Initializes a condition object which is associated with the supplied
58 * mutex object.
59 *
60 * Params:
61 * m = The mutex with which this condition will be associated.
62 *
63 * Throws:
64 * SyncException on error.
65 */
66 this( Mutex m )
67 {
68 version( Win32 )
69 {
70 m_blockLock = CreateSemaphoreA( null, 1, 1, null );
71 if( m_blockLock == m_blockLock.init )
72 throw new SyncException( "Unable to initialize condition" );
73 scope(failure) CloseHandle( m_blockLock );
74
75 m_blockQueue = CreateSemaphoreA( null, 0, int.max, null );
76 if( m_blockQueue == m_blockQueue.init )
77 throw new SyncException( "Unable to initialize condition" );
78 scope(failure) CloseHandle( m_blockQueue );
79
80 InitializeCriticalSection( &m_unblockLock );
81 m_assocMutex = m;
82 }
83 else version( Posix )
84 {
85 m_mutexAddr = m.handleAddr();
86
87 int rc = pthread_cond_init( &m_hndl, null );
88 if( rc )
89 throw new SyncException( "Unable to initialize condition" );
90 }
91 }
92
93
94 ~this()
95 {
96 version( Win32 )
97 {
98 BOOL rc = CloseHandle( m_blockLock );
99 assert( rc, "Unable to destroy condition" );
100 rc = CloseHandle( m_blockQueue );
101 assert( rc, "Unable to destroy condition" );
102 DeleteCriticalSection( &m_unblockLock );
103 }
104 else version( Posix )
105 {
106 int rc = pthread_cond_destroy( &m_hndl );
107 assert( !rc, "Unable to destroy condition" );
108 }
109 }
110
111
112 ////////////////////////////////////////////////////////////////////////////
113 // General Actions
114 ////////////////////////////////////////////////////////////////////////////
115
116
117 /**
118 * Wait until notified.
119 *
120 * Throws:
121 * SyncException on error.
122 */
123 void wait()
124 {
125 version( Win32 )
126 {
127 timedWait( INFINITE );
128 }
129 else version( Posix )
130 {
131 int rc = pthread_cond_wait( &m_hndl, m_mutexAddr );
132 if( rc )
133 throw new SyncException( "Unable to wait for condition" );
134 }
135 }
136
137
138 /**
139 * Suspends the calling thread until a notification occurs or until the
140 * supplied time period has elapsed.
141 *
142 * Params:
143 * period = The time to wait, in 100 nanosecond intervals. This value may
144 * be adjusted to equal to the maximum wait period supported by
145 * the target platform if it is too large.
146 *
147 * In:
148 * period must be non-negative.
149 *
150 * Throws:
151 * SyncException on error.
152 *
153 * Returns:
154 * true if notified before the timeout and false if not.
155 */
156 bool wait( long period )
157 in
158 {
159 assert( period >= 0 );
160 }
161 body
162 {
163 version( Win32 )
164 {
165 enum : uint
166 {
167 TICKS_PER_MILLI = 10_000,
168 MAX_WAIT_MILLIS = uint.max - 1
169 }
170
171 period /= TICKS_PER_MILLI;
172 if( period > MAX_WAIT_MILLIS )
173 period = MAX_WAIT_MILLIS;
174 return timedWait( cast(uint) period );
175 }
176 else version( Posix )
177 {
178 timespec t = void;
179 mktspec( t, period );
180
181 int rc = pthread_cond_timedwait( &m_hndl, m_mutexAddr, &t );
182 if( !rc )
183 return true;
184 if( rc == ETIMEDOUT )
185 return false;
186 throw new SyncException( "Unable to wait for condition" );
187 }
188 }
189
190 /**
191 * Notifies one waiter.
192 *
193 * Throws:
194 * SyncException on error.
195 */
196 void notify()
197 {
198 version( Win32 )
199 {
200 notify( false );
201 }
202 else version( Posix )
203 {
204 int rc = pthread_cond_signal( &m_hndl );
205 if( rc )
206 throw new SyncException( "Unable to notify condition" );
207 }
208 }
209
210
211 /**
212 * Notifies all waiters.
213 *
214 * Throws:
215 * SyncException on error.
216 */
217 void notifyAll()
218 {
219 version( Win32 )
220 {
221 notify( true );
222 }
223 else version( Posix )
224 {
225 int rc = pthread_cond_broadcast( &m_hndl );
226 if( rc )
227 throw new SyncException( "Unable to notify condition" );
228 }
229 }
230
231
232 private:
233 version( Win32 )
234 {
235 bool timedWait( DWORD timeout )
236 {
237 int numSignalsLeft;
238 int numWaitersGone;
239 DWORD rc;
240
241 rc = WaitForSingleObject( m_blockLock, INFINITE );
242 assert( rc == WAIT_OBJECT_0 );
243
244 m_numWaitersBlocked++;
245
246 rc = ReleaseSemaphore( m_blockLock, 1, null );
247 assert( rc );
248
249 m_assocMutex.unlock();
250 scope(failure) m_assocMutex.lock();
251
252 rc = WaitForSingleObject( m_blockQueue, timeout );
253 assert( rc == WAIT_OBJECT_0 || rc == WAIT_TIMEOUT );
254 bool timedOut = (rc == WAIT_TIMEOUT);
255
256 EnterCriticalSection( &m_unblockLock );
257 scope(failure) LeaveCriticalSection( &m_unblockLock );
258
259 if( (numSignalsLeft = m_numWaitersToUnblock) != 0 )
260 {
261 if ( timedOut )
262 {
263 // timeout (or canceled)
264 if( m_numWaitersBlocked != 0 )
265 {
266 m_numWaitersBlocked--;
267 // do not unblock next waiter below (already unblocked)
268 numSignalsLeft = 0;
269 }
270 else
271 {
272 // spurious wakeup pending!!
273 m_numWaitersGone = 1;
274 }
275 }
276 if( --m_numWaitersToUnblock == 0 )
277 {
278 if( m_numWaitersBlocked != 0 )
279 {
280 // open the gate
281 rc = ReleaseSemaphore( m_blockLock, 1, null );
282 assert( rc );
283 // do not open the gate below again
284 numSignalsLeft = 0;
285 }
286 else if( (numWaitersGone = m_numWaitersGone) != 0 )
287 {
288 m_numWaitersGone = 0;
289 }
290 }
291 }
292 else if( ++m_numWaitersGone == int.max / 2 )
293 {
294 // timeout/canceled or spurious event :-)
295 rc = WaitForSingleObject( m_blockLock, INFINITE );
296 assert( rc == WAIT_OBJECT_0 );
297 // something is going on here - test of timeouts?
298 m_numWaitersBlocked -= m_numWaitersGone;
299 rc = ReleaseSemaphore( m_blockLock, 1, null );
300 assert( rc == WAIT_OBJECT_0 );
301 m_numWaitersGone = 0;
302 }
303
304 LeaveCriticalSection( &m_unblockLock );
305
306 if( numSignalsLeft == 1 )
307 {
308 // better now than spurious later (same as ResetEvent)
309 for( ; numWaitersGone > 0; --numWaitersGone )
310 {
311 rc = WaitForSingleObject( m_blockQueue, INFINITE );
312 assert( rc == WAIT_OBJECT_0 );
313 }
314 // open the gate
315 rc = ReleaseSemaphore( m_blockLock, 1, null );
316 assert( rc );
317 }
318 else if( numSignalsLeft != 0 )
319 {
320 // unblock next waiter
321 rc = ReleaseSemaphore( m_blockQueue, 1, null );
322 assert( rc );
323 }
324 m_assocMutex.lock();
325 return !timedOut;
326 }
327
328
329 void notify( bool all )
330 {
331 DWORD rc;
332
333 EnterCriticalSection( &m_unblockLock );
334 scope(failure) LeaveCriticalSection( &m_unblockLock );
335
336 if( m_numWaitersToUnblock != 0 )
337 {
338 if( m_numWaitersBlocked == 0 )
339 {
340 LeaveCriticalSection( &m_unblockLock );
341 return;
342 }
343 if( all )
344 {
345 m_numWaitersToUnblock += m_numWaitersBlocked;
346 m_numWaitersBlocked = 0;
347 }
348 else
349 {
350 m_numWaitersToUnblock++;
351 m_numWaitersBlocked--;
352 }
353 LeaveCriticalSection( &m_unblockLock );
354 }
355 else if( m_numWaitersBlocked > m_numWaitersGone )
356 {
357 rc = WaitForSingleObject( m_blockLock, INFINITE );
358 assert( rc == WAIT_OBJECT_0 );
359 if( 0 != m_numWaitersGone )
360 {
361 m_numWaitersBlocked -= m_numWaitersGone;
362 m_numWaitersGone = 0;
363 }
364 if( all )
365 {
366 m_numWaitersToUnblock = m_numWaitersBlocked;
367 m_numWaitersBlocked = 0;
368 }
369 else
370 {
371 m_numWaitersToUnblock = 1;
372 m_numWaitersBlocked--;
373 }
374 LeaveCriticalSection( &m_unblockLock );
375 rc = ReleaseSemaphore( m_blockQueue, 1, null );
376 assert( rc );
377 }
378 else
379 {
380 LeaveCriticalSection( &m_unblockLock );
381 }
382 }
383
384
385 // NOTE: This implementation uses Algorithm 8c as described here:
386 // http://groups.google.com/group/comp.programming.threads/
387 // browse_frm/thread/1692bdec8040ba40/e7a5f9d40e86503a
388 HANDLE m_blockLock; // auto-reset event (now semaphore)
389 HANDLE m_blockQueue; // auto-reset event (now semaphore)
390 Mutex m_assocMutex; // external mutex/CS
391 CRITICAL_SECTION m_unblockLock; // internal mutex/CS
392 int m_numWaitersGone = 0;
393 int m_numWaitersBlocked = 0;
394 int m_numWaitersToUnblock = 0;
395 }
396 else version( Posix )
397 {
398 pthread_cond_t m_hndl;
399 pthread_mutex_t* m_mutexAddr;
400 }
401 }
402
403
404 ////////////////////////////////////////////////////////////////////////////////
405 // Unit Tests
406 ////////////////////////////////////////////////////////////////////////////////
407
408
409 version( unittest )
410 {
411 private import core.thread;
412 private import core.sync.mutex;
413 private import core.sync.semaphore;
414
415
416 void testNotify()
417 {
418 auto mutex = new Mutex;
419 auto condReady = new Condition( mutex );
420 auto semDone = new Semaphore;
421 auto synLoop = new Object;
422 int numWaiters = 10;
423 int numTries = 10;
424 int numReady = 0;
425 int numTotal = 0;
426 int numDone = 0;
427 int numPost = 0;
428
429 void waiter()
430 {
431 for( int i = 0; i < numTries; ++i )
432 {
433 synchronized( mutex )
434 {
435 while( numReady < 1 )
436 {
437 condReady.wait();
438 }
439 --numReady;
440 ++numTotal;
441 }
442
443 synchronized( synLoop )
444 {
445 ++numDone;
446 }
447 semDone.wait();
448 }
449 }
450
451 auto group = new ThreadGroup;
452
453 for( int i = 0; i < numWaiters; ++i )
454 group.create( &waiter );
455
456 for( int i = 0; i < numTries; ++i )
457 {
458 for( int j = 0; j < numWaiters; ++j )
459 {
460 synchronized( mutex )
461 {
462 ++numReady;
463 condReady.notify();
464 }
465 }
466 while( true )
467 {
468 synchronized( synLoop )
469 {
470 if( numDone >= numWaiters )
471 break;
472 }
473 Thread.yield();
474 }
475 for( int j = 0; j < numWaiters; ++j )
476 {
477 semDone.notify();
478 }
479 }
480
481 group.joinAll();
482 assert( numTotal == numWaiters * numTries );
483 }
484
485
486 void testNotifyAll()
487 {
488 auto mutex = new Mutex;
489 auto condReady = new Condition( mutex );
490 int numWaiters = 10;
491 int numReady = 0;
492 int numDone = 0;
493 bool alert = false;
494
495 void waiter()
496 {
497 synchronized( mutex )
498 {
499 ++numReady;
500 while( !alert )
501 condReady.wait();
502 ++numDone;
503 }
504 }
505
506 auto group = new ThreadGroup;
507
508 for( int i = 0; i < numWaiters; ++i )
509 group.create( &waiter );
510
511 while( true )
512 {
513 synchronized( mutex )
514 {
515 if( numReady >= numWaiters )
516 {
517 alert = true;
518 condReady.notifyAll();
519 break;
520 }
521 }
522 Thread.yield();
523 }
524 group.joinAll();
525 assert( numReady == numWaiters && numDone == numWaiters );
526 }
527
528
529 void testWaitTimeout()
530 {
531 auto mutex = new Mutex;
532 auto condReady = new Condition( mutex );
533 bool waiting = false;
534 bool alertedOne = true;
535 bool alertedTwo = true;
536
537 void waiter()
538 {
539 synchronized( mutex )
540 {
541 waiting = true;
542 alertedOne = condReady.wait( 10_000_000 ); // 1s
543 alertedTwo = condReady.wait( 10_000_000 ); // 1s
544 }
545 }
546
547 auto thread = new Thread( &waiter );
548 thread.start();
549
550 while( true )
551 {
552 synchronized( mutex )
553 {
554 if( waiting )
555 {
556 condReady.notify();
557 break;
558 }
559 }
560 Thread.yield();
561 }
562 thread.join();
563 assert( waiting && alertedOne && !alertedTwo );
564 }
565
566
567 unittest
568 {
569 testNotify();
570 testNotifyAll();
571 testWaitTimeout();
572 }
573 }