Mercurial > projects > dreactor
view dreactor/core/Task.d @ 12:d6a3cfe7c3de
more stuff
author | rick@Macintosh.local |
---|---|
date | Wed, 27 Aug 2008 00:47:33 -0400 |
parents | 5836613d16ac |
children | 8c9b1276f623 |
line wrap: on
line source
module dreactor.core.Task; import tango.core.Thread; import tango.util.container.HashMap; import tango.util.container.CircularList; import dreactor.core.Vat; import dreactor.protocol.IProvider; alias CircularList!(Message) Messages; class Mailbox { public this () { box = new HashMap!(int, Messages); } Message popMessageOfType(int type) { Messages m; if (box.get(type, m)) { Message msg = m.removeHead(); if (msg) msg_count.store(msg_count.load()-1); if (m.isEmpty()) box.removeKey(type); return msg; } else return null; } //TODO this could be optimized to use set intersection logic instead of checking for //multiple keys one at a time. Message popMessageOfType(int[] types) { foreach(int i; types) { Message msg = popMessageOfType(i); if (msg) return msg; } return null; } Message popMessage() { Messages m; int key; auto itor = box.iterator; do { if (itor.valid && itor.next(key, m)) { if (!m.isEmpty()) { Message msg = m.removeHead(); if (msg) msg_count.store(msg_count.load()-1); if (m.isEmpty()) box.removeKey(key); return msg; } else { iterator.remove(); } } else return null; } while (true) } void push(Message msg) { Messages m; if (box.get(msg.type, m)) m.append(msg); else { m = new Messages; m.append(msg); box.add(msg.type, m); } msg_count.store(msg_count.load()+1); } int count() { return msg_count.load(); } private HashMap!(int, Messages) box; Atomic!(int) msg_count; } alias void delegate (Message) TaskDg; class Task { private Fiber fiber; Mailbox mailbox; Mailbox lockedMailbox; int id; Vat vat; TaskDG taskdg; IProvider provider; public this(TaskDg tdg = null, IProvider provider = null) { fiber = new Fiber(&run); mailbox = new Mailbox; lockedMailbox = new Mailbox; taskdg = tdg; if (!provider) provider = new DefaultProvider; } void setId(int i) { id = i; } void appendMessage(Message m) { mailbox.push(m); } synchronized void appendIVMessage(Message m) { lockedMailbox.push(m); } void setVat(Vat v) { vat = v; } IProvider getProvider() { return provider; } void run() in { assert(taskdg !is null); } body { while (msg = receive()) { taskdg(msg); } } /*************************************************************************** sendTo Basic message passing utility for inter-task communication. It first checks the local Vat to see if the task is present, if not it gets the task from the global registry and sends a message to its thread-safe mailbox. ****************************************************************************/ bool sendTo(int taskid, Message m) { Task t; if (t = vat.getTask(taskid)) { t.appendMessage(m); return true; } else if (t = Vat.getGlobalTask(taskid)) { t.appendIVMessage(m); return true; } return false; } protected /*************************************************************************** receive User-called function to get the next pending message in the mailbox. If there are no pending messages, this will yield control back to the vat's scheduler. ***************************************************************************/ Message receive(int[] types) { while(true) { Message m = mailbox.popMessageOfType(types); if (!m) Fiber.yield(); else if (SYSTEM_QUIT == m.type) break; else return m; } return null; } Message receive() { while(true) { Message m = mailbox.popMessage(); if (!m) Fiber.yield(); else if (SYSTEM_QUIT == m.type) break; else return m; } return null; } int getId() { return id;} }