diff dreactor/core/Vat.d @ 13:8c9b1276f623 default tip

bug fixes
author rick@minifunk
date Sat, 20 Sep 2008 18:33:11 -0400
parents d6a3cfe7c3de
children
line wrap: on
line diff
--- a/dreactor/core/Vat.d	Wed Aug 27 00:47:33 2008 -0400
+++ b/dreactor/core/Vat.d	Sat Sep 20 18:33:11 2008 -0400
@@ -17,18 +17,17 @@
 import tango.core.Exception;
 import tango.core.Thread;
 import tango.core.Atomic;
-import tango.util.collection.CircularSeq;
 import tango.util.log.Log;
 
 import dreactor.transport.AsyncSocketConduit;
 import dreactor.protocol.IProvider;
+import dreactor.protocol.DefaultProvider;
 import dreactor.core.Task;
 import dreactor.util.ThreadSafeQueue;
 
 static char[] version_string = "Vat.d 0.1 2008-05-31";
 
-
-enum : int {CLOSE = -2, UNREGISTER = -1, REMAIN = 0, REGISTER = 1, REREGISTER = 2};
+alias bool delegate (Event) RegDg;
 
 class TaskAttachment
 {
@@ -42,6 +41,8 @@
 
 class Vat
 {
+    static Vat LocalVat;
+
 private
     Thread thread;
     bool running;
@@ -51,7 +52,7 @@
 
     Selector selector;
     static Atomic!(int) taskCount;
-    TaskAttachment[int] globalTasks; //global registry of tasks
+    static TaskAttachment[int] globalTasks; //global registry of tasks
 
 public 
 
@@ -63,17 +64,28 @@
         thread = new Thread(&eventLoop);
         thread.start();
     }
-     
-    synchronized int addTask(Task t, IProvider p = null)
+
+    static this()
+    {
+        LocalVat = new Vat;
+    }
+
+    int addTask(Task t)
     {
         t.setVat(this);
-        ++taskCount;
-        auto ta = new TaskAttachment(t, (p !is null) ? p : new DefaultProvider);
-        tasks[taskCount] = ta;
-        globalTask[taskCount] = ta;
+        int taskid = taskCount.load() + 1;
+        taskCount.store(taskid);
+        auto p = t.getProvider();
+        if (p is null)
+            p = new DefaultProvider;
+        p.setRegisterFunc(createRegFunc(taskid));  //default the task id as a param in the delegate
+        auto ta = new TaskAttachment(t, p);
+        tasks[taskid] = ta;
+        globalTasks[taskid] = ta;
         selector.register(p.getConduit(), p.getEvents(), ta);
-        t.setId(taskCount);
-        return taskCount;
+        t.setId(taskid);
+
+        return taskid;
     }
  
     void exit()
@@ -86,34 +98,58 @@
         thread.join();
     }
 
-    bool addConnection(int tid, Conduit c, Events evts)
+    bool register(int tid, Event evts)
     {
         log.trace("adding handler");
-        TaskAttachment ta;
-        if (ta = (tid in tasks))
-            return selector.register(c, evts, ta);
+        TaskAttachment* ta;
+        if ((ta = (tid in tasks)) !is null)
+        {
+            selector.register((*ta).provider.getConduit(), evts, *ta);
+            return true;
+        }
         else
+        {
             return false;
+        }
     }
-     
+    
+    RegDg createRegFunc(int taskid)
+    {
+        class Functor
+        {   
+            int taskid;
+            this (int tid)
+            {
+                taskid = tid;
+            }
+            bool call(Event evts)
+            {
+                return register(taskid, evts);    
+            }
+        }
+        auto ftor = new Functor(taskid);
+        return &ftor.call;
+    }   
+
     bool remConnection(Conduit c)
     {
-        return selector.unregister(c);
+        selector.unregister(c);
+        return true;
     }
 
     Task getTask(int tid)
     {
-        TaskAttachment ta;
-        if (ta = (tid in tasks))
+        TaskAttachment* ta;
+        if ((ta = (tid in tasks)) !is null)
             return ta.task;
         else
             return null;
     }
 
-    static synchronized Task getGlobalTask(int tid)
+    static Task getGlobalTask(int tid)
     {
-        TaskAttachment ta;
-        if (ta = (tid in globaltasks))
+        TaskAttachment* ta;
+        if ((ta = (tid in globalTasks)) !is null)
             return ta.task;
         else
             return null;
@@ -139,30 +175,27 @@
                         // incoming data
                         log.trace("Read event fired");    
                         auto ta = cast(TaskAttachment) key.attachment;
-                        processReturn(ta.appendMessage(ta.provider.handleRead()), key.conduit);
+                        ta.task.appendMessage(ta.provider.handleRead());
 
                     }
                     else if (key.isWritable())
                     {
                         log.trace("Write event fired");    
                         auto ta = cast(TaskAttachment) key.attachment;
-                        ta.appendMessage(ta.provider.handleWrite());
-                        processReturn(ta.appendMessage(ta.provider.handleWrite()), key.conduit);
+                        ta.task.appendMessage(ta.provider.handleWrite());
                     }
                     else if (key.isHangup())
                     {
                         log.trace("Hangup event fired");
                         auto ta = cast(TaskAttachment) key.attachment;
-                        ta.appendMessage(ta.provider.handleDisconnect());
-                        processReturn(ta.appendMessage(ta.provider.handleDisconnect()), key.conduit);
+                        ta.task.appendMessage(ta.provider.handleDisconnect());
                     }
                     else if (key.isError() || key.isInvalidHandle())
                     {
                         log.trace("Error event fired"); 
                         // error, close connection
                         auto ta = cast(TaskAttachment) key.attachment;
-                        ta.appendMessage(ta.provider.handleError());
-                        processReturn(ta.appendMessage(ta.provider.handleError()), key.conduit);
+                        ta.task.appendMessage(ta.provider.handleError());
                     }
                 }
             }
@@ -183,33 +216,11 @@
     {
         foreach(int k; tasks.keys)
         {
-            if (tasks[k].state() == Fiber.State.HOLD)
-                tasks[k].call();
-            if (tasks[k].state() == Fiber.State.TERM)
+            if (tasks[k].task.state() == Fiber.State.HOLD)
+                tasks[k].task.call();
+            if (tasks[k].task.state() == Fiber.State.TERM)
                 tasks.remove(k);
         }        
     }
 
-    void processReturn(int result, Conduit c)
-    {
-        switch(result)
-        {
-            case CLOSE:
-                selector.unregister(c);
-                c.detach();
-            break;
-            case UNREGISTER:
-                selector.unregister(c);
-            break;
-            case REMAIN:
-                //this space intentially left blank
-            break;
-            case REGISTER:
-            break;
-            case REREGISTER:
-            break;
-            default:
-                log.error("processReturn: unknown return value");
-        }
-    }
 }