comparison tango/tango/core/sync/Semaphore.d @ 132:1700239cab2e trunk

[svn r136] MAJOR UNSTABLE UPDATE!!! Initial commit after moving to Tango instead of Phobos. Lots of bugfixes... This build is not suitable for most things.
author lindquist
date Fri, 11 Jan 2008 17:57:40 +0100
parents
children
comparison
equal deleted inserted replaced
131:5825d48b27d1 132:1700239cab2e
1 /**
2 * The semaphore module provides a general use semaphore for synchronization.
3 *
4 * Copyright: Copyright (C) 2005-2006 Sean Kelly. All rights reserved.
5 * License: BSD style: $(LICENSE)
6 * Authors: Sean Kelly
7 */
8 module tango.core.sync.Semaphore;
9
10
11 public import tango.core.Exception : SyncException;
12
13 version( Win32 )
14 {
15 private import tango.sys.win32.UserGdi;
16 }
17 else version( Posix )
18 {
19 private import tango.core.sync.Config;
20 private import tango.stdc.errno;
21 private import tango.stdc.posix.pthread;
22 private import tango.stdc.posix.semaphore;
23 }
24
25
26 ////////////////////////////////////////////////////////////////////////////////
27 // Semaphore
28 //
29 // void wait();
30 // void notify();
31 // bool tryWait();
32 ////////////////////////////////////////////////////////////////////////////////
33
34
35 /**
36 * This class represents a general counting semaphore as concieved by Edsger
37 * Dijkstra. As per Mesa type monitors however, "signal" has been replaced
38 * with "notify" to indicate that control is not transferred to the waiter when
39 * a notification is sent.
40 */
41 class Semaphore
42 {
43 ////////////////////////////////////////////////////////////////////////////
44 // Initialization
45 ////////////////////////////////////////////////////////////////////////////
46
47
48 /**
49 * Initializes a semaphore object with the specified initial count.
50 *
51 * Params:
52 * count = The initial count for the semaphore.
53 *
54 * Throws:
55 * SyncException on error.
56 */
57 this( uint count = 0 )
58 {
59 version( Win32 )
60 {
61 m_hndl = CreateSemaphoreA( null, count, int.max, null );
62 if( m_hndl == m_hndl.init )
63 throw new SyncException( "Unable to create semaphore" );
64 }
65 else version( Posix )
66 {
67 int rc = sem_init( &m_hndl, 0, count );
68 if( rc )
69 throw new SyncException( "Unable to create semaphore" );
70 }
71 }
72
73
74 ~this()
75 {
76 version( Win32 )
77 {
78 BOOL rc = CloseHandle( m_hndl );
79 assert( rc, "Unable to destroy semaphore" );
80 }
81 else version( Posix )
82 {
83 int rc = sem_destroy( &m_hndl );
84 assert( !rc, "Unable to destroy semaphore" );
85 }
86 }
87
88
89 ////////////////////////////////////////////////////////////////////////////
90 // General Actions
91 ////////////////////////////////////////////////////////////////////////////
92
93
94 /**
95 * Wait until the current count is above zero, then atomically decrement
96 * the count by one and return.
97 *
98 * Throws:
99 * SyncException on error.
100 */
101 void wait()
102 {
103 version( Win32 )
104 {
105 DWORD rc = WaitForSingleObject( m_hndl, INFINITE );
106 if( rc != WAIT_OBJECT_0 )
107 throw new SyncException( "Unable to wait for semaphore" );
108 }
109 else version( Posix )
110 {
111 while( true )
112 {
113 if( !sem_wait( &m_hndl ) )
114 return;
115 if( errno != EINTR )
116 throw new SyncException( "Unable to wait for semaphore" );
117 }
118 }
119 }
120
121
122 /**
123 * Suspends the calling thread until the current count moves above zero or
124 * until the supplied time period has elapsed. If the count moves above
125 * zero in this interval, then atomically decrement the count by one and
126 * return true. Otherwise, return false. The supplied period may be up to
127 * a maximum of (uint.max - 1) milliseconds.
128 *
129 * Params:
130 * period = The number of seconds to wait.
131 *
132 * In:
133 * period must be less than (uint.max - 1) milliseconds.
134 *
135 * Returns:
136 * true if notified before the timeout and false if not.
137 *
138 * Throws:
139 * SyncException on error.
140 */
141 bool wait( double period )
142 in
143 {
144 assert( period * 1000 + 0.1 < uint.max - 1);
145 }
146 body
147 {
148 version( Win32 )
149 {
150 DWORD t = cast(DWORD)(period * 1000 + 0.1);
151 switch( WaitForSingleObject( m_hndl, t ) )
152 {
153 case WAIT_OBJECT_0:
154 return true;
155 case WAIT_TIMEOUT:
156 return false;
157 default:
158 throw new SyncException( "Unable to wait for semaphore" );
159 }
160 }
161 else version( Posix )
162 {
163 timespec t;
164
165 getTimespec( t );
166 adjTimespec( t, period );
167
168 while( true )
169 {
170 if( !sem_timedwait( &m_hndl, &t ) )
171 return true;
172 if( errno == ETIMEDOUT )
173 return false;
174 if( errno != EINTR )
175 throw new SyncException( "Unable to wait for semaphore" );
176 }
177 }
178
179 // -w trip
180 return false;
181 }
182
183
184 /**
185 * Atomically increment the current count by one. This will notify one
186 * waiter, if there are any in the queue.
187 *
188 * Throws:
189 * SyncException on error.
190 */
191 void notify()
192 {
193 version( Win32 )
194 {
195 if( !ReleaseSemaphore( m_hndl, 1, null ) )
196 throw new SyncException( "Unable to notify semaphore" );
197 }
198 else version( Posix )
199 {
200 int rc = sem_post( &m_hndl );
201 if( rc )
202 throw new SyncException( "Unable to notify semaphore" );
203 }
204 }
205
206
207 /**
208 * If the current count is equal to zero, return. Otherwise, atomically
209 * decrement the count by one and return true.
210 *
211 * Returns:
212 * true if the count was above zero and false if not.
213 *
214 * Throws:
215 * SyncException on error.
216 */
217 bool tryWait()
218 {
219 version( Win32 )
220 {
221 switch( WaitForSingleObject( m_hndl, 0 ) )
222 {
223 case WAIT_OBJECT_0:
224 return true;
225 case WAIT_TIMEOUT:
226 return false;
227 default:
228 throw new SyncException( "Unable to wait for semaphore" );
229 }
230 }
231 else version( Posix )
232 {
233 while( true )
234 {
235 if( !sem_trywait( &m_hndl ) )
236 return true;
237 if( errno == EAGAIN )
238 return false;
239 if( errno != EINTR )
240 throw new SyncException( "Unable to wait for semaphore" );
241 }
242 }
243
244 // -w trip
245 return false;
246 }
247
248
249 private:
250 version( Win32 )
251 {
252 HANDLE m_hndl;
253 }
254 else version( Posix )
255 {
256 sem_t m_hndl;
257 }
258 }
259
260
261 ////////////////////////////////////////////////////////////////////////////////
262 // Unit Tests
263 ////////////////////////////////////////////////////////////////////////////////
264
265
266 debug( UnitTest )
267 {
268 private import tango.core.Thread;
269
270
271 void testWait()
272 {
273 auto semaphore = new Semaphore;
274 int numToProduce = 10;
275 bool allProduced = false;
276 auto synProduced = new Object;
277 int numConsumed = 0;
278 auto synConsumed = new Object;
279 int numConsumers = 10;
280 int numComplete = 0;
281 auto synComplete = new Object;
282
283 void consumer()
284 {
285 while( true )
286 {
287 semaphore.wait();
288
289 synchronized( synProduced )
290 {
291 if( allProduced )
292 break;
293 }
294
295 synchronized( synConsumed )
296 {
297 ++numConsumed;
298 }
299 }
300
301 synchronized( synComplete )
302 {
303 ++numComplete;
304 }
305 }
306
307 void producer()
308 {
309 assert( !semaphore.tryWait() );
310
311 for( int i = 0; i < numToProduce; ++i )
312 {
313 semaphore.notify();
314 Thread.yield();
315 }
316
317 synchronized( synProduced )
318 {
319 allProduced = true;
320 }
321
322 for( int i = 0; i < numConsumers; ++i )
323 {
324 semaphore.notify();
325 Thread.yield();
326 }
327
328 for( int i = numConsumers * 10; i > 0; --i )
329 {
330 synchronized( synComplete )
331 {
332 if( numComplete == numConsumers )
333 break;
334 }
335 }
336
337 synchronized( synComplete )
338 {
339 assert( numComplete == numConsumers );
340 }
341 assert( numConsumed == numToProduce );
342
343 assert( !semaphore.tryWait() );
344 semaphore.notify();
345 assert( semaphore.tryWait() );
346 assert( !semaphore.tryWait() );
347 }
348
349 auto group = new ThreadGroup;
350
351 for( int i = 0; i < numConsumers; ++i )
352 group.create( &consumer );
353 group.create( &producer );
354 group.joinAll();
355 }
356
357
358 void testWaitTimeout()
359 {
360 auto synReady = new Object;
361 auto semReady = new Semaphore;
362 bool waiting = false;
363 bool alertedOne = true;
364 bool alertedTwo = true;
365
366 void waiter()
367 {
368 synchronized( synReady )
369 {
370 waiting = true;
371 }
372 alertedOne = semReady.wait( 0.1 );
373 alertedTwo = semReady.wait( 0.1 );
374 }
375
376 auto thread = new Thread( &waiter );
377 thread.start();
378
379 while( true )
380 {
381 synchronized( synReady )
382 {
383 if( waiting )
384 {
385 semReady.notify();
386 break;
387 }
388 }
389 Thread.yield();
390 }
391 thread.join();
392 assert( waiting && alertedOne && !alertedTwo );
393 }
394
395
396 unittest
397 {
398 testWait();
399 testWaitTimeout();
400 }
401 }