11
|
1 module dreactor.core.Task;
|
|
2
|
|
3 import tango.core.Thread;
|
12
|
4 import tango.util.container.HashMap;
|
|
5 import tango.util.container.CircularList;
|
11
|
6 import dreactor.core.Vat;
|
12
|
7 import dreactor.protocol.IProvider;
|
|
8
|
|
9 alias CircularList!(Message) Messages;
|
|
10
|
|
11 class Mailbox
|
|
12 {
|
|
13 public
|
|
14
|
|
15 this () { box = new HashMap!(int, Messages); }
|
|
16
|
|
17 Message popMessageOfType(int type)
|
|
18 {
|
|
19 Messages m;
|
|
20 if (box.get(type, m))
|
|
21 {
|
|
22 Message msg = m.removeHead();
|
|
23 if (msg)
|
|
24 msg_count.store(msg_count.load()-1);
|
|
25
|
|
26 if (m.isEmpty())
|
|
27 box.removeKey(type);
|
|
28
|
|
29 return msg;
|
|
30 }
|
|
31 else
|
|
32 return null;
|
|
33 }
|
|
34
|
|
35 //TODO this could be optimized to use set intersection logic instead of checking for
|
|
36 //multiple keys one at a time.
|
|
37 Message popMessageOfType(int[] types)
|
|
38 {
|
|
39 foreach(int i; types)
|
|
40 {
|
|
41 Message msg = popMessageOfType(i);
|
|
42 if (msg)
|
|
43 return msg;
|
|
44 }
|
|
45 return null;
|
|
46 }
|
|
47
|
|
48 Message popMessage()
|
|
49 {
|
|
50 Messages m;
|
|
51 int key;
|
|
52 auto itor = box.iterator;
|
11
|
53
|
12
|
54 do
|
|
55 {
|
|
56 if (itor.valid && itor.next(key, m))
|
|
57 {
|
|
58 if (!m.isEmpty())
|
|
59 {
|
|
60 Message msg = m.removeHead();
|
|
61 if (msg)
|
|
62 msg_count.store(msg_count.load()-1);
|
|
63 if (m.isEmpty())
|
|
64 box.removeKey(key);
|
|
65 return msg;
|
|
66 }
|
|
67 else
|
|
68 {
|
|
69 iterator.remove();
|
|
70 }
|
|
71 }
|
|
72 else
|
|
73 return null;
|
|
74 }
|
|
75 while (true)
|
|
76 }
|
11
|
77
|
12
|
78 void push(Message msg)
|
|
79 {
|
|
80 Messages m;
|
|
81 if (box.get(msg.type, m))
|
|
82 m.append(msg);
|
|
83 else
|
|
84 {
|
|
85 m = new Messages;
|
|
86 m.append(msg);
|
|
87 box.add(msg.type, m);
|
|
88 }
|
|
89 msg_count.store(msg_count.load()+1);
|
|
90 }
|
|
91
|
|
92 int count()
|
|
93 {
|
|
94 return msg_count.load();
|
|
95 }
|
|
96 private
|
|
97 HashMap!(int, Messages) box;
|
|
98 Atomic!(int) msg_count;
|
|
99 }
|
|
100
|
|
101 alias void delegate (Message) TaskDg;
|
11
|
102 class Task
|
|
103 {
|
|
104 private
|
|
105 Fiber fiber;
|
|
106 Mailbox mailbox;
|
12
|
107 Mailbox lockedMailbox;
|
11
|
108 int id;
|
|
109 Vat vat;
|
12
|
110 TaskDG taskdg;
|
|
111 IProvider provider;
|
|
112
|
11
|
113 public
|
12
|
114 this(TaskDg tdg = null, IProvider provider = null)
|
11
|
115 {
|
|
116 fiber = new Fiber(&run);
|
|
117 mailbox = new Mailbox;
|
12
|
118 lockedMailbox = new Mailbox;
|
|
119 taskdg = tdg;
|
|
120 if (!provider)
|
|
121 provider = new DefaultProvider;
|
11
|
122 }
|
|
123
|
|
124 void setId(int i)
|
|
125 {
|
|
126 id = i;
|
|
127 }
|
|
128
|
12
|
129 void appendMessage(Message m)
|
11
|
130 {
|
12
|
131 mailbox.push(m);
|
|
132 }
|
|
133
|
|
134 synchronized void appendIVMessage(Message m)
|
|
135 {
|
|
136 lockedMailbox.push(m);
|
11
|
137 }
|
|
138
|
|
139 void setVat(Vat v)
|
|
140 {
|
|
141 vat = v;
|
|
142 }
|
|
143
|
12
|
144 IProvider getProvider()
|
|
145 {
|
|
146 return provider;
|
|
147 }
|
|
148
|
|
149 void run()
|
|
150 in
|
|
151 {
|
|
152 assert(taskdg !is null);
|
|
153 }
|
|
154 body
|
|
155 {
|
|
156 while (msg = receive())
|
|
157 {
|
|
158 taskdg(msg);
|
|
159 }
|
|
160 }
|
|
161
|
|
162 /***************************************************************************
|
|
163 sendTo
|
|
164 Basic message passing utility for inter-task communication.
|
|
165 It first checks the local Vat to see if the task is present, if not
|
|
166 it gets the task from the global registry and sends a message to its
|
|
167 thread-safe mailbox.
|
|
168 ****************************************************************************/
|
|
169
|
|
170 bool sendTo(int taskid, Message m)
|
|
171 {
|
|
172 Task t;
|
|
173 if (t = vat.getTask(taskid))
|
|
174 {
|
|
175 t.appendMessage(m);
|
|
176 return true;
|
|
177 }
|
|
178 else if (t = Vat.getGlobalTask(taskid))
|
|
179 {
|
|
180 t.appendIVMessage(m);
|
|
181 return true;
|
|
182 }
|
|
183 return false;
|
|
184 }
|
11
|
185
|
|
186 protected
|
|
187
|
|
188 /***************************************************************************
|
|
189 receive
|
|
190 User-called function to get the next pending message in the mailbox.
|
|
191 If there are no pending messages, this will yield control back to
|
12
|
192 the vat's scheduler.
|
11
|
193 ***************************************************************************/
|
|
194
|
12
|
195 Message receive(int[] types)
|
11
|
196 {
|
12
|
197 while(true)
|
|
198 {
|
|
199 Message m = mailbox.popMessageOfType(types);
|
|
200 if (!m)
|
|
201 Fiber.yield();
|
|
202 else if (SYSTEM_QUIT == m.type)
|
|
203 break;
|
|
204 else return m;
|
|
205
|
|
206 }
|
|
207 return null;
|
|
208 }
|
|
209
|
|
210 Message receive()
|
|
211 {
|
|
212 while(true)
|
|
213 {
|
|
214 Message m = mailbox.popMessage();
|
|
215 if (!m)
|
|
216 Fiber.yield();
|
|
217 else if (SYSTEM_QUIT == m.type)
|
|
218 break;
|
|
219 else return m;
|
|
220 }
|
|
221 return null;
|
11
|
222 }
|
|
223
|
|
224 int getId() { return id;}
|
|
225
|
|
226 }
|