# HG changeset patch # User rick@minifunk # Date 1221949991 14400 # Node ID 8c9b1276f623b703d2bd8e3d419a36306303583c # Parent d6a3cfe7c3de4ed4bb28f8c24a644fb19993e2df bug fixes diff -r d6a3cfe7c3de -r 8c9b1276f623 dreactor/core/Dispatcher.d --- a/dreactor/core/Dispatcher.d Wed Aug 27 00:47:33 2008 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,136 +0,0 @@ -module dreactor.protocol.Dispatcher; - -import tango.io.selector.model.ISelector; -import tango.util.collection.CircularSeq; -import tango.net.Socket; -public import tango.core.Exception; -import dreactor.transport.AsyncSocketConduit; - -import tango.util.log.Log; -import tango.util.log.Config; - -class Dispatcher -{ -public - this (Conduit trans) - { - cond = trans; - ibuf_len = 0; - o_offset = 0; - out_buffers = new CircularSeq!(char[]); - log = Log.lookup("dreactor.core.Dispatcher"); - } - - /************************************************************************** - - onSend -- Send method - Called by the vat in response to a FD writeable event. - Sends data, returns amount sent. Unregisters Handler for sending - if there is no more data left to send. - - ***************************************************************************/ - public int onSend() - { - Logger log = Log.lookup("Handlers.onSend"); - - char[] outbuf = nextBuffer(); - if (outbuf !is null) - { - int sent = cond.write(outbuf); - if (sent > 0) - { - if (! addOffset(sent)) - { - return UNREGISTER; - } - } - else if (sent == 0) - { - log.error("Select said socket was writable, but sent 0 bytes"); - } - else - { - log.error("Socket send return ERR {}", sent); - } - return REMAIN; - } - else - { - return UNREGISTER; - } - } - - /************************************************************************** - - appendOutBuffer - - Adds an outgoing buffer to the list. This returns true if the list - was empty, indicating that the handler should be registered with the - SelectLoop. If it returns false, it was probably already registered. - - **************************************************************************/ - bool appendOutBuffer(char[] outbuf) - { - out_buffers.append(outbuf); - out_buffers_len++; - if (out_buffers_len == 1) - return true; - else - return false; - } - - /************************************************************************** - - addOffset - Use this function to update the offset position after a successful data - send. This not only manages the current offset, but will update the - out buffer chain if necessary. - - Returns: false if there is nothing left to send, true if there is. - - **************************************************************************/ - bool addOffset(int off) - in - { - assert(out_buffers_len > 0); - } - body - { - char[] hd = out_buffers.head(); - if ((off + o_offset) >= hd.length) - { - out_buffers.removeHead(); - o_offset = 0; - out_buffers_len--; - return (out_buffers_len > 0); - } - else - o_offset += off; - return true; - } - - /************************************************************************** - - char[] nextBuffer - - Returns a slice of the current outbound buffer, returns a char[] pointing - to null if there is no current outbound buffer - - **************************************************************************/ - synchronized char[] nextBuffer() - { - if (out_buffers_len < 1) - { - return null; - } - - return out_buffers.head()[o_offset .. $]; - } - - Conduit cond; - CircularSeq!(char[]) out_buffers; - int out_buffers_len; - int ibuf_len; - int o_offset; - Logger log; -} diff -r d6a3cfe7c3de -r 8c9b1276f623 dreactor/core/Task.d --- a/dreactor/core/Task.d Wed Aug 27 00:47:33 2008 -0400 +++ b/dreactor/core/Task.d Sat Sep 20 18:33:11 2008 -0400 @@ -3,8 +3,11 @@ import tango.core.Thread; import tango.util.container.HashMap; import tango.util.container.CircularList; +import tango.core.Atomic; + import dreactor.core.Vat; import dreactor.protocol.IProvider; +import dreactor.protocol.DefaultProvider; alias CircularList!(Message) Messages; @@ -19,17 +22,20 @@ Messages m; if (box.get(type, m)) { - Message msg = m.removeHead(); - if (msg) + if (!m.isEmpty) + { + Message msg = m.removeHead(); msg_count.store(msg_count.load()-1); - if (m.isEmpty()) + if (m.isEmpty()) + box.removeKey(type); + return msg; + } + else box.removeKey(type); - - return msg; } - else - return null; + Message msg; + return msg; } //TODO this could be optimized to use set intersection logic instead of checking for @@ -39,10 +45,11 @@ foreach(int i; types) { Message msg = popMessageOfType(i); - if (msg) + if (msg.valid) return msg; } - return null; + Message msg; + return msg; } Message popMessage() @@ -51,14 +58,14 @@ int key; auto itor = box.iterator; - do + while (true) { if (itor.valid && itor.next(key, m)) { if (!m.isEmpty()) { Message msg = m.removeHead(); - if (msg) + if (msg.valid) msg_count.store(msg_count.load()-1); if (m.isEmpty()) box.removeKey(key); @@ -66,13 +73,15 @@ } else { - iterator.remove(); + itor.remove(); } } else - return null; + { + Message msg; + return msg; + } } - while (true) } void push(Message msg) @@ -107,18 +116,18 @@ Mailbox lockedMailbox; int id; Vat vat; - TaskDG taskdg; + TaskDg taskdg; IProvider provider; public - this(TaskDg tdg = null, IProvider provider = null) + this(IProvider prov = null) { - fiber = new Fiber(&run); + fiber = new Fiber(&run, 4096 * 4); mailbox = new Mailbox; lockedMailbox = new Mailbox; - taskdg = tdg; - if (!provider) - provider = new DefaultProvider; + provider = prov ? prov : new DefaultProvider; + + Vat.LocalVat.addTask(this); } void setId(int i) @@ -153,7 +162,8 @@ } body { - while (msg = receive()) + Message msg; + while ((msg = receive()).valid) { taskdg(msg); } @@ -170,12 +180,12 @@ bool sendTo(int taskid, Message m) { Task t; - if (t = vat.getTask(taskid)) + if ((t = vat.getTask(taskid)) !is null) { t.appendMessage(m); return true; } - else if (t = Vat.getGlobalTask(taskid)) + else if ((t = Vat.getGlobalTask(taskid)) !is null) { t.appendIVMessage(m); return true; @@ -183,6 +193,19 @@ return false; } + char[] getString(Message msg) + { + return (cast(char*) msg.payload)[0 .. msg.info]; + } + + Fiber.State state() + { + return fiber.state(); + } + void call() + { + fiber.call(); + } protected /*************************************************************************** @@ -197,14 +220,12 @@ while(true) { Message m = mailbox.popMessageOfType(types); - if (!m) + if (!m.valid) Fiber.yield(); - else if (SYSTEM_QUIT == m.type) - break; - else return m; + else + return m; } - return null; } Message receive() @@ -212,13 +233,11 @@ while(true) { Message m = mailbox.popMessage(); - if (!m) + if (!m.valid) Fiber.yield(); - else if (SYSTEM_QUIT == m.type) - break; - else return m; + else + return m; } - return null; } int getId() { return id;} diff -r d6a3cfe7c3de -r 8c9b1276f623 dreactor/core/Vat.d --- a/dreactor/core/Vat.d Wed Aug 27 00:47:33 2008 -0400 +++ b/dreactor/core/Vat.d Sat Sep 20 18:33:11 2008 -0400 @@ -17,18 +17,17 @@ import tango.core.Exception; import tango.core.Thread; import tango.core.Atomic; -import tango.util.collection.CircularSeq; import tango.util.log.Log; import dreactor.transport.AsyncSocketConduit; import dreactor.protocol.IProvider; +import dreactor.protocol.DefaultProvider; import dreactor.core.Task; import dreactor.util.ThreadSafeQueue; static char[] version_string = "Vat.d 0.1 2008-05-31"; - -enum : int {CLOSE = -2, UNREGISTER = -1, REMAIN = 0, REGISTER = 1, REREGISTER = 2}; +alias bool delegate (Event) RegDg; class TaskAttachment { @@ -42,6 +41,8 @@ class Vat { + static Vat LocalVat; + private Thread thread; bool running; @@ -51,7 +52,7 @@ Selector selector; static Atomic!(int) taskCount; - TaskAttachment[int] globalTasks; //global registry of tasks + static TaskAttachment[int] globalTasks; //global registry of tasks public @@ -63,17 +64,28 @@ thread = new Thread(&eventLoop); thread.start(); } - - synchronized int addTask(Task t, IProvider p = null) + + static this() + { + LocalVat = new Vat; + } + + int addTask(Task t) { t.setVat(this); - ++taskCount; - auto ta = new TaskAttachment(t, (p !is null) ? p : new DefaultProvider); - tasks[taskCount] = ta; - globalTask[taskCount] = ta; + int taskid = taskCount.load() + 1; + taskCount.store(taskid); + auto p = t.getProvider(); + if (p is null) + p = new DefaultProvider; + p.setRegisterFunc(createRegFunc(taskid)); //default the task id as a param in the delegate + auto ta = new TaskAttachment(t, p); + tasks[taskid] = ta; + globalTasks[taskid] = ta; selector.register(p.getConduit(), p.getEvents(), ta); - t.setId(taskCount); - return taskCount; + t.setId(taskid); + + return taskid; } void exit() @@ -86,34 +98,58 @@ thread.join(); } - bool addConnection(int tid, Conduit c, Events evts) + bool register(int tid, Event evts) { log.trace("adding handler"); - TaskAttachment ta; - if (ta = (tid in tasks)) - return selector.register(c, evts, ta); + TaskAttachment* ta; + if ((ta = (tid in tasks)) !is null) + { + selector.register((*ta).provider.getConduit(), evts, *ta); + return true; + } else + { return false; + } } - + + RegDg createRegFunc(int taskid) + { + class Functor + { + int taskid; + this (int tid) + { + taskid = tid; + } + bool call(Event evts) + { + return register(taskid, evts); + } + } + auto ftor = new Functor(taskid); + return &ftor.call; + } + bool remConnection(Conduit c) { - return selector.unregister(c); + selector.unregister(c); + return true; } Task getTask(int tid) { - TaskAttachment ta; - if (ta = (tid in tasks)) + TaskAttachment* ta; + if ((ta = (tid in tasks)) !is null) return ta.task; else return null; } - static synchronized Task getGlobalTask(int tid) + static Task getGlobalTask(int tid) { - TaskAttachment ta; - if (ta = (tid in globaltasks)) + TaskAttachment* ta; + if ((ta = (tid in globalTasks)) !is null) return ta.task; else return null; @@ -139,30 +175,27 @@ // incoming data log.trace("Read event fired"); auto ta = cast(TaskAttachment) key.attachment; - processReturn(ta.appendMessage(ta.provider.handleRead()), key.conduit); + ta.task.appendMessage(ta.provider.handleRead()); } else if (key.isWritable()) { log.trace("Write event fired"); auto ta = cast(TaskAttachment) key.attachment; - ta.appendMessage(ta.provider.handleWrite()); - processReturn(ta.appendMessage(ta.provider.handleWrite()), key.conduit); + ta.task.appendMessage(ta.provider.handleWrite()); } else if (key.isHangup()) { log.trace("Hangup event fired"); auto ta = cast(TaskAttachment) key.attachment; - ta.appendMessage(ta.provider.handleDisconnect()); - processReturn(ta.appendMessage(ta.provider.handleDisconnect()), key.conduit); + ta.task.appendMessage(ta.provider.handleDisconnect()); } else if (key.isError() || key.isInvalidHandle()) { log.trace("Error event fired"); // error, close connection auto ta = cast(TaskAttachment) key.attachment; - ta.appendMessage(ta.provider.handleError()); - processReturn(ta.appendMessage(ta.provider.handleError()), key.conduit); + ta.task.appendMessage(ta.provider.handleError()); } } } @@ -183,33 +216,11 @@ { foreach(int k; tasks.keys) { - if (tasks[k].state() == Fiber.State.HOLD) - tasks[k].call(); - if (tasks[k].state() == Fiber.State.TERM) + if (tasks[k].task.state() == Fiber.State.HOLD) + tasks[k].task.call(); + if (tasks[k].task.state() == Fiber.State.TERM) tasks.remove(k); } } - void processReturn(int result, Conduit c) - { - switch(result) - { - case CLOSE: - selector.unregister(c); - c.detach(); - break; - case UNREGISTER: - selector.unregister(c); - break; - case REMAIN: - //this space intentially left blank - break; - case REGISTER: - break; - case REREGISTER: - break; - default: - log.error("processReturn: unknown return value"); - } - } } diff -r d6a3cfe7c3de -r 8c9b1276f623 dreactor/protocol/DefaultProvider.d --- a/dreactor/protocol/DefaultProvider.d Wed Aug 27 00:47:33 2008 -0400 +++ b/dreactor/protocol/DefaultProvider.d Sat Sep 20 18:33:11 2008 -0400 @@ -1,6 +1,7 @@ module dreactor.protocol.DefaultProvider; -import tango.io.Selector; +import tango.io.selector.model.ISelector; +import tango.io.device.Conduit; import dreactor.protocol.IProvider; @@ -10,27 +11,41 @@ { private Conduit cond; - Events evts; - + Event evts; + bool delegate (Event) regFn; public - Message handleRead(Conduit c) - { + + enum { + Read = 1000, + Write, + Error, + Connect, + Disconnect } - Message handleWrite(Conduit c) + Message handleRead() { + return Message(cast(void*)cond, Read, 0); } - Message handleError(Conduit c) + Message handleWrite() { + return Message(cast(void*)cond, Write, 0); } - Message handleConnect(Conduit c) + Message handleError() { + return Message(cast(void*)cond, Error, 0); } - Message handleDisconnect(Conduit c) + Message handleConnect() { + return Message(cast(void*)cond, Connect, 0); + } + + Message handleDisconnect() + { + return Message(cast(void*)cond, Disconnect, 0); } @@ -39,13 +54,22 @@ return cond; } - int getEvents() + void send(char[] buf) + { + } + + Event getEvents() { return evts; } void setEvents(Event e) { - evts e; + evts = e; } + + void setRegisterFunc( bool delegate (Event) fn) + { + regFn = fn; + } } diff -r d6a3cfe7c3de -r 8c9b1276f623 dreactor/protocol/Emitter.d --- a/dreactor/protocol/Emitter.d Wed Aug 27 00:47:33 2008 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,48 +0,0 @@ -module Emitter - - - - -import tango.core.Thread; - -import dreactor.core.Task; - -alias Message delegate(void) EmitterDg; - -class Emitter -{ -public - this(Task t, EmitterDg cb) - { - task = t; - callback = cb; - thread = new Thread(&run); - thread.start(); - } - - void stop() - { - running = false; - } - - void stopNow() - { - thread.isDaemon(true); - running = false; - } -private - - void run() - { - while(running) - { - Message msg = callback(); - task.appendIVMessage(msg); - } - } - Task task; - Thread thread; - bool running; - EmitterCb callback; -} - diff -r d6a3cfe7c3de -r 8c9b1276f623 dreactor/protocol/IProvider.d --- a/dreactor/protocol/IProvider.d Wed Aug 27 00:47:33 2008 -0400 +++ b/dreactor/protocol/IProvider.d Sat Sep 20 18:33:11 2008 -0400 @@ -1,29 +1,40 @@ module dreactor.protocol.IProvider; -class Message +import tango.io.selector.model.ISelector; +import tango.io.device.Conduit; + +struct Message { public + int type; int info; - Object payload; - this (Object buf, int t, int e) + void* payload; + int from; + bool valid; + + static Message opCall(void* buf, int t, int e, int f = 0) { - type = t; - info = e; - payload = buf; + Message m; + m.type = t; + m.info = e; + m.from = f; + m.payload = buf; + m.valid = true; + return m; } } interface IProvider { - Message handleRead(Conduit c); - Message handleWrite(Conduit c); - Message handleError(Conduit c); - Message handleConnect(Conduit c); - Message handleDisconnect(Conduit c); - abstract void send(char []); - + Message handleRead(); + Message handleWrite(); + Message handleError(); + Message handleConnect(); + Message handleDisconnect(); + void send(char []); + void setRegisterFunc(bool delegate (Event)); Conduit getConduit(); - int getEvents(); - void setEvents(); + Event getEvents(); + void setEvents(Event e); } diff -r d6a3cfe7c3de -r 8c9b1276f623 dreactor/protocol/TcpProvider.d --- a/dreactor/protocol/TcpProvider.d Wed Aug 27 00:47:33 2008 -0400 +++ b/dreactor/protocol/TcpProvider.d Sat Sep 20 18:33:11 2008 -0400 @@ -1,34 +1,31 @@ -module dreactor.protocol.RawTcp; +module dreactor.protocol.TcpProvider; import tango.io.device.Conduit; import tango.io.selector.model.ISelector; import tango.net.Socket; -import tango.util.collection.CircularSeq; +import tango.util.container.CircularList; import tango.util.log.Log; import tango.util.log.Config; import dreactor.transport.AsyncSocketConduit; import dreactor.core.Vat; -import dreactor.core.Dispatcher; - +public import dreactor.protocol.IProvider; /****************************************************************************** Basic TCP server or client routines for sending raw data. ******************************************************************************/ -class TCPProvider : IProvider +class TcpProvider : IProvider { public - - enum - { - RECEIVE = 1000, - SEND_COMPLETE, - NEW_CONNECTION, - REMOTE_CLOSED, - SEND_ERROR, - RECEIVE_ERROR, - ERROR + enum { + SendComplete = 2000, + NewConnection, + Receive, + RemoteClosed, + SendError, + ReceiveError, + Error } this(AsyncSocketConduit c) @@ -36,41 +33,53 @@ log = Log.lookup("dreactor.protocol.RawTcpServer"); log.info("log initialized"); cond = c; + events = Event.Read; } - this(Vat v, IPv4Address addr) + this(IPv4Address addr, bool listen = false) { - AsyncSocketConduit cond = new AsyncSocketConduit; - cond.socket().setAddressReuse(true); - this(cond); + AsyncSocketConduit c = new AsyncSocketConduit; + c.socket().setAddressReuse(true); + if (listen) + { + c.bind(addr); + c.socket().listen(1000); + listener = listen; + } + else + c.connect(addr); + this(c); } + ~this() { close(); } - Message handleRead(Conduit c) + Message handleRead() { Logger log = Log.lookup("Handlers.onReceive"); + if (listener) + return handleConnect(); + char inbuf[8192]; int amt; - if((amt = h.transport.read(inbuf)) > 0) + if((amt = cond.read(inbuf)) > 0) { - return new Message(inbuf[0 .. amt].dup, RECEIVE, amt); + return Message(inbuf[0 .. amt].dup.ptr, Receive, amt); } else { if (amt == 0) { - children.remove(h); - (cast(AsyncSocketConduit) h.transport).shutdown(); - return Message(null, REMOTE_CLOSED, amt); + cond.shutdown(); + return Message(null, RemoteClosed, amt); } log.error("Received no data, err = {}", amt); } - return new Message(null, ERROR, amt); + return Message(null, Error, amt); } /************************************************************************** @@ -81,7 +90,7 @@ if there is no more data left to send. ***************************************************************************/ - Message handleWrite(Conduit c) + Message handleWrite() { Logger log = Log.lookup("Handlers.onSend"); @@ -95,42 +104,45 @@ { //h.remEvent(Event.Write); //TODO - How do we handle event re-registering - return new Message(null, SEND_COMPLETE, sent); + return Message(null, SendComplete, sent); } } else if (sent == 0) { log.error("Select said socket was writable, but sent 0 bytes"); - return new Message(null, SEND_ERROR, 0); + return Message(null, Error, 0); } else { log.error("Socket send return ERR"); - return new Message(null, SEND_ERROR, sent); + return Message(null, Error, sent); } } else { - //h.remEvent(Event.Write); - //TODO - How do we handle event re-registering - - return new Message(null, SEND_COMPLETE, 0); + remEvent(Event.Write); + if (!regFn(events)) + { + log.error("unable to register mgr"); + } + return Message(null, SendComplete, 0); } } - Message handleDisconnect(Conduit c) + Message handleDisconnect() { - return new Message(c, REMOTE_CLOSED, 0); + return Message(cast(void*)cond, RemoteClosed, 0); } - Message handleError(Conduit c) + Message handleError() { - return new Messsage(null, ERROR, 0); + return Message(cast(void*)cond, Error, 0); } - Message handleConnect(Conduit c) + Message handleConnect() { - return new Message(accept(), NEW_CONNECTION, 0); + log.trace("accepting new connection"); + return Message(cast(void*)accept(), NewConnection, 0); } Conduit getConduit() @@ -138,7 +150,7 @@ return cond; } - int getEvents() + Event getEvents() { return events; } @@ -152,22 +164,9 @@ { AsyncSocketConduit newcond = new AsyncSocketConduit; cond.socket().accept(newcond.socket); - log.info("accepted new connection"); + log.info("accepted new connection {} {}", cast(uint) newcond, newcond.fileHandle()); return newcond; } - - int broadcast(char[] outbuf, TCPProvider[] recips) - { - foreach(TCPProvider c; recips) - { - if (c.appendOutBuffer(outbuf)) - { - h.addEvent(Event.Write); - vat.addConnection(h); - } - } - return 0; - } /************************************************************************** @@ -177,18 +176,16 @@ data as the socket becomes free. **************************************************************************/ - int send(char[] outbufl) + void send(char[] outbuf) { if (appendOutBuffer(outbuf)) { - //TODO - should we always register for all events? or update it when needed? - //d.addEvent(Event.Write); - if (!vat.addConnection(d)) + addEvent(Event.Write); + if (!regFn(events)) { log.error("unable to register mgr"); } } - return 0; } @@ -198,13 +195,6 @@ cond.detach(); } - - ~this() - { - (cast(AsyncSocketConduit)manager.transport).shutdown(); - (cast(AsyncSocketConduit)manager.transport).detach(); - } - int connect(IPv4Address addr) { cond = new AsyncSocketConduit; @@ -218,35 +208,94 @@ /************************************************************************** - send - User-called function to send data to the counterpart at the other - end of the connection. This sets up the connection manager to send - data as the socket becomes free. + appendOutBuffer + + Adds an outgoing buffer to the list. This returns true if the list + was empty, indicating that the handler should be registered with the + SelectLoop. If it returns false, it was probably already registered. + + **************************************************************************/ + bool appendOutBuffer(char[] outbuf) + { + out_buffers.append(outbuf); + out_buffers_len++; + if (out_buffers_len == 1) + return true; + else + return false; + } + + /************************************************************************** + + addOffset + Use this function to update the offset position after a successful data + send. This not only manages the current offset, but will update the + out buffer chain if necessary. + + Returns: false if there is nothing left to send, true if there is. + + **************************************************************************/ + bool addOffset(int off) + in + { + assert(out_buffers_len > 0); + } + body + { + char[] hd = out_buffers.head(); + if ((off + o_offset) >= hd.length) + { + out_buffers.removeHead(); + o_offset = 0; + out_buffers_len--; + return (out_buffers_len > 0); + } + else + o_offset += off; + return true; + } + + /************************************************************************** + + char[] nextBuffer + + Returns a slice of the current outbound buffer, returns a char[] pointing + to null if there is no current outbound buffer **************************************************************************/ - int send(char[] outbuf, IPv4Address addr = null) + char[] nextBuffer() { - if (!connected) + if (out_buffers_len < 1) { - log.info("send: not connected, connecting"); - return -1; + return null; } - if (appendOutBuffer(outbuf)) - { - addEvent(Event.Write); - if (!vat.addConnection(manager)) - { - log.error("unable to register mgr"); - } - } - return 0; + + return out_buffers.head()[o_offset .. $]; } - + void setRegisterFunc( bool delegate (Event) fn) + { + regFn = fn; + } + + void addEvent(Event evt) + { + events |= evt; + } + + void remEvent(Event evt) + { + events &= !evt; + } + private - Vat vat; - Conduit cond; + AsyncSocketConduit cond; Logger log; bool listener; Event events; + bool connected; + CircularList!(char[]) out_buffers; + int out_buffers_len; + int o_offset; + bool delegate (Event) regFn; } diff -r d6a3cfe7c3de -r 8c9b1276f623 dreactor/transport/AsyncSocketConduit.d --- a/dreactor/transport/AsyncSocketConduit.d Wed Aug 27 00:47:33 2008 -0400 +++ b/dreactor/transport/AsyncSocketConduit.d Sat Sep 20 18:33:11 2008 -0400 @@ -102,7 +102,7 @@ ***********************************************************************/ - override Handle fileHandle () + Handle fileHandle () { return cast(Handle) socket_.fileHandle; } diff -r d6a3cfe7c3de -r 8c9b1276f623 dreactor/util/Emitter.d --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/dreactor/util/Emitter.d Sat Sep 20 18:33:11 2008 -0400 @@ -0,0 +1,49 @@ +module dreactor.util.Emitter; + + + + +import tango.core.Thread; + +import dreactor.core.Task; +import dreactor.protocol.IProvider; + +alias Message delegate() EmitterDg; + +class Emitter +{ +public + this(Task t, EmitterDg cb) + { + task = t; + callback = cb; + thread = new Thread(&run); + thread.start(); + } + + void stop() + { + running = false; + } + + void stopNow() + { + thread.isDaemon(true); + running = false; + } +private + + void run() + { + while(running) + { + Message msg = callback(); + task.appendIVMessage(msg); + } + } + Task task; + Thread thread; + bool running; + EmitterDg callback; +} + diff -r d6a3cfe7c3de -r 8c9b1276f623 dsss.conf --- a/dsss.conf Wed Aug 27 00:47:33 2008 -0400 +++ b/dsss.conf Sat Sep 20 18:33:11 2008 -0400 @@ -2,3 +2,4 @@ #[test/longtest.d] #[test/chatserver.d] [test/chatclient.d] +[test/chatserver.d] diff -r d6a3cfe7c3de -r 8c9b1276f623 dsss.last --- a/dsss.last Wed Aug 27 00:47:33 2008 -0400 +++ b/dsss.last Sat Sep 20 18:33:11 2008 -0400 @@ -2,3 +2,4 @@ #[test/longtest.d] #[test/chatserver.d] [test/chatclient.d] +[test/chatserver.d] diff -r d6a3cfe7c3de -r 8c9b1276f623 test/chatclient Binary file test/chatclient has changed diff -r d6a3cfe7c3de -r 8c9b1276f623 test/chatclient.d --- a/test/chatclient.d Wed Aug 27 00:47:33 2008 -0400 +++ b/test/chatclient.d Sat Sep 20 18:33:11 2008 -0400 @@ -1,18 +1,26 @@ module chatclient; +import tango.io.Stdout; +import tango.io.Console; +import tango.net.Socket; +import tango.util.log.Log; + import dreactor.core.Vat; import dreactor.core.Task; import dreactor.protocol.TcpProvider; +import dreactor.util.Emitter; -enum {EMITTER_CHAT_RECEIVE = 42}; +enum { StdinReceive = 42 } + +Logger log; class ChatTask : Task { private TcpProvider client; - + bool running; public this(TcpProvider tcpclient) { @@ -22,40 +30,33 @@ void run() { Message msg; - + running = true; auto em = new Emitter(this, { char buf[] = Cin.copyln(true); - return new Message(buf, EMITTER_STDIN_RECEIVE, buf.size); + return Message(cast(void*)buf.ptr, StdinReceive, buf.length); }); - while (msg = receive()) + while (running) { - switch(msg.type) + msg = receive(); + switch (msg.type) { - case EMITTER_CHAT_RECEIVE: - char[] inbuf = msg.payload; + case StdinReceive: + { + char[] inbuf = getString(msg); if (inbuf == "quit") { - em.stopNow(); - return; + running = false; } - client.send(msg.payload); - break; - - case TcpProvider.RECEIVE: - Stdout(cast(char[]) msg.payload); - break; - - case TcpProvider.SEND_COMPLETE: - break; - - case TcpProvider.REMOTE_CLOSED: - Stdout("--- Remote host closed connection \n"); - break; - + client.send(inbuf); + } + case TcpProvider.Receive: + { + Stdout(getString(msg)); + } default: - Stdout("Unknown message received\n"); + Stdout("unknown msg received: {}", msg.type); } } em.stopNow(); @@ -63,10 +64,10 @@ } -int main(int argc, char[][] argv) +int main(char[][] args) { - auto vat = new Vat; - auto client = new TcpProvider(new IPv4Address("localhost", 5555), vat); - auto tsk = new ChatTask(client); - vat.addTask(task); + log = Log.lookup("dreactor.chatserver"); + auto provider = new TcpProvider(new IPv4Address("localhost", 5555)); + auto tsk = new ChatTask(provider); + return 0; } diff -r d6a3cfe7c3de -r 8c9b1276f623 test/chatserver Binary file test/chatserver has changed diff -r d6a3cfe7c3de -r 8c9b1276f623 test/chatserver.d --- a/test/chatserver.d Wed Aug 27 00:47:33 2008 -0400 +++ b/test/chatserver.d Sat Sep 20 18:33:11 2008 -0400 @@ -1,66 +1,102 @@ module chatserver; +import tango.io.Stdout; +import tango.io.Console; +import tango.util.container.CircularList; +import tango.util.log.Log; +import tango.net.Socket; + import dreactor.core.Vat; import dreactor.core.Task; import dreactor.protocol.TcpProvider; +import dreactor.transport.AsyncSocketConduit; +typedef Message ChildTCPRequest; +Logger log; class ChatConnectionTask : Task { public + this(TcpProvider tcpclient) + { + super(tcpclient); + } + enum { + StdIn = 100, + RemoteClosed + } + + void run() + { + running = true; + Message msg; + while (running) + { + msg = receive(); + switch(msg.type) + { + case TcpProvider.Receive: + Stdout(cast(char*) msg.payload); + break; + case TcpProvider.SendComplete: + break; + case TcpProvider.RemoteClosed: + log.trace("--- Remote host closed connection \n"); + break; + default: + log.trace("Unknown message received\n"); + } + } + } + +private + bool running; +} + +class ListenerTask : Task +{ + this(TcpProvider tcpclient) + { + super(tcpclient); + } void run() { Message msg; - while (msg = receive()) + running = true; + while (running) { + msg = receive(); + auto children = new CircularList!(ChatConnectionTask); switch(msg.type) { - case TCP_PROVIDER_RECEIVE: - //Stdout(cast(char[]) msg.payload); + case TcpProvider.NewConnection: + AsyncSocketConduit cond = cast(AsyncSocketConduit) msg.payload; + log.trace("new conduit : {}", cast(uint) cond); + auto provider = new TcpProvider(cond); + auto tsk = new ChatConnectionTask(new TcpProvider(cond)); + children.append(tsk); + log.trace("accepted connection"); break; - case TCP_PROVIDER_SEND_COMPLETE: + case ChatConnectionTask.StdIn: + char[] inbuf = (cast(char*) msg.payload)[0 .. msg.info]; break; - case TCP_PROVIDER_REMOTE_CLOSED: - Stdout("--- Remote host closed connection \n"); + case ChatConnectionTask.RemoteClosed: break; default: - Stdout("Unknown message received\n"); + log.trace("Unknown message received"); } } - em.stopNow(); } - - void send(char[] buf) - { - tcp.send(buf); - } - - static CircularList!(ChatConnectionTask!( +private + bool running; } - -int main(int argc, char[][] argv) +int main(char[][] args) { - auto vat = new Vat; - - void listentask(Message msg) - { - switch(msg.type) - { - case TCP_PROVIDER_CONNECT: - AsyncSocketConduit cond = cast(AsyncSocketConduit) msg.payload; - auto tsk = ChatConnectionTask(new TcpProvider(cond)); - vat.addTask(tsk); - break; - default: - Stdout("Unknown message received\n"); - } - } - - - auto provider = new TcpProvider(new IPv4Address("localhost", 5555), vat); - auto srvtsk = new Task(&listentask, provider); - vat.addTask(task, client); + log = Log.lookup("dreactor.chatserver"); + auto provider = new TcpProvider(new IPv4Address("localhost", 5555), true); + auto srvtsk = new ListenerTask(provider); + return 0; } diff -r d6a3cfe7c3de -r 8c9b1276f623 test/testtuple.d --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/test/testtuple.d Sat Sep 20 18:33:11 2008 -0400 @@ -0,0 +1,3 @@ + + +