comparison druntime/src/common/core/sync/semaphore.d @ 1458:e0b2d67cfe7c

Added druntime (this should be removed once it works).
author Robert Clipsham <robert@octarineparrot.com>
date Tue, 02 Jun 2009 17:43:06 +0100
parents
children
comparison
equal deleted inserted replaced
1456:7b218ec1044f 1458:e0b2d67cfe7c
1 /**
2 * The semaphore module provides a general use semaphore for synchronization.
3 *
4 * Copyright: Copyright Sean Kelly 2005 - 2009.
5 * License: <a href="http://www.boost.org/LICENSE_1_0.txt>Boost License 1.0</a>.
6 * Authors: Sean Kelly
7 *
8 * Copyright Sean Kelly 2005 - 2009.
9 * Distributed under the Boost Software License, Version 1.0.
10 * (See accompanying file LICENSE_1_0.txt or copy at
11 * http://www.boost.org/LICENSE_1_0.txt)
12 */
13 module core.sync.semaphore;
14
15
16 public import core.sync.exception;
17
18 version( Win32 )
19 {
20 private import core.sys.windows.windows;
21 }
22 else version( OSX )
23 {
24 private import core.sync.config;
25 private import core.stdc.errno;
26 private import core.sys.posix.time;
27 private import core.sys.osx.mach.semaphore;
28 }
29 else version( Posix )
30 {
31 private import core.sync.config;
32 private import core.stdc.errno;
33 private import core.sys.posix.pthread;
34 private import core.sys.posix.semaphore;
35 }
36
37
38 ////////////////////////////////////////////////////////////////////////////////
39 // Semaphore
40 //
41 // void wait();
42 // void notify();
43 // bool tryWait();
44 ////////////////////////////////////////////////////////////////////////////////
45
46
47 /**
48 * This class represents a general counting semaphore as concieved by Edsger
49 * Dijkstra. As per Mesa type monitors however, "signal" has been replaced
50 * with "notify" to indicate that control is not transferred to the waiter when
51 * a notification is sent.
52 */
53 class Semaphore
54 {
55 ////////////////////////////////////////////////////////////////////////////
56 // Initialization
57 ////////////////////////////////////////////////////////////////////////////
58
59
60 /**
61 * Initializes a semaphore object with the specified initial count.
62 *
63 * Params:
64 * count = The initial count for the semaphore.
65 *
66 * Throws:
67 * SyncException on error.
68 */
69 this( uint count = 0 )
70 {
71 version( Win32 )
72 {
73 m_hndl = CreateSemaphoreA( null, count, int.max, null );
74 if( m_hndl == m_hndl.init )
75 throw new SyncException( "Unable to create semaphore" );
76 }
77 else version( OSX )
78 {
79 auto rc = semaphore_create( mach_task_self(), &m_hndl, SYNC_POLICY_FIFO, count );
80 if( rc )
81 throw new SyncException( "Unable to create semaphore" );
82 }
83 else version( Posix )
84 {
85 int rc = sem_init( &m_hndl, 0, count );
86 if( rc )
87 throw new SyncException( "Unable to create semaphore" );
88 }
89 }
90
91
92 ~this()
93 {
94 version( Win32 )
95 {
96 BOOL rc = CloseHandle( m_hndl );
97 assert( rc, "Unable to destroy semaphore" );
98 }
99 else version( OSX )
100 {
101 auto rc = semaphore_destroy( mach_task_self(), m_hndl );
102 assert( !rc, "Unable to destroy semaphore" );
103 }
104 else version( Posix )
105 {
106 int rc = sem_destroy( &m_hndl );
107 assert( !rc, "Unable to destroy semaphore" );
108 }
109 }
110
111
112 ////////////////////////////////////////////////////////////////////////////
113 // General Actions
114 ////////////////////////////////////////////////////////////////////////////
115
116
117 /**
118 * Wait until the current count is above zero, then atomically decrement
119 * the count by one and return.
120 *
121 * Throws:
122 * SyncException on error.
123 */
124 void wait()
125 {
126 version( Win32 )
127 {
128 DWORD rc = WaitForSingleObject( m_hndl, INFINITE );
129 if( rc != WAIT_OBJECT_0 )
130 throw new SyncException( "Unable to wait for semaphore" );
131 }
132 else version( OSX )
133 {
134 while( true )
135 {
136 auto rc = semaphore_wait( m_hndl );
137 if( !rc )
138 return;
139 if( rc == KERN_ABORTED && errno == EINTR )
140 continue;
141 throw new SyncException( "Unable to wait for semaphore" );
142 }
143 }
144 else version( Posix )
145 {
146 while( true )
147 {
148 if( !sem_wait( &m_hndl ) )
149 return;
150 if( errno != EINTR )
151 throw new SyncException( "Unable to wait for semaphore" );
152 }
153 }
154 }
155
156
157 /**
158 * Suspends the calling thread until the current count moves above zero or
159 * until the supplied time period has elapsed. If the count moves above
160 * zero in this interval, then atomically decrement the count by one and
161 * return true. Otherwise, return false.
162 *
163 *
164 * Params:
165 * period = The time to wait, in 100 nanosecond intervals. This value may
166 * be adjusted to equal to the maximum wait period supported by
167 * the target platform if it is too large.
168 *
169 * In:
170 * period must be non-negative.
171 *
172 * Throws:
173 * SyncException on error.
174 *
175 * Returns:
176 * true if notified before the timeout and false if not.
177 */
178 bool wait( long period )
179 in
180 {
181 assert( period >= 0 );
182 }
183 body
184 {
185 version( Win32 )
186 {
187 enum : uint
188 {
189 TICKS_PER_MILLI = 10_000,
190 MAX_WAIT_MILLIS = uint.max - 1
191 }
192
193 period /= TICKS_PER_MILLI;
194 if( period > MAX_WAIT_MILLIS )
195 period = MAX_WAIT_MILLIS;
196 switch( WaitForSingleObject( m_hndl, cast(uint) period ) )
197 {
198 case WAIT_OBJECT_0:
199 return true;
200 case WAIT_TIMEOUT:
201 return false;
202 default:
203 throw new SyncException( "Unable to wait for semaphore" );
204 }
205 }
206 else version( OSX )
207 {
208 mach_timespec_t t = void;
209 (cast(byte*) &t)[0 .. t.sizeof] = 0;
210
211 if( period != 0 )
212 {
213 enum : uint
214 {
215 NANOS_PER_TICK = 100,
216 TICKS_PER_SECOND = 10_000_000,
217 NANOS_PER_SECOND = NANOS_PER_TICK * TICKS_PER_SECOND,
218 }
219
220 if( t.tv_sec.max - t.tv_sec < period / TICKS_PER_SECOND )
221 {
222 t.tv_sec = t.tv_sec.max;
223 t.tv_nsec = 0;
224 }
225 else
226 {
227 t.tv_sec += cast(typeof(t.tv_sec)) (period / TICKS_PER_SECOND);
228 long ns = (period % TICKS_PER_SECOND) * NANOS_PER_TICK;
229 if( NANOS_PER_SECOND - t.tv_nsec > ns )
230 t.tv_nsec = cast(typeof(t.tv_nsec)) ns;
231 else
232 {
233 t.tv_sec += 1;
234 t.tv_nsec += ns - NANOS_PER_SECOND;
235 }
236 }
237 }
238 while( true )
239 {
240 auto rc = semaphore_timedwait( m_hndl, t );
241 if( !rc )
242 return true;
243 if( rc == KERN_OPERATION_TIMED_OUT )
244 return false;
245 if( rc != KERN_ABORTED || errno != EINTR )
246 throw new SyncException( "Unable to wait for semaphore" );
247 }
248 // -w trip
249 return false;
250 }
251 else version( Posix )
252 {
253 timespec t = void;
254 mktspec( t, period );
255
256 while( true )
257 {
258 if( !sem_timedwait( &m_hndl, &t ) )
259 return true;
260 if( errno == ETIMEDOUT )
261 return false;
262 if( errno != EINTR )
263 throw new SyncException( "Unable to wait for semaphore" );
264 }
265 // -w trip
266 return false;
267 }
268 }
269
270
271 /**
272 * Atomically increment the current count by one. This will notify one
273 * waiter, if there are any in the queue.
274 *
275 * Throws:
276 * SyncException on error.
277 */
278 void notify()
279 {
280 version( Win32 )
281 {
282 if( !ReleaseSemaphore( m_hndl, 1, null ) )
283 throw new SyncException( "Unable to notify semaphore" );
284 }
285 else version( OSX )
286 {
287 auto rc = semaphore_signal( m_hndl );
288 if( rc )
289 throw new SyncException( "Unable to notify semaphore" );
290 }
291 else version( Posix )
292 {
293 int rc = sem_post( &m_hndl );
294 if( rc )
295 throw new SyncException( "Unable to notify semaphore" );
296 }
297 }
298
299
300 /**
301 * If the current count is equal to zero, return. Otherwise, atomically
302 * decrement the count by one and return true.
303 *
304 * Throws:
305 * SyncException on error.
306 *
307 * Returns:
308 * true if the count was above zero and false if not.
309 */
310 bool tryWait()
311 {
312 version( Win32 )
313 {
314 switch( WaitForSingleObject( m_hndl, 0 ) )
315 {
316 case WAIT_OBJECT_0:
317 return true;
318 case WAIT_TIMEOUT:
319 return false;
320 default:
321 throw new SyncException( "Unable to wait for semaphore" );
322 }
323 }
324 else version( OSX )
325 {
326 return wait( 0 );
327 }
328 else version( Posix )
329 {
330 while( true )
331 {
332 if( !sem_trywait( &m_hndl ) )
333 return true;
334 if( errno == EAGAIN )
335 return false;
336 if( errno != EINTR )
337 throw new SyncException( "Unable to wait for semaphore" );
338 }
339 // -w trip
340 return false;
341 }
342 }
343
344
345 private:
346 version( Win32 )
347 {
348 HANDLE m_hndl;
349 }
350 else version( OSX )
351 {
352 semaphore_t m_hndl;
353 }
354 else version( Posix )
355 {
356 sem_t m_hndl;
357 }
358 }
359
360
361 ////////////////////////////////////////////////////////////////////////////////
362 // Unit Tests
363 ////////////////////////////////////////////////////////////////////////////////
364
365
366 version( unittest )
367 {
368 private import core.thread;
369
370
371 void testWait()
372 {
373 auto semaphore = new Semaphore;
374 int numToProduce = 10;
375 bool allProduced = false;
376 auto synProduced = new Object;
377 int numConsumed = 0;
378 auto synConsumed = new Object;
379 int numConsumers = 10;
380 int numComplete = 0;
381 auto synComplete = new Object;
382
383 void consumer()
384 {
385 while( true )
386 {
387 semaphore.wait();
388
389 synchronized( synProduced )
390 {
391 if( allProduced )
392 break;
393 }
394
395 synchronized( synConsumed )
396 {
397 ++numConsumed;
398 }
399 }
400
401 synchronized( synComplete )
402 {
403 ++numComplete;
404 }
405 }
406
407 void producer()
408 {
409 assert( !semaphore.tryWait() );
410
411 for( int i = 0; i < numToProduce; ++i )
412 {
413 semaphore.notify();
414 Thread.yield();
415 }
416 Thread.sleep( 10_000_000 ); // 1s
417 synchronized( synProduced )
418 {
419 allProduced = true;
420 }
421
422 for( int i = 0; i < numConsumers; ++i )
423 {
424 semaphore.notify();
425 Thread.yield();
426 }
427
428 for( int i = numConsumers * 10000; i > 0; --i )
429 {
430 synchronized( synComplete )
431 {
432 if( numComplete == numConsumers )
433 break;
434 }
435 Thread.yield();
436 }
437
438 synchronized( synComplete )
439 {
440 assert( numComplete == numConsumers );
441 }
442
443 synchronized( synConsumed )
444 {
445 assert( numConsumed == numToProduce );
446 }
447
448 assert( !semaphore.tryWait() );
449 semaphore.notify();
450 assert( semaphore.tryWait() );
451 assert( !semaphore.tryWait() );
452 }
453
454 auto group = new ThreadGroup;
455
456 for( int i = 0; i < numConsumers; ++i )
457 group.create( &consumer );
458 group.create( &producer );
459 group.joinAll();
460 }
461
462
463 void testWaitTimeout()
464 {
465 auto synReady = new Object;
466 auto semReady = new Semaphore;
467 bool waiting = false;
468 bool alertedOne = true;
469 bool alertedTwo = true;
470
471 void waiter()
472 {
473 synchronized( synReady )
474 {
475 waiting = true;
476 }
477 alertedOne = semReady.wait( 10_000_000 ); // 100ms
478 alertedTwo = semReady.wait( 10_000_000 ); // 100ms
479 }
480
481 auto thread = new Thread( &waiter );
482 thread.start();
483
484 while( true )
485 {
486 synchronized( synReady )
487 {
488 if( waiting )
489 {
490 semReady.notify();
491 break;
492 }
493 }
494 Thread.yield();
495 }
496 thread.join();
497 assert( waiting && alertedOne && !alertedTwo );
498 }
499
500
501 unittest
502 {
503 testWait();
504 testWaitTimeout();
505 }
506 }