# HG changeset patch # User rick@minifunk # Date 1215530559 14400 # Node ID f8b01c9f7114510d2f10f25565fc849e4e0ecf1d # Parent e3dbc92088224db93e806a0cf90e1e25cd15c0c7 adding basic protocols diff -r e3dbc9208822 -r f8b01c9f7114 .hgignore --- a/.hgignore Tue Jul 08 11:21:09 2008 -0400 +++ b/.hgignore Tue Jul 08 11:22:39 2008 -0400 @@ -4,5 +4,7 @@ dsss_imports/* dsss_objects dsss_objects/* +dsss_objs +dsss_objs/* dsss.last *.swp diff -r e3dbc9208822 -r f8b01c9f7114 dreactor/protocol/RawTcp.d --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/dreactor/protocol/RawTcp.d Tue Jul 08 11:22:39 2008 -0400 @@ -0,0 +1,209 @@ +module dreactor.protocol.RawTcp; + +import tango.io.Conduit; +import tango.io.selector.model.ISelector; +import tango.net.Socket; +import tango.util.collection.CircularSeq; +import tango.util.log.Log; +import tango.util.log.Config; + +import dreactor.transport.AsyncSocketConduit; +import dreactor.core.SelectLoop; +import dreactor.core.ConnectionHandler; + +/****************************************************************************** + + Basic TCP server or client routines for sending raw data. + +******************************************************************************/ +class RawTCPListener +{ +public + Logger log; + this(ConnectionHandler mgr, SelectLoop sel, IPv4Address addr) + { + manager = mgr; + mgr.events(Event.Read); + mgr.setOutgoingHandler(&Handlers.onSend); + mgr.setIncomingHandler(&Handlers.onReceive); + mgr.setConnectHandler(&accept); + mgr.listen(addr); + + sel.addConnection(mgr); + select = sel; + log = Log.lookup("dreactor.protocol.RawTcpServer"); + log.info("log initialized"); + children = new CircularSeq!(ConnectionHandler); + } + + int accept(Conduit cond, RegisterD reg) + { + AsyncSocketConduit newcond = new AsyncSocketConduit; + (cast(AsyncSocketConduit)cond).socket().accept(newcond.socket); + ConnectionHandler h = ConnectionHandler.New(newcond, manager); + h.events(Event.Read); + select.addConnection(h); + children.append(h); + log.info("accepted new connection"); + return 0; + } + + int broadcast(char[] outbuf) + { + foreach(ConnectionHandler h; children) + { + if (h.appendOutBuffer(outbuf)) + { + h.addEvent(Event.Write); + select.addConnection(h); + } + } + return 0; + } + + void close() + { + + } + +private + ConnectionHandler manager; + SelectLoop select; + CircularSeq!(ConnectionHandler) children; +} + +class RawTCPClient +{ +public + Logger log; + this(ConnectionHandler mgr, SelectLoop sel, Event evts = Event.Read) + { + manager = mgr; + manager.events(evts); + connected = false; + mgr.setOutgoingHandler(&Handlers.onSend); + mgr.setIncomingHandler(&Handlers.onReceive); + select = sel; + log = Log.lookup("dreactor.protocol.RawTcpClient"); + } + + int connect(IPv4Address addr) + { + (cast(AsyncSocketConduit) manager.transport()).connect(addr); + select.addConnection(manager); + connected = true; + log.info("connected to {}", addr); + return 0; + } + + /************************************************************************** + + send + User-called function to send data to the counterpart at the other + end of the connection. This sets up the connection manager to send + data as the socket becomes free. + + **************************************************************************/ + int send(char[] outbuf, IPv4Address addr = null) + { + log.info("sending buffer: {}", outbuf); + if (!connected) + { + log.info("not connected, connecting"); + if (addr !is null) + { + if (0 > connect(addr)) + { + log.error("unable to connect"); + return -1; + } + } + } + if (manager.appendOutBuffer(outbuf)) + { + manager.addEvent(Event.Write); + if (!select.addConnection(manager)) + { + log.error("unable to register mgr"); + } + } + return 0; + } + +private + ConnectionHandler manager; + SelectLoop select; + bool connected; +} + + +/****************************************************************************** + + Default Event handlers common to both listener/clients + +******************************************************************************/ +class Handlers +{ +/************************************************************************** + + onSend + OutgoingHandlerD + To be registered as the response to socket writable event. + Sends data, returns amount sent. Unregisters Handler for sending + if there is no more data left to send. + +***************************************************************************/ +public static int onSend(ConnectionHandler h) +{ + Logger log = Log.lookup("Handlers.onSend"); + + log.info("top of onSend"); + char[] outbuf = h.nextBuffer(); + if (outbuf !is null) + { + int sent = h.transport.write(outbuf); + if (sent > 0) + { + if (! h.addOffset(sent)) + { + h.remEvent(Event.Write); + return REREGISTER; + } + } + else if (sent == AsyncSocketConduit.Eof) + { + log.error("Select said socket was writable, but sent 0 bytes"); + } + else + { + log.error("Socket send return ERR"); + } + return REMAIN; + } + else + { + h.remEvent(Event.Write); + return REREGISTER; + } +} +/************************************************************************** + + receive + IncomingHandlerD + Default incoming data handler. Should be replaced with something useful. + +**************************************************************************/ +public static int onReceive(ConnectionHandler h) +{ + Logger log = Log.lookup("Handlers.onReceive"); + + char inbuf[8192]; + int amt; + if((amt = h.transport.read(inbuf)) > 0) + log.info("Received Buffer: {}", inbuf[0 .. amt]); + else + log.error("Received no data, err = {}", amt); + + return REMAIN; +} +} diff -r e3dbc9208822 -r f8b01c9f7114 dreactor/protocol/RawUdp.d --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/dreactor/protocol/RawUdp.d Tue Jul 08 11:22:39 2008 -0400 @@ -0,0 +1,133 @@ +module dreactor.protocol.Raw; + +import tango.io.Conduit; +import tango.io.selector.model.ISelector; +import dreactor.core.AsyncConduit; +import dreactor.core.SelectLoop; +import dreactor.core.ConnectionHandler; +import tango.util.collection.CircularSeq; +import tango.util.log.Log; +import tango.util.log.Configurator; + +Logger log = Log.getLogger("dreactor.core.SelectLoop"); + +/****************************************************************************** + + Basic TCP server or client routines for sending raw data. + +******************************************************************************/ +class RawListener +{ +public + + this(ConnectionHandler mgr, SelectLoop sel) + { + manager = mgr; + mgr.events(Event.Read); + sel.addConnection(mgr); + select = sel; + children = CircularSeq!(ConnectionHandler); + Configurator(); + } + + int accept(Conduit cond) + { + AsyncConduit newcond = new AsyncConduit; + cond.socket().accept(newcond.socket); + ConnectionHandler h = ConnectionHandler.New(manager); + mgr.events(Event.Read); + select.addConnection(mgr); + children.append(mgr); + } + + bool broadcast(char[] outbuf) + { + foreach(ConnectionHandler h; children) + { + if (h.appendBuffer(outbuf)) + { + h.addEvent(Event.Write); + select.addConnection(h); + } + } + } + + void close() + { + + } + + /************************************************************************** + + send + OutgoingHandlerD + To be registered as the response to socket writable event. + Sends data, returns amount sent. Unregisters Handler for sending + if there is no more data left to send. + + ***************************************************************************/ + int send(ConnectionHandler h, RegisterD reg) + { + char[] outbuf = h.nextBuffer(); + if (!outbuf is null) + { + int sent = h.transport.write(outbuf); + if (sent > 0) + { + if (! h.addOffset(sent)) + { + h.removeEvent(Event.write); + reg(h); + } + } + else if (sent == EOF) + { + // EAGAIN ? probably shouldn't have happened. + } + else + { + log.error("Socket send return ERR"); + } + return sent; + } + return 0; + } + + /************************************************************************** + + receive + IncomingHandlerD + Default incoming data handler. Should be replaced with something useful. + + **************************************************************************/ + int receive(ConnectionHandler h, RegisterD reg) + { + char inbuf[8192]; + auto format = Log.format; + if(h.transport.read(inbuf) > 0) + log.info(format("Received Buffer: {}", inbuf)); + } + +private + ConnectionHandler manager; + SelectLoop select; + CircularSeq!(ConnectionHandler) children; +} + +class RawClient +{ +public + this(ConnectionHandler mgr, SelectLoop sel) + { + manager = mgr; + mgr.events(Event.Read); + sel.addConnection(mgr); + select = sel; + } + + + +private + ConnectionHandler manager; + SelectLoop select; +} diff -r e3dbc9208822 -r f8b01c9f7114 test/test Binary file test/test has changed diff -r e3dbc9208822 -r f8b01c9f7114 test/test.d --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/test/test.d Tue Jul 08 11:22:39 2008 -0400 @@ -0,0 +1,30 @@ +module test; + +import tango.net.Socket; +import tango.core.Thread; +import tango.io.Stdout; +import dreactor.core.SelectLoop; +import dreactor.core.ConnectionHandler; +import dreactor.protocol.RawTcp; +import dreactor.transport.AsyncSocketConduit; + +int main() +{ + AsyncSocketConduit cond = new AsyncSocketConduit; + ConnectionHandler lh = new ConnectionHandler(cond, true); + SelectLoop l_loop = new SelectLoop(); + RawTCPListener listener = new RawTCPListener(lh, l_loop, new IPv4Address(5555)); + l_loop.run(); + + AsyncSocketConduit clcond = new AsyncSocketConduit; + ConnectionHandler ch = new ConnectionHandler(clcond); + SelectLoop c_loop = new SelectLoop(); + RawTCPClient client = new RawTCPClient(ch, c_loop); + c_loop.run(); + + client.connect(new IPv4Address("localhost", 5555)); + //Thread.sleep(1); + client.send("This is a test"); + return 0; +} +