Mercurial > projects > dreactor
view dreactor/core/Task.d @ 13:8c9b1276f623 default tip
bug fixes
author | rick@minifunk |
---|---|
date | Sat, 20 Sep 2008 18:33:11 -0400 |
parents | d6a3cfe7c3de |
children |
line wrap: on
line source
module dreactor.core.Task; import tango.core.Thread; import tango.util.container.HashMap; import tango.util.container.CircularList; import tango.core.Atomic; import dreactor.core.Vat; import dreactor.protocol.IProvider; import dreactor.protocol.DefaultProvider; alias CircularList!(Message) Messages; class Mailbox { public this () { box = new HashMap!(int, Messages); } Message popMessageOfType(int type) { Messages m; if (box.get(type, m)) { if (!m.isEmpty) { Message msg = m.removeHead(); msg_count.store(msg_count.load()-1); if (m.isEmpty()) box.removeKey(type); return msg; } else box.removeKey(type); } Message msg; return msg; } //TODO this could be optimized to use set intersection logic instead of checking for //multiple keys one at a time. Message popMessageOfType(int[] types) { foreach(int i; types) { Message msg = popMessageOfType(i); if (msg.valid) return msg; } Message msg; return msg; } Message popMessage() { Messages m; int key; auto itor = box.iterator; while (true) { if (itor.valid && itor.next(key, m)) { if (!m.isEmpty()) { Message msg = m.removeHead(); if (msg.valid) msg_count.store(msg_count.load()-1); if (m.isEmpty()) box.removeKey(key); return msg; } else { itor.remove(); } } else { Message msg; return msg; } } } void push(Message msg) { Messages m; if (box.get(msg.type, m)) m.append(msg); else { m = new Messages; m.append(msg); box.add(msg.type, m); } msg_count.store(msg_count.load()+1); } int count() { return msg_count.load(); } private HashMap!(int, Messages) box; Atomic!(int) msg_count; } alias void delegate (Message) TaskDg; class Task { private Fiber fiber; Mailbox mailbox; Mailbox lockedMailbox; int id; Vat vat; TaskDg taskdg; IProvider provider; public this(IProvider prov = null) { fiber = new Fiber(&run, 4096 * 4); mailbox = new Mailbox; lockedMailbox = new Mailbox; provider = prov ? prov : new DefaultProvider; Vat.LocalVat.addTask(this); } void setId(int i) { id = i; } void appendMessage(Message m) { mailbox.push(m); } synchronized void appendIVMessage(Message m) { lockedMailbox.push(m); } void setVat(Vat v) { vat = v; } IProvider getProvider() { return provider; } void run() in { assert(taskdg !is null); } body { Message msg; while ((msg = receive()).valid) { taskdg(msg); } } /*************************************************************************** sendTo Basic message passing utility for inter-task communication. It first checks the local Vat to see if the task is present, if not it gets the task from the global registry and sends a message to its thread-safe mailbox. ****************************************************************************/ bool sendTo(int taskid, Message m) { Task t; if ((t = vat.getTask(taskid)) !is null) { t.appendMessage(m); return true; } else if ((t = Vat.getGlobalTask(taskid)) !is null) { t.appendIVMessage(m); return true; } return false; } char[] getString(Message msg) { return (cast(char*) msg.payload)[0 .. msg.info]; } Fiber.State state() { return fiber.state(); } void call() { fiber.call(); } protected /*************************************************************************** receive User-called function to get the next pending message in the mailbox. If there are no pending messages, this will yield control back to the vat's scheduler. ***************************************************************************/ Message receive(int[] types) { while(true) { Message m = mailbox.popMessageOfType(types); if (!m.valid) Fiber.yield(); else return m; } } Message receive() { while(true) { Message m = mailbox.popMessage(); if (!m.valid) Fiber.yield(); else return m; } } int getId() { return id;} }