# HG changeset patch # User rick@minifunk # Date 1215873761 14400 # Node ID 5412a1ff2e49e79560015a9828f9a63e3812d48d # Parent 60cf25102fb20d760216ae283341d82d17c6290c adding chat client and more updates diff -r 60cf25102fb2 -r 5412a1ff2e49 dreactor/core/Dispatcher.d --- a/dreactor/core/Dispatcher.d Wed Jul 09 00:56:21 2008 -0400 +++ b/dreactor/core/Dispatcher.d Sat Jul 12 10:42:41 2008 -0400 @@ -49,7 +49,7 @@ out_buffers = new CircularSeq!(char[]); log = Log.lookup("dreactor.core.Dispatcher"); } - + /********************************************************************** Setters for the handlers. These are set by the Protocols as well diff -r 60cf25102fb2 -r 5412a1ff2e49 dreactor/core/Vat.d --- a/dreactor/core/Vat.d Wed Jul 09 00:56:21 2008 -0400 +++ b/dreactor/core/Vat.d Sat Jul 12 10:42:41 2008 -0400 @@ -59,6 +59,11 @@ running = false; } + void wait() + { + thread.join(); + } + bool addConnection(Dispatcher handler) { log.trace("adding handler"); @@ -126,7 +131,7 @@ //add Conduits to listener freshList.processAll( (ref Dispatcher h) { - selector.reregister(h.transport, h.events(), h); + selector.register(h.transport, h.events(), h); return 1; }); remList.processAll( (ref Dispatcher h) @@ -145,8 +150,7 @@ { case CLOSE: s.unregister(h.transport); - (cast (AsyncSocketConduit) h.transport).shutdown(); - (cast (AsyncSocketConduit) h.transport).detach(); + h.transport.detach(); break; case UNREGISTER: s.unregister(h.transport); @@ -158,7 +162,7 @@ s.register(h.transport, h.events(), h); break; case REREGISTER: - s.reregister(h.transport, h.events(), h); + s.register(h.transport, h.events(), h); break; default: log.error("processReturn: unknown return value"); diff -r 60cf25102fb2 -r 5412a1ff2e49 dreactor/protocol/RawTcp.d --- a/dreactor/protocol/RawTcp.d Wed Jul 09 00:56:21 2008 -0400 +++ b/dreactor/protocol/RawTcp.d Sat Jul 12 10:42:41 2008 -0400 @@ -30,7 +30,7 @@ mgr.setErrorHandler(&onError); mgr.setDisconnectHandler(&onHangup); mgr.listen(addr); - + sel.addConnection(mgr); vat = sel; log = Log.lookup("dreactor.protocol.RawTcpServer"); @@ -40,10 +40,22 @@ this(Vat sel, IPv4Address addr) { AsyncSocketConduit cond = new AsyncSocketConduit; + cond.socket().setAddressReuse(true); Dispatcher lh = new Dispatcher(cond, true); this(lh, sel, addr); } - + + ~this() + { + foreach(Dispatcher d; children) + { + (cast(AsyncSocketConduit)d.transport).shutdown(); + (cast(AsyncSocketConduit)d.transport).detach(); + } + (cast(AsyncSocketConduit)manager.transport).shutdown(); + (cast(AsyncSocketConduit)manager.transport).detach(); + } + int accept(Conduit cond, RegisterD reg) { AsyncSocketConduit newcond = new AsyncSocketConduit; @@ -56,7 +68,7 @@ return 0; } - int broadcast(char[] outbuf, Dispatcher[] excluded) + int broadcast(char[] outbuf, Dispatcher[] excluded = null) { foreach(Dispatcher h; children) { @@ -70,6 +82,28 @@ } return 0; } + + /************************************************************************** + + send + User-called function to send data to the counterpart at the other + end of the connection. This sets up the connection manager to send + data as the socket becomes free. + + **************************************************************************/ + int send(Dispatcher d, char[] outbuf, IPv4Address addr = null) + { + if (d.appendOutBuffer(outbuf)) + { + d.addEvent(Event.Write); + d.setOutgoingHandler(&onSend); + if (!vat.addConnection(d)) + { + log.error("unable to register mgr"); + } + } + return 0; + } public int onReceive(Dispatcher h) { @@ -89,6 +123,7 @@ if (amt == 0) { children.remove(h); + (cast(AsyncSocketConduit) h.transport).shutdown(); return CLOSE; } log.error("Received no data, err = {}", amt); @@ -123,6 +158,7 @@ vat = sel; log = Log.lookup("dreactor.protocol.RawTcpClient"); } + this(Vat sel, Event evts = Event.Read) { AsyncSocketConduit clcond = new AsyncSocketConduit; @@ -130,6 +166,12 @@ this(ch, sel, evts); } + ~this() + { + (cast(AsyncSocketConduit)manager.transport).shutdown(); + (cast(AsyncSocketConduit)manager.transport).detach(); + } + int connect(IPv4Address addr) { (cast(AsyncSocketConduit) manager.transport()).connect(addr); @@ -199,7 +241,6 @@ { Logger log = Log.lookup("Handlers.onSend"); - log.info("top of onSend"); char[] outbuf = h.nextBuffer(); if (outbuf !is null) { diff -r 60cf25102fb2 -r 5412a1ff2e49 test/chatclient.d --- a/test/chatclient.d Wed Jul 09 00:56:21 2008 -0400 +++ b/test/chatclient.d Sat Jul 12 10:42:41 2008 -0400 @@ -28,8 +28,12 @@ while(true) { char buf[] = Cin.copyln(true); + if (buf == "quit\n") + break; client.send(buf); } + c_vat.exit(); + delete client; return 0; } diff -r 60cf25102fb2 -r 5412a1ff2e49 test/chatserver.d --- a/test/chatserver.d Wed Jul 09 00:56:21 2008 -0400 +++ b/test/chatserver.d Sat Jul 12 10:42:41 2008 -0400 @@ -3,19 +3,22 @@ import tango.net.Socket; import tango.core.Thread; import tango.io.Stdout; +import tango.util.log.Log; import dreactor.core.Vat; import dreactor.core.Dispatcher; import dreactor.protocol.RawTcp; import dreactor.transport.AsyncSocketConduit; - +int count; int main() { Vat l_vat = new Vat(); + Logger log = Log.lookup("dreactor.chatserver"); + Log.root.level(log.Level.Info, true); RawTCPListener listener = new RawTCPListener(l_vat, new IPv4Address(5555)); listener.setDataHandler( (char[] inbuf, Dispatcher d) { - + listener.broadcast(inbuf, [d]); }); diff -r 60cf25102fb2 -r 5412a1ff2e49 test/test Binary file test/test has changed