comparison dreactor/util/ThreadSafeQueue.d @ 3:e3dbc9208822

basic tests working
author rick@minifunk
date Tue, 08 Jul 2008 11:21:09 -0400
parents 7a315154bf5e
children
comparison
equal deleted inserted replaced
2:d3374d553986 3:e3dbc9208822
1 module dreactor.util.ThreadSafeQueue; 1 module dreactor.util.ThreadSafeQueue;
2 2
3 import tango.util.collection.CircularSeq; 3 import tango.util.collection.CircularSeq;
4 import tango.core.Atomic;
4 5
6 import tango.util.log.Log;
7 import tango.util.log.Config;
5 /****************************************************************************** 8 /******************************************************************************
6 9
7 ThreadSafeQueue 10 ThreadSafeQueue
8 Queue that is probably thread safe. It acts as a job queue, in that 11 Queue that is probably thread safe. It acts as a job queue, in that
9 you can push or pop off of the queue. Or you can processAll, which will 12 you can push or pop off of the queue. Or you can processAll, which will
13 class ThreadSafeQueue(TYPE) 16 class ThreadSafeQueue(TYPE)
14 { 17 {
15 public 18 public
16 this(int maxsz = 1000) 19 this(int maxsz = 1000)
17 { 20 {
18 list = new CircularSeq!(TYPE); 21 list_ = new CircularSeq!(TYPE);
19 maxsize = maxsz; 22 maxsize_ = maxsz;
20 size = 0; 23 size_ = 0;
24 log = Log.lookup("dreactor.util.ThreadSafeQueue");
21 } 25 }
22 26
23 synchronized bool pop(ref TYPE t) 27 synchronized bool pop(ref TYPE t)
24 { 28 {
25 if (list.size()) 29 if (size_ > 0)
26 { 30 {
27 TYPE t = list.head(); 31 t = list_.head();
28 list.removeHead(); 32 list_.removeHead();
29 --size; 33 size_--;
30 return true; 34 return true;
31 } 35 }
32 else 36 else
33 return false; 37 return false;
34 } 38 }
35 39
36 synchronized bool push(TYPE t) 40 synchronized bool push(TYPE t)
37 { 41 {
38 if (list.size < maxsize) 42 if (size_ < maxsize_)
39 { 43 {
40 list.append(t); 44 list_.append(t);
41 ++size; 45 size_++;
42 return true; 46 return true;
43 } 47 }
44 else 48 else
45 return false; 49 return false;
46 } 50 }
47 51
48 synchronized int size() 52 synchronized int size()
49 { 53 {
50 return size(); 54 return size_;
51 } 55 }
52 56
53 synchronized int processAll(int delegate(ref T value) dg) 57 synchronized int processAll(int delegate(ref TYPE value) dg)
54 { 58 {
59 if (0 >= size_)
60 return 0;
61
55 int count = 0; 62 int count = 0;
56 foreach(T t; list) 63 foreach(TYPE t; list_)
57 { 64 {
58 if (dg(t) < 0) 65 if (dg(t) < 0)
59 break; 66 break;
60 ++count; 67 ++count;
61 } 68 }
62 if (count == size) 69 if (count == size_)
70 {
63 clear_(); 71 clear_();
72 size_ = 0;
73 }
64 else 74 else
65 list.removeRange(0, count); 75 {
76 list_.removeRange(0, count);
77 size_ -= count;
78 }
66 return count; 79 return count;
67 } 80 }
68 81
69 synchronized void clear() 82 synchronized void clear()
70 { 83 {
73 86
74 private 87 private
75 88
76 void clear_() 89 void clear_()
77 { 90 {
78 list.removeAll(); 91 list_.clear();
92 size_ = 0 ;
79 } 93 }
80 94
81 CircularSeq!(TYPE) list; 95 int maxsize_;
82 int maxsize; 96 int size_;
83 int size; 97 Logger log;
98 CircularSeq!(TYPE) list_;
84 } 99 }