comparison druntime/src/common/core/sync/rwmutex.d @ 1458:e0b2d67cfe7c

Added druntime (this should be removed once it works).
author Robert Clipsham <robert@octarineparrot.com>
date Tue, 02 Jun 2009 17:43:06 +0100
parents
children
comparison
equal deleted inserted replaced
1456:7b218ec1044f 1458:e0b2d67cfe7c
1 /**
2 * The read/write mutex module provides a primitive for maintaining shared read
3 * access and mutually exclusive write access.
4 *
5 * Copyright: Copyright Sean Kelly 2005 - 2009.
6 * License: <a href="http://www.boost.org/LICENSE_1_0.txt>Boost License 1.0</a>.
7 * Authors: Sean Kelly
8 *
9 * Copyright Sean Kelly 2005 - 2009.
10 * Distributed under the Boost Software License, Version 1.0.
11 * (See accompanying file LICENSE_1_0.txt or copy at
12 * http://www.boost.org/LICENSE_1_0.txt)
13 */
14 module core.sync.rwmutex;
15
16
17 public import core.sync.exception;
18 private import core.sync.condition;
19 private import core.sync.mutex;
20
21 version( Win32 )
22 {
23 private import core.sys.windows.windows;
24 }
25 else version( Posix )
26 {
27 private import core.sys.posix.pthread;
28 }
29
30
31 ////////////////////////////////////////////////////////////////////////////////
32 // ReadWriteMutex
33 //
34 // Reader reader();
35 // Writer writer();
36 ////////////////////////////////////////////////////////////////////////////////
37
38
39 /**
40 * This class represents a mutex that allows any number of readers to enter,
41 * but when a writer enters, all other readers and writers are blocked.
42 *
43 * Please note that this mutex is not recursive and is intended to guard access
44 * to data only. Also, no deadlock checking is in place because doing so would
45 * require dynamic memory allocation, which would reduce performance by an
46 * unacceptable amount. As a result, any attempt to recursively acquire this
47 * mutex may well deadlock the caller, particularly if a write lock is acquired
48 * while holding a read lock, or vice-versa. In practice, this should not be
49 * an issue however, because it is uncommon to call deeply into unknown code
50 * while holding a lock that simply protects data.
51 */
52 class ReadWriteMutex
53 {
54 /**
55 * Defines the policy used by this mutex. Currently, two policies are
56 * defined.
57 *
58 * The first will queue writers until no readers hold the mutex, then
59 * pass the writers through one at a time. If a reader acquires the mutex
60 * while there are still writers queued, the reader will take precedence.
61 *
62 * The second will queue readers if there are any writers queued. Writers
63 * are passed through one at a time, and once there are no writers present,
64 * all queued readers will be alerted.
65 *
66 * Future policies may offer a more even balance between reader and writer
67 * precedence.
68 */
69 enum Policy
70 {
71 PREFER_READERS, /// Readers get preference. This may starve writers.
72 PREFER_WRITERS /// Writers get preference. This may starve readers.
73 }
74
75
76 ////////////////////////////////////////////////////////////////////////////
77 // Initialization
78 ////////////////////////////////////////////////////////////////////////////
79
80
81 /**
82 * Initializes a read/write mutex object with the supplied policy.
83 *
84 * Params:
85 * policy = The policy to use.
86 *
87 * Throws:
88 * SyncException on error.
89 */
90 this( Policy policy = Policy.PREFER_WRITERS )
91 {
92 m_commonMutex = new Mutex;
93 if( !m_commonMutex )
94 throw new SyncException( "Unable to initialize mutex" );
95 scope(failure) delete m_commonMutex;
96
97 m_readerQueue = new Condition( m_commonMutex );
98 if( !m_readerQueue )
99 throw new SyncException( "Unable to initialize mutex" );
100 scope(failure) delete m_readerQueue;
101
102 m_writerQueue = new Condition( m_commonMutex );
103 if( !m_writerQueue )
104 throw new SyncException( "Unable to initialize mutex" );
105 scope(failure) delete m_writerQueue;
106
107 m_policy = policy;
108 m_reader = new Reader;
109 m_writer = new Writer;
110 }
111
112
113 ////////////////////////////////////////////////////////////////////////////
114 // General Properties
115 ////////////////////////////////////////////////////////////////////////////
116
117
118 /**
119 * Gets the policy for the associated mutex.
120 *
121 * Returns:
122 * The policy used by this mutex.
123 */
124 Policy policy()
125 {
126 return m_policy;
127 }
128
129
130 ////////////////////////////////////////////////////////////////////////////
131 // Reader/Writer Handles
132 ////////////////////////////////////////////////////////////////////////////
133
134
135 /**
136 * Gets an object representing the reader lock for the associated mutex.
137 *
138 * Returns:
139 * A reader sub-mutex.
140 */
141 Reader reader()
142 {
143 return m_reader;
144 }
145
146
147 /**
148 * Gets an object representing the writer lock for the associated mutex.
149 *
150 * Returns:
151 * A writer sub-mutex.
152 */
153 Writer writer()
154 {
155 return m_writer;
156 }
157
158
159 ////////////////////////////////////////////////////////////////////////////
160 // Reader
161 ////////////////////////////////////////////////////////////////////////////
162
163
164 /**
165 * This class can be considered a mutex in its own right, and is used to
166 * negotiate a read lock for the enclosing mutex.
167 */
168 class Reader :
169 Object.Monitor
170 {
171 /**
172 * Initializes a read/write mutex reader proxy object.
173 */
174 this()
175 {
176 m_proxy.link = this;
177 (cast(void**) this)[1] = &m_proxy;
178 }
179
180
181 /**
182 * Acquires a read lock on the enclosing mutex.
183 */
184 void lock()
185 {
186 synchronized( m_commonMutex )
187 {
188 ++m_numQueuedReaders;
189 scope(exit) --m_numQueuedReaders;
190
191 while( shouldQueueReader() )
192 m_readerQueue.wait();
193 ++m_numActiveReaders;
194 }
195 }
196
197
198 /**
199 * Releases a read lock on the enclosing mutex.
200 */
201 void unlock()
202 {
203 synchronized( m_commonMutex )
204 {
205 if( --m_numActiveReaders < 1 )
206 {
207 if( m_numQueuedWriters > 0 )
208 m_writerQueue.notify();
209 }
210 }
211 }
212
213
214 /**
215 * Attempts to acquire a read lock on the enclosing mutex. If one can
216 * be obtained without blocking, the lock is acquired and true is
217 * returned. If not, the lock is not acquired and false is returned.
218 *
219 * Returns:
220 * true if the lock was acquired and false if not.
221 */
222 bool tryLock()
223 {
224 synchronized( m_commonMutex )
225 {
226 if( shouldQueueReader() )
227 return false;
228 ++m_numActiveReaders;
229 return true;
230 }
231 }
232
233
234 private:
235 bool shouldQueueReader()
236 {
237 if( m_numActiveWriters > 0 )
238 return true;
239
240 switch( m_policy )
241 {
242 case Policy.PREFER_WRITERS:
243 return m_numQueuedWriters > 0;
244
245 case Policy.PREFER_READERS:
246 default:
247 break;
248 }
249
250 return false;
251 }
252
253 struct MonitorProxy
254 {
255 Object.Monitor link;
256 }
257
258 MonitorProxy m_proxy;
259 }
260
261
262 ////////////////////////////////////////////////////////////////////////////
263 // Writer
264 ////////////////////////////////////////////////////////////////////////////
265
266
267 /**
268 * This class can be considered a mutex in its own right, and is used to
269 * negotiate a write lock for the enclosing mutex.
270 */
271 class Writer :
272 Object.Monitor
273 {
274 /**
275 * Initializes a read/write mutex writer proxy object.
276 */
277 this()
278 {
279 m_proxy.link = this;
280 (cast(void**) this)[1] = &m_proxy;
281 }
282
283
284 /**
285 * Acquires a write lock on the enclosing mutex.
286 */
287 void lock()
288 {
289 synchronized( m_commonMutex )
290 {
291 ++m_numQueuedWriters;
292 scope(exit) --m_numQueuedWriters;
293
294 while( shouldQueueWriter() )
295 m_writerQueue.wait();
296 ++m_numActiveWriters;
297 }
298 }
299
300
301 /**
302 * Releases a write lock on the enclosing mutex.
303 */
304 void unlock()
305 {
306 synchronized( m_commonMutex )
307 {
308 if( --m_numActiveWriters < 1 )
309 {
310 switch( m_policy )
311 {
312 default:
313 case Policy.PREFER_READERS:
314 if( m_numQueuedReaders > 0 )
315 m_readerQueue.notifyAll();
316 else if( m_numQueuedWriters > 0 )
317 m_writerQueue.notify();
318 break;
319 case Policy.PREFER_WRITERS:
320 if( m_numQueuedWriters > 0 )
321 m_writerQueue.notify();
322 else if( m_numQueuedReaders > 0 )
323 m_readerQueue.notifyAll();
324 }
325 }
326 }
327 }
328
329
330 /**
331 * Attempts to acquire a write lock on the enclosing mutex. If one can
332 * be obtained without blocking, the lock is acquired and true is
333 * returned. If not, the lock is not acquired and false is returned.
334 *
335 * Returns:
336 * true if the lock was acquired and false if not.
337 */
338 bool tryLock()
339 {
340 synchronized( m_commonMutex )
341 {
342 if( shouldQueueWriter() )
343 return false;
344 ++m_numActiveWriters;
345 return true;
346 }
347 }
348
349
350 private:
351 bool shouldQueueWriter()
352 {
353 if( m_numActiveWriters > 0 ||
354 m_numActiveReaders > 0 )
355 return true;
356 switch( m_policy )
357 {
358 case Policy.PREFER_READERS:
359 return m_numQueuedReaders > 0;
360
361 case Policy.PREFER_WRITERS:
362 default:
363 break;
364 }
365
366 return false;
367 }
368
369 struct MonitorProxy
370 {
371 Object.Monitor link;
372 }
373
374 MonitorProxy m_proxy;
375 }
376
377
378 private:
379 Policy m_policy;
380 Reader m_reader;
381 Writer m_writer;
382
383 Mutex m_commonMutex;
384 Condition m_readerQueue;
385 Condition m_writerQueue;
386
387 int m_numQueuedReaders;
388 int m_numActiveReaders;
389 int m_numQueuedWriters;
390 int m_numActiveWriters;
391 }
392
393
394 ////////////////////////////////////////////////////////////////////////////////
395 // Unit Tests
396 ////////////////////////////////////////////////////////////////////////////////
397
398
399 version( unittest )
400 {
401 static if( !is( typeof( Thread ) ) )
402 private import core.thread;
403
404
405 void testRead( ReadWriteMutex.Policy policy )
406 {
407 auto mutex = new ReadWriteMutex( policy );
408 auto synInfo = new Object;
409 int numThreads = 10;
410 int numReaders = 0;
411 int maxReaders = 0;
412
413 void readerFn()
414 {
415 synchronized( mutex.reader() )
416 {
417 synchronized( synInfo )
418 {
419 if( ++numReaders > maxReaders )
420 maxReaders = numReaders;
421 }
422 Thread.sleep( 100_000 ); // 1ms
423 synchronized( synInfo )
424 {
425 --numReaders;
426 }
427 }
428 }
429
430 auto group = new ThreadGroup;
431
432 for( int i = 0; i < numThreads; ++i )
433 {
434 group.create( &readerFn );
435 }
436 group.joinAll();
437 assert( numReaders < 1 && maxReaders > 1 );
438 }
439
440
441 void testReadWrite( ReadWriteMutex.Policy policy )
442 {
443 auto mutex = new ReadWriteMutex( policy );
444 auto synInfo = new Object;
445 int numThreads = 10;
446 int numReaders = 0;
447 int numWriters = 0;
448 int maxReaders = 0;
449 int maxWriters = 0;
450 int numTries = 20;
451
452 void readerFn()
453 {
454 for( int i = 0; i < numTries; ++i )
455 {
456 synchronized( mutex.reader() )
457 {
458 synchronized( synInfo )
459 {
460 if( ++numReaders > maxReaders )
461 maxReaders = numReaders;
462 }
463 Thread.sleep( 100_000 ); // 1ms
464 synchronized( synInfo )
465 {
466 --numReaders;
467 }
468 }
469 }
470 }
471
472 void writerFn()
473 {
474 for( int i = 0; i < numTries; ++i )
475 {
476 synchronized( mutex.writer() )
477 {
478 synchronized( synInfo )
479 {
480 if( ++numWriters > maxWriters )
481 maxWriters = numWriters;
482 }
483 Thread.sleep( 100_000 ); // 1ms
484 synchronized( synInfo )
485 {
486 --numWriters;
487 }
488 }
489 }
490 }
491
492 auto group = new ThreadGroup;
493
494 for( int i = 0; i < numThreads; ++i )
495 {
496 group.create( &readerFn );
497 group.create( &writerFn );
498 }
499 group.joinAll();
500 assert( numReaders < 1 && maxReaders > 1 &&
501 numWriters < 1 && maxWriters < 2 );
502 }
503
504
505 unittest
506 {
507 testRead( ReadWriteMutex.Policy.PREFER_READERS );
508 testRead( ReadWriteMutex.Policy.PREFER_WRITERS );
509 testReadWrite( ReadWriteMutex.Policy.PREFER_READERS );
510 testReadWrite( ReadWriteMutex.Policy.PREFER_WRITERS );
511 }
512 }