132
|
1 /**
|
|
2 * The semaphore module provides a general use semaphore for synchronization.
|
|
3 *
|
|
4 * Copyright: Copyright (C) 2005-2006 Sean Kelly. All rights reserved.
|
|
5 * License: BSD style: $(LICENSE)
|
|
6 * Authors: Sean Kelly
|
|
7 */
|
|
8 module tango.core.sync.Semaphore;
|
|
9
|
|
10
|
|
11 public import tango.core.Exception : SyncException;
|
|
12
|
|
13 version( Win32 )
|
|
14 {
|
|
15 private import tango.sys.win32.UserGdi;
|
|
16 }
|
|
17 else version( Posix )
|
|
18 {
|
|
19 private import tango.core.sync.Config;
|
|
20 private import tango.stdc.errno;
|
|
21 private import tango.stdc.posix.pthread;
|
|
22 private import tango.stdc.posix.semaphore;
|
|
23 }
|
|
24
|
|
25
|
|
26 ////////////////////////////////////////////////////////////////////////////////
|
|
27 // Semaphore
|
|
28 //
|
|
29 // void wait();
|
|
30 // void notify();
|
|
31 // bool tryWait();
|
|
32 ////////////////////////////////////////////////////////////////////////////////
|
|
33
|
|
34
|
|
35 /**
|
|
36 * This class represents a general counting semaphore as concieved by Edsger
|
|
37 * Dijkstra. As per Mesa type monitors however, "signal" has been replaced
|
|
38 * with "notify" to indicate that control is not transferred to the waiter when
|
|
39 * a notification is sent.
|
|
40 */
|
|
41 class Semaphore
|
|
42 {
|
|
43 ////////////////////////////////////////////////////////////////////////////
|
|
44 // Initialization
|
|
45 ////////////////////////////////////////////////////////////////////////////
|
|
46
|
|
47
|
|
48 /**
|
|
49 * Initializes a semaphore object with the specified initial count.
|
|
50 *
|
|
51 * Params:
|
|
52 * count = The initial count for the semaphore.
|
|
53 *
|
|
54 * Throws:
|
|
55 * SyncException on error.
|
|
56 */
|
|
57 this( uint count = 0 )
|
|
58 {
|
|
59 version( Win32 )
|
|
60 {
|
|
61 m_hndl = CreateSemaphoreA( null, count, int.max, null );
|
|
62 if( m_hndl == m_hndl.init )
|
|
63 throw new SyncException( "Unable to create semaphore" );
|
|
64 }
|
|
65 else version( Posix )
|
|
66 {
|
|
67 int rc = sem_init( &m_hndl, 0, count );
|
|
68 if( rc )
|
|
69 throw new SyncException( "Unable to create semaphore" );
|
|
70 }
|
|
71 }
|
|
72
|
|
73
|
|
74 ~this()
|
|
75 {
|
|
76 version( Win32 )
|
|
77 {
|
|
78 BOOL rc = CloseHandle( m_hndl );
|
|
79 assert( rc, "Unable to destroy semaphore" );
|
|
80 }
|
|
81 else version( Posix )
|
|
82 {
|
|
83 int rc = sem_destroy( &m_hndl );
|
|
84 assert( !rc, "Unable to destroy semaphore" );
|
|
85 }
|
|
86 }
|
|
87
|
|
88
|
|
89 ////////////////////////////////////////////////////////////////////////////
|
|
90 // General Actions
|
|
91 ////////////////////////////////////////////////////////////////////////////
|
|
92
|
|
93
|
|
94 /**
|
|
95 * Wait until the current count is above zero, then atomically decrement
|
|
96 * the count by one and return.
|
|
97 *
|
|
98 * Throws:
|
|
99 * SyncException on error.
|
|
100 */
|
|
101 void wait()
|
|
102 {
|
|
103 version( Win32 )
|
|
104 {
|
|
105 DWORD rc = WaitForSingleObject( m_hndl, INFINITE );
|
|
106 if( rc != WAIT_OBJECT_0 )
|
|
107 throw new SyncException( "Unable to wait for semaphore" );
|
|
108 }
|
|
109 else version( Posix )
|
|
110 {
|
|
111 while( true )
|
|
112 {
|
|
113 if( !sem_wait( &m_hndl ) )
|
|
114 return;
|
|
115 if( errno != EINTR )
|
|
116 throw new SyncException( "Unable to wait for semaphore" );
|
|
117 }
|
|
118 }
|
|
119 }
|
|
120
|
|
121
|
|
122 /**
|
|
123 * Suspends the calling thread until the current count moves above zero or
|
|
124 * until the supplied time period has elapsed. If the count moves above
|
|
125 * zero in this interval, then atomically decrement the count by one and
|
|
126 * return true. Otherwise, return false. The supplied period may be up to
|
|
127 * a maximum of (uint.max - 1) milliseconds.
|
|
128 *
|
|
129 * Params:
|
|
130 * period = The number of seconds to wait.
|
|
131 *
|
|
132 * In:
|
|
133 * period must be less than (uint.max - 1) milliseconds.
|
|
134 *
|
|
135 * Returns:
|
|
136 * true if notified before the timeout and false if not.
|
|
137 *
|
|
138 * Throws:
|
|
139 * SyncException on error.
|
|
140 */
|
|
141 bool wait( double period )
|
|
142 in
|
|
143 {
|
|
144 assert( period * 1000 + 0.1 < uint.max - 1);
|
|
145 }
|
|
146 body
|
|
147 {
|
|
148 version( Win32 )
|
|
149 {
|
|
150 DWORD t = cast(DWORD)(period * 1000 + 0.1);
|
|
151 switch( WaitForSingleObject( m_hndl, t ) )
|
|
152 {
|
|
153 case WAIT_OBJECT_0:
|
|
154 return true;
|
|
155 case WAIT_TIMEOUT:
|
|
156 return false;
|
|
157 default:
|
|
158 throw new SyncException( "Unable to wait for semaphore" );
|
|
159 }
|
|
160 }
|
|
161 else version( Posix )
|
|
162 {
|
|
163 timespec t;
|
|
164
|
|
165 getTimespec( t );
|
|
166 adjTimespec( t, period );
|
|
167
|
|
168 while( true )
|
|
169 {
|
|
170 if( !sem_timedwait( &m_hndl, &t ) )
|
|
171 return true;
|
|
172 if( errno == ETIMEDOUT )
|
|
173 return false;
|
|
174 if( errno != EINTR )
|
|
175 throw new SyncException( "Unable to wait for semaphore" );
|
|
176 }
|
|
177 }
|
|
178
|
|
179 // -w trip
|
|
180 return false;
|
|
181 }
|
|
182
|
|
183
|
|
184 /**
|
|
185 * Atomically increment the current count by one. This will notify one
|
|
186 * waiter, if there are any in the queue.
|
|
187 *
|
|
188 * Throws:
|
|
189 * SyncException on error.
|
|
190 */
|
|
191 void notify()
|
|
192 {
|
|
193 version( Win32 )
|
|
194 {
|
|
195 if( !ReleaseSemaphore( m_hndl, 1, null ) )
|
|
196 throw new SyncException( "Unable to notify semaphore" );
|
|
197 }
|
|
198 else version( Posix )
|
|
199 {
|
|
200 int rc = sem_post( &m_hndl );
|
|
201 if( rc )
|
|
202 throw new SyncException( "Unable to notify semaphore" );
|
|
203 }
|
|
204 }
|
|
205
|
|
206
|
|
207 /**
|
|
208 * If the current count is equal to zero, return. Otherwise, atomically
|
|
209 * decrement the count by one and return true.
|
|
210 *
|
|
211 * Returns:
|
|
212 * true if the count was above zero and false if not.
|
|
213 *
|
|
214 * Throws:
|
|
215 * SyncException on error.
|
|
216 */
|
|
217 bool tryWait()
|
|
218 {
|
|
219 version( Win32 )
|
|
220 {
|
|
221 switch( WaitForSingleObject( m_hndl, 0 ) )
|
|
222 {
|
|
223 case WAIT_OBJECT_0:
|
|
224 return true;
|
|
225 case WAIT_TIMEOUT:
|
|
226 return false;
|
|
227 default:
|
|
228 throw new SyncException( "Unable to wait for semaphore" );
|
|
229 }
|
|
230 }
|
|
231 else version( Posix )
|
|
232 {
|
|
233 while( true )
|
|
234 {
|
|
235 if( !sem_trywait( &m_hndl ) )
|
|
236 return true;
|
|
237 if( errno == EAGAIN )
|
|
238 return false;
|
|
239 if( errno != EINTR )
|
|
240 throw new SyncException( "Unable to wait for semaphore" );
|
|
241 }
|
|
242 }
|
|
243
|
|
244 // -w trip
|
|
245 return false;
|
|
246 }
|
|
247
|
|
248
|
|
249 private:
|
|
250 version( Win32 )
|
|
251 {
|
|
252 HANDLE m_hndl;
|
|
253 }
|
|
254 else version( Posix )
|
|
255 {
|
|
256 sem_t m_hndl;
|
|
257 }
|
|
258 }
|
|
259
|
|
260
|
|
261 ////////////////////////////////////////////////////////////////////////////////
|
|
262 // Unit Tests
|
|
263 ////////////////////////////////////////////////////////////////////////////////
|
|
264
|
|
265
|
|
266 debug( UnitTest )
|
|
267 {
|
|
268 private import tango.core.Thread;
|
|
269
|
|
270
|
|
271 void testWait()
|
|
272 {
|
|
273 auto semaphore = new Semaphore;
|
|
274 int numToProduce = 10;
|
|
275 bool allProduced = false;
|
|
276 auto synProduced = new Object;
|
|
277 int numConsumed = 0;
|
|
278 auto synConsumed = new Object;
|
|
279 int numConsumers = 10;
|
|
280 int numComplete = 0;
|
|
281 auto synComplete = new Object;
|
|
282
|
|
283 void consumer()
|
|
284 {
|
|
285 while( true )
|
|
286 {
|
|
287 semaphore.wait();
|
|
288
|
|
289 synchronized( synProduced )
|
|
290 {
|
|
291 if( allProduced )
|
|
292 break;
|
|
293 }
|
|
294
|
|
295 synchronized( synConsumed )
|
|
296 {
|
|
297 ++numConsumed;
|
|
298 }
|
|
299 }
|
|
300
|
|
301 synchronized( synComplete )
|
|
302 {
|
|
303 ++numComplete;
|
|
304 }
|
|
305 }
|
|
306
|
|
307 void producer()
|
|
308 {
|
|
309 assert( !semaphore.tryWait() );
|
|
310
|
|
311 for( int i = 0; i < numToProduce; ++i )
|
|
312 {
|
|
313 semaphore.notify();
|
|
314 Thread.yield();
|
|
315 }
|
|
316
|
|
317 synchronized( synProduced )
|
|
318 {
|
|
319 allProduced = true;
|
|
320 }
|
|
321
|
|
322 for( int i = 0; i < numConsumers; ++i )
|
|
323 {
|
|
324 semaphore.notify();
|
|
325 Thread.yield();
|
|
326 }
|
|
327
|
|
328 for( int i = numConsumers * 10; i > 0; --i )
|
|
329 {
|
|
330 synchronized( synComplete )
|
|
331 {
|
|
332 if( numComplete == numConsumers )
|
|
333 break;
|
|
334 }
|
|
335 }
|
|
336
|
|
337 synchronized( synComplete )
|
|
338 {
|
|
339 assert( numComplete == numConsumers );
|
|
340 }
|
|
341 assert( numConsumed == numToProduce );
|
|
342
|
|
343 assert( !semaphore.tryWait() );
|
|
344 semaphore.notify();
|
|
345 assert( semaphore.tryWait() );
|
|
346 assert( !semaphore.tryWait() );
|
|
347 }
|
|
348
|
|
349 auto group = new ThreadGroup;
|
|
350
|
|
351 for( int i = 0; i < numConsumers; ++i )
|
|
352 group.create( &consumer );
|
|
353 group.create( &producer );
|
|
354 group.joinAll();
|
|
355 }
|
|
356
|
|
357
|
|
358 void testWaitTimeout()
|
|
359 {
|
|
360 auto synReady = new Object;
|
|
361 auto semReady = new Semaphore;
|
|
362 bool waiting = false;
|
|
363 bool alertedOne = true;
|
|
364 bool alertedTwo = true;
|
|
365
|
|
366 void waiter()
|
|
367 {
|
|
368 synchronized( synReady )
|
|
369 {
|
|
370 waiting = true;
|
|
371 }
|
|
372 alertedOne = semReady.wait( 0.1 );
|
|
373 alertedTwo = semReady.wait( 0.1 );
|
|
374 }
|
|
375
|
|
376 auto thread = new Thread( &waiter );
|
|
377 thread.start();
|
|
378
|
|
379 while( true )
|
|
380 {
|
|
381 synchronized( synReady )
|
|
382 {
|
|
383 if( waiting )
|
|
384 {
|
|
385 semReady.notify();
|
|
386 break;
|
|
387 }
|
|
388 }
|
|
389 Thread.yield();
|
|
390 }
|
|
391 thread.join();
|
|
392 assert( waiting && alertedOne && !alertedTwo );
|
|
393 }
|
|
394
|
|
395
|
|
396 unittest
|
|
397 {
|
|
398 testWait();
|
|
399 testWaitTimeout();
|
|
400 }
|
|
401 }
|