Mercurial > projects > dreactor
diff dreactor/core/Vat.d @ 13:8c9b1276f623 default tip
bug fixes
author | rick@minifunk |
---|---|
date | Sat, 20 Sep 2008 18:33:11 -0400 |
parents | d6a3cfe7c3de |
children |
line wrap: on
line diff
--- a/dreactor/core/Vat.d Wed Aug 27 00:47:33 2008 -0400 +++ b/dreactor/core/Vat.d Sat Sep 20 18:33:11 2008 -0400 @@ -17,18 +17,17 @@ import tango.core.Exception; import tango.core.Thread; import tango.core.Atomic; -import tango.util.collection.CircularSeq; import tango.util.log.Log; import dreactor.transport.AsyncSocketConduit; import dreactor.protocol.IProvider; +import dreactor.protocol.DefaultProvider; import dreactor.core.Task; import dreactor.util.ThreadSafeQueue; static char[] version_string = "Vat.d 0.1 2008-05-31"; - -enum : int {CLOSE = -2, UNREGISTER = -1, REMAIN = 0, REGISTER = 1, REREGISTER = 2}; +alias bool delegate (Event) RegDg; class TaskAttachment { @@ -42,6 +41,8 @@ class Vat { + static Vat LocalVat; + private Thread thread; bool running; @@ -51,7 +52,7 @@ Selector selector; static Atomic!(int) taskCount; - TaskAttachment[int] globalTasks; //global registry of tasks + static TaskAttachment[int] globalTasks; //global registry of tasks public @@ -63,17 +64,28 @@ thread = new Thread(&eventLoop); thread.start(); } - - synchronized int addTask(Task t, IProvider p = null) + + static this() + { + LocalVat = new Vat; + } + + int addTask(Task t) { t.setVat(this); - ++taskCount; - auto ta = new TaskAttachment(t, (p !is null) ? p : new DefaultProvider); - tasks[taskCount] = ta; - globalTask[taskCount] = ta; + int taskid = taskCount.load() + 1; + taskCount.store(taskid); + auto p = t.getProvider(); + if (p is null) + p = new DefaultProvider; + p.setRegisterFunc(createRegFunc(taskid)); //default the task id as a param in the delegate + auto ta = new TaskAttachment(t, p); + tasks[taskid] = ta; + globalTasks[taskid] = ta; selector.register(p.getConduit(), p.getEvents(), ta); - t.setId(taskCount); - return taskCount; + t.setId(taskid); + + return taskid; } void exit() @@ -86,34 +98,58 @@ thread.join(); } - bool addConnection(int tid, Conduit c, Events evts) + bool register(int tid, Event evts) { log.trace("adding handler"); - TaskAttachment ta; - if (ta = (tid in tasks)) - return selector.register(c, evts, ta); + TaskAttachment* ta; + if ((ta = (tid in tasks)) !is null) + { + selector.register((*ta).provider.getConduit(), evts, *ta); + return true; + } else + { return false; + } } - + + RegDg createRegFunc(int taskid) + { + class Functor + { + int taskid; + this (int tid) + { + taskid = tid; + } + bool call(Event evts) + { + return register(taskid, evts); + } + } + auto ftor = new Functor(taskid); + return &ftor.call; + } + bool remConnection(Conduit c) { - return selector.unregister(c); + selector.unregister(c); + return true; } Task getTask(int tid) { - TaskAttachment ta; - if (ta = (tid in tasks)) + TaskAttachment* ta; + if ((ta = (tid in tasks)) !is null) return ta.task; else return null; } - static synchronized Task getGlobalTask(int tid) + static Task getGlobalTask(int tid) { - TaskAttachment ta; - if (ta = (tid in globaltasks)) + TaskAttachment* ta; + if ((ta = (tid in globalTasks)) !is null) return ta.task; else return null; @@ -139,30 +175,27 @@ // incoming data log.trace("Read event fired"); auto ta = cast(TaskAttachment) key.attachment; - processReturn(ta.appendMessage(ta.provider.handleRead()), key.conduit); + ta.task.appendMessage(ta.provider.handleRead()); } else if (key.isWritable()) { log.trace("Write event fired"); auto ta = cast(TaskAttachment) key.attachment; - ta.appendMessage(ta.provider.handleWrite()); - processReturn(ta.appendMessage(ta.provider.handleWrite()), key.conduit); + ta.task.appendMessage(ta.provider.handleWrite()); } else if (key.isHangup()) { log.trace("Hangup event fired"); auto ta = cast(TaskAttachment) key.attachment; - ta.appendMessage(ta.provider.handleDisconnect()); - processReturn(ta.appendMessage(ta.provider.handleDisconnect()), key.conduit); + ta.task.appendMessage(ta.provider.handleDisconnect()); } else if (key.isError() || key.isInvalidHandle()) { log.trace("Error event fired"); // error, close connection auto ta = cast(TaskAttachment) key.attachment; - ta.appendMessage(ta.provider.handleError()); - processReturn(ta.appendMessage(ta.provider.handleError()), key.conduit); + ta.task.appendMessage(ta.provider.handleError()); } } } @@ -183,33 +216,11 @@ { foreach(int k; tasks.keys) { - if (tasks[k].state() == Fiber.State.HOLD) - tasks[k].call(); - if (tasks[k].state() == Fiber.State.TERM) + if (tasks[k].task.state() == Fiber.State.HOLD) + tasks[k].task.call(); + if (tasks[k].task.state() == Fiber.State.TERM) tasks.remove(k); } } - void processReturn(int result, Conduit c) - { - switch(result) - { - case CLOSE: - selector.unregister(c); - c.detach(); - break; - case UNREGISTER: - selector.unregister(c); - break; - case REMAIN: - //this space intentially left blank - break; - case REGISTER: - break; - case REREGISTER: - break; - default: - log.error("processReturn: unknown return value"); - } - } }