comparison tango/tango/core/sync/ReadWriteMutex.d @ 132:1700239cab2e trunk

[svn r136] MAJOR UNSTABLE UPDATE!!! Initial commit after moving to Tango instead of Phobos. Lots of bugfixes... This build is not suitable for most things.
author lindquist
date Fri, 11 Jan 2008 17:57:40 +0100
parents
children
comparison
equal deleted inserted replaced
131:5825d48b27d1 132:1700239cab2e
1 /**
2 * The read/write mutex module provides a primitive for maintaining shared read
3 * access and mutually exclusive write access.
4 *
5 * Copyright: Copyright (C) 2005-2006 Sean Kelly. All rights reserved.
6 * License: BSD style: $(LICENSE)
7 * Authors: Sean Kelly
8 */
9 module tango.core.sync.ReadWriteMutex;
10
11
12 public import tango.core.Exception : SyncException;
13 private import tango.core.sync.Condition;
14 private import tango.core.sync.Mutex;
15
16 version( Win32 )
17 {
18 private import tango.sys.win32.UserGdi;
19 }
20 else version( Posix )
21 {
22 private import tango.stdc.posix.pthread;
23 }
24
25
26 ////////////////////////////////////////////////////////////////////////////////
27 // ReadWriteMutex
28 //
29 // Reader reader();
30 // Writer writer();
31 ////////////////////////////////////////////////////////////////////////////////
32
33
34 /**
35 * This class represents a mutex that allows any number of readers to enter,
36 * but when a writer enters, all other readers and writers are blocked.
37 *
38 * Please note that this mutex is not recursive and is intended to guard access
39 * to data only. Also, no deadlock checking is in place because doing so would
40 * require dynamic memory allocation, which would reduce performance by an
41 * unacceptable amount. As a result, any attempt to recursively acquire this
42 * mutex may well deadlock the caller, particularly if a write lock is acquired
43 * while holding a read lock, or vice-versa. In practice, this should not be
44 * an issue however, because it is uncommon to call deeply into unknown code
45 * while holding a lock that simply protects data.
46 */
47 class ReadWriteMutex
48 {
49 /**
50 * Defines the policy used by this mutex. Currently, two policies are
51 * defined.
52 *
53 * The first will queue writers until no readers hold the mutex, then
54 * pass the writers through one at a time. If a reader acquires the mutex
55 * while there are still writers queued, the reader will take precedence.
56 *
57 * The second will queue readers if there are any writers queued. Writers
58 * are passed through one at a time, and once there are no writers present,
59 * all queued readers will be alerted.
60 *
61 * Future policies may offer a more even balance between reader and writer
62 * precedence.
63 */
64 enum Policy
65 {
66 PREFER_READERS, /// Readers get preference. This may starve writers.
67 PREFER_WRITERS /// Writers get preference. This may starve readers.
68 }
69
70
71 ////////////////////////////////////////////////////////////////////////////
72 // Initialization
73 ////////////////////////////////////////////////////////////////////////////
74
75
76 /**
77 * Initializes a read/write mutex object with the supplied policy.
78 *
79 * Params:
80 * policy = The policy to use.
81 *
82 * Throws:
83 * SyncException on error.
84 */
85 this( Policy policy = Policy.PREFER_WRITERS )
86 {
87 m_commonMutex = new Mutex;
88 if( !m_commonMutex )
89 throw new SyncException( "Unable to initialize mutex" );
90 scope(failure) delete m_commonMutex;
91
92 m_readerQueue = new Condition( m_commonMutex );
93 if( !m_readerQueue )
94 throw new SyncException( "Unable to initialize mutex" );
95 scope(failure) delete m_readerQueue;
96
97 m_writerQueue = new Condition( m_commonMutex );
98 if( !m_writerQueue )
99 throw new SyncException( "Unable to initialize mutex" );
100 scope(failure) delete m_writerQueue;
101
102 m_policy = policy;
103 version(LLVMDC)
104 {
105 pragma(msg, "ReadWriteMutex is broken on LLVMDC");
106 }
107 else
108 {
109 m_reader = new Reader;
110 m_writer = new Writer;
111 }
112 }
113
114
115 ////////////////////////////////////////////////////////////////////////////
116 // General Properties
117 ////////////////////////////////////////////////////////////////////////////
118
119
120 /**
121 * Gets the policy for the associated mutex.
122 *
123 * Returns:
124 * The policy used by this mutex.
125 */
126 Policy policy()
127 {
128 return m_policy;
129 }
130
131
132 ////////////////////////////////////////////////////////////////////////////
133 // Reader/Writer Handles
134 ////////////////////////////////////////////////////////////////////////////
135
136
137 /**
138 * Gets an object representing the reader lock for the associated mutex.
139 *
140 * Returns:
141 * A reader sub-mutex.
142 */
143 Reader reader()
144 {
145 return m_reader;
146 }
147
148
149 /**
150 * Gets an object representing the writer lock for the associated mutex.
151 *
152 * Returns:
153 * A writer sub-mutex.
154 */
155 Writer writer()
156 {
157 return m_writer;
158 }
159
160
161 ////////////////////////////////////////////////////////////////////////////
162 // Reader
163 ////////////////////////////////////////////////////////////////////////////
164
165
166 /**
167 * This class can be considered a mutex in its own right, and is used to
168 * negotiate a read lock for the enclosing mutex.
169 */
170 class Reader :
171 Object.Monitor
172 {
173 /**
174 * Initializes a read/write mutex reader proxy object.
175 */
176 this()
177 {
178 m_proxy.link = this;
179 (cast(void**) this)[1] = &m_proxy;
180 }
181
182
183 /**
184 * Acquires a read lock on the enclosing mutex.
185 */
186 void lock()
187 {
188 synchronized( m_commonMutex )
189 {
190 ++m_numQueuedReaders;
191 scope(exit) --m_numQueuedReaders;
192
193 while( shouldQueueReader() )
194 m_readerQueue.wait();
195 ++m_numActiveReaders;
196 }
197 }
198
199
200 /**
201 * Releases a read lock on the enclosing mutex.
202 */
203 void unlock()
204 {
205 synchronized( m_commonMutex )
206 {
207 if( --m_numActiveReaders < 1 )
208 {
209 if( m_numQueuedWriters > 0 )
210 m_writerQueue.notify();
211 }
212 }
213 }
214
215
216 /**
217 * Attempts to acquire a read lock on the enclosing mutex. If one can
218 * be obtained without blocking, the lock is acquired and true is
219 * returned. If not, the lock is not acquired and false is returned.
220 *
221 * Returns:
222 * true if the lock was acquired and false if not.
223 */
224 bool tryLock()
225 {
226 synchronized( m_commonMutex )
227 {
228 if( shouldQueueReader() )
229 return false;
230 ++m_numActiveReaders;
231 return true;
232 }
233 }
234
235
236 private:
237 bool shouldQueueReader()
238 {
239 if( m_numActiveWriters > 0 )
240 return true;
241
242 switch( m_policy )
243 {
244 case Policy.PREFER_WRITERS:
245 return m_numQueuedWriters > 0;
246
247 case Policy.PREFER_READERS:
248 default:
249 break;
250 }
251
252 return false;
253 }
254
255 struct MonitorProxy
256 {
257 Object.Monitor link;
258 }
259
260 MonitorProxy m_proxy;
261 }
262
263
264 ////////////////////////////////////////////////////////////////////////////
265 // Writer
266 ////////////////////////////////////////////////////////////////////////////
267
268
269 /**
270 * This class can be considered a mutex in its own right, and is used to
271 * negotiate a write lock for the enclosing mutex.
272 */
273 class Writer :
274 Object.Monitor
275 {
276 /**
277 * Initializes a read/write mutex writer proxy object.
278 */
279 this()
280 {
281 m_proxy.link = this;
282 (cast(void**) this)[1] = &m_proxy;
283 }
284
285
286 /**
287 * Acquires a write lock on the enclosing mutex.
288 */
289 void lock()
290 {
291 synchronized( m_commonMutex )
292 {
293 ++m_numQueuedWriters;
294 scope(exit) --m_numQueuedWriters;
295
296 while( shouldQueueWriter() )
297 m_writerQueue.wait();
298 ++m_numActiveWriters;
299 }
300 }
301
302
303 /**
304 * Releases a write lock on the enclosing mutex.
305 */
306 void unlock()
307 {
308 synchronized( m_commonMutex )
309 {
310 if( --m_numActiveWriters < 1 )
311 {
312 switch( m_policy )
313 {
314 default:
315 case Policy.PREFER_READERS:
316 if( m_numQueuedReaders > 0 )
317 m_readerQueue.notifyAll();
318 else if( m_numQueuedWriters > 0 )
319 m_writerQueue.notify();
320 break;
321 case Policy.PREFER_WRITERS:
322 if( m_numQueuedWriters > 0 )
323 m_writerQueue.notify();
324 else if( m_numQueuedReaders > 0 )
325 m_readerQueue.notifyAll();
326 }
327 }
328 }
329 }
330
331
332 /**
333 * Attempts to acquire a write lock on the enclosing mutex. If one can
334 * be obtained without blocking, the lock is acquired and true is
335 * returned. If not, the lock is not acquired and false is returned.
336 *
337 * Returns:
338 * true if the lock was acquired and false if not.
339 */
340 bool tryLock()
341 {
342 synchronized( m_commonMutex )
343 {
344 if( shouldQueueWriter() )
345 return false;
346 ++m_numActiveWriters;
347 return true;
348 }
349 }
350
351
352 private:
353 bool shouldQueueWriter()
354 {
355 if( m_numActiveWriters > 0 ||
356 m_numActiveReaders > 0 )
357 return true;
358 switch( m_policy )
359 {
360 case Policy.PREFER_READERS:
361 return m_numQueuedReaders > 0;
362
363 case Policy.PREFER_WRITERS:
364 default:
365 break;
366 }
367
368 return false;
369 }
370
371 struct MonitorProxy
372 {
373 Object.Monitor link;
374 }
375
376 MonitorProxy m_proxy;
377 }
378
379
380 private:
381 Policy m_policy;
382 Reader m_reader;
383 Writer m_writer;
384
385 Mutex m_commonMutex;
386 Condition m_readerQueue;
387 Condition m_writerQueue;
388
389 int m_numQueuedReaders;
390 int m_numActiveReaders;
391 int m_numQueuedWriters;
392 int m_numActiveWriters;
393 }
394
395
396 ////////////////////////////////////////////////////////////////////////////////
397 // Unit Tests
398 ////////////////////////////////////////////////////////////////////////////////
399
400
401 debug( UnitTest )
402 {
403 private import tango.core.Thread;
404
405
406 void testRead( ReadWriteMutex.Policy policy )
407 {
408 auto mutex = new ReadWriteMutex( policy );
409 auto synInfo = new Object;
410 int numThreads = 10;
411 int numReaders = 0;
412 int maxReaders = 0;
413
414 void readerFn()
415 {
416 synchronized( mutex.reader() )
417 {
418 synchronized( synInfo )
419 {
420 if( ++numReaders > maxReaders )
421 maxReaders = numReaders;
422 }
423 Thread.yield();
424 synchronized( synInfo )
425 {
426 --numReaders;
427 }
428 }
429 }
430
431 auto group = new ThreadGroup;
432
433 for( int i = 0; i < numThreads; ++i )
434 {
435 group.create( &readerFn );
436 }
437 group.joinAll();
438 assert( numReaders < 1 && maxReaders > 1 );
439 }
440
441
442 void testReadWrite( ReadWriteMutex.Policy policy )
443 {
444 auto mutex = new ReadWriteMutex( policy );
445 auto synInfo = new Object;
446 int numThreads = 10;
447 int numReaders = 0;
448 int numWriters = 0;
449 int maxReaders = 0;
450 int maxWriters = 0;
451 int numTries = 20;
452
453 void readerFn()
454 {
455 for( int i = 0; i < numTries; ++i )
456 {
457 synchronized( mutex.reader() )
458 {
459 synchronized( synInfo )
460 {
461 if( ++numReaders > maxReaders )
462 maxReaders = numReaders;
463 }
464 Thread.yield();
465 synchronized( synInfo )
466 {
467 --numReaders;
468 }
469 }
470 }
471 }
472
473 void writerFn()
474 {
475 for( int i = 0; i < numTries; ++i )
476 {
477 synchronized( mutex.writer() )
478 {
479 synchronized( synInfo )
480 {
481 if( ++numWriters > maxWriters )
482 maxWriters = numWriters;
483 }
484 Thread.yield();
485 synchronized( synInfo )
486 {
487 --numWriters;
488 }
489 }
490 }
491 }
492
493 auto group = new ThreadGroup;
494
495 for( int i = 0; i < numThreads; ++i )
496 {
497 group.create( &readerFn );
498 group.create( &writerFn );
499 }
500 group.joinAll();
501 assert( numReaders < 1 && maxReaders > 1 &&
502 numWriters < 1 && maxWriters < 2 );
503 }
504
505
506 unittest
507 {
508 testRead( ReadWriteMutex.Policy.PREFER_READERS );
509 testRead( ReadWriteMutex.Policy.PREFER_WRITERS );
510 testReadWrite( ReadWriteMutex.Policy.PREFER_READERS );
511 testReadWrite( ReadWriteMutex.Policy.PREFER_WRITERS );
512 }
513 }