Mercurial > projects > dreactor
diff dreactor/core/Vat.d @ 12:d6a3cfe7c3de
more stuff
author | rick@Macintosh.local |
---|---|
date | Wed, 27 Aug 2008 00:47:33 -0400 |
parents | 5836613d16ac |
children | 8c9b1276f623 |
line wrap: on
line diff
--- a/dreactor/core/Vat.d Tue Aug 12 16:59:56 2008 -0400 +++ b/dreactor/core/Vat.d Wed Aug 27 00:47:33 2008 -0400 @@ -21,31 +21,23 @@ import tango.util.log.Log; import dreactor.transport.AsyncSocketConduit; +import dreactor.protocol.IProvider; import dreactor.core.Task; import dreactor.util.ThreadSafeQueue; static char[] version_string = "Vat.d 0.1 2008-05-31"; -Logger log; enum : int {CLOSE = -2, UNREGISTER = -1, REMAIN = 0, REGISTER = 1, REREGISTER = 2}; -alias Message delegate (Conduit c) HandlerDG; -alias Message function (Conduit c) HandlerFN; class TaskAttachment { public Task task; - HandlerDG dg; - HandlerFN fn; + IProvider provider; - this(Task ta, HandlerDG d) - { TaskAttachment t; t.task = ta; t.dg = d; return t; } - - this(Task ta, HandlerFN f) - { TaskAttachment t; t.task = ta; t.fn = f; return t; } - - public Message opCall(Conduit c) { dg is null ? return fn() : return dg(c); } + this (Task ta, IProvider p) + { task = ta; provider = p; } } class Vat @@ -53,18 +45,16 @@ private Thread thread; bool running; + Logger log; - Task[int] tasks; - int taskCount; + TaskAttachment[int] tasks; //registry for local tasks + + Selector selector; + static Atomic!(int) taskCount; + TaskAttachment[int] globalTasks; //global registry of tasks public - this(Task t) - { - addTask(t); - this(); - } - this() { log = Log.lookup("dreactor.core.Vat"); @@ -73,13 +63,17 @@ thread = new Thread(&eventLoop); thread.start(); } - - void addTask(Task t) + + synchronized int addTask(Task t, IProvider p = null) { t.setVat(this); ++taskCount; - tasks[taskCount] = t; + auto ta = new TaskAttachment(t, (p !is null) ? p : new DefaultProvider); + tasks[taskCount] = ta; + globalTask[taskCount] = ta; + selector.register(p.getConduit(), p.getEvents(), ta); t.setId(taskCount); + return taskCount; } void exit() @@ -92,21 +86,43 @@ thread.join(); } - bool addConnection() + bool addConnection(int tid, Conduit c, Events evts) { log.trace("adding handler"); - return selector.register(h.transport, h.events(), h); + TaskAttachment ta; + if (ta = (tid in tasks)) + return selector.register(c, evts, ta); + else + return false; } - bool remConnection(Dispatcher handler) + bool remConnection(Conduit c) + { + return selector.unregister(c); + } + + Task getTask(int tid) { - return selector.unregister(h.transport); + TaskAttachment ta; + if (ta = (tid in tasks)) + return ta.task; + else + return null; + } + + static synchronized Task getGlobalTask(int tid) + { + TaskAttachment ta; + if (ta = (tid in globaltasks)) + return ta.task; + else + return null; } private void eventLoop() { - auto selector = new Selector(); + selector = new Selector(); selector.open(); do { @@ -122,30 +138,31 @@ { // incoming data log.trace("Read event fired"); - auto conn = cast(Dispatcher) key.attachment; - if ( Dispatcher.State.listening == conn.getState() ) - conn.handleConnection(conn.transport, &addConnection); - else - processReturn(conn.handleIncoming(), selector, conn); + auto ta = cast(TaskAttachment) key.attachment; + processReturn(ta.appendMessage(ta.provider.handleRead()), key.conduit); + } else if (key.isWritable()) { log.trace("Write event fired"); - auto conn = cast(Dispatcher) key.attachment; - processReturn(conn.handleOutgoing(), selector, conn); + auto ta = cast(TaskAttachment) key.attachment; + ta.appendMessage(ta.provider.handleWrite()); + processReturn(ta.appendMessage(ta.provider.handleWrite()), key.conduit); } else if (key.isHangup()) { log.trace("Hangup event fired"); - auto conn = cast(Dispatcher) key.attachment; - processReturn(conn.handleDisconnect(), selector, conn); + auto ta = cast(TaskAttachment) key.attachment; + ta.appendMessage(ta.provider.handleDisconnect()); + processReturn(ta.appendMessage(ta.provider.handleDisconnect()), key.conduit); } else if (key.isError() || key.isInvalidHandle()) { - log.trace("Error event fired"); + log.trace("Error event fired"); // error, close connection - auto conn = cast(Dispatcher) key.attachment; - conn.handleError(&remConnection); + auto ta = cast(TaskAttachment) key.attachment; + ta.appendMessage(ta.provider.handleError()); + processReturn(ta.appendMessage(ta.provider.handleError()), key.conduit); } } } @@ -173,25 +190,23 @@ } } - void processReturn(int result, Selector s, Dispatcher h) + void processReturn(int result, Conduit c) { switch(result) { case CLOSE: - s.unregister(h.transport); - h.transport.detach(); + selector.unregister(c); + c.detach(); break; case UNREGISTER: - s.unregister(h.transport); + selector.unregister(c); break; case REMAIN: //this space intentially left blank break; case REGISTER: - s.register(h.transport, h.events(), h); break; case REREGISTER: - s.register(h.transport, h.events(), h); break; default: log.error("processReturn: unknown return value");