Mercurial > projects > dreactor
changeset 12:d6a3cfe7c3de
more stuff
author | rick@Macintosh.local |
---|---|
date | Wed, 27 Aug 2008 00:47:33 -0400 |
parents | 5836613d16ac |
children | 8c9b1276f623 |
files | dreactor/core/Task.d dreactor/core/Vat.d dreactor/protocol/DefaultProvider.d dreactor/protocol/Emitter.d dreactor/protocol/IProvider.d dreactor/protocol/Protocol.d dreactor/protocol/RawTcp.d dreactor/protocol/TcpProvider.d dreactor/protocol/UdpProvider.d dreactor/transport/AsyncSocketConduit.d dsss.conf dsss.last test/async/chatclient test/async/chatclient.d test/async/chatserver test/async/chatserver.d test/async/dummy.txt test/async/longtest test/async/longtest.d test/async/test test/async/test.d test/chatclient.d test/chatserver.d |
diffstat | 22 files changed, 1252 insertions(+), 501 deletions(-) [+] |
line wrap: on
line diff
--- a/dreactor/core/Task.d Tue Aug 12 16:59:56 2008 -0400 +++ b/dreactor/core/Task.d Wed Aug 27 00:47:33 2008 -0400 @@ -1,26 +1,124 @@ module dreactor.core.Task; import tango.core.Thread; +import tango.util.container.HashMap; +import tango.util.container.CircularList; import dreactor.core.Vat; -import dreactor.protocol.Protocol; -import dreactor.protocol.Dispatcher; +import dreactor.protocol.IProvider; + +alias CircularList!(Message) Messages; + +class Mailbox +{ +public + + this () { box = new HashMap!(int, Messages); } + + Message popMessageOfType(int type) + { + Messages m; + if (box.get(type, m)) + { + Message msg = m.removeHead(); + if (msg) + msg_count.store(msg_count.load()-1); + + if (m.isEmpty()) + box.removeKey(type); + + return msg; + } + else + return null; + } + + //TODO this could be optimized to use set intersection logic instead of checking for + //multiple keys one at a time. + Message popMessageOfType(int[] types) + { + foreach(int i; types) + { + Message msg = popMessageOfType(i); + if (msg) + return msg; + } + return null; + } + + Message popMessage() + { + Messages m; + int key; + auto itor = box.iterator; -alias CircularSeq!(Message) Mailbox; + do + { + if (itor.valid && itor.next(key, m)) + { + if (!m.isEmpty()) + { + Message msg = m.removeHead(); + if (msg) + msg_count.store(msg_count.load()-1); + if (m.isEmpty()) + box.removeKey(key); + return msg; + } + else + { + iterator.remove(); + } + } + else + return null; + } + while (true) + } + void push(Message msg) + { + Messages m; + if (box.get(msg.type, m)) + m.append(msg); + else + { + m = new Messages; + m.append(msg); + box.add(msg.type, m); + } + msg_count.store(msg_count.load()+1); + } + + int count() + { + return msg_count.load(); + } +private + HashMap!(int, Messages) box; + Atomic!(int) msg_count; +} + +alias void delegate (Message) TaskDg; class Task { private Fiber fiber; Mailbox mailbox; + Mailbox lockedMailbox; int id; Vat vat; - dispatcher[Conduit] dispatchers; - + TaskDG taskdg; + IProvider provider; + public - this() + this(TaskDg tdg = null, IProvider provider = null) { fiber = new Fiber(&run); mailbox = new Mailbox; + lockedMailbox = new Mailbox; + taskdg = tdg; + if (!provider) + provider = new DefaultProvider; } void setId(int i) @@ -28,9 +126,14 @@ id = i; } - Mailbox getMailbox() + void appendMessage(Message m) { - return mailbox; + mailbox.push(m); + } + + synchronized void appendIVMessage(Message m) + { + lockedMailbox.push(m); } void setVat(Vat v) @@ -38,7 +141,47 @@ vat = v; } - abstract void run(); + IProvider getProvider() + { + return provider; + } + + void run() + in + { + assert(taskdg !is null); + } + body + { + while (msg = receive()) + { + taskdg(msg); + } + } + + /*************************************************************************** + sendTo + Basic message passing utility for inter-task communication. + It first checks the local Vat to see if the task is present, if not + it gets the task from the global registry and sends a message to its + thread-safe mailbox. + ****************************************************************************/ + + bool sendTo(int taskid, Message m) + { + Task t; + if (t = vat.getTask(taskid)) + { + t.appendMessage(m); + return true; + } + else if (t = Vat.getGlobalTask(taskid)) + { + t.appendIVMessage(m); + return true; + } + return false; + } protected @@ -46,39 +189,38 @@ receive User-called function to get the next pending message in the mailbox. If there are no pending messages, this will yield control back to - the scheduler/vat. + the vat's scheduler. ***************************************************************************/ - Message receive() + Message receive(int[] types) { - Message m = mailbox.head(); - mailbox.removeHead(); - return m; + while(true) + { + Message m = mailbox.popMessageOfType(types); + if (!m) + Fiber.yield(); + else if (SYSTEM_QUIT == m.type) + break; + else return m; + + } + return null; + } + + Message receive() + { + while(true) + { + Message m = mailbox.popMessage(); + if (!m) + Fiber.yield(); + else if (SYSTEM_QUIT == m.type) + break; + else return m; + } + return null; } int getId() { return id;} - /************************************************************************** - - send - User-called function to send data to the counterpart at the other - end of the connection. This sets up a dispatcher to send - data as the conduit becomes free. - - **************************************************************************/ - int send(char[] outbuf, Conduit c) - { - Dispatcher dis; - if ( ! (dis = (c in dispatchers))) - dis = new Dispatcher(c); - - if (dis.appendOutBuffer(outbuf)) - { - if (!vat.addConnection(dis)) - { - log.error("unable to register mgr"); - } - } - return 0; - } }
--- a/dreactor/core/Vat.d Tue Aug 12 16:59:56 2008 -0400 +++ b/dreactor/core/Vat.d Wed Aug 27 00:47:33 2008 -0400 @@ -21,31 +21,23 @@ import tango.util.log.Log; import dreactor.transport.AsyncSocketConduit; +import dreactor.protocol.IProvider; import dreactor.core.Task; import dreactor.util.ThreadSafeQueue; static char[] version_string = "Vat.d 0.1 2008-05-31"; -Logger log; enum : int {CLOSE = -2, UNREGISTER = -1, REMAIN = 0, REGISTER = 1, REREGISTER = 2}; -alias Message delegate (Conduit c) HandlerDG; -alias Message function (Conduit c) HandlerFN; class TaskAttachment { public Task task; - HandlerDG dg; - HandlerFN fn; + IProvider provider; - this(Task ta, HandlerDG d) - { TaskAttachment t; t.task = ta; t.dg = d; return t; } - - this(Task ta, HandlerFN f) - { TaskAttachment t; t.task = ta; t.fn = f; return t; } - - public Message opCall(Conduit c) { dg is null ? return fn() : return dg(c); } + this (Task ta, IProvider p) + { task = ta; provider = p; } } class Vat @@ -53,18 +45,16 @@ private Thread thread; bool running; + Logger log; - Task[int] tasks; - int taskCount; + TaskAttachment[int] tasks; //registry for local tasks + + Selector selector; + static Atomic!(int) taskCount; + TaskAttachment[int] globalTasks; //global registry of tasks public - this(Task t) - { - addTask(t); - this(); - } - this() { log = Log.lookup("dreactor.core.Vat"); @@ -73,13 +63,17 @@ thread = new Thread(&eventLoop); thread.start(); } - - void addTask(Task t) + + synchronized int addTask(Task t, IProvider p = null) { t.setVat(this); ++taskCount; - tasks[taskCount] = t; + auto ta = new TaskAttachment(t, (p !is null) ? p : new DefaultProvider); + tasks[taskCount] = ta; + globalTask[taskCount] = ta; + selector.register(p.getConduit(), p.getEvents(), ta); t.setId(taskCount); + return taskCount; } void exit() @@ -92,21 +86,43 @@ thread.join(); } - bool addConnection() + bool addConnection(int tid, Conduit c, Events evts) { log.trace("adding handler"); - return selector.register(h.transport, h.events(), h); + TaskAttachment ta; + if (ta = (tid in tasks)) + return selector.register(c, evts, ta); + else + return false; } - bool remConnection(Dispatcher handler) + bool remConnection(Conduit c) + { + return selector.unregister(c); + } + + Task getTask(int tid) { - return selector.unregister(h.transport); + TaskAttachment ta; + if (ta = (tid in tasks)) + return ta.task; + else + return null; + } + + static synchronized Task getGlobalTask(int tid) + { + TaskAttachment ta; + if (ta = (tid in globaltasks)) + return ta.task; + else + return null; } private void eventLoop() { - auto selector = new Selector(); + selector = new Selector(); selector.open(); do { @@ -122,30 +138,31 @@ { // incoming data log.trace("Read event fired"); - auto conn = cast(Dispatcher) key.attachment; - if ( Dispatcher.State.listening == conn.getState() ) - conn.handleConnection(conn.transport, &addConnection); - else - processReturn(conn.handleIncoming(), selector, conn); + auto ta = cast(TaskAttachment) key.attachment; + processReturn(ta.appendMessage(ta.provider.handleRead()), key.conduit); + } else if (key.isWritable()) { log.trace("Write event fired"); - auto conn = cast(Dispatcher) key.attachment; - processReturn(conn.handleOutgoing(), selector, conn); + auto ta = cast(TaskAttachment) key.attachment; + ta.appendMessage(ta.provider.handleWrite()); + processReturn(ta.appendMessage(ta.provider.handleWrite()), key.conduit); } else if (key.isHangup()) { log.trace("Hangup event fired"); - auto conn = cast(Dispatcher) key.attachment; - processReturn(conn.handleDisconnect(), selector, conn); + auto ta = cast(TaskAttachment) key.attachment; + ta.appendMessage(ta.provider.handleDisconnect()); + processReturn(ta.appendMessage(ta.provider.handleDisconnect()), key.conduit); } else if (key.isError() || key.isInvalidHandle()) { - log.trace("Error event fired"); + log.trace("Error event fired"); // error, close connection - auto conn = cast(Dispatcher) key.attachment; - conn.handleError(&remConnection); + auto ta = cast(TaskAttachment) key.attachment; + ta.appendMessage(ta.provider.handleError()); + processReturn(ta.appendMessage(ta.provider.handleError()), key.conduit); } } } @@ -173,25 +190,23 @@ } } - void processReturn(int result, Selector s, Dispatcher h) + void processReturn(int result, Conduit c) { switch(result) { case CLOSE: - s.unregister(h.transport); - h.transport.detach(); + selector.unregister(c); + c.detach(); break; case UNREGISTER: - s.unregister(h.transport); + selector.unregister(c); break; case REMAIN: //this space intentially left blank break; case REGISTER: - s.register(h.transport, h.events(), h); break; case REREGISTER: - s.register(h.transport, h.events(), h); break; default: log.error("processReturn: unknown return value");
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/dreactor/protocol/DefaultProvider.d Wed Aug 27 00:47:33 2008 -0400 @@ -0,0 +1,51 @@ +module dreactor.protocol.DefaultProvider; + +import tango.io.Selector; + +import dreactor.protocol.IProvider; + + + +class DefaultProvider : IProvider +{ +private + Conduit cond; + Events evts; + +public + Message handleRead(Conduit c) + { + } + + Message handleWrite(Conduit c) + { + } + + Message handleError(Conduit c) + { + } + + Message handleConnect(Conduit c) + { + } + + Message handleDisconnect(Conduit c) + { + } + + + Conduit getConduit() + { + return cond; + } + + int getEvents() + { + return evts; + } + + void setEvents(Event e) + { + evts e; + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/dreactor/protocol/Emitter.d Wed Aug 27 00:47:33 2008 -0400 @@ -0,0 +1,48 @@ +module Emitter + + + + +import tango.core.Thread; + +import dreactor.core.Task; + +alias Message delegate(void) EmitterDg; + +class Emitter +{ +public + this(Task t, EmitterDg cb) + { + task = t; + callback = cb; + thread = new Thread(&run); + thread.start(); + } + + void stop() + { + running = false; + } + + void stopNow() + { + thread.isDaemon(true); + running = false; + } +private + + void run() + { + while(running) + { + Message msg = callback(); + task.appendIVMessage(msg); + } + } + Task task; + Thread thread; + bool running; + EmitterCb callback; +} +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/dreactor/protocol/IProvider.d Wed Aug 27 00:47:33 2008 -0400 @@ -0,0 +1,29 @@ +module dreactor.protocol.IProvider; + +class Message +{ +public + int type; + int info; + Object payload; + this (Object buf, int t, int e) + { + type = t; + info = e; + payload = buf; + } +} + +interface IProvider +{ + Message handleRead(Conduit c); + Message handleWrite(Conduit c); + Message handleError(Conduit c); + Message handleConnect(Conduit c); + Message handleDisconnect(Conduit c); + abstract void send(char []); + + Conduit getConduit(); + int getEvents(); + void setEvents(); +}
--- a/dreactor/protocol/Protocol.d Tue Aug 12 16:59:56 2008 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,26 +0,0 @@ -module dreactor.protocol.IProtocol; - -struct Message -{ -public - int type; - int errorcode; - Object payload; - static Message opCall(Object buf, int t, int e) - { - Message m; - m.type = t; - errorcode = e; - m.payload = buf; - return m; - } -} - -interface IProtocol -{ - Message handleRead(Conduit c); - Message handleWrite(Conduit c); - Message handleError(Conduit c); - Message handleConnect(Conduit c); - Message handleDisconnect(Conduit c); -}
--- a/dreactor/protocol/RawTcp.d Tue Aug 12 16:59:56 2008 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,333 +0,0 @@ -module dreactor.protocol.RawTcp; - -import tango.io.Conduit; -import tango.io.selector.model.ISelector; -import tango.net.Socket; -import tango.util.collection.CircularSeq; -import tango.util.log.Log; -import tango.util.log.Config; - -import dreactor.transport.AsyncSocketConduit; -import dreactor.core.Vat; -import dreactor.core.Dispatcher; - -/****************************************************************************** - - Basic TCP server or client routines for sending raw data. - -******************************************************************************/ -class RawTCPListener -{ -public - - this(AsyncSocketConduit cond) - { - log = Log.lookup("dreactor.protocol.RawTcpServer"); - log.info("log initialized"); - children = new CircularSeq!(Dispatcher); - } - - this(Vat sel, IPv4Address addr) - { - AsyncSocketConduit cond = new AsyncSocketConduit; - cond.socket().setAddressReuse(true); - this(cond); - } - - ~this() - { - close(); - } - - AsyncSocketConduit accept(Conduit cond, RegisterD reg) - { - AsyncSocketConduit newcond = new AsyncSocketConduit; - (cast(AsyncSocketConduit)cond).socket().accept(newcond.socket); - h.events(Event.Read); - vat.addConnection(h); - children.append(h); - log.info("accepted new connection"); - return newcond; - } - - int broadcast(char[] outbuf, AsyncSocketConduit[] recips) - { - foreach(AsyncSocketConduit c; recips) - { - if (c.appendOutBuffer(outbuf)) - { - h.addEvent(Event.Write); - vat.addConnection(h); - } - } - return 0; - } - - /************************************************************************** - - send - User-called function to send data to the counterpart at the other - end of the connection. This sets up the connection manager to send - data as the socket becomes free. - - **************************************************************************/ - int send(Dispatcher d, char[] outbuf, IPv4Address addr = null) - { - if (d.appendOutBuffer(outbuf)) - { - d.addEvent(Event.Write); - if (!vat.addConnection(d)) - { - log.error("unable to register mgr"); - } - } - return 0; - } - - /************************************************************************** - - receive - IncomingHandlerD - Default incoming data handler. Should be replaced with something useful. - - **************************************************************************/ - int onReceive(Dispatcher h) - { - Logger log = Log.lookup("Handlers.onReceive"); - - char inbuf[8192]; - int amt; - if((amt = h.transport.read(inbuf)) > 0) - { - if (dataHandler) - dataHandler(inbuf[0 .. amt], h); - else - log.info("Received {} byte Buffer: {}", amt, inbuf[0 .. amt]); - } - else - { - if (amt == 0) - { - children.remove(h); - (cast(AsyncSocketConduit) h.transport).shutdown(); - return CLOSE; - } - log.error("Received no data, err = {}", amt); - } - return REMAIN; - } - - void close() - { - foreach(Dispatcher d; children) - { - (cast(AsyncSocketConduit)d.transport).shutdown(); - (cast(AsyncSocketConduit)d.transport).detach(); - } - (cast(AsyncSocketConduit)manager.transport).shutdown(); - (cast(AsyncSocketConduit)manager.transport).detach(); - - } - - void setDataHandler(void delegate(char[], Dispatcher) h) - { - dataHandler = h; - } - -private - Vat vat; - CircularSeq!(Dispatcher) children; - Dispatcher manager; - Logger log; - RawTCPHandler h; - void delegate(char[], Dispatcher) dataHandler; -} - -class RawTCPClient -{ - -public - this(Dispatcher mgr, Vat sel, Event evts = Event.Read) - { - manager = mgr; - manager.events(evts); - connected = false; - mgr.setOutgoingHandler(&RawTCPHandler.onSend); - mgr.setIncomingHandler(&onReceive); - mgr.setErrorHandler(&RawTCPHandler.onError); - mgr.setDisconnectHandler(&RawTCPHandler.onHangup); - vat = sel; - log = Log.lookup("dreactor.protocol.RawTcpClient"); - } - - this(Vat sel, Event evts = Event.Read) - { - AsyncSocketConduit clcond = new AsyncSocketConduit; - Dispatcher ch = new Dispatcher(clcond); - this(ch, sel, evts); - } - - ~this() - { - (cast(AsyncSocketConduit)manager.transport).shutdown(); - (cast(AsyncSocketConduit)manager.transport).detach(); - } - - int connect(IPv4Address addr) - { - (cast(AsyncSocketConduit) manager.transport()).connect(addr); - vat.addConnection(manager); - connected = true; - log.info("connected to {}", addr); - return 0; - } - - /************************************************************************** - - send - User-called function to send data to the counterpart at the other - end of the connection. This sets up the connection manager to send - data as the socket becomes free. - - **************************************************************************/ - int send(char[] outbuf, IPv4Address addr = null) - { - if (!connected) - { - log.info("send: not connected, connecting"); - if (addr !is null) - { - if (0 > connect(addr)) - { - log.error("send: unable to connect"); - return -1; - } - } - } - if (manager.appendOutBuffer(outbuf)) - { - manager.addEvent(Event.Write); - if (!vat.addConnection(manager)) - { - log.error("unable to register mgr"); - } - } - return 0; - } - - /************************************************************************** - - receive - IncomingHandlerD - Default incoming data handler. Should be replaced with something useful. - - **************************************************************************/ - public int onReceive(Dispatcher h) - { - Logger log = Log.lookup("Handlers.onReceive"); - - char inbuf[8192]; - int amt; - if((amt = h.transport.read(inbuf)) > 0) - { - if (dataHandler) - dataHandler(inbuf[0 .. amt], h); - else - log.info("Received {} byte Buffer: {}", amt, inbuf[0 .. amt]); - } - else - { - if (amt == 0) - { - return CLOSE; - } - log.error("Received no data, err = {}", amt); - } - return REMAIN; - } - - void setDataHandler(void delegate(char[], Dispatcher) h) - { - dataHandler = h; - } - -private - void delegate(char[], Dispatcher) dataHandler; - Dispatcher manager; - Vat vat; - bool connected; - Logger log; - RawTCPHandler h; -} - - -/****************************************************************************** - - Default Event handlers common to both listener/clients - -******************************************************************************/ -struct RawTCPHandler -{ - /************************************************************************** - - onSend - OutgoingHandlerD - To be registered as the response to socket writable event. - Sends data, returns amount sent. Unregisters Handler for sending - if there is no more data left to send. - - ***************************************************************************/ - public static int onSend(Dispatcher h) - { - Logger log = Log.lookup("Handlers.onSend"); - - char[] outbuf = h.nextBuffer(); - if (outbuf !is null) - { - int sent = h.transport.write(outbuf); - if (sent > 0) - { - if (! h.addOffset(sent)) - { - h.remEvent(Event.Write); - return REREGISTER; - } - } - else if (sent == AsyncSocketConduit.Eof) - { - log.error("Select said socket was writable, but sent 0 bytes"); - } - else - { - log.error("Socket send return ERR"); - } - return REMAIN; - } - else - { - h.remEvent(Event.Write); - return REREGISTER; - } - } - - static int onHangup(Dispatcher d) - { - return UNREGISTER; - } - - static int onError(Dispatcher d, RegisterD unreg) - { - return CLOSE; - } - -} - -bool includes(Dispatcher[] haystack, Dispatcher needle) -{ - foreach(Dispatcher h; haystack) - { - if (h is needle) - return true; - } - return false; -}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/dreactor/protocol/TcpProvider.d Wed Aug 27 00:47:33 2008 -0400 @@ -0,0 +1,252 @@ +module dreactor.protocol.RawTcp; + +import tango.io.device.Conduit; +import tango.io.selector.model.ISelector; +import tango.net.Socket; +import tango.util.collection.CircularSeq; +import tango.util.log.Log; +import tango.util.log.Config; + +import dreactor.transport.AsyncSocketConduit; +import dreactor.core.Vat; +import dreactor.core.Dispatcher; + +/****************************************************************************** + + Basic TCP server or client routines for sending raw data. + +******************************************************************************/ +class TCPProvider : IProvider +{ +public + + enum + { + RECEIVE = 1000, + SEND_COMPLETE, + NEW_CONNECTION, + REMOTE_CLOSED, + SEND_ERROR, + RECEIVE_ERROR, + ERROR + } + + this(AsyncSocketConduit c) + { + log = Log.lookup("dreactor.protocol.RawTcpServer"); + log.info("log initialized"); + cond = c; + } + + this(Vat v, IPv4Address addr) + { + AsyncSocketConduit cond = new AsyncSocketConduit; + cond.socket().setAddressReuse(true); + this(cond); + } + + ~this() + { + close(); + } + + Message handleRead(Conduit c) + { + Logger log = Log.lookup("Handlers.onReceive"); + + char inbuf[8192]; + int amt; + if((amt = h.transport.read(inbuf)) > 0) + { + return new Message(inbuf[0 .. amt].dup, RECEIVE, amt); + } + else + { + if (amt == 0) + { + children.remove(h); + (cast(AsyncSocketConduit) h.transport).shutdown(); + return Message(null, REMOTE_CLOSED, amt); + } + log.error("Received no data, err = {}", amt); + } + return new Message(null, ERROR, amt); + } + + /************************************************************************** + + handleWrite + To be registered as the response to socket writable event. + Sends data, returns amount sent. Unregisters Handler for sending + if there is no more data left to send. + + ***************************************************************************/ + Message handleWrite(Conduit c) + { + Logger log = Log.lookup("Handlers.onSend"); + + char[] outbuf = nextBuffer(); + if (outbuf !is null) + { + int sent = cond.write(outbuf); + if (sent > 0) + { + if (! addOffset(sent)) + { + //h.remEvent(Event.Write); + //TODO - How do we handle event re-registering + return new Message(null, SEND_COMPLETE, sent); + } + } + else if (sent == 0) + { + log.error("Select said socket was writable, but sent 0 bytes"); + return new Message(null, SEND_ERROR, 0); + } + else + { + log.error("Socket send return ERR"); + return new Message(null, SEND_ERROR, sent); + } + } + else + { + //h.remEvent(Event.Write); + //TODO - How do we handle event re-registering + + return new Message(null, SEND_COMPLETE, 0); + } + } + + Message handleDisconnect(Conduit c) + { + return new Message(c, REMOTE_CLOSED, 0); + } + + Message handleError(Conduit c) + { + return new Messsage(null, ERROR, 0); + } + + Message handleConnect(Conduit c) + { + return new Message(accept(), NEW_CONNECTION, 0); + } + + Conduit getConduit() + { + return cond; + } + + int getEvents() + { + return events; + } + + void setEvents(Event e) + { + events = e; + } + + AsyncSocketConduit accept() + { + AsyncSocketConduit newcond = new AsyncSocketConduit; + cond.socket().accept(newcond.socket); + log.info("accepted new connection"); + return newcond; + } + + int broadcast(char[] outbuf, TCPProvider[] recips) + { + foreach(TCPProvider c; recips) + { + if (c.appendOutBuffer(outbuf)) + { + h.addEvent(Event.Write); + vat.addConnection(h); + } + } + return 0; + } + + /************************************************************************** + + send + User-called function to send data to the counterpart at the other + end of the connection. This sets up the connection manager to send + data as the socket becomes free. + + **************************************************************************/ + int send(char[] outbufl) + { + if (appendOutBuffer(outbuf)) + { + //TODO - should we always register for all events? or update it when needed? + //d.addEvent(Event.Write); + if (!vat.addConnection(d)) + { + log.error("unable to register mgr"); + } + } + return 0; + } + + + void close() + { + cond.shutdown(); + cond.detach(); + } + + + ~this() + { + (cast(AsyncSocketConduit)manager.transport).shutdown(); + (cast(AsyncSocketConduit)manager.transport).detach(); + } + + int connect(IPv4Address addr) + { + cond = new AsyncSocketConduit; + cond.socket().setAddressReuse(true); + + cond.connect(addr); + connected = true; + log.info("connected to {}", addr); + return 0; + } + + /************************************************************************** + + send + User-called function to send data to the counterpart at the other + end of the connection. This sets up the connection manager to send + data as the socket becomes free. + + **************************************************************************/ + int send(char[] outbuf, IPv4Address addr = null) + { + if (!connected) + { + log.info("send: not connected, connecting"); + return -1; + } + if (appendOutBuffer(outbuf)) + { + addEvent(Event.Write); + if (!vat.addConnection(manager)) + { + log.error("unable to register mgr"); + } + } + return 0; + } + + +private + Vat vat; + Conduit cond; + Logger log; + bool listener; + Event events; +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/dreactor/protocol/UdpProvider.d Wed Aug 27 00:47:33 2008 -0400 @@ -0,0 +1,133 @@ +module dreactor.protocol.Raw; + +import tango.io.Conduit; +import tango.io.selector.model.ISelector; +import dreactor.core.AsyncConduit; +import dreactor.core.SelectLoop; +import dreactor.core.ConnectionHandler; +import tango.util.collection.CircularSeq; +import tango.util.log.Log; +import tango.util.log.Configurator; + +Logger log = Log.getLogger("dreactor.core.SelectLoop"); + +/****************************************************************************** + + Basic TCP server or client routines for sending raw data. + +******************************************************************************/ +class RawListener +{ +public + + this(ConnectionHandler mgr, SelectLoop sel) + { + manager = mgr; + mgr.events(Event.Read); + sel.addConnection(mgr); + select = sel; + children = CircularSeq!(ConnectionHandler); + Configurator(); + } + + int accept(Conduit cond) + { + AsyncConduit newcond = new AsyncConduit; + cond.socket().accept(newcond.socket); + ConnectionHandler h = ConnectionHandler.New(manager); + mgr.events(Event.Read); + select.addConnection(mgr); + children.append(mgr); + } + + bool broadcast(char[] outbuf) + { + foreach(ConnectionHandler h; children) + { + if (h.appendBuffer(outbuf)) + { + h.addEvent(Event.Write); + select.addConnection(h); + } + } + } + + void close() + { + + } + + /************************************************************************** + + send + OutgoingHandlerD + To be registered as the response to socket writable event. + Sends data, returns amount sent. Unregisters Handler for sending + if there is no more data left to send. + + ***************************************************************************/ + int send(ConnectionHandler h, RegisterD reg) + { + char[] outbuf = h.nextBuffer(); + if (!outbuf is null) + { + int sent = h.transport.write(outbuf); + if (sent > 0) + { + if (! h.addOffset(sent)) + { + h.removeEvent(Event.write); + reg(h); + } + } + else if (sent == EOF) + { + // EAGAIN ? probably shouldn't have happened. + } + else + { + log.error("Socket send return ERR"); + } + return sent; + } + return 0; + } + + /************************************************************************** + + receive + IncomingHandlerD + Default incoming data handler. Should be replaced with something useful. + + **************************************************************************/ + int receive(ConnectionHandler h, RegisterD reg) + { + char inbuf[8192]; + auto format = Log.format; + if(h.transport.read(inbuf) > 0) + log.info(format("Received Buffer: {}", inbuf)); + } + +private + ConnectionHandler manager; + SelectLoop select; + CircularSeq!(ConnectionHandler) children; +} + +class RawClient +{ +public + this(ConnectionHandler mgr, SelectLoop sel) + { + manager = mgr; + mgr.events(Event.Read); + sel.addConnection(mgr); + select = sel; + } + + + +private + ConnectionHandler manager; + SelectLoop select; +}
--- a/dreactor/transport/AsyncSocketConduit.d Tue Aug 12 16:59:56 2008 -0400 +++ b/dreactor/transport/AsyncSocketConduit.d Wed Aug 27 00:47:33 2008 -0400 @@ -16,7 +16,7 @@ private import tango.time.Time; -public import tango.io.Conduit; +public import tango.io.device.Conduit; private import tango.net.Socket;
--- a/dsss.conf Tue Aug 12 16:59:56 2008 -0400 +++ b/dsss.conf Wed Aug 27 00:47:33 2008 -0400 @@ -1,4 +1,4 @@ -[test/test.d] -[test/longtest.d] -[test/chatserver.d] +#[test/test.d] +#[test/longtest.d] +#[test/chatserver.d] [test/chatclient.d]
--- a/dsss.last Tue Aug 12 16:59:56 2008 -0400 +++ b/dsss.last Wed Aug 27 00:47:33 2008 -0400 @@ -1,5 +1,4 @@ -[test/test.d] -[test/longtest.d] -[test/chatserver.d] +#[test/test.d] +#[test/longtest.d] +#[test/chatserver.d] [test/chatclient.d] -[dreactor/protocol/http11_parser.d]
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/test/async/chatclient.d Wed Aug 27 00:47:33 2008 -0400 @@ -0,0 +1,39 @@ +module chatclient; + +import tango.net.Socket; +import tango.core.Thread; +import tango.io.Stdout; +import tango.io.Console; +import tango.util.log.Log; + +import dreactor.core.Vat; +import dreactor.core.Dispatcher; + +import dreactor.protocol.RawTcp; +import dreactor.transport.AsyncSocketConduit; + +int main() +{ + Vat c_vat = new Vat(); + RawTCPClient client = new RawTCPClient(c_vat); + Log.root.level(log.Level.Warn, true); + + client.setDataHandler( (char[] inbuf, Dispatcher d) { + Stdout(inbuf)(); + }); + + c_vat.run(); + client.connect(new IPv4Address("localhost", 5555)); + + while(true) + { + char buf[] = Cin.copyln(true); + if (buf == "quit\n") + break; + client.send(buf); + } + c_vat.exit(); + delete client; + return 0; +} +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/test/async/chatserver.d Wed Aug 27 00:47:33 2008 -0400 @@ -0,0 +1,29 @@ +module chatserver; + +import tango.net.Socket; +import tango.core.Thread; +import tango.io.Stdout; +import tango.util.log.Log; +import dreactor.core.Vat; +import dreactor.core.Dispatcher; + +import dreactor.protocol.RawTcp; +import dreactor.transport.AsyncSocketConduit; + +int main() +{ + Vat l_vat = new Vat(); + Logger log = Log.lookup("dreactor.chatserver"); + Log.root.level(log.Level.Info, true); + RawTCPListener listener = new RawTCPListener(l_vat, new IPv4Address(5555)); + + listener.setDataHandler( (char[] inbuf, Dispatcher d) { + + listener.broadcast(inbuf, [d]); + + }); + l_vat.run(); + + return 0; +} +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/test/async/longtest.d Wed Aug 27 00:47:33 2008 -0400 @@ -0,0 +1,273 @@ +module longtest; + +import tango.net.Socket; +import tango.core.Thread; +import tango.io.Stdout; +import dreactor.core.Vat; +import dreactor.core.Dispatcher; +import dreactor.protocol.RawTcp; +import dreactor.transport.AsyncSocketConduit; + + +int main() +{ + AsyncSocketConduit cond = new AsyncSocketConduit; + Dispatcher lh = new Dispatcher(cond, true); + Vat l_vat = new Vat(); + RawTCPListener listener = new RawTCPListener(lh, l_vat, new IPv4Address(5555)); + l_vat.run(); + + AsyncSocketConduit clcond = new AsyncSocketConduit; + Dispatcher ch = new Dispatcher(clcond); + Vat c_vat = new Vat(); + RawTCPClient client = new RawTCPClient(ch, c_vat); + c_vat.run(); //run, vat, run! + + client.connect(new IPv4Address("localhost", 5555)); + //Thread.sleep(1); + client.send(testbuffer); + return 0; +} + +char testbuffer[] = + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000"; +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/test/async/test.d Wed Aug 27 00:47:33 2008 -0400 @@ -0,0 +1,30 @@ +module test; + +import tango.net.Socket; +import tango.core.Thread; +import tango.io.Stdout; +import dreactor.core.Vat; +import dreactor.core.Dispatcher; +import dreactor.protocol.RawTcp; +import dreactor.transport.AsyncSocketConduit; + +int main() +{ + AsyncSocketConduit cond = new AsyncSocketConduit; + Dispatcher lh = new Dispatcher(cond, true); + Vat l_vat = new Vat(); + RawTCPListener listener = new RawTCPListener(lh, l_vat, new IPv4Address(5555)); + l_vat.run(); + + AsyncSocketConduit clcond = new AsyncSocketConduit; + Dispatcher ch = new Dispatcher(clcond); + Vat c_vat = new Vat(); + RawTCPClient client = new RawTCPClient(ch, c_vat); + c_vat.run(); + + client.connect(new IPv4Address("localhost", 5555)); + //Thread.sleep(1); + client.send("This is a test"); + return 0; +} +
--- a/test/chatclient.d Tue Aug 12 16:59:56 2008 -0400 +++ b/test/chatclient.d Wed Aug 27 00:47:33 2008 -0400 @@ -1,39 +1,72 @@ + module chatclient; -import tango.net.Socket; -import tango.core.Thread; -import tango.io.Stdout; -import tango.io.Console; -import tango.util.log.Log; +import dreactor.core.Vat; +import dreactor.core.Task; +import dreactor.protocol.TcpProvider; -import dreactor.core.Vat; -import dreactor.core.Dispatcher; +enum {EMITTER_CHAT_RECEIVE = 42}; + +class ChatTask : Task +{ -import dreactor.protocol.RawTcp; -import dreactor.transport.AsyncSocketConduit; +private + TcpProvider client; -int main() -{ - Vat c_vat = new Vat(); - RawTCPClient client = new RawTCPClient(c_vat); - Log.root.level(log.Level.Warn, true); +public + this(TcpProvider tcpclient) + { + super(tcpclient); + } + + void run() + { + Message msg; - client.setDataHandler( (char[] inbuf, Dispatcher d) { - Stdout(inbuf)(); - }); - - c_vat.run(); - client.connect(new IPv4Address("localhost", 5555)); - - while(true) - { - char buf[] = Cin.copyln(true); - if (buf == "quit\n") - break; - client.send(buf); + auto em = new Emitter(this, + { + char buf[] = Cin.copyln(true); + return new Message(buf, EMITTER_STDIN_RECEIVE, buf.size); + }); + + while (msg = receive()) + { + switch(msg.type) + { + case EMITTER_CHAT_RECEIVE: + char[] inbuf = msg.payload; + if (inbuf == "quit") + { + em.stopNow(); + return; + } + client.send(msg.payload); + break; + + case TcpProvider.RECEIVE: + Stdout(cast(char[]) msg.payload); + break; + + case TcpProvider.SEND_COMPLETE: + break; + + case TcpProvider.REMOTE_CLOSED: + Stdout("--- Remote host closed connection \n"); + break; + + default: + Stdout("Unknown message received\n"); + } + } + em.stopNow(); } - c_vat.exit(); - delete client; - return 0; } + +int main(int argc, char[][] argv) +{ + auto vat = new Vat; + auto client = new TcpProvider(new IPv4Address("localhost", 5555), vat); + auto tsk = new ChatTask(client); + vat.addTask(task); +}
--- a/test/chatserver.d Tue Aug 12 16:59:56 2008 -0400 +++ b/test/chatserver.d Wed Aug 27 00:47:33 2008 -0400 @@ -1,29 +1,66 @@ + module chatserver; -import tango.net.Socket; -import tango.core.Thread; -import tango.io.Stdout; -import tango.util.log.Log; -import dreactor.core.Vat; -import dreactor.core.Dispatcher; +import dreactor.core.Vat; +import dreactor.core.Task; +import dreactor.protocol.TcpProvider; -import dreactor.protocol.RawTcp; -import dreactor.transport.AsyncSocketConduit; + +class ChatConnectionTask : Task +{ +public -int main() -{ - Vat l_vat = new Vat(); - Logger log = Log.lookup("dreactor.chatserver"); - Log.root.level(log.Level.Info, true); - RawTCPListener listener = new RawTCPListener(l_vat, new IPv4Address(5555)); + void run() + { + Message msg; + while (msg = receive()) + { + switch(msg.type) + { + case TCP_PROVIDER_RECEIVE: + //Stdout(cast(char[]) msg.payload); + break; + case TCP_PROVIDER_SEND_COMPLETE: + break; + case TCP_PROVIDER_REMOTE_CLOSED: + Stdout("--- Remote host closed connection \n"); + break; + default: + Stdout("Unknown message received\n"); + } + } + em.stopNow(); + } - listener.setDataHandler( (char[] inbuf, Dispatcher d) { - - listener.broadcast(inbuf, [d]); + void send(char[] buf) + { + tcp.send(buf); + } - }); - l_vat.run(); - - return 0; + static CircularList!(ChatConnectionTask!( } + +int main(int argc, char[][] argv) +{ + auto vat = new Vat; + + void listentask(Message msg) + { + switch(msg.type) + { + case TCP_PROVIDER_CONNECT: + AsyncSocketConduit cond = cast(AsyncSocketConduit) msg.payload; + auto tsk = ChatConnectionTask(new TcpProvider(cond)); + vat.addTask(tsk); + break; + default: + Stdout("Unknown message received\n"); + } + } + + + auto provider = new TcpProvider(new IPv4Address("localhost", 5555), vat); + auto srvtsk = new Task(&listentask, provider); + vat.addTask(task, client); +}