changeset 13:8c9b1276f623 default tip

bug fixes
author rick@minifunk
date Sat, 20 Sep 2008 18:33:11 -0400
parents d6a3cfe7c3de
children
files dreactor/core/Dispatcher.d dreactor/core/Task.d dreactor/core/Vat.d dreactor/protocol/DefaultProvider.d dreactor/protocol/Emitter.d dreactor/protocol/IProvider.d dreactor/protocol/TcpProvider.d dreactor/transport/AsyncSocketConduit.d dreactor/util/Emitter.d dsss.conf dsss.last test/chatclient test/chatclient.d test/chatserver test/chatserver.d test/testtuple.d
diffstat 16 files changed, 474 insertions(+), 453 deletions(-) [+]
line wrap: on
line diff
--- a/dreactor/core/Dispatcher.d	Wed Aug 27 00:47:33 2008 -0400
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,136 +0,0 @@
-module dreactor.protocol.Dispatcher;
-
-import tango.io.selector.model.ISelector;
-import tango.util.collection.CircularSeq;
-import tango.net.Socket;
-public  import  tango.core.Exception;
-import dreactor.transport.AsyncSocketConduit;
-
-import tango.util.log.Log;
-import tango.util.log.Config;
-
-class Dispatcher
-{
-public
-    this (Conduit trans)
-    {
-        cond = trans;
-        ibuf_len = 0;
-        o_offset = 0;
-        out_buffers = new CircularSeq!(char[]);
-        log = Log.lookup("dreactor.core.Dispatcher");
-    }
-    
-    /**************************************************************************
-
-        onSend  -- Send method
-        Called by the vat in response to a FD writeable event. 
-        Sends data, returns amount sent. Unregisters Handler for sending
-        if there is no more data left to send. 
-
-    ***************************************************************************/
-    public int onSend()
-    {
-        Logger log = Log.lookup("Handlers.onSend");
-     
-        char[] outbuf = nextBuffer();
-        if (outbuf !is null)
-        {
-            int sent = cond.write(outbuf);
-            if (sent > 0)
-            {
-                if (! addOffset(sent))
-                {
-                    return UNREGISTER;
-                }
-            }
-            else if (sent == 0)
-            {
-                log.error("Select said socket was writable, but sent 0 bytes");
-            }
-            else
-            {
-               log.error("Socket send return ERR {}", sent);
-            }
-            return REMAIN;
-        }
-        else
-        {
-            return UNREGISTER;
-        }
-    }
-
-    /**************************************************************************
-    
-        appendOutBuffer
-
-        Adds an outgoing buffer to the list. This returns true if the list
-        was empty, indicating that the handler should be registered with the
-        SelectLoop. If it returns false, it was probably already registered.
-        
-    **************************************************************************/
-    bool appendOutBuffer(char[] outbuf)
-    {
-        out_buffers.append(outbuf);
-        out_buffers_len++;
-        if (out_buffers_len == 1)
-            return true;
-        else
-            return false;
-    }
-    
-    /**************************************************************************
-
-        addOffset 
-        Use this function to update the offset position after a successful data
-        send. This not only manages the current offset, but will update the 
-        out buffer chain if necessary. 
-
-        Returns: false if there is nothing left to send, true if there is.
-
-    **************************************************************************/ 
-    bool addOffset(int off)
-    in
-    {
-        assert(out_buffers_len > 0);
-    }
-    body
-    {
-        char[] hd = out_buffers.head();
-        if ((off + o_offset) >= hd.length)
-        {
-            out_buffers.removeHead();
-            o_offset = 0;
-            out_buffers_len--;
-            return (out_buffers_len > 0);
-        }
-        else
-            o_offset += off;
-        return true;
-    }
-    
-    /**************************************************************************
-
-        char[] nextBuffer
-
-        Returns a slice of the current outbound buffer, returns a char[] pointing
-        to null if there is no current outbound buffer
-
-    **************************************************************************/
-    synchronized char[] nextBuffer()
-    {
-        if (out_buffers_len < 1)
-        {
-            return null; 
-        }
-
-        return out_buffers.head()[o_offset .. $];
-    }
-
-    Conduit cond; 
-    CircularSeq!(char[]) out_buffers;
-    int out_buffers_len;
-    int ibuf_len;
-    int o_offset;
-    Logger log; 
-}
--- a/dreactor/core/Task.d	Wed Aug 27 00:47:33 2008 -0400
+++ b/dreactor/core/Task.d	Sat Sep 20 18:33:11 2008 -0400
@@ -3,8 +3,11 @@
 import tango.core.Thread;
 import tango.util.container.HashMap;
 import tango.util.container.CircularList;
+import tango.core.Atomic;
+
 import dreactor.core.Vat;
 import dreactor.protocol.IProvider;
+import dreactor.protocol.DefaultProvider;
 
 alias CircularList!(Message) Messages;
 
@@ -19,17 +22,20 @@
         Messages m;
         if (box.get(type, m))
         {
-            Message msg = m.removeHead();
-            if (msg)
+            if (!m.isEmpty)
+            {
+                Message msg = m.removeHead();
                 msg_count.store(msg_count.load()-1);
 
-            if (m.isEmpty())
+                if (m.isEmpty())
+                    box.removeKey(type);
+                return msg;
+            }
+            else
                 box.removeKey(type);
-
-            return msg;
         }
-        else
-            return null;
+        Message msg;
+        return msg;
     }
 
     //TODO this could be optimized to use set intersection logic instead of checking for 
@@ -39,10 +45,11 @@
         foreach(int i; types)
         {
             Message msg = popMessageOfType(i);
-            if (msg)
+            if (msg.valid)
                 return msg;
         }
-        return null;
+        Message msg;
+        return msg;
     }
 
     Message popMessage()
@@ -51,14 +58,14 @@
         int key;
         auto itor = box.iterator;
 
-        do
+        while (true)
         {
             if (itor.valid && itor.next(key, m))
             {
                 if (!m.isEmpty())
                 {
                     Message msg = m.removeHead();
-                    if (msg)
+                    if (msg.valid)
                         msg_count.store(msg_count.load()-1);
                     if (m.isEmpty())
                         box.removeKey(key);
@@ -66,13 +73,15 @@
                 }
                 else 
                 {
-                    iterator.remove();
+                    itor.remove();
                 }
             }
             else
-                return null;
+            {
+                Message msg; 
+                return msg;
+            }
         }
-        while (true)
     }
 
     void push(Message msg)
@@ -107,18 +116,18 @@
     Mailbox lockedMailbox;
     int id;
     Vat vat;
-    TaskDG taskdg;
+    TaskDg taskdg;
     IProvider provider;
 
 public
-    this(TaskDg tdg = null, IProvider provider = null) 
+    this(IProvider prov = null) 
     {
-        fiber = new Fiber(&run);
+        fiber = new Fiber(&run, 4096 * 4);
         mailbox = new Mailbox;
         lockedMailbox = new Mailbox;
-        taskdg = tdg;
-        if (!provider)
-            provider = new DefaultProvider;
+        provider = prov ? prov : new DefaultProvider;
+         
+        Vat.LocalVat.addTask(this);
     }
 
     void setId(int i)
@@ -153,7 +162,8 @@
     }
     body
     {
-        while (msg = receive())
+        Message msg;
+        while ((msg = receive()).valid)
         {
             taskdg(msg);
         }
@@ -170,12 +180,12 @@
     bool sendTo(int taskid, Message m)
     {
         Task t;
-        if (t = vat.getTask(taskid))
+        if ((t = vat.getTask(taskid)) !is null)
         {
             t.appendMessage(m);
             return true;
         }
-        else if (t = Vat.getGlobalTask(taskid))
+        else if ((t = Vat.getGlobalTask(taskid)) !is null)
         {
             t.appendIVMessage(m);
             return true;
@@ -183,6 +193,19 @@
         return false;
     }
 
+    char[] getString(Message msg)
+    {
+        return (cast(char*) msg.payload)[0 .. msg.info];
+    }
+
+    Fiber.State state()
+    {
+        return fiber.state();
+    }
+    void call()
+    {
+        fiber.call();
+    }
 protected
 
     /***************************************************************************
@@ -197,14 +220,12 @@
         while(true)
         {
             Message m = mailbox.popMessageOfType(types);
-            if (!m)
+            if (!m.valid)
                 Fiber.yield();
-            else if (SYSTEM_QUIT == m.type)
-                break;
-            else return m;
+            else 
+                return m;
             
         }
-        return null;
     }
 
     Message receive()
@@ -212,13 +233,11 @@
         while(true)
         {
             Message m = mailbox.popMessage();
-            if (!m)
+            if (!m.valid)
                 Fiber.yield();
-            else if (SYSTEM_QUIT == m.type)
-                break;
-            else return m;
+            else 
+                return m;
         }
-        return null;
     }
 
     int getId() { return id;}
--- a/dreactor/core/Vat.d	Wed Aug 27 00:47:33 2008 -0400
+++ b/dreactor/core/Vat.d	Sat Sep 20 18:33:11 2008 -0400
@@ -17,18 +17,17 @@
 import tango.core.Exception;
 import tango.core.Thread;
 import tango.core.Atomic;
-import tango.util.collection.CircularSeq;
 import tango.util.log.Log;
 
 import dreactor.transport.AsyncSocketConduit;
 import dreactor.protocol.IProvider;
+import dreactor.protocol.DefaultProvider;
 import dreactor.core.Task;
 import dreactor.util.ThreadSafeQueue;
 
 static char[] version_string = "Vat.d 0.1 2008-05-31";
 
-
-enum : int {CLOSE = -2, UNREGISTER = -1, REMAIN = 0, REGISTER = 1, REREGISTER = 2};
+alias bool delegate (Event) RegDg;
 
 class TaskAttachment
 {
@@ -42,6 +41,8 @@
 
 class Vat
 {
+    static Vat LocalVat;
+
 private
     Thread thread;
     bool running;
@@ -51,7 +52,7 @@
 
     Selector selector;
     static Atomic!(int) taskCount;
-    TaskAttachment[int] globalTasks; //global registry of tasks
+    static TaskAttachment[int] globalTasks; //global registry of tasks
 
 public 
 
@@ -63,17 +64,28 @@
         thread = new Thread(&eventLoop);
         thread.start();
     }
-     
-    synchronized int addTask(Task t, IProvider p = null)
+
+    static this()
+    {
+        LocalVat = new Vat;
+    }
+
+    int addTask(Task t)
     {
         t.setVat(this);
-        ++taskCount;
-        auto ta = new TaskAttachment(t, (p !is null) ? p : new DefaultProvider);
-        tasks[taskCount] = ta;
-        globalTask[taskCount] = ta;
+        int taskid = taskCount.load() + 1;
+        taskCount.store(taskid);
+        auto p = t.getProvider();
+        if (p is null)
+            p = new DefaultProvider;
+        p.setRegisterFunc(createRegFunc(taskid));  //default the task id as a param in the delegate
+        auto ta = new TaskAttachment(t, p);
+        tasks[taskid] = ta;
+        globalTasks[taskid] = ta;
         selector.register(p.getConduit(), p.getEvents(), ta);
-        t.setId(taskCount);
-        return taskCount;
+        t.setId(taskid);
+
+        return taskid;
     }
  
     void exit()
@@ -86,34 +98,58 @@
         thread.join();
     }
 
-    bool addConnection(int tid, Conduit c, Events evts)
+    bool register(int tid, Event evts)
     {
         log.trace("adding handler");
-        TaskAttachment ta;
-        if (ta = (tid in tasks))
-            return selector.register(c, evts, ta);
+        TaskAttachment* ta;
+        if ((ta = (tid in tasks)) !is null)
+        {
+            selector.register((*ta).provider.getConduit(), evts, *ta);
+            return true;
+        }
         else
+        {
             return false;
+        }
     }
-     
+    
+    RegDg createRegFunc(int taskid)
+    {
+        class Functor
+        {   
+            int taskid;
+            this (int tid)
+            {
+                taskid = tid;
+            }
+            bool call(Event evts)
+            {
+                return register(taskid, evts);    
+            }
+        }
+        auto ftor = new Functor(taskid);
+        return &ftor.call;
+    }   
+
     bool remConnection(Conduit c)
     {
-        return selector.unregister(c);
+        selector.unregister(c);
+        return true;
     }
 
     Task getTask(int tid)
     {
-        TaskAttachment ta;
-        if (ta = (tid in tasks))
+        TaskAttachment* ta;
+        if ((ta = (tid in tasks)) !is null)
             return ta.task;
         else
             return null;
     }
 
-    static synchronized Task getGlobalTask(int tid)
+    static Task getGlobalTask(int tid)
     {
-        TaskAttachment ta;
-        if (ta = (tid in globaltasks))
+        TaskAttachment* ta;
+        if ((ta = (tid in globalTasks)) !is null)
             return ta.task;
         else
             return null;
@@ -139,30 +175,27 @@
                         // incoming data
                         log.trace("Read event fired");    
                         auto ta = cast(TaskAttachment) key.attachment;
-                        processReturn(ta.appendMessage(ta.provider.handleRead()), key.conduit);
+                        ta.task.appendMessage(ta.provider.handleRead());
 
                     }
                     else if (key.isWritable())
                     {
                         log.trace("Write event fired");    
                         auto ta = cast(TaskAttachment) key.attachment;
-                        ta.appendMessage(ta.provider.handleWrite());
-                        processReturn(ta.appendMessage(ta.provider.handleWrite()), key.conduit);
+                        ta.task.appendMessage(ta.provider.handleWrite());
                     }
                     else if (key.isHangup())
                     {
                         log.trace("Hangup event fired");
                         auto ta = cast(TaskAttachment) key.attachment;
-                        ta.appendMessage(ta.provider.handleDisconnect());
-                        processReturn(ta.appendMessage(ta.provider.handleDisconnect()), key.conduit);
+                        ta.task.appendMessage(ta.provider.handleDisconnect());
                     }
                     else if (key.isError() || key.isInvalidHandle())
                     {
                         log.trace("Error event fired"); 
                         // error, close connection
                         auto ta = cast(TaskAttachment) key.attachment;
-                        ta.appendMessage(ta.provider.handleError());
-                        processReturn(ta.appendMessage(ta.provider.handleError()), key.conduit);
+                        ta.task.appendMessage(ta.provider.handleError());
                     }
                 }
             }
@@ -183,33 +216,11 @@
     {
         foreach(int k; tasks.keys)
         {
-            if (tasks[k].state() == Fiber.State.HOLD)
-                tasks[k].call();
-            if (tasks[k].state() == Fiber.State.TERM)
+            if (tasks[k].task.state() == Fiber.State.HOLD)
+                tasks[k].task.call();
+            if (tasks[k].task.state() == Fiber.State.TERM)
                 tasks.remove(k);
         }        
     }
 
-    void processReturn(int result, Conduit c)
-    {
-        switch(result)
-        {
-            case CLOSE:
-                selector.unregister(c);
-                c.detach();
-            break;
-            case UNREGISTER:
-                selector.unregister(c);
-            break;
-            case REMAIN:
-                //this space intentially left blank
-            break;
-            case REGISTER:
-            break;
-            case REREGISTER:
-            break;
-            default:
-                log.error("processReturn: unknown return value");
-        }
-    }
 }
--- a/dreactor/protocol/DefaultProvider.d	Wed Aug 27 00:47:33 2008 -0400
+++ b/dreactor/protocol/DefaultProvider.d	Sat Sep 20 18:33:11 2008 -0400
@@ -1,6 +1,7 @@
 module dreactor.protocol.DefaultProvider;
 
-import tango.io.Selector;
+import tango.io.selector.model.ISelector;
+import tango.io.device.Conduit;
 
 import dreactor.protocol.IProvider;
 
@@ -10,27 +11,41 @@
 {
 private
     Conduit cond;
-    Events evts;
-
+    Event evts;
+    bool delegate (Event) regFn;
 public
-    Message handleRead(Conduit c)
-    {
+
+    enum {
+        Read = 1000,
+        Write,
+        Error,
+        Connect,
+        Disconnect
     }
 
-    Message handleWrite(Conduit c)
+    Message handleRead()
     {
+        return Message(cast(void*)cond, Read, 0); 
     }
 
-    Message handleError(Conduit c)
+    Message handleWrite()
     {
+        return Message(cast(void*)cond, Write, 0); 
     }
 
-    Message handleConnect(Conduit c)
+    Message handleError()
     {
+        return Message(cast(void*)cond, Error, 0); 
     }
 
-    Message handleDisconnect(Conduit c)
+    Message handleConnect()
     {
+        return Message(cast(void*)cond, Connect, 0); 
+    }
+
+    Message handleDisconnect()
+    {
+        return Message(cast(void*)cond, Disconnect, 0); 
     }
 
 
@@ -39,13 +54,22 @@
         return cond;
     }
 
-    int getEvents()
+    void send(char[] buf)
+    {
+    }
+
+    Event getEvents()
     {
         return evts;
     }
 
     void setEvents(Event e)
     {
-        evts e;
+        evts = e;
     }
+    
+    void setRegisterFunc( bool delegate (Event) fn)
+    {
+        regFn = fn;
+    } 
 }
--- a/dreactor/protocol/Emitter.d	Wed Aug 27 00:47:33 2008 -0400
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,48 +0,0 @@
-module Emitter
-
-
-
-
-import tango.core.Thread;
-
-import dreactor.core.Task;
-
-alias Message delegate(void) EmitterDg;
-
-class Emitter
-{
-public
-    this(Task t, EmitterDg cb)
-    {
-        task = t;
-        callback = cb;
-        thread = new Thread(&run);
-        thread.start();
-    }
-
-    void stop()
-    {
-        running = false;
-    }
-    
-    void stopNow()
-    {
-        thread.isDaemon(true);
-        running = false;
-    }
-private
-
-    void run()
-    {
-        while(running)
-        {
-            Message msg = callback();
-            task.appendIVMessage(msg);
-        }
-    }
-    Task task;
-    Thread thread;
-    bool running;
-    EmitterCb callback;
-}
-
--- a/dreactor/protocol/IProvider.d	Wed Aug 27 00:47:33 2008 -0400
+++ b/dreactor/protocol/IProvider.d	Sat Sep 20 18:33:11 2008 -0400
@@ -1,29 +1,40 @@
 module dreactor.protocol.IProvider;
 
-class Message
+import tango.io.selector.model.ISelector;
+import tango.io.device.Conduit;
+
+struct Message
 {
 public
+
     int type;
     int info;
-    Object payload;
-    this (Object buf, int t, int e) 
+    void* payload;
+    int from;
+    bool valid; 
+
+    static Message opCall(void* buf, int t, int e, int f = 0) 
     {
-        type = t; 
-        info = e; 
-        payload = buf; 
+        Message m;
+        m.type = t; 
+        m.info = e;
+        m.from = f; 
+        m.payload = buf;
+        m.valid = true;
+        return m;
     }
 }
 
 interface IProvider
 {
-    Message handleRead(Conduit c);
-    Message handleWrite(Conduit c);
-    Message handleError(Conduit c);
-    Message handleConnect(Conduit c);
-    Message handleDisconnect(Conduit c);
-    abstract void send(char []);
-
+    Message handleRead();
+    Message handleWrite();
+    Message handleError();
+    Message handleConnect();
+    Message handleDisconnect();
+    void send(char []);
+    void setRegisterFunc(bool delegate (Event));
     Conduit getConduit();
-    int getEvents();
-    void setEvents();
+    Event getEvents();
+    void setEvents(Event e);
 }
--- a/dreactor/protocol/TcpProvider.d	Wed Aug 27 00:47:33 2008 -0400
+++ b/dreactor/protocol/TcpProvider.d	Sat Sep 20 18:33:11 2008 -0400
@@ -1,34 +1,31 @@
-module dreactor.protocol.RawTcp;
+module dreactor.protocol.TcpProvider;
 
 import tango.io.device.Conduit;
 import tango.io.selector.model.ISelector;
 import tango.net.Socket;
-import tango.util.collection.CircularSeq;
+import tango.util.container.CircularList;
 import tango.util.log.Log;
 import tango.util.log.Config;
 
 import dreactor.transport.AsyncSocketConduit;
 import dreactor.core.Vat;
-import dreactor.core.Dispatcher;
-
+public import dreactor.protocol.IProvider;
 /******************************************************************************
     
     Basic TCP server or client routines for sending raw data.
 
 ******************************************************************************/
-class TCPProvider : IProvider
+class TcpProvider : IProvider
 {
 public
-
-    enum 
-    {
-        RECEIVE = 1000,
-        SEND_COMPLETE,
-        NEW_CONNECTION,
-        REMOTE_CLOSED,
-        SEND_ERROR,
-        RECEIVE_ERROR,
-        ERROR
+    enum {
+        SendComplete = 2000,
+        NewConnection,
+        Receive,
+        RemoteClosed,
+        SendError,
+        ReceiveError,
+        Error
     }
 
     this(AsyncSocketConduit c)
@@ -36,41 +33,53 @@
         log = Log.lookup("dreactor.protocol.RawTcpServer");
         log.info("log initialized");
         cond = c;
+        events = Event.Read;
     }
 
-    this(Vat v, IPv4Address addr)
+    this(IPv4Address addr, bool listen = false)
     {
-        AsyncSocketConduit cond = new AsyncSocketConduit;
-        cond.socket().setAddressReuse(true);
-        this(cond);
+        AsyncSocketConduit c = new AsyncSocketConduit;
+        c.socket().setAddressReuse(true);
+        if (listen)
+        {
+            c.bind(addr);
+            c.socket().listen(1000);
+            listener = listen;
+        }
+        else
+            c.connect(addr);
+        this(c);
     }
 
+    
     ~this()
     {
         close();
     } 
     
-    Message handleRead(Conduit c)
+    Message handleRead()
     {
         Logger log = Log.lookup("Handlers.onReceive");
 
+        if (listener)
+            return handleConnect();
+
         char inbuf[8192];
         int amt;
-        if((amt = h.transport.read(inbuf)) > 0)
+        if((amt = cond.read(inbuf)) > 0)
         {
-            return new Message(inbuf[0 .. amt].dup, RECEIVE, amt);
+            return Message(inbuf[0 .. amt].dup.ptr, Receive, amt);
         }
         else
         {
             if (amt == 0)
             {
-                children.remove(h);
-                (cast(AsyncSocketConduit) h.transport).shutdown();
-                return Message(null, REMOTE_CLOSED, amt);
+                cond.shutdown();
+                return Message(null, RemoteClosed, amt);
             }
             log.error("Received no data, err = {}", amt);
         }
-        return new Message(null, ERROR, amt);
+        return Message(null, Error, amt);
     }
     
     /**************************************************************************
@@ -81,7 +90,7 @@
         if there is no more data left to send. 
 
     ***************************************************************************/
-    Message handleWrite(Conduit c)
+    Message handleWrite()
     {
         Logger log = Log.lookup("Handlers.onSend");
      
@@ -95,42 +104,45 @@
                 {
                     //h.remEvent(Event.Write);
                     //TODO - How do we handle event re-registering
-                    return new Message(null, SEND_COMPLETE, sent);
+                    return Message(null, SendComplete, sent);
                 }
             }
             else if (sent == 0)
             {
                 log.error("Select said socket was writable, but sent 0 bytes");
-                return new Message(null, SEND_ERROR, 0);
+                return Message(null, Error, 0);
             }
             else
             {
                 log.error("Socket send return ERR");
-                return new Message(null, SEND_ERROR, sent);
+                return Message(null, Error, sent);
             }
         }
         else
         {
-            //h.remEvent(Event.Write);
-            //TODO - How do we handle event re-registering
-        
-            return new Message(null, SEND_COMPLETE, 0);
+            remEvent(Event.Write);
+            if (!regFn(events))
+            {
+                log.error("unable to register mgr");
+            }
+            return Message(null, SendComplete, 0);
         }
     }
 
-    Message handleDisconnect(Conduit c)
+    Message handleDisconnect()
     {
-        return new Message(c, REMOTE_CLOSED, 0);
+        return Message(cast(void*)cond, RemoteClosed, 0);
     }
 
-    Message handleError(Conduit c)
+    Message handleError()
     {
-        return new Messsage(null, ERROR, 0);
+        return Message(cast(void*)cond, Error, 0);
     } 
 
-    Message handleConnect(Conduit c)
+    Message handleConnect()
     {
-        return new Message(accept(), NEW_CONNECTION, 0);
+        log.trace("accepting new connection");
+        return Message(cast(void*)accept(), NewConnection, 0);
     }
 
     Conduit getConduit()
@@ -138,7 +150,7 @@
         return cond;
     }
 
-    int getEvents()
+    Event getEvents()
     {
         return events;
     }
@@ -152,22 +164,9 @@
     {
         AsyncSocketConduit newcond = new AsyncSocketConduit;
         cond.socket().accept(newcond.socket);
-        log.info("accepted new connection");
+        log.info("accepted new connection {} {}", cast(uint) newcond, newcond.fileHandle());
         return newcond;
     }
-
-    int broadcast(char[] outbuf, TCPProvider[] recips)
-    {
-        foreach(TCPProvider c; recips)
-        {
-            if (c.appendOutBuffer(outbuf))
-            {
-                h.addEvent(Event.Write);
-                vat.addConnection(h);
-            }
-        }
-        return 0;
-    }
     
     /**************************************************************************
     
@@ -177,18 +176,16 @@
         data as the socket becomes free. 
 
     **************************************************************************/
-    int send(char[] outbufl)
+    void send(char[] outbuf)
     {
         if (appendOutBuffer(outbuf))
         {
-            //TODO - should we always register for all events? or update it when needed?
-            //d.addEvent(Event.Write);
-            if (!vat.addConnection(d))
+            addEvent(Event.Write);
+            if (!regFn(events))
             {
                 log.error("unable to register mgr");
             }
         }
-        return 0;
     }
 
  
@@ -198,13 +195,6 @@
         cond.detach(); 
     }
     
-
-    ~this()
-    {
-        (cast(AsyncSocketConduit)manager.transport).shutdown();
-        (cast(AsyncSocketConduit)manager.transport).detach();
-    }
-
     int connect(IPv4Address addr)
     {
         cond = new AsyncSocketConduit;
@@ -218,35 +208,94 @@
 
     /**************************************************************************
     
-        send
-        User-called function to send data to the counterpart at the other
-        end of the connection. This sets up the connection manager to send
-        data as the socket becomes free. 
+        appendOutBuffer
+
+        Adds an outgoing buffer to the list. This returns true if the list
+        was empty, indicating that the handler should be registered with the
+        SelectLoop. If it returns false, it was probably already registered.
+        
+    **************************************************************************/
+    bool appendOutBuffer(char[] outbuf)
+    {
+        out_buffers.append(outbuf);
+        out_buffers_len++;
+        if (out_buffers_len == 1)
+            return true;
+        else
+            return false;
+    }
+
+    /**************************************************************************
+
+        addOffset 
+        Use this function to update the offset position after a successful data
+        send. This not only manages the current offset, but will update the 
+        out buffer chain if necessary. 
+
+        Returns: false if there is nothing left to send, true if there is.
+
+    **************************************************************************/ 
+    bool addOffset(int off)
+    in
+    {
+        assert(out_buffers_len > 0);
+    }
+    body
+    {
+        char[] hd = out_buffers.head();
+        if ((off + o_offset) >= hd.length)
+        {
+            out_buffers.removeHead();
+            o_offset = 0;
+            out_buffers_len--;
+            return (out_buffers_len > 0);
+        }
+        else
+            o_offset += off;
+        return true;
+    }
+
+    /**************************************************************************
+
+        char[] nextBuffer
+
+        Returns a slice of the current outbound buffer, returns a char[] pointing
+        to null if there is no current outbound buffer
 
     **************************************************************************/
-    int send(char[] outbuf, IPv4Address addr = null)
+    char[] nextBuffer()
     {
-        if (!connected)
+        if (out_buffers_len < 1)
         {
-            log.info("send: not connected, connecting");
-            return -1;
+            return null; 
         }
-        if (appendOutBuffer(outbuf))
-        {
-            addEvent(Event.Write);
-            if (!vat.addConnection(manager))
-            {
-                log.error("unable to register mgr");
-            }
-        }
-        return 0;
+
+        return out_buffers.head()[o_offset .. $];
     }
     
-    
+    void setRegisterFunc( bool delegate (Event) fn)
+    {
+        regFn = fn;
+    }
+
+    void addEvent(Event evt)
+    {
+        events |= evt;
+    }
+
+    void remEvent(Event evt)
+    {
+        events &= !evt;
+    }
+
 private
-    Vat vat;
-    Conduit cond;
+    AsyncSocketConduit cond;
     Logger log;
     bool listener;
     Event events;
+    bool connected;
+    CircularList!(char[]) out_buffers;
+    int out_buffers_len;
+    int o_offset;
+    bool delegate (Event) regFn;
 }
--- a/dreactor/transport/AsyncSocketConduit.d	Wed Aug 27 00:47:33 2008 -0400
+++ b/dreactor/transport/AsyncSocketConduit.d	Sat Sep 20 18:33:11 2008 -0400
@@ -102,7 +102,7 @@
 
     ***********************************************************************/
 
-    override Handle fileHandle ()
+    Handle fileHandle ()
     {
        return cast(Handle) socket_.fileHandle;
     }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/dreactor/util/Emitter.d	Sat Sep 20 18:33:11 2008 -0400
@@ -0,0 +1,49 @@
+module dreactor.util.Emitter;
+
+
+
+
+import tango.core.Thread;
+
+import dreactor.core.Task;
+import dreactor.protocol.IProvider;
+
+alias Message delegate() EmitterDg;
+
+class Emitter
+{
+public
+    this(Task t, EmitterDg cb)
+    {
+        task = t;
+        callback = cb;
+        thread = new Thread(&run);
+        thread.start();
+    }
+
+    void stop()
+    {
+        running = false;
+    }
+    
+    void stopNow()
+    {
+        thread.isDaemon(true);
+        running = false;
+    }
+private
+
+    void run()
+    {
+        while(running)
+        {
+            Message msg = callback();
+            task.appendIVMessage(msg);
+        }
+    }
+    Task task;
+    Thread thread;
+    bool running;
+    EmitterDg callback;
+}
+
--- a/dsss.conf	Wed Aug 27 00:47:33 2008 -0400
+++ b/dsss.conf	Sat Sep 20 18:33:11 2008 -0400
@@ -2,3 +2,4 @@
 #[test/longtest.d]
 #[test/chatserver.d]
 [test/chatclient.d]
+[test/chatserver.d]
--- a/dsss.last	Wed Aug 27 00:47:33 2008 -0400
+++ b/dsss.last	Sat Sep 20 18:33:11 2008 -0400
@@ -2,3 +2,4 @@
 #[test/longtest.d]
 #[test/chatserver.d]
 [test/chatclient.d]
+[test/chatserver.d]
Binary file test/chatclient has changed
--- a/test/chatclient.d	Wed Aug 27 00:47:33 2008 -0400
+++ b/test/chatclient.d	Sat Sep 20 18:33:11 2008 -0400
@@ -1,18 +1,26 @@
 
 module chatclient;
 
+import tango.io.Stdout;
+import tango.io.Console;
+import tango.net.Socket;
+import tango.util.log.Log;
+
 import dreactor.core.Vat;
 import dreactor.core.Task;
 import dreactor.protocol.TcpProvider;
+import dreactor.util.Emitter;
 
-enum {EMITTER_CHAT_RECEIVE = 42};
+enum { StdinReceive = 42 }
+
+Logger log;
 
 class ChatTask : Task
 {
 
 private
     TcpProvider client;
-
+    bool running;
 public
     this(TcpProvider tcpclient) 
     {
@@ -22,40 +30,33 @@
     void run()
     {
         Message msg;
-
+        running = true;
         auto em = new Emitter(this, 
                 {
                      char buf[] = Cin.copyln(true);
-                     return new Message(buf, EMITTER_STDIN_RECEIVE, buf.size);
+                     return Message(cast(void*)buf.ptr, StdinReceive, buf.length);
                 });
         
-        while (msg = receive())
+        while (running)
         {
-            switch(msg.type)
+            msg = receive();
+            switch (msg.type)
             {
-                case EMITTER_CHAT_RECEIVE:
-                    char[] inbuf = msg.payload;
+                case StdinReceive:
+                {
+                    char[] inbuf = getString(msg);
                     if (inbuf == "quit")
                     {
-                        em.stopNow();
-                        return;
+                        running = false;
                     }
-                    client.send(msg.payload);
-                    break;
-
-                case TcpProvider.RECEIVE:
-                    Stdout(cast(char[]) msg.payload);
-                    break;
-
-                case TcpProvider.SEND_COMPLETE:
-                    break;
-
-                case TcpProvider.REMOTE_CLOSED:
-                    Stdout("--- Remote host closed connection \n");
-                    break;
-
+                    client.send(inbuf);
+                }
+                case TcpProvider.Receive:
+                {
+                    Stdout(getString(msg));
+                }
                 default:
-                    Stdout("Unknown message received\n");
+                    Stdout("unknown msg received: {}", msg.type);
             }
         }
         em.stopNow();
@@ -63,10 +64,10 @@
 }
 
 
-int main(int argc, char[][] argv)
+int main(char[][] args)
 {
-    auto vat = new Vat;
-    auto client = new TcpProvider(new IPv4Address("localhost", 5555), vat);
-    auto tsk = new ChatTask(client);
-    vat.addTask(task);
+    log = Log.lookup("dreactor.chatserver");
+    auto provider = new TcpProvider(new IPv4Address("localhost", 5555));
+    auto tsk = new ChatTask(provider);
+    return 0;
 }
Binary file test/chatserver has changed
--- a/test/chatserver.d	Wed Aug 27 00:47:33 2008 -0400
+++ b/test/chatserver.d	Sat Sep 20 18:33:11 2008 -0400
@@ -1,66 +1,102 @@
 
 module chatserver;
 
+import tango.io.Stdout;
+import tango.io.Console;
+import tango.util.container.CircularList;
+import tango.util.log.Log;
+import tango.net.Socket;
+
 import dreactor.core.Vat;
 import dreactor.core.Task;
 import dreactor.protocol.TcpProvider;
+import dreactor.transport.AsyncSocketConduit;
 
+typedef Message ChildTCPRequest;
+Logger log;
 
 class ChatConnectionTask : Task
 {
 public
+    this(TcpProvider tcpclient) 
+    {
+        super(tcpclient);
+    }
 
+    enum {
+        StdIn = 100,
+        RemoteClosed
+        }
+
+    void run()
+    {
+        running = true;
+        Message msg;
+        while (running)
+        {
+            msg = receive();
+            switch(msg.type)
+            {
+                case TcpProvider.Receive:
+                    Stdout(cast(char*) msg.payload);
+                    break;
+                case TcpProvider.SendComplete:
+                    break;
+                case TcpProvider.RemoteClosed:
+                    log.trace("--- Remote host closed connection \n");
+                    break;
+                default:
+                    log.trace("Unknown message received\n");
+            }
+        }
+    }
+
+private
+    bool running;
+}
+
+class ListenerTask : Task
+{
+    this(TcpProvider tcpclient) 
+    {
+        super(tcpclient);
+    }
     void run()
     {
         Message msg;
-        while (msg = receive())
+        running = true;
+        while (running)
         {
+            msg = receive();
+            auto children = new CircularList!(ChatConnectionTask);
             switch(msg.type)
             {
-                case TCP_PROVIDER_RECEIVE:
-                    //Stdout(cast(char[]) msg.payload);
+                case TcpProvider.NewConnection:
+                    AsyncSocketConduit cond = cast(AsyncSocketConduit) msg.payload;
+                    log.trace("new conduit : {}", cast(uint) cond);
+                    auto provider = new TcpProvider(cond);
+                    auto tsk = new ChatConnectionTask(new TcpProvider(cond));
+                    children.append(tsk);
+                    log.trace("accepted connection");
                     break;
-                case TCP_PROVIDER_SEND_COMPLETE:
+                case ChatConnectionTask.StdIn:
+                    char[] inbuf = (cast(char*) msg.payload)[0 .. msg.info];
                     break;
-                case TCP_PROVIDER_REMOTE_CLOSED:
-                    Stdout("--- Remote host closed connection \n");
+                case ChatConnectionTask.RemoteClosed:
                     break;
                 default:
-                    Stdout("Unknown message received\n");
+                    log.trace("Unknown message received");
             }
         }
-        em.stopNow();
     }
-
-    void send(char[] buf)
-    {
-        tcp.send(buf);
-    }
-
-    static CircularList!(ChatConnectionTask!(
+private
+    bool running;
 }
 
-
-int main(int argc, char[][] argv)
+int main(char[][] args)
 {
-    auto vat = new Vat;
- 
-    void listentask(Message msg)
-    {
-        switch(msg.type)
-        {
-            case TCP_PROVIDER_CONNECT:
-                AsyncSocketConduit cond = cast(AsyncSocketConduit) msg.payload;
-                auto tsk = ChatConnectionTask(new TcpProvider(cond));
-                vat.addTask(tsk);
-                break;
-            default:
-                Stdout("Unknown message received\n");
-        }
-    }
-
-
-    auto provider = new TcpProvider(new IPv4Address("localhost", 5555), vat);
-    auto srvtsk = new Task(&listentask, provider);
-    vat.addTask(task, client);
+    log = Log.lookup("dreactor.chatserver");
+    auto provider = new TcpProvider(new IPv4Address("localhost", 5555), true);
+    auto srvtsk = new ListenerTask(provider);
+    return 0;
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test/testtuple.d	Sat Sep 20 18:33:11 2008 -0400
@@ -0,0 +1,3 @@
+
+
+