Mercurial > projects > ldc
comparison druntime/src/common/core/sync/condition.d @ 1458:e0b2d67cfe7c
Added druntime (this should be removed once it works).
author | Robert Clipsham <robert@octarineparrot.com> |
---|---|
date | Tue, 02 Jun 2009 17:43:06 +0100 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
1456:7b218ec1044f | 1458:e0b2d67cfe7c |
---|---|
1 /** | |
2 * The condition module provides a primitive for synchronized condition | |
3 * checking. | |
4 * | |
5 * Copyright: Copyright Sean Kelly 2005 - 2009. | |
6 * License: <a href="http://www.boost.org/LICENSE_1_0.txt>Boost License 1.0</a>. | |
7 * Authors: Sean Kelly | |
8 * | |
9 * Copyright Sean Kelly 2005 - 2009. | |
10 * Distributed under the Boost Software License, Version 1.0. | |
11 * (See accompanying file LICENSE_1_0.txt or copy at | |
12 * http://www.boost.org/LICENSE_1_0.txt) | |
13 */ | |
14 module core.sync.condition; | |
15 | |
16 | |
17 public import core.sync.exception; | |
18 public import core.sync.mutex; | |
19 | |
20 version( Win32 ) | |
21 { | |
22 private import core.sync.semaphore; | |
23 private import core.sys.windows.windows; | |
24 } | |
25 else version( Posix ) | |
26 { | |
27 private import core.sync.config; | |
28 private import core.stdc.errno; | |
29 private import core.sys.posix.pthread; | |
30 private import core.sys.posix.time; | |
31 } | |
32 | |
33 | |
34 //////////////////////////////////////////////////////////////////////////////// | |
35 // Condition | |
36 // | |
37 // void wait(); | |
38 // void notify(); | |
39 // void notifyAll(); | |
40 //////////////////////////////////////////////////////////////////////////////// | |
41 | |
42 | |
43 /** | |
44 * This class represents a condition variable as concieved by C.A.R. Hoare. As | |
45 * per Mesa type monitors however, "signal" has been replaced with "notify" to | |
46 * indicate that control is not transferred to the waiter when a notification | |
47 * is sent. | |
48 */ | |
49 class Condition | |
50 { | |
51 //////////////////////////////////////////////////////////////////////////// | |
52 // Initialization | |
53 //////////////////////////////////////////////////////////////////////////// | |
54 | |
55 | |
56 /** | |
57 * Initializes a condition object which is associated with the supplied | |
58 * mutex object. | |
59 * | |
60 * Params: | |
61 * m = The mutex with which this condition will be associated. | |
62 * | |
63 * Throws: | |
64 * SyncException on error. | |
65 */ | |
66 this( Mutex m ) | |
67 { | |
68 version( Win32 ) | |
69 { | |
70 m_blockLock = CreateSemaphoreA( null, 1, 1, null ); | |
71 if( m_blockLock == m_blockLock.init ) | |
72 throw new SyncException( "Unable to initialize condition" ); | |
73 scope(failure) CloseHandle( m_blockLock ); | |
74 | |
75 m_blockQueue = CreateSemaphoreA( null, 0, int.max, null ); | |
76 if( m_blockQueue == m_blockQueue.init ) | |
77 throw new SyncException( "Unable to initialize condition" ); | |
78 scope(failure) CloseHandle( m_blockQueue ); | |
79 | |
80 InitializeCriticalSection( &m_unblockLock ); | |
81 m_assocMutex = m; | |
82 } | |
83 else version( Posix ) | |
84 { | |
85 m_mutexAddr = m.handleAddr(); | |
86 | |
87 int rc = pthread_cond_init( &m_hndl, null ); | |
88 if( rc ) | |
89 throw new SyncException( "Unable to initialize condition" ); | |
90 } | |
91 } | |
92 | |
93 | |
94 ~this() | |
95 { | |
96 version( Win32 ) | |
97 { | |
98 BOOL rc = CloseHandle( m_blockLock ); | |
99 assert( rc, "Unable to destroy condition" ); | |
100 rc = CloseHandle( m_blockQueue ); | |
101 assert( rc, "Unable to destroy condition" ); | |
102 DeleteCriticalSection( &m_unblockLock ); | |
103 } | |
104 else version( Posix ) | |
105 { | |
106 int rc = pthread_cond_destroy( &m_hndl ); | |
107 assert( !rc, "Unable to destroy condition" ); | |
108 } | |
109 } | |
110 | |
111 | |
112 //////////////////////////////////////////////////////////////////////////// | |
113 // General Actions | |
114 //////////////////////////////////////////////////////////////////////////// | |
115 | |
116 | |
117 /** | |
118 * Wait until notified. | |
119 * | |
120 * Throws: | |
121 * SyncException on error. | |
122 */ | |
123 void wait() | |
124 { | |
125 version( Win32 ) | |
126 { | |
127 timedWait( INFINITE ); | |
128 } | |
129 else version( Posix ) | |
130 { | |
131 int rc = pthread_cond_wait( &m_hndl, m_mutexAddr ); | |
132 if( rc ) | |
133 throw new SyncException( "Unable to wait for condition" ); | |
134 } | |
135 } | |
136 | |
137 | |
138 /** | |
139 * Suspends the calling thread until a notification occurs or until the | |
140 * supplied time period has elapsed. | |
141 * | |
142 * Params: | |
143 * period = The time to wait, in 100 nanosecond intervals. This value may | |
144 * be adjusted to equal to the maximum wait period supported by | |
145 * the target platform if it is too large. | |
146 * | |
147 * In: | |
148 * period must be non-negative. | |
149 * | |
150 * Throws: | |
151 * SyncException on error. | |
152 * | |
153 * Returns: | |
154 * true if notified before the timeout and false if not. | |
155 */ | |
156 bool wait( long period ) | |
157 in | |
158 { | |
159 assert( period >= 0 ); | |
160 } | |
161 body | |
162 { | |
163 version( Win32 ) | |
164 { | |
165 enum : uint | |
166 { | |
167 TICKS_PER_MILLI = 10_000, | |
168 MAX_WAIT_MILLIS = uint.max - 1 | |
169 } | |
170 | |
171 period /= TICKS_PER_MILLI; | |
172 if( period > MAX_WAIT_MILLIS ) | |
173 period = MAX_WAIT_MILLIS; | |
174 return timedWait( cast(uint) period ); | |
175 } | |
176 else version( Posix ) | |
177 { | |
178 timespec t = void; | |
179 mktspec( t, period ); | |
180 | |
181 int rc = pthread_cond_timedwait( &m_hndl, m_mutexAddr, &t ); | |
182 if( !rc ) | |
183 return true; | |
184 if( rc == ETIMEDOUT ) | |
185 return false; | |
186 throw new SyncException( "Unable to wait for condition" ); | |
187 } | |
188 } | |
189 | |
190 /** | |
191 * Notifies one waiter. | |
192 * | |
193 * Throws: | |
194 * SyncException on error. | |
195 */ | |
196 void notify() | |
197 { | |
198 version( Win32 ) | |
199 { | |
200 notify( false ); | |
201 } | |
202 else version( Posix ) | |
203 { | |
204 int rc = pthread_cond_signal( &m_hndl ); | |
205 if( rc ) | |
206 throw new SyncException( "Unable to notify condition" ); | |
207 } | |
208 } | |
209 | |
210 | |
211 /** | |
212 * Notifies all waiters. | |
213 * | |
214 * Throws: | |
215 * SyncException on error. | |
216 */ | |
217 void notifyAll() | |
218 { | |
219 version( Win32 ) | |
220 { | |
221 notify( true ); | |
222 } | |
223 else version( Posix ) | |
224 { | |
225 int rc = pthread_cond_broadcast( &m_hndl ); | |
226 if( rc ) | |
227 throw new SyncException( "Unable to notify condition" ); | |
228 } | |
229 } | |
230 | |
231 | |
232 private: | |
233 version( Win32 ) | |
234 { | |
235 bool timedWait( DWORD timeout ) | |
236 { | |
237 int numSignalsLeft; | |
238 int numWaitersGone; | |
239 DWORD rc; | |
240 | |
241 rc = WaitForSingleObject( m_blockLock, INFINITE ); | |
242 assert( rc == WAIT_OBJECT_0 ); | |
243 | |
244 m_numWaitersBlocked++; | |
245 | |
246 rc = ReleaseSemaphore( m_blockLock, 1, null ); | |
247 assert( rc ); | |
248 | |
249 m_assocMutex.unlock(); | |
250 scope(failure) m_assocMutex.lock(); | |
251 | |
252 rc = WaitForSingleObject( m_blockQueue, timeout ); | |
253 assert( rc == WAIT_OBJECT_0 || rc == WAIT_TIMEOUT ); | |
254 bool timedOut = (rc == WAIT_TIMEOUT); | |
255 | |
256 EnterCriticalSection( &m_unblockLock ); | |
257 scope(failure) LeaveCriticalSection( &m_unblockLock ); | |
258 | |
259 if( (numSignalsLeft = m_numWaitersToUnblock) != 0 ) | |
260 { | |
261 if ( timedOut ) | |
262 { | |
263 // timeout (or canceled) | |
264 if( m_numWaitersBlocked != 0 ) | |
265 { | |
266 m_numWaitersBlocked--; | |
267 // do not unblock next waiter below (already unblocked) | |
268 numSignalsLeft = 0; | |
269 } | |
270 else | |
271 { | |
272 // spurious wakeup pending!! | |
273 m_numWaitersGone = 1; | |
274 } | |
275 } | |
276 if( --m_numWaitersToUnblock == 0 ) | |
277 { | |
278 if( m_numWaitersBlocked != 0 ) | |
279 { | |
280 // open the gate | |
281 rc = ReleaseSemaphore( m_blockLock, 1, null ); | |
282 assert( rc ); | |
283 // do not open the gate below again | |
284 numSignalsLeft = 0; | |
285 } | |
286 else if( (numWaitersGone = m_numWaitersGone) != 0 ) | |
287 { | |
288 m_numWaitersGone = 0; | |
289 } | |
290 } | |
291 } | |
292 else if( ++m_numWaitersGone == int.max / 2 ) | |
293 { | |
294 // timeout/canceled or spurious event :-) | |
295 rc = WaitForSingleObject( m_blockLock, INFINITE ); | |
296 assert( rc == WAIT_OBJECT_0 ); | |
297 // something is going on here - test of timeouts? | |
298 m_numWaitersBlocked -= m_numWaitersGone; | |
299 rc = ReleaseSemaphore( m_blockLock, 1, null ); | |
300 assert( rc == WAIT_OBJECT_0 ); | |
301 m_numWaitersGone = 0; | |
302 } | |
303 | |
304 LeaveCriticalSection( &m_unblockLock ); | |
305 | |
306 if( numSignalsLeft == 1 ) | |
307 { | |
308 // better now than spurious later (same as ResetEvent) | |
309 for( ; numWaitersGone > 0; --numWaitersGone ) | |
310 { | |
311 rc = WaitForSingleObject( m_blockQueue, INFINITE ); | |
312 assert( rc == WAIT_OBJECT_0 ); | |
313 } | |
314 // open the gate | |
315 rc = ReleaseSemaphore( m_blockLock, 1, null ); | |
316 assert( rc ); | |
317 } | |
318 else if( numSignalsLeft != 0 ) | |
319 { | |
320 // unblock next waiter | |
321 rc = ReleaseSemaphore( m_blockQueue, 1, null ); | |
322 assert( rc ); | |
323 } | |
324 m_assocMutex.lock(); | |
325 return !timedOut; | |
326 } | |
327 | |
328 | |
329 void notify( bool all ) | |
330 { | |
331 DWORD rc; | |
332 | |
333 EnterCriticalSection( &m_unblockLock ); | |
334 scope(failure) LeaveCriticalSection( &m_unblockLock ); | |
335 | |
336 if( m_numWaitersToUnblock != 0 ) | |
337 { | |
338 if( m_numWaitersBlocked == 0 ) | |
339 { | |
340 LeaveCriticalSection( &m_unblockLock ); | |
341 return; | |
342 } | |
343 if( all ) | |
344 { | |
345 m_numWaitersToUnblock += m_numWaitersBlocked; | |
346 m_numWaitersBlocked = 0; | |
347 } | |
348 else | |
349 { | |
350 m_numWaitersToUnblock++; | |
351 m_numWaitersBlocked--; | |
352 } | |
353 LeaveCriticalSection( &m_unblockLock ); | |
354 } | |
355 else if( m_numWaitersBlocked > m_numWaitersGone ) | |
356 { | |
357 rc = WaitForSingleObject( m_blockLock, INFINITE ); | |
358 assert( rc == WAIT_OBJECT_0 ); | |
359 if( 0 != m_numWaitersGone ) | |
360 { | |
361 m_numWaitersBlocked -= m_numWaitersGone; | |
362 m_numWaitersGone = 0; | |
363 } | |
364 if( all ) | |
365 { | |
366 m_numWaitersToUnblock = m_numWaitersBlocked; | |
367 m_numWaitersBlocked = 0; | |
368 } | |
369 else | |
370 { | |
371 m_numWaitersToUnblock = 1; | |
372 m_numWaitersBlocked--; | |
373 } | |
374 LeaveCriticalSection( &m_unblockLock ); | |
375 rc = ReleaseSemaphore( m_blockQueue, 1, null ); | |
376 assert( rc ); | |
377 } | |
378 else | |
379 { | |
380 LeaveCriticalSection( &m_unblockLock ); | |
381 } | |
382 } | |
383 | |
384 | |
385 // NOTE: This implementation uses Algorithm 8c as described here: | |
386 // http://groups.google.com/group/comp.programming.threads/ | |
387 // browse_frm/thread/1692bdec8040ba40/e7a5f9d40e86503a | |
388 HANDLE m_blockLock; // auto-reset event (now semaphore) | |
389 HANDLE m_blockQueue; // auto-reset event (now semaphore) | |
390 Mutex m_assocMutex; // external mutex/CS | |
391 CRITICAL_SECTION m_unblockLock; // internal mutex/CS | |
392 int m_numWaitersGone = 0; | |
393 int m_numWaitersBlocked = 0; | |
394 int m_numWaitersToUnblock = 0; | |
395 } | |
396 else version( Posix ) | |
397 { | |
398 pthread_cond_t m_hndl; | |
399 pthread_mutex_t* m_mutexAddr; | |
400 } | |
401 } | |
402 | |
403 | |
404 //////////////////////////////////////////////////////////////////////////////// | |
405 // Unit Tests | |
406 //////////////////////////////////////////////////////////////////////////////// | |
407 | |
408 | |
409 version( unittest ) | |
410 { | |
411 private import core.thread; | |
412 private import core.sync.mutex; | |
413 private import core.sync.semaphore; | |
414 | |
415 | |
416 void testNotify() | |
417 { | |
418 auto mutex = new Mutex; | |
419 auto condReady = new Condition( mutex ); | |
420 auto semDone = new Semaphore; | |
421 auto synLoop = new Object; | |
422 int numWaiters = 10; | |
423 int numTries = 10; | |
424 int numReady = 0; | |
425 int numTotal = 0; | |
426 int numDone = 0; | |
427 int numPost = 0; | |
428 | |
429 void waiter() | |
430 { | |
431 for( int i = 0; i < numTries; ++i ) | |
432 { | |
433 synchronized( mutex ) | |
434 { | |
435 while( numReady < 1 ) | |
436 { | |
437 condReady.wait(); | |
438 } | |
439 --numReady; | |
440 ++numTotal; | |
441 } | |
442 | |
443 synchronized( synLoop ) | |
444 { | |
445 ++numDone; | |
446 } | |
447 semDone.wait(); | |
448 } | |
449 } | |
450 | |
451 auto group = new ThreadGroup; | |
452 | |
453 for( int i = 0; i < numWaiters; ++i ) | |
454 group.create( &waiter ); | |
455 | |
456 for( int i = 0; i < numTries; ++i ) | |
457 { | |
458 for( int j = 0; j < numWaiters; ++j ) | |
459 { | |
460 synchronized( mutex ) | |
461 { | |
462 ++numReady; | |
463 condReady.notify(); | |
464 } | |
465 } | |
466 while( true ) | |
467 { | |
468 synchronized( synLoop ) | |
469 { | |
470 if( numDone >= numWaiters ) | |
471 break; | |
472 } | |
473 Thread.yield(); | |
474 } | |
475 for( int j = 0; j < numWaiters; ++j ) | |
476 { | |
477 semDone.notify(); | |
478 } | |
479 } | |
480 | |
481 group.joinAll(); | |
482 assert( numTotal == numWaiters * numTries ); | |
483 } | |
484 | |
485 | |
486 void testNotifyAll() | |
487 { | |
488 auto mutex = new Mutex; | |
489 auto condReady = new Condition( mutex ); | |
490 int numWaiters = 10; | |
491 int numReady = 0; | |
492 int numDone = 0; | |
493 bool alert = false; | |
494 | |
495 void waiter() | |
496 { | |
497 synchronized( mutex ) | |
498 { | |
499 ++numReady; | |
500 while( !alert ) | |
501 condReady.wait(); | |
502 ++numDone; | |
503 } | |
504 } | |
505 | |
506 auto group = new ThreadGroup; | |
507 | |
508 for( int i = 0; i < numWaiters; ++i ) | |
509 group.create( &waiter ); | |
510 | |
511 while( true ) | |
512 { | |
513 synchronized( mutex ) | |
514 { | |
515 if( numReady >= numWaiters ) | |
516 { | |
517 alert = true; | |
518 condReady.notifyAll(); | |
519 break; | |
520 } | |
521 } | |
522 Thread.yield(); | |
523 } | |
524 group.joinAll(); | |
525 assert( numReady == numWaiters && numDone == numWaiters ); | |
526 } | |
527 | |
528 | |
529 void testWaitTimeout() | |
530 { | |
531 auto mutex = new Mutex; | |
532 auto condReady = new Condition( mutex ); | |
533 bool waiting = false; | |
534 bool alertedOne = true; | |
535 bool alertedTwo = true; | |
536 | |
537 void waiter() | |
538 { | |
539 synchronized( mutex ) | |
540 { | |
541 waiting = true; | |
542 alertedOne = condReady.wait( 10_000_000 ); // 1s | |
543 alertedTwo = condReady.wait( 10_000_000 ); // 1s | |
544 } | |
545 } | |
546 | |
547 auto thread = new Thread( &waiter ); | |
548 thread.start(); | |
549 | |
550 while( true ) | |
551 { | |
552 synchronized( mutex ) | |
553 { | |
554 if( waiting ) | |
555 { | |
556 condReady.notify(); | |
557 break; | |
558 } | |
559 } | |
560 Thread.yield(); | |
561 } | |
562 thread.join(); | |
563 assert( waiting && alertedOne && !alertedTwo ); | |
564 } | |
565 | |
566 | |
567 unittest | |
568 { | |
569 testNotify(); | |
570 testNotifyAll(); | |
571 testWaitTimeout(); | |
572 } | |
573 } |