Mercurial > projects > dreactor
comparison dreactor/core/Task.d @ 12:d6a3cfe7c3de
more stuff
author | rick@Macintosh.local |
---|---|
date | Wed, 27 Aug 2008 00:47:33 -0400 |
parents | 5836613d16ac |
children | 8c9b1276f623 |
comparison
equal
deleted
inserted
replaced
11:5836613d16ac | 12:d6a3cfe7c3de |
---|---|
1 module dreactor.core.Task; | 1 module dreactor.core.Task; |
2 | 2 |
3 import tango.core.Thread; | 3 import tango.core.Thread; |
4 import tango.util.container.HashMap; | |
5 import tango.util.container.CircularList; | |
4 import dreactor.core.Vat; | 6 import dreactor.core.Vat; |
5 import dreactor.protocol.Protocol; | 7 import dreactor.protocol.IProvider; |
6 import dreactor.protocol.Dispatcher; | 8 |
7 | 9 alias CircularList!(Message) Messages; |
8 alias CircularSeq!(Message) Mailbox; | 10 |
9 | 11 class Mailbox |
12 { | |
13 public | |
14 | |
15 this () { box = new HashMap!(int, Messages); } | |
16 | |
17 Message popMessageOfType(int type) | |
18 { | |
19 Messages m; | |
20 if (box.get(type, m)) | |
21 { | |
22 Message msg = m.removeHead(); | |
23 if (msg) | |
24 msg_count.store(msg_count.load()-1); | |
25 | |
26 if (m.isEmpty()) | |
27 box.removeKey(type); | |
28 | |
29 return msg; | |
30 } | |
31 else | |
32 return null; | |
33 } | |
34 | |
35 //TODO this could be optimized to use set intersection logic instead of checking for | |
36 //multiple keys one at a time. | |
37 Message popMessageOfType(int[] types) | |
38 { | |
39 foreach(int i; types) | |
40 { | |
41 Message msg = popMessageOfType(i); | |
42 if (msg) | |
43 return msg; | |
44 } | |
45 return null; | |
46 } | |
47 | |
48 Message popMessage() | |
49 { | |
50 Messages m; | |
51 int key; | |
52 auto itor = box.iterator; | |
53 | |
54 do | |
55 { | |
56 if (itor.valid && itor.next(key, m)) | |
57 { | |
58 if (!m.isEmpty()) | |
59 { | |
60 Message msg = m.removeHead(); | |
61 if (msg) | |
62 msg_count.store(msg_count.load()-1); | |
63 if (m.isEmpty()) | |
64 box.removeKey(key); | |
65 return msg; | |
66 } | |
67 else | |
68 { | |
69 iterator.remove(); | |
70 } | |
71 } | |
72 else | |
73 return null; | |
74 } | |
75 while (true) | |
76 } | |
77 | |
78 void push(Message msg) | |
79 { | |
80 Messages m; | |
81 if (box.get(msg.type, m)) | |
82 m.append(msg); | |
83 else | |
84 { | |
85 m = new Messages; | |
86 m.append(msg); | |
87 box.add(msg.type, m); | |
88 } | |
89 msg_count.store(msg_count.load()+1); | |
90 } | |
91 | |
92 int count() | |
93 { | |
94 return msg_count.load(); | |
95 } | |
96 private | |
97 HashMap!(int, Messages) box; | |
98 Atomic!(int) msg_count; | |
99 } | |
100 | |
101 alias void delegate (Message) TaskDg; | |
10 class Task | 102 class Task |
11 { | 103 { |
12 private | 104 private |
13 Fiber fiber; | 105 Fiber fiber; |
14 Mailbox mailbox; | 106 Mailbox mailbox; |
107 Mailbox lockedMailbox; | |
15 int id; | 108 int id; |
16 Vat vat; | 109 Vat vat; |
17 dispatcher[Conduit] dispatchers; | 110 TaskDG taskdg; |
18 | 111 IProvider provider; |
112 | |
19 public | 113 public |
20 this() | 114 this(TaskDg tdg = null, IProvider provider = null) |
21 { | 115 { |
22 fiber = new Fiber(&run); | 116 fiber = new Fiber(&run); |
23 mailbox = new Mailbox; | 117 mailbox = new Mailbox; |
118 lockedMailbox = new Mailbox; | |
119 taskdg = tdg; | |
120 if (!provider) | |
121 provider = new DefaultProvider; | |
24 } | 122 } |
25 | 123 |
26 void setId(int i) | 124 void setId(int i) |
27 { | 125 { |
28 id = i; | 126 id = i; |
29 } | 127 } |
30 | 128 |
31 Mailbox getMailbox() | 129 void appendMessage(Message m) |
32 { | 130 { |
33 return mailbox; | 131 mailbox.push(m); |
132 } | |
133 | |
134 synchronized void appendIVMessage(Message m) | |
135 { | |
136 lockedMailbox.push(m); | |
34 } | 137 } |
35 | 138 |
36 void setVat(Vat v) | 139 void setVat(Vat v) |
37 { | 140 { |
38 vat = v; | 141 vat = v; |
39 } | 142 } |
40 | 143 |
41 abstract void run(); | 144 IProvider getProvider() |
145 { | |
146 return provider; | |
147 } | |
148 | |
149 void run() | |
150 in | |
151 { | |
152 assert(taskdg !is null); | |
153 } | |
154 body | |
155 { | |
156 while (msg = receive()) | |
157 { | |
158 taskdg(msg); | |
159 } | |
160 } | |
161 | |
162 /*************************************************************************** | |
163 sendTo | |
164 Basic message passing utility for inter-task communication. | |
165 It first checks the local Vat to see if the task is present, if not | |
166 it gets the task from the global registry and sends a message to its | |
167 thread-safe mailbox. | |
168 ****************************************************************************/ | |
169 | |
170 bool sendTo(int taskid, Message m) | |
171 { | |
172 Task t; | |
173 if (t = vat.getTask(taskid)) | |
174 { | |
175 t.appendMessage(m); | |
176 return true; | |
177 } | |
178 else if (t = Vat.getGlobalTask(taskid)) | |
179 { | |
180 t.appendIVMessage(m); | |
181 return true; | |
182 } | |
183 return false; | |
184 } | |
42 | 185 |
43 protected | 186 protected |
44 | 187 |
45 /*************************************************************************** | 188 /*************************************************************************** |
46 receive | 189 receive |
47 User-called function to get the next pending message in the mailbox. | 190 User-called function to get the next pending message in the mailbox. |
48 If there are no pending messages, this will yield control back to | 191 If there are no pending messages, this will yield control back to |
49 the scheduler/vat. | 192 the vat's scheduler. |
50 ***************************************************************************/ | 193 ***************************************************************************/ |
51 | 194 |
52 Message receive() | 195 Message receive(int[] types) |
53 { | 196 { |
54 Message m = mailbox.head(); | 197 while(true) |
55 mailbox.removeHead(); | 198 { |
56 return m; | 199 Message m = mailbox.popMessageOfType(types); |
200 if (!m) | |
201 Fiber.yield(); | |
202 else if (SYSTEM_QUIT == m.type) | |
203 break; | |
204 else return m; | |
205 | |
206 } | |
207 return null; | |
208 } | |
209 | |
210 Message receive() | |
211 { | |
212 while(true) | |
213 { | |
214 Message m = mailbox.popMessage(); | |
215 if (!m) | |
216 Fiber.yield(); | |
217 else if (SYSTEM_QUIT == m.type) | |
218 break; | |
219 else return m; | |
220 } | |
221 return null; | |
57 } | 222 } |
58 | 223 |
59 int getId() { return id;} | 224 int getId() { return id;} |
60 | 225 |
61 /************************************************************************** | |
62 | |
63 send | |
64 User-called function to send data to the counterpart at the other | |
65 end of the connection. This sets up a dispatcher to send | |
66 data as the conduit becomes free. | |
67 | |
68 **************************************************************************/ | |
69 int send(char[] outbuf, Conduit c) | |
70 { | |
71 Dispatcher dis; | |
72 if ( ! (dis = (c in dispatchers))) | |
73 dis = new Dispatcher(c); | |
74 | |
75 if (dis.appendOutBuffer(outbuf)) | |
76 { | |
77 if (!vat.addConnection(dis)) | |
78 { | |
79 log.error("unable to register mgr"); | |
80 } | |
81 } | |
82 return 0; | |
83 } | |
84 } | 226 } |