0
|
1 module dreactor.util.ThreadSafeQueue;
|
|
2
|
|
3 import tango.util.collection.CircularSeq;
|
|
4
|
|
5 /******************************************************************************
|
|
6
|
|
7 ThreadSafeQueue
|
|
8 Queue that is probably thread safe. It acts as a job queue, in that
|
|
9 you can push or pop off of the queue. Or you can processAll, which will
|
|
10 apply a delegate to each item, then clear the list.
|
|
11
|
|
12 ******************************************************************************/
|
|
13 class ThreadSafeQueue(TYPE)
|
|
14 {
|
|
15 public
|
|
16 this(int maxsz = 1000)
|
|
17 {
|
|
18 list = new CircularSeq!(TYPE);
|
|
19 maxsize = maxsz;
|
|
20 size = 0;
|
|
21 }
|
|
22
|
|
23 synchronized bool pop(ref TYPE t)
|
|
24 {
|
|
25 if (list.size())
|
|
26 {
|
|
27 TYPE t = list.head();
|
|
28 list.removeHead();
|
|
29 --size;
|
|
30 return true;
|
|
31 }
|
|
32 else
|
|
33 return false;
|
|
34 }
|
|
35
|
|
36 synchronized bool push(TYPE t)
|
|
37 {
|
|
38 if (list.size < maxsize)
|
|
39 {
|
|
40 list.append(t);
|
|
41 ++size;
|
|
42 return true;
|
|
43 }
|
|
44 else
|
|
45 return false;
|
|
46 }
|
|
47
|
|
48 synchronized int size()
|
|
49 {
|
|
50 return size();
|
|
51 }
|
|
52
|
|
53 synchronized int processAll(int delegate(ref T value) dg)
|
|
54 {
|
|
55 int count = 0;
|
|
56 foreach(T t; list)
|
|
57 {
|
|
58 if (dg(t) < 0)
|
|
59 break;
|
|
60 ++count;
|
|
61 }
|
|
62 if (count == size)
|
|
63 clear_();
|
|
64 else
|
|
65 list.removeRange(0, count);
|
|
66 return count;
|
|
67 }
|
|
68
|
|
69 synchronized void clear()
|
|
70 {
|
|
71 clear_();
|
|
72 }
|
|
73
|
|
74 private
|
|
75
|
|
76 void clear_()
|
|
77 {
|
|
78 list.removeAll();
|
|
79 }
|
|
80
|
|
81 CircularSeq!(TYPE) list;
|
|
82 int maxsize;
|
|
83 int size;
|
|
84 }
|