Mercurial > projects > ldc
comparison druntime/src/common/core/sync/semaphore.d @ 1458:e0b2d67cfe7c
Added druntime (this should be removed once it works).
author | Robert Clipsham <robert@octarineparrot.com> |
---|---|
date | Tue, 02 Jun 2009 17:43:06 +0100 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
1456:7b218ec1044f | 1458:e0b2d67cfe7c |
---|---|
1 /** | |
2 * The semaphore module provides a general use semaphore for synchronization. | |
3 * | |
4 * Copyright: Copyright Sean Kelly 2005 - 2009. | |
5 * License: <a href="http://www.boost.org/LICENSE_1_0.txt>Boost License 1.0</a>. | |
6 * Authors: Sean Kelly | |
7 * | |
8 * Copyright Sean Kelly 2005 - 2009. | |
9 * Distributed under the Boost Software License, Version 1.0. | |
10 * (See accompanying file LICENSE_1_0.txt or copy at | |
11 * http://www.boost.org/LICENSE_1_0.txt) | |
12 */ | |
13 module core.sync.semaphore; | |
14 | |
15 | |
16 public import core.sync.exception; | |
17 | |
18 version( Win32 ) | |
19 { | |
20 private import core.sys.windows.windows; | |
21 } | |
22 else version( OSX ) | |
23 { | |
24 private import core.sync.config; | |
25 private import core.stdc.errno; | |
26 private import core.sys.posix.time; | |
27 private import core.sys.osx.mach.semaphore; | |
28 } | |
29 else version( Posix ) | |
30 { | |
31 private import core.sync.config; | |
32 private import core.stdc.errno; | |
33 private import core.sys.posix.pthread; | |
34 private import core.sys.posix.semaphore; | |
35 } | |
36 | |
37 | |
38 //////////////////////////////////////////////////////////////////////////////// | |
39 // Semaphore | |
40 // | |
41 // void wait(); | |
42 // void notify(); | |
43 // bool tryWait(); | |
44 //////////////////////////////////////////////////////////////////////////////// | |
45 | |
46 | |
47 /** | |
48 * This class represents a general counting semaphore as concieved by Edsger | |
49 * Dijkstra. As per Mesa type monitors however, "signal" has been replaced | |
50 * with "notify" to indicate that control is not transferred to the waiter when | |
51 * a notification is sent. | |
52 */ | |
53 class Semaphore | |
54 { | |
55 //////////////////////////////////////////////////////////////////////////// | |
56 // Initialization | |
57 //////////////////////////////////////////////////////////////////////////// | |
58 | |
59 | |
60 /** | |
61 * Initializes a semaphore object with the specified initial count. | |
62 * | |
63 * Params: | |
64 * count = The initial count for the semaphore. | |
65 * | |
66 * Throws: | |
67 * SyncException on error. | |
68 */ | |
69 this( uint count = 0 ) | |
70 { | |
71 version( Win32 ) | |
72 { | |
73 m_hndl = CreateSemaphoreA( null, count, int.max, null ); | |
74 if( m_hndl == m_hndl.init ) | |
75 throw new SyncException( "Unable to create semaphore" ); | |
76 } | |
77 else version( OSX ) | |
78 { | |
79 auto rc = semaphore_create( mach_task_self(), &m_hndl, SYNC_POLICY_FIFO, count ); | |
80 if( rc ) | |
81 throw new SyncException( "Unable to create semaphore" ); | |
82 } | |
83 else version( Posix ) | |
84 { | |
85 int rc = sem_init( &m_hndl, 0, count ); | |
86 if( rc ) | |
87 throw new SyncException( "Unable to create semaphore" ); | |
88 } | |
89 } | |
90 | |
91 | |
92 ~this() | |
93 { | |
94 version( Win32 ) | |
95 { | |
96 BOOL rc = CloseHandle( m_hndl ); | |
97 assert( rc, "Unable to destroy semaphore" ); | |
98 } | |
99 else version( OSX ) | |
100 { | |
101 auto rc = semaphore_destroy( mach_task_self(), m_hndl ); | |
102 assert( !rc, "Unable to destroy semaphore" ); | |
103 } | |
104 else version( Posix ) | |
105 { | |
106 int rc = sem_destroy( &m_hndl ); | |
107 assert( !rc, "Unable to destroy semaphore" ); | |
108 } | |
109 } | |
110 | |
111 | |
112 //////////////////////////////////////////////////////////////////////////// | |
113 // General Actions | |
114 //////////////////////////////////////////////////////////////////////////// | |
115 | |
116 | |
117 /** | |
118 * Wait until the current count is above zero, then atomically decrement | |
119 * the count by one and return. | |
120 * | |
121 * Throws: | |
122 * SyncException on error. | |
123 */ | |
124 void wait() | |
125 { | |
126 version( Win32 ) | |
127 { | |
128 DWORD rc = WaitForSingleObject( m_hndl, INFINITE ); | |
129 if( rc != WAIT_OBJECT_0 ) | |
130 throw new SyncException( "Unable to wait for semaphore" ); | |
131 } | |
132 else version( OSX ) | |
133 { | |
134 while( true ) | |
135 { | |
136 auto rc = semaphore_wait( m_hndl ); | |
137 if( !rc ) | |
138 return; | |
139 if( rc == KERN_ABORTED && errno == EINTR ) | |
140 continue; | |
141 throw new SyncException( "Unable to wait for semaphore" ); | |
142 } | |
143 } | |
144 else version( Posix ) | |
145 { | |
146 while( true ) | |
147 { | |
148 if( !sem_wait( &m_hndl ) ) | |
149 return; | |
150 if( errno != EINTR ) | |
151 throw new SyncException( "Unable to wait for semaphore" ); | |
152 } | |
153 } | |
154 } | |
155 | |
156 | |
157 /** | |
158 * Suspends the calling thread until the current count moves above zero or | |
159 * until the supplied time period has elapsed. If the count moves above | |
160 * zero in this interval, then atomically decrement the count by one and | |
161 * return true. Otherwise, return false. | |
162 * | |
163 * | |
164 * Params: | |
165 * period = The time to wait, in 100 nanosecond intervals. This value may | |
166 * be adjusted to equal to the maximum wait period supported by | |
167 * the target platform if it is too large. | |
168 * | |
169 * In: | |
170 * period must be non-negative. | |
171 * | |
172 * Throws: | |
173 * SyncException on error. | |
174 * | |
175 * Returns: | |
176 * true if notified before the timeout and false if not. | |
177 */ | |
178 bool wait( long period ) | |
179 in | |
180 { | |
181 assert( period >= 0 ); | |
182 } | |
183 body | |
184 { | |
185 version( Win32 ) | |
186 { | |
187 enum : uint | |
188 { | |
189 TICKS_PER_MILLI = 10_000, | |
190 MAX_WAIT_MILLIS = uint.max - 1 | |
191 } | |
192 | |
193 period /= TICKS_PER_MILLI; | |
194 if( period > MAX_WAIT_MILLIS ) | |
195 period = MAX_WAIT_MILLIS; | |
196 switch( WaitForSingleObject( m_hndl, cast(uint) period ) ) | |
197 { | |
198 case WAIT_OBJECT_0: | |
199 return true; | |
200 case WAIT_TIMEOUT: | |
201 return false; | |
202 default: | |
203 throw new SyncException( "Unable to wait for semaphore" ); | |
204 } | |
205 } | |
206 else version( OSX ) | |
207 { | |
208 mach_timespec_t t = void; | |
209 (cast(byte*) &t)[0 .. t.sizeof] = 0; | |
210 | |
211 if( period != 0 ) | |
212 { | |
213 enum : uint | |
214 { | |
215 NANOS_PER_TICK = 100, | |
216 TICKS_PER_SECOND = 10_000_000, | |
217 NANOS_PER_SECOND = NANOS_PER_TICK * TICKS_PER_SECOND, | |
218 } | |
219 | |
220 if( t.tv_sec.max - t.tv_sec < period / TICKS_PER_SECOND ) | |
221 { | |
222 t.tv_sec = t.tv_sec.max; | |
223 t.tv_nsec = 0; | |
224 } | |
225 else | |
226 { | |
227 t.tv_sec += cast(typeof(t.tv_sec)) (period / TICKS_PER_SECOND); | |
228 long ns = (period % TICKS_PER_SECOND) * NANOS_PER_TICK; | |
229 if( NANOS_PER_SECOND - t.tv_nsec > ns ) | |
230 t.tv_nsec = cast(typeof(t.tv_nsec)) ns; | |
231 else | |
232 { | |
233 t.tv_sec += 1; | |
234 t.tv_nsec += ns - NANOS_PER_SECOND; | |
235 } | |
236 } | |
237 } | |
238 while( true ) | |
239 { | |
240 auto rc = semaphore_timedwait( m_hndl, t ); | |
241 if( !rc ) | |
242 return true; | |
243 if( rc == KERN_OPERATION_TIMED_OUT ) | |
244 return false; | |
245 if( rc != KERN_ABORTED || errno != EINTR ) | |
246 throw new SyncException( "Unable to wait for semaphore" ); | |
247 } | |
248 // -w trip | |
249 return false; | |
250 } | |
251 else version( Posix ) | |
252 { | |
253 timespec t = void; | |
254 mktspec( t, period ); | |
255 | |
256 while( true ) | |
257 { | |
258 if( !sem_timedwait( &m_hndl, &t ) ) | |
259 return true; | |
260 if( errno == ETIMEDOUT ) | |
261 return false; | |
262 if( errno != EINTR ) | |
263 throw new SyncException( "Unable to wait for semaphore" ); | |
264 } | |
265 // -w trip | |
266 return false; | |
267 } | |
268 } | |
269 | |
270 | |
271 /** | |
272 * Atomically increment the current count by one. This will notify one | |
273 * waiter, if there are any in the queue. | |
274 * | |
275 * Throws: | |
276 * SyncException on error. | |
277 */ | |
278 void notify() | |
279 { | |
280 version( Win32 ) | |
281 { | |
282 if( !ReleaseSemaphore( m_hndl, 1, null ) ) | |
283 throw new SyncException( "Unable to notify semaphore" ); | |
284 } | |
285 else version( OSX ) | |
286 { | |
287 auto rc = semaphore_signal( m_hndl ); | |
288 if( rc ) | |
289 throw new SyncException( "Unable to notify semaphore" ); | |
290 } | |
291 else version( Posix ) | |
292 { | |
293 int rc = sem_post( &m_hndl ); | |
294 if( rc ) | |
295 throw new SyncException( "Unable to notify semaphore" ); | |
296 } | |
297 } | |
298 | |
299 | |
300 /** | |
301 * If the current count is equal to zero, return. Otherwise, atomically | |
302 * decrement the count by one and return true. | |
303 * | |
304 * Throws: | |
305 * SyncException on error. | |
306 * | |
307 * Returns: | |
308 * true if the count was above zero and false if not. | |
309 */ | |
310 bool tryWait() | |
311 { | |
312 version( Win32 ) | |
313 { | |
314 switch( WaitForSingleObject( m_hndl, 0 ) ) | |
315 { | |
316 case WAIT_OBJECT_0: | |
317 return true; | |
318 case WAIT_TIMEOUT: | |
319 return false; | |
320 default: | |
321 throw new SyncException( "Unable to wait for semaphore" ); | |
322 } | |
323 } | |
324 else version( OSX ) | |
325 { | |
326 return wait( 0 ); | |
327 } | |
328 else version( Posix ) | |
329 { | |
330 while( true ) | |
331 { | |
332 if( !sem_trywait( &m_hndl ) ) | |
333 return true; | |
334 if( errno == EAGAIN ) | |
335 return false; | |
336 if( errno != EINTR ) | |
337 throw new SyncException( "Unable to wait for semaphore" ); | |
338 } | |
339 // -w trip | |
340 return false; | |
341 } | |
342 } | |
343 | |
344 | |
345 private: | |
346 version( Win32 ) | |
347 { | |
348 HANDLE m_hndl; | |
349 } | |
350 else version( OSX ) | |
351 { | |
352 semaphore_t m_hndl; | |
353 } | |
354 else version( Posix ) | |
355 { | |
356 sem_t m_hndl; | |
357 } | |
358 } | |
359 | |
360 | |
361 //////////////////////////////////////////////////////////////////////////////// | |
362 // Unit Tests | |
363 //////////////////////////////////////////////////////////////////////////////// | |
364 | |
365 | |
366 version( unittest ) | |
367 { | |
368 private import core.thread; | |
369 | |
370 | |
371 void testWait() | |
372 { | |
373 auto semaphore = new Semaphore; | |
374 int numToProduce = 10; | |
375 bool allProduced = false; | |
376 auto synProduced = new Object; | |
377 int numConsumed = 0; | |
378 auto synConsumed = new Object; | |
379 int numConsumers = 10; | |
380 int numComplete = 0; | |
381 auto synComplete = new Object; | |
382 | |
383 void consumer() | |
384 { | |
385 while( true ) | |
386 { | |
387 semaphore.wait(); | |
388 | |
389 synchronized( synProduced ) | |
390 { | |
391 if( allProduced ) | |
392 break; | |
393 } | |
394 | |
395 synchronized( synConsumed ) | |
396 { | |
397 ++numConsumed; | |
398 } | |
399 } | |
400 | |
401 synchronized( synComplete ) | |
402 { | |
403 ++numComplete; | |
404 } | |
405 } | |
406 | |
407 void producer() | |
408 { | |
409 assert( !semaphore.tryWait() ); | |
410 | |
411 for( int i = 0; i < numToProduce; ++i ) | |
412 { | |
413 semaphore.notify(); | |
414 Thread.yield(); | |
415 } | |
416 Thread.sleep( 10_000_000 ); // 1s | |
417 synchronized( synProduced ) | |
418 { | |
419 allProduced = true; | |
420 } | |
421 | |
422 for( int i = 0; i < numConsumers; ++i ) | |
423 { | |
424 semaphore.notify(); | |
425 Thread.yield(); | |
426 } | |
427 | |
428 for( int i = numConsumers * 10000; i > 0; --i ) | |
429 { | |
430 synchronized( synComplete ) | |
431 { | |
432 if( numComplete == numConsumers ) | |
433 break; | |
434 } | |
435 Thread.yield(); | |
436 } | |
437 | |
438 synchronized( synComplete ) | |
439 { | |
440 assert( numComplete == numConsumers ); | |
441 } | |
442 | |
443 synchronized( synConsumed ) | |
444 { | |
445 assert( numConsumed == numToProduce ); | |
446 } | |
447 | |
448 assert( !semaphore.tryWait() ); | |
449 semaphore.notify(); | |
450 assert( semaphore.tryWait() ); | |
451 assert( !semaphore.tryWait() ); | |
452 } | |
453 | |
454 auto group = new ThreadGroup; | |
455 | |
456 for( int i = 0; i < numConsumers; ++i ) | |
457 group.create( &consumer ); | |
458 group.create( &producer ); | |
459 group.joinAll(); | |
460 } | |
461 | |
462 | |
463 void testWaitTimeout() | |
464 { | |
465 auto synReady = new Object; | |
466 auto semReady = new Semaphore; | |
467 bool waiting = false; | |
468 bool alertedOne = true; | |
469 bool alertedTwo = true; | |
470 | |
471 void waiter() | |
472 { | |
473 synchronized( synReady ) | |
474 { | |
475 waiting = true; | |
476 } | |
477 alertedOne = semReady.wait( 10_000_000 ); // 100ms | |
478 alertedTwo = semReady.wait( 10_000_000 ); // 100ms | |
479 } | |
480 | |
481 auto thread = new Thread( &waiter ); | |
482 thread.start(); | |
483 | |
484 while( true ) | |
485 { | |
486 synchronized( synReady ) | |
487 { | |
488 if( waiting ) | |
489 { | |
490 semReady.notify(); | |
491 break; | |
492 } | |
493 } | |
494 Thread.yield(); | |
495 } | |
496 thread.join(); | |
497 assert( waiting && alertedOne && !alertedTwo ); | |
498 } | |
499 | |
500 | |
501 unittest | |
502 { | |
503 testWait(); | |
504 testWaitTimeout(); | |
505 } | |
506 } |