132
|
1 /**
|
|
2 * The read/write mutex module provides a primitive for maintaining shared read
|
|
3 * access and mutually exclusive write access.
|
|
4 *
|
|
5 * Copyright: Copyright (C) 2005-2006 Sean Kelly. All rights reserved.
|
|
6 * License: BSD style: $(LICENSE)
|
|
7 * Authors: Sean Kelly
|
|
8 */
|
|
9 module tango.core.sync.ReadWriteMutex;
|
|
10
|
|
11
|
|
12 public import tango.core.Exception : SyncException;
|
|
13 private import tango.core.sync.Condition;
|
|
14 private import tango.core.sync.Mutex;
|
|
15
|
|
16 version( Win32 )
|
|
17 {
|
|
18 private import tango.sys.win32.UserGdi;
|
|
19 }
|
|
20 else version( Posix )
|
|
21 {
|
|
22 private import tango.stdc.posix.pthread;
|
|
23 }
|
|
24
|
|
25
|
|
26 ////////////////////////////////////////////////////////////////////////////////
|
|
27 // ReadWriteMutex
|
|
28 //
|
|
29 // Reader reader();
|
|
30 // Writer writer();
|
|
31 ////////////////////////////////////////////////////////////////////////////////
|
|
32
|
|
33
|
|
34 /**
|
|
35 * This class represents a mutex that allows any number of readers to enter,
|
|
36 * but when a writer enters, all other readers and writers are blocked.
|
|
37 *
|
|
38 * Please note that this mutex is not recursive and is intended to guard access
|
|
39 * to data only. Also, no deadlock checking is in place because doing so would
|
|
40 * require dynamic memory allocation, which would reduce performance by an
|
|
41 * unacceptable amount. As a result, any attempt to recursively acquire this
|
|
42 * mutex may well deadlock the caller, particularly if a write lock is acquired
|
|
43 * while holding a read lock, or vice-versa. In practice, this should not be
|
|
44 * an issue however, because it is uncommon to call deeply into unknown code
|
|
45 * while holding a lock that simply protects data.
|
|
46 */
|
|
47 class ReadWriteMutex
|
|
48 {
|
|
49 /**
|
|
50 * Defines the policy used by this mutex. Currently, two policies are
|
|
51 * defined.
|
|
52 *
|
|
53 * The first will queue writers until no readers hold the mutex, then
|
|
54 * pass the writers through one at a time. If a reader acquires the mutex
|
|
55 * while there are still writers queued, the reader will take precedence.
|
|
56 *
|
|
57 * The second will queue readers if there are any writers queued. Writers
|
|
58 * are passed through one at a time, and once there are no writers present,
|
|
59 * all queued readers will be alerted.
|
|
60 *
|
|
61 * Future policies may offer a more even balance between reader and writer
|
|
62 * precedence.
|
|
63 */
|
|
64 enum Policy
|
|
65 {
|
|
66 PREFER_READERS, /// Readers get preference. This may starve writers.
|
|
67 PREFER_WRITERS /// Writers get preference. This may starve readers.
|
|
68 }
|
|
69
|
|
70
|
|
71 ////////////////////////////////////////////////////////////////////////////
|
|
72 // Initialization
|
|
73 ////////////////////////////////////////////////////////////////////////////
|
|
74
|
|
75
|
|
76 /**
|
|
77 * Initializes a read/write mutex object with the supplied policy.
|
|
78 *
|
|
79 * Params:
|
|
80 * policy = The policy to use.
|
|
81 *
|
|
82 * Throws:
|
|
83 * SyncException on error.
|
|
84 */
|
|
85 this( Policy policy = Policy.PREFER_WRITERS )
|
|
86 {
|
|
87 m_commonMutex = new Mutex;
|
|
88 if( !m_commonMutex )
|
|
89 throw new SyncException( "Unable to initialize mutex" );
|
|
90 scope(failure) delete m_commonMutex;
|
|
91
|
|
92 m_readerQueue = new Condition( m_commonMutex );
|
|
93 if( !m_readerQueue )
|
|
94 throw new SyncException( "Unable to initialize mutex" );
|
|
95 scope(failure) delete m_readerQueue;
|
|
96
|
|
97 m_writerQueue = new Condition( m_commonMutex );
|
|
98 if( !m_writerQueue )
|
|
99 throw new SyncException( "Unable to initialize mutex" );
|
|
100 scope(failure) delete m_writerQueue;
|
|
101
|
|
102 m_policy = policy;
|
|
103 version(LLVMDC)
|
|
104 {
|
|
105 pragma(msg, "ReadWriteMutex is broken on LLVMDC");
|
|
106 }
|
|
107 else
|
|
108 {
|
|
109 m_reader = new Reader;
|
|
110 m_writer = new Writer;
|
|
111 }
|
|
112 }
|
|
113
|
|
114
|
|
115 ////////////////////////////////////////////////////////////////////////////
|
|
116 // General Properties
|
|
117 ////////////////////////////////////////////////////////////////////////////
|
|
118
|
|
119
|
|
120 /**
|
|
121 * Gets the policy for the associated mutex.
|
|
122 *
|
|
123 * Returns:
|
|
124 * The policy used by this mutex.
|
|
125 */
|
|
126 Policy policy()
|
|
127 {
|
|
128 return m_policy;
|
|
129 }
|
|
130
|
|
131
|
|
132 ////////////////////////////////////////////////////////////////////////////
|
|
133 // Reader/Writer Handles
|
|
134 ////////////////////////////////////////////////////////////////////////////
|
|
135
|
|
136
|
|
137 /**
|
|
138 * Gets an object representing the reader lock for the associated mutex.
|
|
139 *
|
|
140 * Returns:
|
|
141 * A reader sub-mutex.
|
|
142 */
|
|
143 Reader reader()
|
|
144 {
|
|
145 return m_reader;
|
|
146 }
|
|
147
|
|
148
|
|
149 /**
|
|
150 * Gets an object representing the writer lock for the associated mutex.
|
|
151 *
|
|
152 * Returns:
|
|
153 * A writer sub-mutex.
|
|
154 */
|
|
155 Writer writer()
|
|
156 {
|
|
157 return m_writer;
|
|
158 }
|
|
159
|
|
160
|
|
161 ////////////////////////////////////////////////////////////////////////////
|
|
162 // Reader
|
|
163 ////////////////////////////////////////////////////////////////////////////
|
|
164
|
|
165
|
|
166 /**
|
|
167 * This class can be considered a mutex in its own right, and is used to
|
|
168 * negotiate a read lock for the enclosing mutex.
|
|
169 */
|
|
170 class Reader :
|
|
171 Object.Monitor
|
|
172 {
|
|
173 /**
|
|
174 * Initializes a read/write mutex reader proxy object.
|
|
175 */
|
|
176 this()
|
|
177 {
|
|
178 m_proxy.link = this;
|
|
179 (cast(void**) this)[1] = &m_proxy;
|
|
180 }
|
|
181
|
|
182
|
|
183 /**
|
|
184 * Acquires a read lock on the enclosing mutex.
|
|
185 */
|
|
186 void lock()
|
|
187 {
|
|
188 synchronized( m_commonMutex )
|
|
189 {
|
|
190 ++m_numQueuedReaders;
|
|
191 scope(exit) --m_numQueuedReaders;
|
|
192
|
|
193 while( shouldQueueReader() )
|
|
194 m_readerQueue.wait();
|
|
195 ++m_numActiveReaders;
|
|
196 }
|
|
197 }
|
|
198
|
|
199
|
|
200 /**
|
|
201 * Releases a read lock on the enclosing mutex.
|
|
202 */
|
|
203 void unlock()
|
|
204 {
|
|
205 synchronized( m_commonMutex )
|
|
206 {
|
|
207 if( --m_numActiveReaders < 1 )
|
|
208 {
|
|
209 if( m_numQueuedWriters > 0 )
|
|
210 m_writerQueue.notify();
|
|
211 }
|
|
212 }
|
|
213 }
|
|
214
|
|
215
|
|
216 /**
|
|
217 * Attempts to acquire a read lock on the enclosing mutex. If one can
|
|
218 * be obtained without blocking, the lock is acquired and true is
|
|
219 * returned. If not, the lock is not acquired and false is returned.
|
|
220 *
|
|
221 * Returns:
|
|
222 * true if the lock was acquired and false if not.
|
|
223 */
|
|
224 bool tryLock()
|
|
225 {
|
|
226 synchronized( m_commonMutex )
|
|
227 {
|
|
228 if( shouldQueueReader() )
|
|
229 return false;
|
|
230 ++m_numActiveReaders;
|
|
231 return true;
|
|
232 }
|
|
233 }
|
|
234
|
|
235
|
|
236 private:
|
|
237 bool shouldQueueReader()
|
|
238 {
|
|
239 if( m_numActiveWriters > 0 )
|
|
240 return true;
|
|
241
|
|
242 switch( m_policy )
|
|
243 {
|
|
244 case Policy.PREFER_WRITERS:
|
|
245 return m_numQueuedWriters > 0;
|
|
246
|
|
247 case Policy.PREFER_READERS:
|
|
248 default:
|
|
249 break;
|
|
250 }
|
|
251
|
|
252 return false;
|
|
253 }
|
|
254
|
|
255 struct MonitorProxy
|
|
256 {
|
|
257 Object.Monitor link;
|
|
258 }
|
|
259
|
|
260 MonitorProxy m_proxy;
|
|
261 }
|
|
262
|
|
263
|
|
264 ////////////////////////////////////////////////////////////////////////////
|
|
265 // Writer
|
|
266 ////////////////////////////////////////////////////////////////////////////
|
|
267
|
|
268
|
|
269 /**
|
|
270 * This class can be considered a mutex in its own right, and is used to
|
|
271 * negotiate a write lock for the enclosing mutex.
|
|
272 */
|
|
273 class Writer :
|
|
274 Object.Monitor
|
|
275 {
|
|
276 /**
|
|
277 * Initializes a read/write mutex writer proxy object.
|
|
278 */
|
|
279 this()
|
|
280 {
|
|
281 m_proxy.link = this;
|
|
282 (cast(void**) this)[1] = &m_proxy;
|
|
283 }
|
|
284
|
|
285
|
|
286 /**
|
|
287 * Acquires a write lock on the enclosing mutex.
|
|
288 */
|
|
289 void lock()
|
|
290 {
|
|
291 synchronized( m_commonMutex )
|
|
292 {
|
|
293 ++m_numQueuedWriters;
|
|
294 scope(exit) --m_numQueuedWriters;
|
|
295
|
|
296 while( shouldQueueWriter() )
|
|
297 m_writerQueue.wait();
|
|
298 ++m_numActiveWriters;
|
|
299 }
|
|
300 }
|
|
301
|
|
302
|
|
303 /**
|
|
304 * Releases a write lock on the enclosing mutex.
|
|
305 */
|
|
306 void unlock()
|
|
307 {
|
|
308 synchronized( m_commonMutex )
|
|
309 {
|
|
310 if( --m_numActiveWriters < 1 )
|
|
311 {
|
|
312 switch( m_policy )
|
|
313 {
|
|
314 default:
|
|
315 case Policy.PREFER_READERS:
|
|
316 if( m_numQueuedReaders > 0 )
|
|
317 m_readerQueue.notifyAll();
|
|
318 else if( m_numQueuedWriters > 0 )
|
|
319 m_writerQueue.notify();
|
|
320 break;
|
|
321 case Policy.PREFER_WRITERS:
|
|
322 if( m_numQueuedWriters > 0 )
|
|
323 m_writerQueue.notify();
|
|
324 else if( m_numQueuedReaders > 0 )
|
|
325 m_readerQueue.notifyAll();
|
|
326 }
|
|
327 }
|
|
328 }
|
|
329 }
|
|
330
|
|
331
|
|
332 /**
|
|
333 * Attempts to acquire a write lock on the enclosing mutex. If one can
|
|
334 * be obtained without blocking, the lock is acquired and true is
|
|
335 * returned. If not, the lock is not acquired and false is returned.
|
|
336 *
|
|
337 * Returns:
|
|
338 * true if the lock was acquired and false if not.
|
|
339 */
|
|
340 bool tryLock()
|
|
341 {
|
|
342 synchronized( m_commonMutex )
|
|
343 {
|
|
344 if( shouldQueueWriter() )
|
|
345 return false;
|
|
346 ++m_numActiveWriters;
|
|
347 return true;
|
|
348 }
|
|
349 }
|
|
350
|
|
351
|
|
352 private:
|
|
353 bool shouldQueueWriter()
|
|
354 {
|
|
355 if( m_numActiveWriters > 0 ||
|
|
356 m_numActiveReaders > 0 )
|
|
357 return true;
|
|
358 switch( m_policy )
|
|
359 {
|
|
360 case Policy.PREFER_READERS:
|
|
361 return m_numQueuedReaders > 0;
|
|
362
|
|
363 case Policy.PREFER_WRITERS:
|
|
364 default:
|
|
365 break;
|
|
366 }
|
|
367
|
|
368 return false;
|
|
369 }
|
|
370
|
|
371 struct MonitorProxy
|
|
372 {
|
|
373 Object.Monitor link;
|
|
374 }
|
|
375
|
|
376 MonitorProxy m_proxy;
|
|
377 }
|
|
378
|
|
379
|
|
380 private:
|
|
381 Policy m_policy;
|
|
382 Reader m_reader;
|
|
383 Writer m_writer;
|
|
384
|
|
385 Mutex m_commonMutex;
|
|
386 Condition m_readerQueue;
|
|
387 Condition m_writerQueue;
|
|
388
|
|
389 int m_numQueuedReaders;
|
|
390 int m_numActiveReaders;
|
|
391 int m_numQueuedWriters;
|
|
392 int m_numActiveWriters;
|
|
393 }
|
|
394
|
|
395
|
|
396 ////////////////////////////////////////////////////////////////////////////////
|
|
397 // Unit Tests
|
|
398 ////////////////////////////////////////////////////////////////////////////////
|
|
399
|
|
400
|
|
401 debug( UnitTest )
|
|
402 {
|
|
403 private import tango.core.Thread;
|
|
404
|
|
405
|
|
406 void testRead( ReadWriteMutex.Policy policy )
|
|
407 {
|
|
408 auto mutex = new ReadWriteMutex( policy );
|
|
409 auto synInfo = new Object;
|
|
410 int numThreads = 10;
|
|
411 int numReaders = 0;
|
|
412 int maxReaders = 0;
|
|
413
|
|
414 void readerFn()
|
|
415 {
|
|
416 synchronized( mutex.reader() )
|
|
417 {
|
|
418 synchronized( synInfo )
|
|
419 {
|
|
420 if( ++numReaders > maxReaders )
|
|
421 maxReaders = numReaders;
|
|
422 }
|
|
423 Thread.yield();
|
|
424 synchronized( synInfo )
|
|
425 {
|
|
426 --numReaders;
|
|
427 }
|
|
428 }
|
|
429 }
|
|
430
|
|
431 auto group = new ThreadGroup;
|
|
432
|
|
433 for( int i = 0; i < numThreads; ++i )
|
|
434 {
|
|
435 group.create( &readerFn );
|
|
436 }
|
|
437 group.joinAll();
|
|
438 assert( numReaders < 1 && maxReaders > 1 );
|
|
439 }
|
|
440
|
|
441
|
|
442 void testReadWrite( ReadWriteMutex.Policy policy )
|
|
443 {
|
|
444 auto mutex = new ReadWriteMutex( policy );
|
|
445 auto synInfo = new Object;
|
|
446 int numThreads = 10;
|
|
447 int numReaders = 0;
|
|
448 int numWriters = 0;
|
|
449 int maxReaders = 0;
|
|
450 int maxWriters = 0;
|
|
451 int numTries = 20;
|
|
452
|
|
453 void readerFn()
|
|
454 {
|
|
455 for( int i = 0; i < numTries; ++i )
|
|
456 {
|
|
457 synchronized( mutex.reader() )
|
|
458 {
|
|
459 synchronized( synInfo )
|
|
460 {
|
|
461 if( ++numReaders > maxReaders )
|
|
462 maxReaders = numReaders;
|
|
463 }
|
|
464 Thread.yield();
|
|
465 synchronized( synInfo )
|
|
466 {
|
|
467 --numReaders;
|
|
468 }
|
|
469 }
|
|
470 }
|
|
471 }
|
|
472
|
|
473 void writerFn()
|
|
474 {
|
|
475 for( int i = 0; i < numTries; ++i )
|
|
476 {
|
|
477 synchronized( mutex.writer() )
|
|
478 {
|
|
479 synchronized( synInfo )
|
|
480 {
|
|
481 if( ++numWriters > maxWriters )
|
|
482 maxWriters = numWriters;
|
|
483 }
|
|
484 Thread.yield();
|
|
485 synchronized( synInfo )
|
|
486 {
|
|
487 --numWriters;
|
|
488 }
|
|
489 }
|
|
490 }
|
|
491 }
|
|
492
|
|
493 auto group = new ThreadGroup;
|
|
494
|
|
495 for( int i = 0; i < numThreads; ++i )
|
|
496 {
|
|
497 group.create( &readerFn );
|
|
498 group.create( &writerFn );
|
|
499 }
|
|
500 group.joinAll();
|
|
501 assert( numReaders < 1 && maxReaders > 1 &&
|
|
502 numWriters < 1 && maxWriters < 2 );
|
|
503 }
|
|
504
|
|
505
|
|
506 unittest
|
|
507 {
|
|
508 testRead( ReadWriteMutex.Policy.PREFER_READERS );
|
|
509 testRead( ReadWriteMutex.Policy.PREFER_WRITERS );
|
|
510 testReadWrite( ReadWriteMutex.Policy.PREFER_READERS );
|
|
511 testReadWrite( ReadWriteMutex.Policy.PREFER_WRITERS );
|
|
512 }
|
|
513 }
|