changeset 4:f8b01c9f7114

adding basic protocols
author rick@minifunk
date Tue, 08 Jul 2008 11:22:39 -0400
parents e3dbc9208822
children f875a1f278b8
files .hgignore dreactor/protocol/RawTcp.d dreactor/protocol/RawUdp.d test/test test/test.d
diffstat 5 files changed, 374 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- a/.hgignore	Tue Jul 08 11:21:09 2008 -0400
+++ b/.hgignore	Tue Jul 08 11:22:39 2008 -0400
@@ -4,5 +4,7 @@
 dsss_imports/*
 dsss_objects
 dsss_objects/*
+dsss_objs
+dsss_objs/*
 dsss.last
 *.swp
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/dreactor/protocol/RawTcp.d	Tue Jul 08 11:22:39 2008 -0400
@@ -0,0 +1,209 @@
+module dreactor.protocol.RawTcp;
+
+import tango.io.Conduit;
+import tango.io.selector.model.ISelector;
+import tango.net.Socket;
+import tango.util.collection.CircularSeq;
+import tango.util.log.Log;
+import tango.util.log.Config;
+
+import dreactor.transport.AsyncSocketConduit;
+import dreactor.core.SelectLoop;
+import dreactor.core.ConnectionHandler;
+
+/******************************************************************************
+    
+    Basic TCP server or client routines for sending raw data.
+
+******************************************************************************/
+class RawTCPListener
+{
+public
+    Logger log; 
+    this(ConnectionHandler mgr, SelectLoop sel, IPv4Address addr)
+    {
+        manager = mgr;
+        mgr.events(Event.Read);
+        mgr.setOutgoingHandler(&Handlers.onSend);
+        mgr.setIncomingHandler(&Handlers.onReceive);
+        mgr.setConnectHandler(&accept);
+        mgr.listen(addr);
+        
+        sel.addConnection(mgr);
+        select = sel;
+        log = Log.lookup("dreactor.protocol.RawTcpServer");
+        log.info("log initialized");
+        children = new CircularSeq!(ConnectionHandler);
+    }
+ 
+    int accept(Conduit cond, RegisterD reg)
+    {
+        AsyncSocketConduit newcond = new AsyncSocketConduit;
+        (cast(AsyncSocketConduit)cond).socket().accept(newcond.socket);
+        ConnectionHandler h = ConnectionHandler.New(newcond, manager);
+        h.events(Event.Read);
+        select.addConnection(h);
+        children.append(h);
+        log.info("accepted new connection");
+        return 0;
+    }
+
+    int broadcast(char[] outbuf)
+    {
+        foreach(ConnectionHandler h; children)
+        {
+            if (h.appendOutBuffer(outbuf))
+            {
+                h.addEvent(Event.Write);
+                select.addConnection(h);
+            }
+        }
+        return 0;
+    }
+ 
+    void close()
+    {
+        
+    }
+
+private
+    ConnectionHandler manager;
+    SelectLoop select;
+    CircularSeq!(ConnectionHandler) children;
+}
+
+class RawTCPClient
+{
+public
+    Logger log;
+    this(ConnectionHandler mgr, SelectLoop sel, Event evts = Event.Read)
+    {
+        manager = mgr;
+        manager.events(evts);
+        connected = false;
+        mgr.setOutgoingHandler(&Handlers.onSend);
+        mgr.setIncomingHandler(&Handlers.onReceive);
+        select = sel;
+        log = Log.lookup("dreactor.protocol.RawTcpClient");
+    }
+
+    int connect(IPv4Address addr)
+    {
+        (cast(AsyncSocketConduit) manager.transport()).connect(addr);
+        select.addConnection(manager);
+        connected = true;
+        log.info("connected to {}", addr);
+        return 0;
+    } 
+
+    /**************************************************************************
+    
+        send
+        User-called function to send data to the counterpart at the other
+        end of the connection. This sets up the connection manager to send
+        data as the socket becomes free. 
+
+    **************************************************************************/
+    int send(char[] outbuf, IPv4Address addr = null)
+    {
+        log.info("sending buffer: {}", outbuf);
+        if (!connected)
+        {
+            log.info("not connected, connecting");
+            if (addr !is null)
+            {
+                if (0 > connect(addr))
+                {
+                    log.error("unable to connect");
+                    return -1;
+                }
+            }
+        }
+        if (manager.appendOutBuffer(outbuf))
+        {
+            manager.addEvent(Event.Write);
+            if (!select.addConnection(manager))
+            {
+                log.error("unable to register mgr");
+            }
+        }
+        return 0;
+    }
+    
+private
+    ConnectionHandler manager;
+    SelectLoop select;
+    bool connected;
+}
+
+
+/******************************************************************************
+
+    Default Event handlers common to both listener/clients
+
+******************************************************************************/
+class Handlers
+{
+/**************************************************************************
+
+    onSend
+    OutgoingHandlerD 
+    To be registered as the response to socket writable event. 
+    Sends data, returns amount sent. Unregisters Handler for sending
+    if there is no more data left to send. 
+
+***************************************************************************/
+public static int onSend(ConnectionHandler h)
+{
+    Logger log = Log.lookup("Handlers.onSend");
+ 
+    log.info("top of onSend");
+    char[] outbuf = h.nextBuffer();
+    if (outbuf !is null)
+    {
+        int sent = h.transport.write(outbuf);
+        if (sent > 0)
+        {
+            if (! h.addOffset(sent))
+            {
+                h.remEvent(Event.Write);
+                return REREGISTER;
+            }
+        }
+        else if (sent == AsyncSocketConduit.Eof)
+        {
+            log.error("Select said socket was writable, but sent 0 bytes");
+        }
+        else
+        {
+           log.error("Socket send return ERR");
+        }
+        return REMAIN;
+    }
+    else
+    {
+        h.remEvent(Event.Write);
+        return REREGISTER;
+    }
+}
+/**************************************************************************
+
+    receive
+    IncomingHandlerD
+    Default incoming data handler. Should be replaced with something useful.
+
+**************************************************************************/ 
+public static int onReceive(ConnectionHandler h)
+{
+    Logger log = Log.lookup("Handlers.onReceive");
+
+    char inbuf[8192];
+    int amt;
+    if((amt = h.transport.read(inbuf)) > 0)
+        log.info("Received Buffer: {}", inbuf[0 .. amt]);
+    else 
+        log.error("Received no data, err = {}", amt);
+
+    return REMAIN;
+}
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/dreactor/protocol/RawUdp.d	Tue Jul 08 11:22:39 2008 -0400
@@ -0,0 +1,133 @@
+module dreactor.protocol.Raw;
+
+import tango.io.Conduit;
+import tango.io.selector.model.ISelector;
+import dreactor.core.AsyncConduit;
+import dreactor.core.SelectLoop;
+import dreactor.core.ConnectionHandler;
+import tango.util.collection.CircularSeq;
+import tango.util.log.Log;
+import tango.util.log.Configurator;
+
+Logger log = Log.getLogger("dreactor.core.SelectLoop");
+
+/******************************************************************************
+    
+    Basic TCP server or client routines for sending raw data.
+
+******************************************************************************/
+class RawListener
+{
+public
+ 
+    this(ConnectionHandler mgr, SelectLoop sel)
+    {
+        manager = mgr;
+        mgr.events(Event.Read);
+        sel.addConnection(mgr);
+        select = sel;
+        children = CircularSeq!(ConnectionHandler);
+        Configurator();
+    }
+ 
+    int accept(Conduit cond)
+    {
+        AsyncConduit newcond = new AsyncConduit;
+        cond.socket().accept(newcond.socket);
+        ConnectionHandler h = ConnectionHandler.New(manager);
+        mgr.events(Event.Read);
+        select.addConnection(mgr);
+        children.append(mgr);
+    }
+
+    bool broadcast(char[] outbuf)
+    {
+        foreach(ConnectionHandler h; children)
+        {
+            if (h.appendBuffer(outbuf))
+            {
+                h.addEvent(Event.Write);
+                select.addConnection(h);
+            }
+        }
+    }
+ 
+    void close()
+    {
+        
+    }
+
+    /**************************************************************************
+
+        send
+        OutgoingHandlerD 
+        To be registered as the response to socket writable event. 
+        Sends data, returns amount sent. Unregisters Handler for sending
+        if there is no more data left to send. 
+
+    ***************************************************************************/
+    int send(ConnectionHandler h, RegisterD reg)
+    {
+        char[] outbuf = h.nextBuffer();
+        if (!outbuf is null)
+        {
+            int sent = h.transport.write(outbuf);
+            if (sent > 0)
+            {
+                if (! h.addOffset(sent))
+                {
+                    h.removeEvent(Event.write);
+                    reg(h);
+                }
+            }
+            else if (sent == EOF)
+            {
+                // EAGAIN ? probably shouldn't have happened. 
+            }
+            else
+            {
+               log.error("Socket send return ERR"); 
+            }
+            return sent;
+        }
+        return 0;
+    }
+
+    /**************************************************************************
+    
+        receive
+        IncomingHandlerD
+        Default incoming data handler. Should be replaced with something useful.
+
+    **************************************************************************/ 
+    int receive(ConnectionHandler h, RegisterD reg)
+    {
+        char inbuf[8192];
+        auto format = Log.format;
+        if(h.transport.read(inbuf) > 0)
+            log.info(format("Received Buffer: {}", inbuf));
+    }
+
+private
+    ConnectionHandler manager;
+    SelectLoop select;
+    CircularSeq!(ConnectionHandler) children;
+}
+
+class RawClient
+{
+public
+    this(ConnectionHandler mgr, SelectLoop sel)
+    {
+        manager = mgr;
+        mgr.events(Event.Read);
+        sel.addConnection(mgr);
+        select = sel;
+    }
+
+    
+
+private
+    ConnectionHandler manager;
+    SelectLoop select;
+}
Binary file test/test has changed
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test/test.d	Tue Jul 08 11:22:39 2008 -0400
@@ -0,0 +1,30 @@
+module test;
+
+import tango.net.Socket;
+import tango.core.Thread;
+import tango.io.Stdout;
+import dreactor.core.SelectLoop; 
+import dreactor.core.ConnectionHandler;
+import dreactor.protocol.RawTcp;
+import dreactor.transport.AsyncSocketConduit;
+
+int main()
+{ 
+    AsyncSocketConduit cond = new AsyncSocketConduit;
+    ConnectionHandler lh = new ConnectionHandler(cond, true);
+    SelectLoop l_loop = new SelectLoop();
+    RawTCPListener listener = new RawTCPListener(lh, l_loop, new IPv4Address(5555)); 
+    l_loop.run();
+
+    AsyncSocketConduit clcond = new AsyncSocketConduit;
+    ConnectionHandler ch = new ConnectionHandler(clcond);
+    SelectLoop c_loop = new SelectLoop();
+    RawTCPClient client = new RawTCPClient(ch, c_loop);
+    c_loop.run();
+
+    client.connect(new IPv4Address("localhost", 5555));
+    //Thread.sleep(1); 
+    client.send("This is a test");
+    return 0;
+}
+