Mercurial > projects > dreactor
diff dreactor/core/Task.d @ 12:d6a3cfe7c3de
more stuff
author | rick@Macintosh.local |
---|---|
date | Wed, 27 Aug 2008 00:47:33 -0400 |
parents | 5836613d16ac |
children | 8c9b1276f623 |
line wrap: on
line diff
--- a/dreactor/core/Task.d Tue Aug 12 16:59:56 2008 -0400 +++ b/dreactor/core/Task.d Wed Aug 27 00:47:33 2008 -0400 @@ -1,26 +1,124 @@ module dreactor.core.Task; import tango.core.Thread; +import tango.util.container.HashMap; +import tango.util.container.CircularList; import dreactor.core.Vat; -import dreactor.protocol.Protocol; -import dreactor.protocol.Dispatcher; +import dreactor.protocol.IProvider; + +alias CircularList!(Message) Messages; + +class Mailbox +{ +public + + this () { box = new HashMap!(int, Messages); } + + Message popMessageOfType(int type) + { + Messages m; + if (box.get(type, m)) + { + Message msg = m.removeHead(); + if (msg) + msg_count.store(msg_count.load()-1); + + if (m.isEmpty()) + box.removeKey(type); + + return msg; + } + else + return null; + } + + //TODO this could be optimized to use set intersection logic instead of checking for + //multiple keys one at a time. + Message popMessageOfType(int[] types) + { + foreach(int i; types) + { + Message msg = popMessageOfType(i); + if (msg) + return msg; + } + return null; + } + + Message popMessage() + { + Messages m; + int key; + auto itor = box.iterator; -alias CircularSeq!(Message) Mailbox; + do + { + if (itor.valid && itor.next(key, m)) + { + if (!m.isEmpty()) + { + Message msg = m.removeHead(); + if (msg) + msg_count.store(msg_count.load()-1); + if (m.isEmpty()) + box.removeKey(key); + return msg; + } + else + { + iterator.remove(); + } + } + else + return null; + } + while (true) + } + void push(Message msg) + { + Messages m; + if (box.get(msg.type, m)) + m.append(msg); + else + { + m = new Messages; + m.append(msg); + box.add(msg.type, m); + } + msg_count.store(msg_count.load()+1); + } + + int count() + { + return msg_count.load(); + } +private + HashMap!(int, Messages) box; + Atomic!(int) msg_count; +} + +alias void delegate (Message) TaskDg; class Task { private Fiber fiber; Mailbox mailbox; + Mailbox lockedMailbox; int id; Vat vat; - dispatcher[Conduit] dispatchers; - + TaskDG taskdg; + IProvider provider; + public - this() + this(TaskDg tdg = null, IProvider provider = null) { fiber = new Fiber(&run); mailbox = new Mailbox; + lockedMailbox = new Mailbox; + taskdg = tdg; + if (!provider) + provider = new DefaultProvider; } void setId(int i) @@ -28,9 +126,14 @@ id = i; } - Mailbox getMailbox() + void appendMessage(Message m) { - return mailbox; + mailbox.push(m); + } + + synchronized void appendIVMessage(Message m) + { + lockedMailbox.push(m); } void setVat(Vat v) @@ -38,7 +141,47 @@ vat = v; } - abstract void run(); + IProvider getProvider() + { + return provider; + } + + void run() + in + { + assert(taskdg !is null); + } + body + { + while (msg = receive()) + { + taskdg(msg); + } + } + + /*************************************************************************** + sendTo + Basic message passing utility for inter-task communication. + It first checks the local Vat to see if the task is present, if not + it gets the task from the global registry and sends a message to its + thread-safe mailbox. + ****************************************************************************/ + + bool sendTo(int taskid, Message m) + { + Task t; + if (t = vat.getTask(taskid)) + { + t.appendMessage(m); + return true; + } + else if (t = Vat.getGlobalTask(taskid)) + { + t.appendIVMessage(m); + return true; + } + return false; + } protected @@ -46,39 +189,38 @@ receive User-called function to get the next pending message in the mailbox. If there are no pending messages, this will yield control back to - the scheduler/vat. + the vat's scheduler. ***************************************************************************/ - Message receive() + Message receive(int[] types) { - Message m = mailbox.head(); - mailbox.removeHead(); - return m; + while(true) + { + Message m = mailbox.popMessageOfType(types); + if (!m) + Fiber.yield(); + else if (SYSTEM_QUIT == m.type) + break; + else return m; + + } + return null; + } + + Message receive() + { + while(true) + { + Message m = mailbox.popMessage(); + if (!m) + Fiber.yield(); + else if (SYSTEM_QUIT == m.type) + break; + else return m; + } + return null; } int getId() { return id;} - /************************************************************************** - - send - User-called function to send data to the counterpart at the other - end of the connection. This sets up a dispatcher to send - data as the conduit becomes free. - - **************************************************************************/ - int send(char[] outbuf, Conduit c) - { - Dispatcher dis; - if ( ! (dis = (c in dispatchers))) - dis = new Dispatcher(c); - - if (dis.appendOutBuffer(outbuf)) - { - if (!vat.addConnection(dis)) - { - log.error("unable to register mgr"); - } - } - return 0; - } }