diff dreactor/core/Vat.d @ 12:d6a3cfe7c3de

more stuff
author rick@Macintosh.local
date Wed, 27 Aug 2008 00:47:33 -0400
parents 5836613d16ac
children 8c9b1276f623
line wrap: on
line diff
--- a/dreactor/core/Vat.d	Tue Aug 12 16:59:56 2008 -0400
+++ b/dreactor/core/Vat.d	Wed Aug 27 00:47:33 2008 -0400
@@ -21,31 +21,23 @@
 import tango.util.log.Log;
 
 import dreactor.transport.AsyncSocketConduit;
+import dreactor.protocol.IProvider;
 import dreactor.core.Task;
 import dreactor.util.ThreadSafeQueue;
 
 static char[] version_string = "Vat.d 0.1 2008-05-31";
 
-Logger log;
 
 enum : int {CLOSE = -2, UNREGISTER = -1, REMAIN = 0, REGISTER = 1, REREGISTER = 2};
-alias Message delegate (Conduit c) HandlerDG;
-alias Message function (Conduit c) HandlerFN;
 
 class TaskAttachment
 {
 public
     Task task;
-    HandlerDG dg;
-    HandlerFN fn;
+    IProvider provider;
 
-    this(Task ta, HandlerDG d) 
-    { TaskAttachment t; t.task = ta; t.dg = d; return t; }
-    
-    this(Task ta, HandlerFN f) 
-    { TaskAttachment t; t.task = ta; t.fn = f; return t; }
-
-    public Message opCall(Conduit c) { dg is null ? return fn() : return dg(c); }
+    this (Task ta, IProvider p) 
+    { task = ta; provider = p; }
 }
 
 class Vat
@@ -53,18 +45,16 @@
 private
     Thread thread;
     bool running;
+    Logger log;
  
-    Task[int] tasks;
-    int taskCount;
+    TaskAttachment[int] tasks; //registry for local tasks
+
+    Selector selector;
+    static Atomic!(int) taskCount;
+    TaskAttachment[int] globalTasks; //global registry of tasks
 
 public 
 
-    this(Task t)
-    {
-        addTask(t);
-        this();
-    }
-
     this()
     {
         log = Log.lookup("dreactor.core.Vat");
@@ -73,13 +63,17 @@
         thread = new Thread(&eventLoop);
         thread.start();
     }
-    
-    void addTask(Task t)
+     
+    synchronized int addTask(Task t, IProvider p = null)
     {
         t.setVat(this);
         ++taskCount;
-        tasks[taskCount] = t;
+        auto ta = new TaskAttachment(t, (p !is null) ? p : new DefaultProvider);
+        tasks[taskCount] = ta;
+        globalTask[taskCount] = ta;
+        selector.register(p.getConduit(), p.getEvents(), ta);
         t.setId(taskCount);
+        return taskCount;
     }
  
     void exit()
@@ -92,21 +86,43 @@
         thread.join();
     }
 
-    bool addConnection()
+    bool addConnection(int tid, Conduit c, Events evts)
     {
         log.trace("adding handler");
-        return selector.register(h.transport, h.events(), h);
+        TaskAttachment ta;
+        if (ta = (tid in tasks))
+            return selector.register(c, evts, ta);
+        else
+            return false;
     }
      
-    bool remConnection(Dispatcher handler)
+    bool remConnection(Conduit c)
+    {
+        return selector.unregister(c);
+    }
+
+    Task getTask(int tid)
     {
-        return selector.unregister(h.transport);
+        TaskAttachment ta;
+        if (ta = (tid in tasks))
+            return ta.task;
+        else
+            return null;
+    }
+
+    static synchronized Task getGlobalTask(int tid)
+    {
+        TaskAttachment ta;
+        if (ta = (tid in globaltasks))
+            return ta.task;
+        else
+            return null;
     }
 
 private
     void eventLoop()
     {
-        auto selector = new Selector();
+        selector = new Selector();
         selector.open();
         do
         {
@@ -122,30 +138,31 @@
                     {
                         // incoming data
                         log.trace("Read event fired");    
-                        auto conn = cast(Dispatcher) key.attachment;
-                        if ( Dispatcher.State.listening == conn.getState() )
-                            conn.handleConnection(conn.transport, &addConnection);
-                        else
-                            processReturn(conn.handleIncoming(), selector, conn);
+                        auto ta = cast(TaskAttachment) key.attachment;
+                        processReturn(ta.appendMessage(ta.provider.handleRead()), key.conduit);
+
                     }
                     else if (key.isWritable())
                     {
                         log.trace("Write event fired");    
-                        auto conn = cast(Dispatcher) key.attachment;
-                        processReturn(conn.handleOutgoing(), selector, conn);
+                        auto ta = cast(TaskAttachment) key.attachment;
+                        ta.appendMessage(ta.provider.handleWrite());
+                        processReturn(ta.appendMessage(ta.provider.handleWrite()), key.conduit);
                     }
                     else if (key.isHangup())
                     {
                         log.trace("Hangup event fired");
-                        auto conn = cast(Dispatcher) key.attachment;
-                        processReturn(conn.handleDisconnect(), selector, conn);
+                        auto ta = cast(TaskAttachment) key.attachment;
+                        ta.appendMessage(ta.provider.handleDisconnect());
+                        processReturn(ta.appendMessage(ta.provider.handleDisconnect()), key.conduit);
                     }
                     else if (key.isError() || key.isInvalidHandle())
                     {
-                        log.trace("Error event fired");    
+                        log.trace("Error event fired"); 
                         // error, close connection
-                        auto conn = cast(Dispatcher) key.attachment;
-                        conn.handleError(&remConnection);
+                        auto ta = cast(TaskAttachment) key.attachment;
+                        ta.appendMessage(ta.provider.handleError());
+                        processReturn(ta.appendMessage(ta.provider.handleError()), key.conduit);
                     }
                 }
             }
@@ -173,25 +190,23 @@
         }        
     }
 
-    void processReturn(int result, Selector s, Dispatcher h)
+    void processReturn(int result, Conduit c)
     {
         switch(result)
         {
             case CLOSE:
-                s.unregister(h.transport);
-                h.transport.detach();
+                selector.unregister(c);
+                c.detach();
             break;
             case UNREGISTER:
-                s.unregister(h.transport);
+                selector.unregister(c);
             break;
             case REMAIN:
                 //this space intentially left blank
             break;
             case REGISTER:
-                s.register(h.transport, h.events(), h);
             break;
             case REREGISTER:
-                s.register(h.transport, h.events(), h);
             break;
             default:
                 log.error("processReturn: unknown return value");