view dreactor/protocol/TcpProvider.d @ 13:8c9b1276f623 default tip

bug fixes
author rick@minifunk
date Sat, 20 Sep 2008 18:33:11 -0400
parents d6a3cfe7c3de
children
line wrap: on
line source

module dreactor.protocol.TcpProvider;

import tango.io.device.Conduit;
import tango.io.selector.model.ISelector;
import tango.net.Socket;
import tango.util.container.CircularList;
import tango.util.log.Log;
import tango.util.log.Config;

import dreactor.transport.AsyncSocketConduit;
import dreactor.core.Vat;
public import dreactor.protocol.IProvider;
/******************************************************************************
    
    Basic TCP server or client routines for sending raw data.

******************************************************************************/
class TcpProvider : IProvider
{
public
    enum {
        SendComplete = 2000,
        NewConnection,
        Receive,
        RemoteClosed,
        SendError,
        ReceiveError,
        Error
    }

    this(AsyncSocketConduit c)
    {
        log = Log.lookup("dreactor.protocol.RawTcpServer");
        log.info("log initialized");
        cond = c;
        events = Event.Read;
    }

    this(IPv4Address addr, bool listen = false)
    {
        AsyncSocketConduit c = new AsyncSocketConduit;
        c.socket().setAddressReuse(true);
        if (listen)
        {
            c.bind(addr);
            c.socket().listen(1000);
            listener = listen;
        }
        else
            c.connect(addr);
        this(c);
    }

    
    ~this()
    {
        close();
    } 
    
    Message handleRead()
    {
        Logger log = Log.lookup("Handlers.onReceive");

        if (listener)
            return handleConnect();

        char inbuf[8192];
        int amt;
        if((amt = cond.read(inbuf)) > 0)
        {
            return Message(inbuf[0 .. amt].dup.ptr, Receive, amt);
        }
        else
        {
            if (amt == 0)
            {
                cond.shutdown();
                return Message(null, RemoteClosed, amt);
            }
            log.error("Received no data, err = {}", amt);
        }
        return Message(null, Error, amt);
    }
    
    /**************************************************************************

        handleWrite
        To be registered as the response to socket writable event. 
        Sends data, returns amount sent. Unregisters Handler for sending
        if there is no more data left to send. 

    ***************************************************************************/
    Message handleWrite()
    {
        Logger log = Log.lookup("Handlers.onSend");
     
        char[] outbuf = nextBuffer();
        if (outbuf !is null)
        {
            int sent = cond.write(outbuf);
            if (sent > 0)
            {
                if (! addOffset(sent))
                {
                    //h.remEvent(Event.Write);
                    //TODO - How do we handle event re-registering
                    return Message(null, SendComplete, sent);
                }
            }
            else if (sent == 0)
            {
                log.error("Select said socket was writable, but sent 0 bytes");
                return Message(null, Error, 0);
            }
            else
            {
                log.error("Socket send return ERR");
                return Message(null, Error, sent);
            }
        }
        else
        {
            remEvent(Event.Write);
            if (!regFn(events))
            {
                log.error("unable to register mgr");
            }
            return Message(null, SendComplete, 0);
        }
    }

    Message handleDisconnect()
    {
        return Message(cast(void*)cond, RemoteClosed, 0);
    }

    Message handleError()
    {
        return Message(cast(void*)cond, Error, 0);
    } 

    Message handleConnect()
    {
        log.trace("accepting new connection");
        return Message(cast(void*)accept(), NewConnection, 0);
    }

    Conduit getConduit()
    {
        return cond;
    }

    Event getEvents()
    {
        return events;
    }

    void setEvents(Event e)
    {
        events = e;
    }

    AsyncSocketConduit accept()
    {
        AsyncSocketConduit newcond = new AsyncSocketConduit;
        cond.socket().accept(newcond.socket);
        log.info("accepted new connection {} {}", cast(uint) newcond, newcond.fileHandle());
        return newcond;
    }
    
    /**************************************************************************
    
        send
        User-called function to send data to the counterpart at the other
        end of the connection. This sets up the connection manager to send
        data as the socket becomes free. 

    **************************************************************************/
    void send(char[] outbuf)
    {
        if (appendOutBuffer(outbuf))
        {
            addEvent(Event.Write);
            if (!regFn(events))
            {
                log.error("unable to register mgr");
            }
        }
    }

 
    void close()
    {
        cond.shutdown();
        cond.detach(); 
    }
    
    int connect(IPv4Address addr)
    {
        cond = new AsyncSocketConduit;
        cond.socket().setAddressReuse(true);
        
        cond.connect(addr);
        connected = true;
        log.info("connected to {}", addr);
        return 0;
    } 

    /**************************************************************************
    
        appendOutBuffer

        Adds an outgoing buffer to the list. This returns true if the list
        was empty, indicating that the handler should be registered with the
        SelectLoop. If it returns false, it was probably already registered.
        
    **************************************************************************/
    bool appendOutBuffer(char[] outbuf)
    {
        out_buffers.append(outbuf);
        out_buffers_len++;
        if (out_buffers_len == 1)
            return true;
        else
            return false;
    }

    /**************************************************************************

        addOffset 
        Use this function to update the offset position after a successful data
        send. This not only manages the current offset, but will update the 
        out buffer chain if necessary. 

        Returns: false if there is nothing left to send, true if there is.

    **************************************************************************/ 
    bool addOffset(int off)
    in
    {
        assert(out_buffers_len > 0);
    }
    body
    {
        char[] hd = out_buffers.head();
        if ((off + o_offset) >= hd.length)
        {
            out_buffers.removeHead();
            o_offset = 0;
            out_buffers_len--;
            return (out_buffers_len > 0);
        }
        else
            o_offset += off;
        return true;
    }

    /**************************************************************************

        char[] nextBuffer

        Returns a slice of the current outbound buffer, returns a char[] pointing
        to null if there is no current outbound buffer

    **************************************************************************/
    char[] nextBuffer()
    {
        if (out_buffers_len < 1)
        {
            return null; 
        }

        return out_buffers.head()[o_offset .. $];
    }
    
    void setRegisterFunc( bool delegate (Event) fn)
    {
        regFn = fn;
    }

    void addEvent(Event evt)
    {
        events |= evt;
    }

    void remEvent(Event evt)
    {
        events &= !evt;
    }

private
    AsyncSocketConduit cond;
    Logger log;
    bool listener;
    Event events;
    bool connected;
    CircularList!(char[]) out_buffers;
    int out_buffers_len;
    int o_offset;
    bool delegate (Event) regFn;
}