Mercurial > projects > ldc
comparison tango/tango/core/sync/Condition.d @ 132:1700239cab2e trunk
[svn r136] MAJOR UNSTABLE UPDATE!!!
Initial commit after moving to Tango instead of Phobos.
Lots of bugfixes...
This build is not suitable for most things.
author | lindquist |
---|---|
date | Fri, 11 Jan 2008 17:57:40 +0100 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
131:5825d48b27d1 | 132:1700239cab2e |
---|---|
1 /** | |
2 * The condition module provides a primitive for synchronized condition | |
3 * checking. | |
4 * | |
5 * Copyright: Copyright (C) 2005-2006 Sean Kelly. All rights reserved. | |
6 * License: BSD style: $(LICENSE) | |
7 * Authors: Sean Kelly | |
8 */ | |
9 module tango.core.sync.Condition; | |
10 | |
11 | |
12 public import tango.core.Exception : SyncException; | |
13 public import tango.core.sync.Mutex; | |
14 | |
15 version( Win32 ) | |
16 { | |
17 private import tango.core.sync.Semaphore; | |
18 private import tango.sys.win32.UserGdi; | |
19 } | |
20 else version( Posix ) | |
21 { | |
22 private import tango.core.sync.Config; | |
23 private import tango.stdc.errno; | |
24 private import tango.stdc.posix.pthread; | |
25 private import tango.stdc.posix.time; | |
26 } | |
27 | |
28 | |
29 //////////////////////////////////////////////////////////////////////////////// | |
30 // Condition | |
31 // | |
32 // void wait(); | |
33 // void notify(); | |
34 // void notifyAll(); | |
35 //////////////////////////////////////////////////////////////////////////////// | |
36 | |
37 | |
38 /** | |
39 * This class represents a condition variable as concieved by C.A.R. Hoare. As | |
40 * per Mesa type monitors however, "signal" has been replaced with "notify" to | |
41 * indicate that control is not transferred to the waiter when a notification | |
42 * is sent. | |
43 */ | |
44 class Condition | |
45 { | |
46 //////////////////////////////////////////////////////////////////////////// | |
47 // Initialization | |
48 //////////////////////////////////////////////////////////////////////////// | |
49 | |
50 | |
51 /** | |
52 * Initializes a condition object which is associated with the supplied | |
53 * mutex object. | |
54 * | |
55 * Params: | |
56 * m = The mutex with which this condition will be associated. | |
57 * | |
58 * Throws: | |
59 * SyncException on error. | |
60 */ | |
61 this( Mutex m ) | |
62 { | |
63 version( Win32 ) | |
64 { | |
65 m_blockLock = CreateSemaphoreA( null, 1, 1, null ); | |
66 if( m_blockLock == m_blockLock.init ) | |
67 throw new SyncException( "Unable to initialize condition" ); | |
68 scope(failure) CloseHandle( m_blockLock ); | |
69 | |
70 m_blockQueue = CreateSemaphoreA( null, 0, int.max, null ); | |
71 if( m_blockQueue == m_blockQueue.init ) | |
72 throw new SyncException( "Unable to initialize condition" ); | |
73 scope(failure) CloseHandle( m_blockQueue ); | |
74 | |
75 InitializeCriticalSection( &m_unblockLock ); | |
76 m_assocMutex = m; | |
77 } | |
78 else version( Posix ) | |
79 { | |
80 m_mutexAddr = m.handleAddr(); | |
81 | |
82 int rc = pthread_cond_init( &m_hndl, null ); | |
83 if( rc ) | |
84 throw new SyncException( "Unable to initialize condition" ); | |
85 } | |
86 } | |
87 | |
88 | |
89 ~this() | |
90 { | |
91 version( Win32 ) | |
92 { | |
93 BOOL rc = CloseHandle( m_blockLock ); | |
94 assert( rc, "Unable to destroy condition" ); | |
95 rc = CloseHandle( m_blockQueue ); | |
96 assert( rc, "Unable to destroy condition" ); | |
97 DeleteCriticalSection( &m_unblockLock ); | |
98 } | |
99 else version( Posix ) | |
100 { | |
101 int rc = pthread_cond_destroy( &m_hndl ); | |
102 assert( !rc, "Unable to destroy condition" ); | |
103 } | |
104 } | |
105 | |
106 | |
107 //////////////////////////////////////////////////////////////////////////// | |
108 // General Actions | |
109 //////////////////////////////////////////////////////////////////////////// | |
110 | |
111 | |
112 /** | |
113 * Wait until notified. | |
114 * | |
115 * Throws: | |
116 * SyncException on error. | |
117 */ | |
118 void wait() | |
119 { | |
120 version( Win32 ) | |
121 { | |
122 timedWait( INFINITE ); | |
123 } | |
124 else version( Posix ) | |
125 { | |
126 int rc = pthread_cond_wait( &m_hndl, m_mutexAddr ); | |
127 if( rc ) | |
128 throw new SyncException( "Unable to wait for condition" ); | |
129 } | |
130 } | |
131 | |
132 | |
133 /** | |
134 * Suspends the calling thread until a notification occurs or until the | |
135 * supplied time period has elapsed. The supplied period may be up to a | |
136 * maximum of (uint.max - 1) milliseconds. | |
137 * | |
138 * Params: | |
139 * period = The time to wait, in seconds (fractional values are accepted). | |
140 * | |
141 * In: | |
142 * period must be less than (uint.max - 1) milliseconds. | |
143 * | |
144 * Returns: | |
145 * true if notified before the timeout and false if not. | |
146 * | |
147 * Throws: | |
148 * SyncException on error. | |
149 */ | |
150 bool wait( double period ) | |
151 in | |
152 { | |
153 // NOTE: The fractional value added to period is to correct fp error. | |
154 assert( period * 1000 + 0.1 < uint.max - 1 ); | |
155 } | |
156 body | |
157 { | |
158 version( Win32 ) | |
159 { | |
160 return timedWait( cast(uint)(period * 1000 + 0.1) ); | |
161 } | |
162 else version( Posix ) | |
163 { | |
164 timespec t; | |
165 | |
166 getTimespec( t ); | |
167 adjTimespec( t, period ); | |
168 int rc = pthread_cond_timedwait( &m_hndl, m_mutexAddr, &t ); | |
169 if( !rc ) | |
170 return true; | |
171 if( rc == ETIMEDOUT ) | |
172 return false; | |
173 throw new SyncException( "Unable to wait for condition" ); | |
174 } | |
175 } | |
176 | |
177 /** | |
178 * Notifies one waiter. | |
179 * | |
180 * Throws: | |
181 * SyncException on error. | |
182 */ | |
183 void notify() | |
184 { | |
185 version( Win32 ) | |
186 { | |
187 notify( false ); | |
188 } | |
189 else version( Posix ) | |
190 { | |
191 int rc = pthread_cond_signal( &m_hndl ); | |
192 if( rc ) | |
193 throw new SyncException( "Unable to notify condition" ); | |
194 } | |
195 } | |
196 | |
197 | |
198 /** | |
199 * Notifies all waiters. | |
200 * | |
201 * Throws: | |
202 * SyncException on error. | |
203 */ | |
204 void notifyAll() | |
205 { | |
206 version( Win32 ) | |
207 { | |
208 notify( true ); | |
209 } | |
210 else version( Posix ) | |
211 { | |
212 int rc = pthread_cond_broadcast( &m_hndl ); | |
213 if( rc ) | |
214 throw new SyncException( "Unable to notify condition" ); | |
215 } | |
216 } | |
217 | |
218 | |
219 private: | |
220 version( Win32 ) | |
221 { | |
222 bool timedWait( DWORD timeout ) | |
223 { | |
224 int numSignalsLeft; | |
225 int numWaitersGone; | |
226 DWORD rc; | |
227 | |
228 rc = WaitForSingleObject( m_blockLock, INFINITE ); | |
229 assert( rc == WAIT_OBJECT_0 ); | |
230 | |
231 m_numWaitersBlocked++; | |
232 | |
233 rc = ReleaseSemaphore( m_blockLock, 1, null ); | |
234 assert( rc ); | |
235 | |
236 m_assocMutex.unlock(); | |
237 scope(failure) m_assocMutex.lock(); | |
238 | |
239 rc = WaitForSingleObject( m_blockQueue, timeout ); | |
240 assert( rc == WAIT_OBJECT_0 || rc == WAIT_TIMEOUT ); | |
241 bool timedOut = (rc == WAIT_TIMEOUT); | |
242 | |
243 EnterCriticalSection( &m_unblockLock ); | |
244 scope(failure) LeaveCriticalSection( &m_unblockLock ); | |
245 | |
246 if( (numSignalsLeft = m_numWaitersToUnblock) != 0 ) | |
247 { | |
248 if ( timedOut ) | |
249 { | |
250 // timeout (or canceled) | |
251 if( m_numWaitersBlocked != 0 ) | |
252 { | |
253 m_numWaitersBlocked--; | |
254 // do not unblock next waiter below (already unblocked) | |
255 numSignalsLeft = 0; | |
256 } | |
257 else | |
258 { | |
259 // spurious wakeup pending!! | |
260 m_numWaitersGone = 1; | |
261 } | |
262 } | |
263 if( --m_numWaitersToUnblock == 0 ) | |
264 { | |
265 if( m_numWaitersBlocked != 0 ) | |
266 { | |
267 // open the gate | |
268 rc = ReleaseSemaphore( m_blockLock, 1, null ); | |
269 assert( rc ); | |
270 // do not open the gate below again | |
271 numSignalsLeft = 0; | |
272 } | |
273 else if( (numWaitersGone = m_numWaitersGone) != 0 ) | |
274 { | |
275 m_numWaitersGone = 0; | |
276 } | |
277 } | |
278 } | |
279 else if( ++m_numWaitersGone == int.max / 2 ) | |
280 { | |
281 // timeout/canceled or spurious event :-) | |
282 rc = WaitForSingleObject( m_blockLock, INFINITE ); | |
283 assert( rc == WAIT_OBJECT_0 ); | |
284 // something is going on here - test of timeouts? | |
285 m_numWaitersBlocked -= m_numWaitersGone; | |
286 rc = ReleaseSemaphore( m_blockLock, 1, null ); | |
287 assert( rc == WAIT_OBJECT_0 ); | |
288 m_numWaitersGone = 0; | |
289 } | |
290 | |
291 LeaveCriticalSection( &m_unblockLock ); | |
292 | |
293 if( numSignalsLeft == 1 ) | |
294 { | |
295 // better now than spurious later (same as ResetEvent) | |
296 for( ; numWaitersGone > 0; --numWaitersGone ) | |
297 { | |
298 rc = WaitForSingleObject( m_blockQueue, INFINITE ); | |
299 assert( rc == WAIT_OBJECT_0 ); | |
300 } | |
301 // open the gate | |
302 rc = ReleaseSemaphore( m_blockLock, 1, null ); | |
303 assert( rc ); | |
304 } | |
305 else if( numSignalsLeft != 0 ) | |
306 { | |
307 // unblock next waiter | |
308 rc = ReleaseSemaphore( m_blockQueue, 1, null ); | |
309 assert( rc ); | |
310 } | |
311 m_assocMutex.lock(); | |
312 return !timedOut; | |
313 } | |
314 | |
315 | |
316 void notify( bool all ) | |
317 { | |
318 DWORD rc; | |
319 | |
320 EnterCriticalSection( &m_unblockLock ); | |
321 scope(failure) LeaveCriticalSection( &m_unblockLock ); | |
322 | |
323 if( m_numWaitersToUnblock != 0 ) | |
324 { | |
325 if( m_numWaitersBlocked == 0 ) | |
326 { | |
327 LeaveCriticalSection( &m_unblockLock ); | |
328 return; | |
329 } | |
330 if( all ) | |
331 { | |
332 m_numWaitersToUnblock += m_numWaitersBlocked; | |
333 m_numWaitersBlocked = 0; | |
334 } | |
335 else | |
336 { | |
337 m_numWaitersToUnblock++; | |
338 m_numWaitersBlocked--; | |
339 } | |
340 LeaveCriticalSection( &m_unblockLock ); | |
341 } | |
342 else if( m_numWaitersBlocked > m_numWaitersGone ) | |
343 { | |
344 rc = WaitForSingleObject( m_blockLock, INFINITE ); | |
345 assert( rc == WAIT_OBJECT_0 ); | |
346 if( 0 != m_numWaitersGone ) | |
347 { | |
348 m_numWaitersBlocked -= m_numWaitersGone; | |
349 m_numWaitersGone = 0; | |
350 } | |
351 if( all ) | |
352 { | |
353 m_numWaitersToUnblock = m_numWaitersBlocked; | |
354 m_numWaitersBlocked = 0; | |
355 } | |
356 else | |
357 { | |
358 m_numWaitersToUnblock = 1; | |
359 m_numWaitersBlocked--; | |
360 } | |
361 LeaveCriticalSection( &m_unblockLock ); | |
362 rc = ReleaseSemaphore( m_blockQueue, 1, null ); | |
363 assert( rc ); | |
364 } | |
365 else | |
366 { | |
367 LeaveCriticalSection( &m_unblockLock ); | |
368 } | |
369 } | |
370 | |
371 | |
372 // NOTE: This implementation uses Algorithm 8c as described here: | |
373 // http://groups.google.com/group/comp.programming.threads/ | |
374 // browse_frm/thread/1692bdec8040ba40/e7a5f9d40e86503a | |
375 HANDLE m_blockLock; // auto-reset event (now semaphore) | |
376 HANDLE m_blockQueue; // auto-reset event (now semaphore) | |
377 Mutex m_assocMutex; // external mutex/CS | |
378 CRITICAL_SECTION m_unblockLock; // internal mutex/CS | |
379 int m_numWaitersGone = 0; | |
380 int m_numWaitersBlocked = 0; | |
381 int m_numWaitersToUnblock = 0; | |
382 } | |
383 else version( Posix ) | |
384 { | |
385 pthread_cond_t m_hndl; | |
386 pthread_mutex_t* m_mutexAddr; | |
387 } | |
388 } | |
389 | |
390 | |
391 //////////////////////////////////////////////////////////////////////////////// | |
392 // Unit Tests | |
393 //////////////////////////////////////////////////////////////////////////////// | |
394 | |
395 | |
396 debug( UnitTest ) | |
397 { | |
398 private import tango.core.Thread; | |
399 private import tango.core.sync.Mutex; | |
400 private import tango.core.sync.Semaphore; | |
401 | |
402 | |
403 void testNotify() | |
404 { | |
405 auto mutex = new Mutex; | |
406 auto condReady = new Condition( mutex ); | |
407 auto semDone = new Semaphore; | |
408 auto synLoop = new Object; | |
409 int numWaiters = 10; | |
410 int numTries = 10; | |
411 int numReady = 0; | |
412 int numTotal = 0; | |
413 int numDone = 0; | |
414 int numPost = 0; | |
415 | |
416 void waiter() | |
417 { | |
418 for( int i = 0; i < numTries; ++i ) | |
419 { | |
420 synchronized( mutex ) | |
421 { | |
422 while( numReady < 1 ) | |
423 { | |
424 condReady.wait(); | |
425 } | |
426 --numReady; | |
427 ++numTotal; | |
428 } | |
429 | |
430 synchronized( synLoop ) | |
431 { | |
432 ++numDone; | |
433 } | |
434 semDone.wait(); | |
435 } | |
436 } | |
437 | |
438 auto group = new ThreadGroup; | |
439 | |
440 for( int i = 0; i < numWaiters; ++i ) | |
441 group.create( &waiter ); | |
442 | |
443 for( int i = 0; i < numTries; ++i ) | |
444 { | |
445 for( int j = 0; j < numWaiters; ++j ) | |
446 { | |
447 synchronized( mutex ) | |
448 { | |
449 ++numReady; | |
450 condReady.notify(); | |
451 } | |
452 } | |
453 while( true ) | |
454 { | |
455 synchronized( synLoop ) | |
456 { | |
457 if( numDone >= numWaiters ) | |
458 break; | |
459 } | |
460 Thread.yield(); | |
461 } | |
462 for( int j = 0; j < numWaiters; ++j ) | |
463 { | |
464 semDone.notify(); | |
465 } | |
466 } | |
467 | |
468 group.joinAll(); | |
469 assert( numTotal == numWaiters * numTries ); | |
470 } | |
471 | |
472 | |
473 void testNotifyAll() | |
474 { | |
475 auto mutex = new Mutex; | |
476 auto condReady = new Condition( mutex ); | |
477 int numWaiters = 10; | |
478 int numReady = 0; | |
479 int numDone = 0; | |
480 bool alert = false; | |
481 | |
482 void waiter() | |
483 { | |
484 synchronized( mutex ) | |
485 { | |
486 ++numReady; | |
487 while( !alert ) | |
488 condReady.wait(); | |
489 ++numDone; | |
490 } | |
491 } | |
492 | |
493 auto group = new ThreadGroup; | |
494 | |
495 for( int i = 0; i < numWaiters; ++i ) | |
496 group.create( &waiter ); | |
497 | |
498 while( true ) | |
499 { | |
500 synchronized( mutex ) | |
501 { | |
502 if( numReady >= numWaiters ) | |
503 { | |
504 alert = true; | |
505 condReady.notifyAll(); | |
506 break; | |
507 } | |
508 } | |
509 Thread.yield(); | |
510 } | |
511 group.joinAll(); | |
512 assert( numReady == numWaiters && numDone == numWaiters ); | |
513 } | |
514 | |
515 | |
516 void testWaitTimeout() | |
517 { | |
518 auto mutex = new Mutex; | |
519 auto condReady = new Condition( mutex ); | |
520 bool waiting = false; | |
521 bool alertedOne = true; | |
522 bool alertedTwo = true; | |
523 | |
524 void waiter() | |
525 { | |
526 synchronized( mutex ) | |
527 { | |
528 waiting = true; | |
529 alertedOne = condReady.wait( 1 ); | |
530 alertedTwo = condReady.wait( 1 ); | |
531 } | |
532 } | |
533 | |
534 auto thread = new Thread( &waiter ); | |
535 thread.start(); | |
536 | |
537 while( true ) | |
538 { | |
539 synchronized( mutex ) | |
540 { | |
541 if( waiting ) | |
542 { | |
543 condReady.notify(); | |
544 break; | |
545 } | |
546 } | |
547 Thread.yield(); | |
548 } | |
549 thread.join(); | |
550 assert( waiting && alertedOne && !alertedTwo ); | |
551 } | |
552 | |
553 | |
554 unittest | |
555 { | |
556 testNotify(); | |
557 testNotifyAll(); | |
558 testWaitTimeout(); | |
559 } | |
560 } |