view dreactor/protocol/RawTcp.d @ 9:5412a1ff2e49

adding chat client and more updates
author rick@minifunk
date Sat, 12 Jul 2008 10:42:41 -0400
parents 60cf25102fb2
children e75a2e506b1d
line wrap: on
line source

module dreactor.protocol.RawTcp;

import tango.io.Conduit;
import tango.io.selector.model.ISelector;
import tango.net.Socket;
import tango.util.collection.CircularSeq;
import tango.util.log.Log;
import tango.util.log.Config;

import dreactor.transport.AsyncSocketConduit;
import dreactor.core.Vat;
import dreactor.core.Dispatcher;

/******************************************************************************
    
    Basic TCP server or client routines for sending raw data.

******************************************************************************/
class RawTCPListener : RawTCPHandler
{
public
    Logger log; 
    this(Dispatcher mgr, Vat sel, IPv4Address addr)
    {
        manager = mgr;
        mgr.events(Event.Read);
        mgr.setOutgoingHandler(&onSend);
        mgr.setIncomingHandler(&onReceive);
        mgr.setConnectHandler(&accept);
        mgr.setErrorHandler(&onError);
        mgr.setDisconnectHandler(&onHangup);
        mgr.listen(addr);
         
        sel.addConnection(mgr);
        vat = sel;
        log = Log.lookup("dreactor.protocol.RawTcpServer");
        log.info("log initialized");
        children = new CircularSeq!(Dispatcher);
    }
    this(Vat sel, IPv4Address addr)
    {
        AsyncSocketConduit cond = new AsyncSocketConduit;
        cond.socket().setAddressReuse(true);
        Dispatcher lh = new Dispatcher(cond, true);
        this(lh, sel, addr);
    }

    ~this()
    {
        foreach(Dispatcher d; children)
        {
            (cast(AsyncSocketConduit)d.transport).shutdown();
            (cast(AsyncSocketConduit)d.transport).detach();
        }
        (cast(AsyncSocketConduit)manager.transport).shutdown();
        (cast(AsyncSocketConduit)manager.transport).detach(); 
    } 

    int accept(Conduit cond, RegisterD reg)
    {
        AsyncSocketConduit newcond = new AsyncSocketConduit;
        (cast(AsyncSocketConduit)cond).socket().accept(newcond.socket);
        Dispatcher h = Dispatcher.New(newcond, manager);
        h.events(Event.Read);
        vat.addConnection(h);
        children.append(h);
        log.info("accepted new connection");
        return 0;
    }

    int broadcast(char[] outbuf, Dispatcher[] excluded = null)
    {
        foreach(Dispatcher h; children)
        {
            if (excluded && excluded.includes(h))
                continue;
            if (h.appendOutBuffer(outbuf))
            {
                h.addEvent(Event.Write);
                vat.addConnection(h);
            }
        }
        return 0;
    }
    
    /**************************************************************************
    
        send
        User-called function to send data to the counterpart at the other
        end of the connection. This sets up the connection manager to send
        data as the socket becomes free. 

    **************************************************************************/
    int send(Dispatcher d, char[] outbuf, IPv4Address addr = null)
    {
        if (d.appendOutBuffer(outbuf))
        {
            d.addEvent(Event.Write);
            d.setOutgoingHandler(&onSend);
            if (!vat.addConnection(d))
            {
                log.error("unable to register mgr");
            }
        }
        return 0;
    }

    public int onReceive(Dispatcher h)
    {
        Logger log = Log.lookup("Handlers.onReceive");

        char inbuf[8192];
        int amt;
        if((amt = h.transport.read(inbuf)) > 0)
        {
            if (dataHandler)
                dataHandler(inbuf[0 .. amt], h);
            else
                log.info("Received {} byte Buffer: {}", amt, inbuf[0 .. amt]);
        }
        else
        {
            if (amt == 0)
            {
                children.remove(h);
                (cast(AsyncSocketConduit) h.transport).shutdown();
                return CLOSE;
            }
            log.error("Received no data, err = {}", amt);
        }
        return REMAIN;
    }
 
    void close()
    {
        
    }

private
    Dispatcher manager;
    Vat vat;
    CircularSeq!(Dispatcher) children;
}

class RawTCPClient : RawTCPHandler
{
public
    Logger log;
    this(Dispatcher mgr, Vat sel, Event evts = Event.Read)
    {
        manager = mgr;
        manager.events(evts);
        connected = false;
        mgr.setOutgoingHandler(&onSend);
        mgr.setIncomingHandler(&onReceive);
        mgr.setErrorHandler(&onError);
        mgr.setDisconnectHandler(&onHangup);
        vat = sel;
        log = Log.lookup("dreactor.protocol.RawTcpClient");
    }

    this(Vat sel, Event evts = Event.Read)
    {
        AsyncSocketConduit clcond = new AsyncSocketConduit;
        Dispatcher ch = new Dispatcher(clcond);
        this(ch, sel, evts);
    }

    ~this()
    {
        (cast(AsyncSocketConduit)manager.transport).shutdown();
        (cast(AsyncSocketConduit)manager.transport).detach();
    }

    int connect(IPv4Address addr)
    {
        (cast(AsyncSocketConduit) manager.transport()).connect(addr);
        vat.addConnection(manager);
        connected = true;
        log.info("connected to {}", addr);
        return 0;
    } 

    /**************************************************************************
    
        send
        User-called function to send data to the counterpart at the other
        end of the connection. This sets up the connection manager to send
        data as the socket becomes free. 

    **************************************************************************/
    int send(char[] outbuf, IPv4Address addr = null)
    {
        if (!connected)
        {
            log.info("send: not connected, connecting");
            if (addr !is null)
            {
                if (0 > connect(addr))
                {
                    log.error("send: unable to connect");
                    return -1;
                }
            }
        }
        if (manager.appendOutBuffer(outbuf))
        {
            manager.addEvent(Event.Write);
            if (!vat.addConnection(manager))
            {
                log.error("unable to register mgr");
            }
        }
        return 0;
    }
    
private
    Dispatcher manager;
    Vat vat;
    bool connected;
}


/******************************************************************************

    Default Event handlers common to both listener/clients

******************************************************************************/
class RawTCPHandler
{
/**************************************************************************

    onSend
    OutgoingHandlerD 
    To be registered as the response to socket writable event. 
    Sends data, returns amount sent. Unregisters Handler for sending
    if there is no more data left to send. 

***************************************************************************/
public int onSend(Dispatcher h)
{
    Logger log = Log.lookup("Handlers.onSend");
 
    char[] outbuf = h.nextBuffer();
    if (outbuf !is null)
    {
        int sent = h.transport.write(outbuf);
        if (sent > 0)
        {
            if (! h.addOffset(sent))
            {
                h.remEvent(Event.Write);
                return REREGISTER;
            }
        }
        else if (sent == AsyncSocketConduit.Eof)
        {
            log.error("Select said socket was writable, but sent 0 bytes");
        }
        else
        {
           log.error("Socket send return ERR");
        }
        return REMAIN;
    }
    else
    {
        h.remEvent(Event.Write);
        return REREGISTER;
    }
}
/**************************************************************************

    receive
    IncomingHandlerD
    Default incoming data handler. Should be replaced with something useful.

**************************************************************************/ 
public int onReceive(Dispatcher h)
{
    Logger log = Log.lookup("Handlers.onReceive");

    char inbuf[8192];
    int amt;
    if((amt = h.transport.read(inbuf)) > 0)
    {
        if (dataHandler)
            dataHandler(inbuf[0 .. amt], h);
        else
            log.info("Received {} byte Buffer: {}", amt, inbuf[0 .. amt]);
    }
    else
    {
        if (amt == 0)
        {
            return CLOSE;
        }
        log.error("Received no data, err = {}", amt);
    }
    return REMAIN;
}
int onHangup(Dispatcher d)
{
    return UNREGISTER;
}
int onError(Dispatcher d, RegisterD unreg)
{
    return CLOSE;
} 

void setDataHandler(void delegate(char[],Dispatcher) del)
{
    dataHandler = del;
}
protected
    void delegate(char[], Dispatcher) dataHandler;
}

bool includes(Dispatcher[] haystack, Dispatcher needle)
{
    foreach(Dispatcher h; haystack)
    {
        if (h is needle)
            return true;
    }
    return false;
}