changeset 9:5412a1ff2e49

adding chat client and more updates
author rick@minifunk
date Sat, 12 Jul 2008 10:42:41 -0400
parents 60cf25102fb2
children e75a2e506b1d
files dreactor/core/Dispatcher.d dreactor/core/Vat.d dreactor/protocol/RawTcp.d test/chatclient.d test/chatserver.d test/test
diffstat 6 files changed, 63 insertions(+), 11 deletions(-) [+]
line wrap: on
line diff
--- a/dreactor/core/Dispatcher.d	Wed Jul 09 00:56:21 2008 -0400
+++ b/dreactor/core/Dispatcher.d	Sat Jul 12 10:42:41 2008 -0400
@@ -49,7 +49,7 @@
         out_buffers = new CircularSeq!(char[]);
         log = Log.lookup("dreactor.core.Dispatcher");
     }
-      
+    
     /**********************************************************************
     
         Setters for the handlers. These are set by the Protocols as well 
--- a/dreactor/core/Vat.d	Wed Jul 09 00:56:21 2008 -0400
+++ b/dreactor/core/Vat.d	Sat Jul 12 10:42:41 2008 -0400
@@ -59,6 +59,11 @@
         running = false;
     }
 
+    void wait()
+    {
+        thread.join();
+    }
+
     bool addConnection(Dispatcher handler)
     {
         log.trace("adding handler");
@@ -126,7 +131,7 @@
             //add Conduits to listener
             freshList.processAll( (ref Dispatcher h)
             {
-                selector.reregister(h.transport, h.events(), h);
+                selector.register(h.transport, h.events(), h);
                 return 1; 
             });
             remList.processAll( (ref Dispatcher h)
@@ -145,8 +150,7 @@
         {
             case CLOSE:
                 s.unregister(h.transport);
-                (cast (AsyncSocketConduit) h.transport).shutdown();
-                (cast (AsyncSocketConduit) h.transport).detach();
+                h.transport.detach();
             break;
             case UNREGISTER:
                 s.unregister(h.transport);
@@ -158,7 +162,7 @@
                 s.register(h.transport, h.events(), h);
             break;
             case REREGISTER:
-                s.reregister(h.transport, h.events(), h);
+                s.register(h.transport, h.events(), h);
             break;
             default:
                 log.error("processReturn: unknown return value");
--- a/dreactor/protocol/RawTcp.d	Wed Jul 09 00:56:21 2008 -0400
+++ b/dreactor/protocol/RawTcp.d	Sat Jul 12 10:42:41 2008 -0400
@@ -30,7 +30,7 @@
         mgr.setErrorHandler(&onError);
         mgr.setDisconnectHandler(&onHangup);
         mgr.listen(addr);
-        
+         
         sel.addConnection(mgr);
         vat = sel;
         log = Log.lookup("dreactor.protocol.RawTcpServer");
@@ -40,10 +40,22 @@
     this(Vat sel, IPv4Address addr)
     {
         AsyncSocketConduit cond = new AsyncSocketConduit;
+        cond.socket().setAddressReuse(true);
         Dispatcher lh = new Dispatcher(cond, true);
         this(lh, sel, addr);
     }
- 
+
+    ~this()
+    {
+        foreach(Dispatcher d; children)
+        {
+            (cast(AsyncSocketConduit)d.transport).shutdown();
+            (cast(AsyncSocketConduit)d.transport).detach();
+        }
+        (cast(AsyncSocketConduit)manager.transport).shutdown();
+        (cast(AsyncSocketConduit)manager.transport).detach(); 
+    } 
+
     int accept(Conduit cond, RegisterD reg)
     {
         AsyncSocketConduit newcond = new AsyncSocketConduit;
@@ -56,7 +68,7 @@
         return 0;
     }
 
-    int broadcast(char[] outbuf, Dispatcher[] excluded)
+    int broadcast(char[] outbuf, Dispatcher[] excluded = null)
     {
         foreach(Dispatcher h; children)
         {
@@ -70,6 +82,28 @@
         }
         return 0;
     }
+    
+    /**************************************************************************
+    
+        send
+        User-called function to send data to the counterpart at the other
+        end of the connection. This sets up the connection manager to send
+        data as the socket becomes free. 
+
+    **************************************************************************/
+    int send(Dispatcher d, char[] outbuf, IPv4Address addr = null)
+    {
+        if (d.appendOutBuffer(outbuf))
+        {
+            d.addEvent(Event.Write);
+            d.setOutgoingHandler(&onSend);
+            if (!vat.addConnection(d))
+            {
+                log.error("unable to register mgr");
+            }
+        }
+        return 0;
+    }
 
     public int onReceive(Dispatcher h)
     {
@@ -89,6 +123,7 @@
             if (amt == 0)
             {
                 children.remove(h);
+                (cast(AsyncSocketConduit) h.transport).shutdown();
                 return CLOSE;
             }
             log.error("Received no data, err = {}", amt);
@@ -123,6 +158,7 @@
         vat = sel;
         log = Log.lookup("dreactor.protocol.RawTcpClient");
     }
+
     this(Vat sel, Event evts = Event.Read)
     {
         AsyncSocketConduit clcond = new AsyncSocketConduit;
@@ -130,6 +166,12 @@
         this(ch, sel, evts);
     }
 
+    ~this()
+    {
+        (cast(AsyncSocketConduit)manager.transport).shutdown();
+        (cast(AsyncSocketConduit)manager.transport).detach();
+    }
+
     int connect(IPv4Address addr)
     {
         (cast(AsyncSocketConduit) manager.transport()).connect(addr);
@@ -199,7 +241,6 @@
 {
     Logger log = Log.lookup("Handlers.onSend");
  
-    log.info("top of onSend");
     char[] outbuf = h.nextBuffer();
     if (outbuf !is null)
     {
--- a/test/chatclient.d	Wed Jul 09 00:56:21 2008 -0400
+++ b/test/chatclient.d	Sat Jul 12 10:42:41 2008 -0400
@@ -28,8 +28,12 @@
     while(true)
     {
         char buf[] = Cin.copyln(true);
+        if (buf == "quit\n")
+            break;
         client.send(buf);             
     }
+    c_vat.exit();
+    delete client;
     return 0;
 }
 
--- a/test/chatserver.d	Wed Jul 09 00:56:21 2008 -0400
+++ b/test/chatserver.d	Sat Jul 12 10:42:41 2008 -0400
@@ -3,19 +3,22 @@
 import tango.net.Socket;
 import tango.core.Thread;
 import tango.io.Stdout;
+import tango.util.log.Log;
 import dreactor.core.Vat; 
 import dreactor.core.Dispatcher;
 
 import dreactor.protocol.RawTcp;
 import dreactor.transport.AsyncSocketConduit;
 
-
+int count;
 int main()
 { 
     Vat l_vat = new Vat();
+    Logger log = Log.lookup("dreactor.chatserver");
+    Log.root.level(log.Level.Info, true); 
     RawTCPListener listener = new RawTCPListener(l_vat, new IPv4Address(5555));
     listener.setDataHandler( (char[] inbuf, Dispatcher d) {
-
+    
         listener.broadcast(inbuf, [d]);
 
     });
Binary file test/test has changed