comparison dreactor/core/Task.d @ 12:d6a3cfe7c3de

more stuff
author rick@Macintosh.local
date Wed, 27 Aug 2008 00:47:33 -0400
parents 5836613d16ac
children 8c9b1276f623
comparison
equal deleted inserted replaced
11:5836613d16ac 12:d6a3cfe7c3de
1 module dreactor.core.Task; 1 module dreactor.core.Task;
2 2
3 import tango.core.Thread; 3 import tango.core.Thread;
4 import tango.util.container.HashMap;
5 import tango.util.container.CircularList;
4 import dreactor.core.Vat; 6 import dreactor.core.Vat;
5 import dreactor.protocol.Protocol; 7 import dreactor.protocol.IProvider;
6 import dreactor.protocol.Dispatcher; 8
7 9 alias CircularList!(Message) Messages;
8 alias CircularSeq!(Message) Mailbox; 10
9 11 class Mailbox
12 {
13 public
14
15 this () { box = new HashMap!(int, Messages); }
16
17 Message popMessageOfType(int type)
18 {
19 Messages m;
20 if (box.get(type, m))
21 {
22 Message msg = m.removeHead();
23 if (msg)
24 msg_count.store(msg_count.load()-1);
25
26 if (m.isEmpty())
27 box.removeKey(type);
28
29 return msg;
30 }
31 else
32 return null;
33 }
34
35 //TODO this could be optimized to use set intersection logic instead of checking for
36 //multiple keys one at a time.
37 Message popMessageOfType(int[] types)
38 {
39 foreach(int i; types)
40 {
41 Message msg = popMessageOfType(i);
42 if (msg)
43 return msg;
44 }
45 return null;
46 }
47
48 Message popMessage()
49 {
50 Messages m;
51 int key;
52 auto itor = box.iterator;
53
54 do
55 {
56 if (itor.valid && itor.next(key, m))
57 {
58 if (!m.isEmpty())
59 {
60 Message msg = m.removeHead();
61 if (msg)
62 msg_count.store(msg_count.load()-1);
63 if (m.isEmpty())
64 box.removeKey(key);
65 return msg;
66 }
67 else
68 {
69 iterator.remove();
70 }
71 }
72 else
73 return null;
74 }
75 while (true)
76 }
77
78 void push(Message msg)
79 {
80 Messages m;
81 if (box.get(msg.type, m))
82 m.append(msg);
83 else
84 {
85 m = new Messages;
86 m.append(msg);
87 box.add(msg.type, m);
88 }
89 msg_count.store(msg_count.load()+1);
90 }
91
92 int count()
93 {
94 return msg_count.load();
95 }
96 private
97 HashMap!(int, Messages) box;
98 Atomic!(int) msg_count;
99 }
100
101 alias void delegate (Message) TaskDg;
10 class Task 102 class Task
11 { 103 {
12 private 104 private
13 Fiber fiber; 105 Fiber fiber;
14 Mailbox mailbox; 106 Mailbox mailbox;
107 Mailbox lockedMailbox;
15 int id; 108 int id;
16 Vat vat; 109 Vat vat;
17 dispatcher[Conduit] dispatchers; 110 TaskDG taskdg;
18 111 IProvider provider;
112
19 public 113 public
20 this() 114 this(TaskDg tdg = null, IProvider provider = null)
21 { 115 {
22 fiber = new Fiber(&run); 116 fiber = new Fiber(&run);
23 mailbox = new Mailbox; 117 mailbox = new Mailbox;
118 lockedMailbox = new Mailbox;
119 taskdg = tdg;
120 if (!provider)
121 provider = new DefaultProvider;
24 } 122 }
25 123
26 void setId(int i) 124 void setId(int i)
27 { 125 {
28 id = i; 126 id = i;
29 } 127 }
30 128
31 Mailbox getMailbox() 129 void appendMessage(Message m)
32 { 130 {
33 return mailbox; 131 mailbox.push(m);
132 }
133
134 synchronized void appendIVMessage(Message m)
135 {
136 lockedMailbox.push(m);
34 } 137 }
35 138
36 void setVat(Vat v) 139 void setVat(Vat v)
37 { 140 {
38 vat = v; 141 vat = v;
39 } 142 }
40 143
41 abstract void run(); 144 IProvider getProvider()
145 {
146 return provider;
147 }
148
149 void run()
150 in
151 {
152 assert(taskdg !is null);
153 }
154 body
155 {
156 while (msg = receive())
157 {
158 taskdg(msg);
159 }
160 }
161
162 /***************************************************************************
163 sendTo
164 Basic message passing utility for inter-task communication.
165 It first checks the local Vat to see if the task is present, if not
166 it gets the task from the global registry and sends a message to its
167 thread-safe mailbox.
168 ****************************************************************************/
169
170 bool sendTo(int taskid, Message m)
171 {
172 Task t;
173 if (t = vat.getTask(taskid))
174 {
175 t.appendMessage(m);
176 return true;
177 }
178 else if (t = Vat.getGlobalTask(taskid))
179 {
180 t.appendIVMessage(m);
181 return true;
182 }
183 return false;
184 }
42 185
43 protected 186 protected
44 187
45 /*************************************************************************** 188 /***************************************************************************
46 receive 189 receive
47 User-called function to get the next pending message in the mailbox. 190 User-called function to get the next pending message in the mailbox.
48 If there are no pending messages, this will yield control back to 191 If there are no pending messages, this will yield control back to
49 the scheduler/vat. 192 the vat's scheduler.
50 ***************************************************************************/ 193 ***************************************************************************/
51 194
52 Message receive() 195 Message receive(int[] types)
53 { 196 {
54 Message m = mailbox.head(); 197 while(true)
55 mailbox.removeHead(); 198 {
56 return m; 199 Message m = mailbox.popMessageOfType(types);
200 if (!m)
201 Fiber.yield();
202 else if (SYSTEM_QUIT == m.type)
203 break;
204 else return m;
205
206 }
207 return null;
208 }
209
210 Message receive()
211 {
212 while(true)
213 {
214 Message m = mailbox.popMessage();
215 if (!m)
216 Fiber.yield();
217 else if (SYSTEM_QUIT == m.type)
218 break;
219 else return m;
220 }
221 return null;
57 } 222 }
58 223
59 int getId() { return id;} 224 int getId() { return id;}
60 225
61 /**************************************************************************
62
63 send
64 User-called function to send data to the counterpart at the other
65 end of the connection. This sets up a dispatcher to send
66 data as the conduit becomes free.
67
68 **************************************************************************/
69 int send(char[] outbuf, Conduit c)
70 {
71 Dispatcher dis;
72 if ( ! (dis = (c in dispatchers)))
73 dis = new Dispatcher(c);
74
75 if (dis.appendOutBuffer(outbuf))
76 {
77 if (!vat.addConnection(dis))
78 {
79 log.error("unable to register mgr");
80 }
81 }
82 return 0;
83 }
84 } 226 }