diff dreactor/core/Task.d @ 12:d6a3cfe7c3de

more stuff
author rick@Macintosh.local
date Wed, 27 Aug 2008 00:47:33 -0400
parents 5836613d16ac
children 8c9b1276f623
line wrap: on
line diff
--- a/dreactor/core/Task.d	Tue Aug 12 16:59:56 2008 -0400
+++ b/dreactor/core/Task.d	Wed Aug 27 00:47:33 2008 -0400
@@ -1,26 +1,124 @@
 module dreactor.core.Task;
 
 import tango.core.Thread;
+import tango.util.container.HashMap;
+import tango.util.container.CircularList;
 import dreactor.core.Vat;
-import dreactor.protocol.Protocol;
-import dreactor.protocol.Dispatcher;
+import dreactor.protocol.IProvider;
+
+alias CircularList!(Message) Messages;
+
+class Mailbox
+{
+public
+
+    this () { box = new HashMap!(int, Messages); }
+
+    Message popMessageOfType(int type)
+    {
+        Messages m;
+        if (box.get(type, m))
+        {
+            Message msg = m.removeHead();
+            if (msg)
+                msg_count.store(msg_count.load()-1);
+
+            if (m.isEmpty())
+                box.removeKey(type);
+
+            return msg;
+        }
+        else
+            return null;
+    }
+
+    //TODO this could be optimized to use set intersection logic instead of checking for 
+    //multiple keys one at a time. 
+    Message popMessageOfType(int[] types)
+    {
+        foreach(int i; types)
+        {
+            Message msg = popMessageOfType(i);
+            if (msg)
+                return msg;
+        }
+        return null;
+    }
+
+    Message popMessage()
+    {
+        Messages m;
+        int key;
+        auto itor = box.iterator;
 
-alias CircularSeq!(Message) Mailbox;
+        do
+        {
+            if (itor.valid && itor.next(key, m))
+            {
+                if (!m.isEmpty())
+                {
+                    Message msg = m.removeHead();
+                    if (msg)
+                        msg_count.store(msg_count.load()-1);
+                    if (m.isEmpty())
+                        box.removeKey(key);
+                    return msg;
+                }
+                else 
+                {
+                    iterator.remove();
+                }
+            }
+            else
+                return null;
+        }
+        while (true)
+    }
 
+    void push(Message msg)
+    {
+        Messages m;
+        if (box.get(msg.type, m))
+            m.append(msg);
+        else
+        {
+            m = new Messages;
+            m.append(msg);
+            box.add(msg.type, m);
+        }
+        msg_count.store(msg_count.load()+1);
+    }
+
+    int count()
+    {
+        return msg_count.load();
+    }
+private
+    HashMap!(int, Messages) box;
+    Atomic!(int) msg_count;
+}
+
+alias void delegate (Message) TaskDg;
 class Task
 {
 private
     Fiber fiber;
     Mailbox mailbox;
+    Mailbox lockedMailbox;
     int id;
     Vat vat;
-    dispatcher[Conduit] dispatchers;
-    
+    TaskDG taskdg;
+    IProvider provider;
+
 public
-    this() 
+    this(TaskDg tdg = null, IProvider provider = null) 
     {
         fiber = new Fiber(&run);
         mailbox = new Mailbox;
+        lockedMailbox = new Mailbox;
+        taskdg = tdg;
+        if (!provider)
+            provider = new DefaultProvider;
     }
 
     void setId(int i)
@@ -28,9 +126,14 @@
         id = i;
     }
 
-    Mailbox getMailbox() 
+    void  appendMessage(Message m) 
     { 
-        return mailbox; 
+        mailbox.push(m); 
+    }
+
+    synchronized void appendIVMessage(Message m)
+    {
+        lockedMailbox.push(m);
     }
 
     void setVat(Vat v)
@@ -38,7 +141,47 @@
         vat = v;
     }
 
-    abstract void run();
+    IProvider getProvider()
+    {
+        return provider;
+    }
+
+    void run()
+    in
+    {
+        assert(taskdg !is null);
+    }
+    body
+    {
+        while (msg = receive())
+        {
+            taskdg(msg);
+        }
+    }
+
+    /***************************************************************************
+        sendTo
+        Basic message passing utility for inter-task communication. 
+        It first checks the local Vat to see if the task is present, if not
+        it gets the task from the global registry and sends a message to its 
+        thread-safe mailbox. 
+    ****************************************************************************/
+ 
+    bool sendTo(int taskid, Message m)
+    {
+        Task t;
+        if (t = vat.getTask(taskid))
+        {
+            t.appendMessage(m);
+            return true;
+        }
+        else if (t = Vat.getGlobalTask(taskid))
+        {
+            t.appendIVMessage(m);
+            return true;
+        }
+        return false;
+    }
 
 protected
 
@@ -46,39 +189,38 @@
         receive
         User-called function to get the next pending message in the mailbox. 
         If there are no pending messages, this will yield control back to 
-        the scheduler/vat. 
+        the vat's scheduler. 
     ***************************************************************************/
 
-    Message receive() 
+    Message receive(int[] types) 
     {
-        Message m = mailbox.head();
-        mailbox.removeHead();
-        return m;
+        while(true)
+        {
+            Message m = mailbox.popMessageOfType(types);
+            if (!m)
+                Fiber.yield();
+            else if (SYSTEM_QUIT == m.type)
+                break;
+            else return m;
+            
+        }
+        return null;
+    }
+
+    Message receive()
+    {
+        while(true)
+        {
+            Message m = mailbox.popMessage();
+            if (!m)
+                Fiber.yield();
+            else if (SYSTEM_QUIT == m.type)
+                break;
+            else return m;
+        }
+        return null;
     }
 
     int getId() { return id;}
     
-    /**************************************************************************
-    
-        send
-        User-called function to send data to the counterpart at the other
-        end of the connection. This sets up a dispatcher to send
-        data as the conduit becomes free. 
-
-    **************************************************************************/
-    int send(char[] outbuf, Conduit c)
-    {
-        Dispatcher dis;
-        if ( ! (dis = (c in dispatchers)))
-            dis = new Dispatcher(c);
-
-        if (dis.appendOutBuffer(outbuf))
-        {
-            if (!vat.addConnection(dis))
-            {
-                log.error("unable to register mgr");
-            }
-        }
-        return 0;
-    } 
 }