Mercurial > projects > dreactor
comparison dreactor/core/Vat.d @ 12:d6a3cfe7c3de
more stuff
author | rick@Macintosh.local |
---|---|
date | Wed, 27 Aug 2008 00:47:33 -0400 |
parents | 5836613d16ac |
children | 8c9b1276f623 |
comparison
equal
deleted
inserted
replaced
11:5836613d16ac | 12:d6a3cfe7c3de |
---|---|
19 import tango.core.Atomic; | 19 import tango.core.Atomic; |
20 import tango.util.collection.CircularSeq; | 20 import tango.util.collection.CircularSeq; |
21 import tango.util.log.Log; | 21 import tango.util.log.Log; |
22 | 22 |
23 import dreactor.transport.AsyncSocketConduit; | 23 import dreactor.transport.AsyncSocketConduit; |
24 import dreactor.protocol.IProvider; | |
24 import dreactor.core.Task; | 25 import dreactor.core.Task; |
25 import dreactor.util.ThreadSafeQueue; | 26 import dreactor.util.ThreadSafeQueue; |
26 | 27 |
27 static char[] version_string = "Vat.d 0.1 2008-05-31"; | 28 static char[] version_string = "Vat.d 0.1 2008-05-31"; |
28 | 29 |
29 Logger log; | |
30 | 30 |
31 enum : int {CLOSE = -2, UNREGISTER = -1, REMAIN = 0, REGISTER = 1, REREGISTER = 2}; | 31 enum : int {CLOSE = -2, UNREGISTER = -1, REMAIN = 0, REGISTER = 1, REREGISTER = 2}; |
32 alias Message delegate (Conduit c) HandlerDG; | |
33 alias Message function (Conduit c) HandlerFN; | |
34 | 32 |
35 class TaskAttachment | 33 class TaskAttachment |
36 { | 34 { |
37 public | 35 public |
38 Task task; | 36 Task task; |
39 HandlerDG dg; | 37 IProvider provider; |
40 HandlerFN fn; | 38 |
41 | 39 this (Task ta, IProvider p) |
42 this(Task ta, HandlerDG d) | 40 { task = ta; provider = p; } |
43 { TaskAttachment t; t.task = ta; t.dg = d; return t; } | |
44 | |
45 this(Task ta, HandlerFN f) | |
46 { TaskAttachment t; t.task = ta; t.fn = f; return t; } | |
47 | |
48 public Message opCall(Conduit c) { dg is null ? return fn() : return dg(c); } | |
49 } | 41 } |
50 | 42 |
51 class Vat | 43 class Vat |
52 { | 44 { |
53 private | 45 private |
54 Thread thread; | 46 Thread thread; |
55 bool running; | 47 bool running; |
48 Logger log; | |
56 | 49 |
57 Task[int] tasks; | 50 TaskAttachment[int] tasks; //registry for local tasks |
58 int taskCount; | 51 |
52 Selector selector; | |
53 static Atomic!(int) taskCount; | |
54 TaskAttachment[int] globalTasks; //global registry of tasks | |
59 | 55 |
60 public | 56 public |
61 | |
62 this(Task t) | |
63 { | |
64 addTask(t); | |
65 this(); | |
66 } | |
67 | 57 |
68 this() | 58 this() |
69 { | 59 { |
70 log = Log.lookup("dreactor.core.Vat"); | 60 log = Log.lookup("dreactor.core.Vat"); |
71 | 61 |
72 running = true; | 62 running = true; |
73 thread = new Thread(&eventLoop); | 63 thread = new Thread(&eventLoop); |
74 thread.start(); | 64 thread.start(); |
75 } | 65 } |
76 | 66 |
77 void addTask(Task t) | 67 synchronized int addTask(Task t, IProvider p = null) |
78 { | 68 { |
79 t.setVat(this); | 69 t.setVat(this); |
80 ++taskCount; | 70 ++taskCount; |
81 tasks[taskCount] = t; | 71 auto ta = new TaskAttachment(t, (p !is null) ? p : new DefaultProvider); |
72 tasks[taskCount] = ta; | |
73 globalTask[taskCount] = ta; | |
74 selector.register(p.getConduit(), p.getEvents(), ta); | |
82 t.setId(taskCount); | 75 t.setId(taskCount); |
76 return taskCount; | |
83 } | 77 } |
84 | 78 |
85 void exit() | 79 void exit() |
86 { | 80 { |
87 running = false; | 81 running = false; |
90 void wait() | 84 void wait() |
91 { | 85 { |
92 thread.join(); | 86 thread.join(); |
93 } | 87 } |
94 | 88 |
95 bool addConnection() | 89 bool addConnection(int tid, Conduit c, Events evts) |
96 { | 90 { |
97 log.trace("adding handler"); | 91 log.trace("adding handler"); |
98 return selector.register(h.transport, h.events(), h); | 92 TaskAttachment ta; |
93 if (ta = (tid in tasks)) | |
94 return selector.register(c, evts, ta); | |
95 else | |
96 return false; | |
99 } | 97 } |
100 | 98 |
101 bool remConnection(Dispatcher handler) | 99 bool remConnection(Conduit c) |
102 { | 100 { |
103 return selector.unregister(h.transport); | 101 return selector.unregister(c); |
102 } | |
103 | |
104 Task getTask(int tid) | |
105 { | |
106 TaskAttachment ta; | |
107 if (ta = (tid in tasks)) | |
108 return ta.task; | |
109 else | |
110 return null; | |
111 } | |
112 | |
113 static synchronized Task getGlobalTask(int tid) | |
114 { | |
115 TaskAttachment ta; | |
116 if (ta = (tid in globaltasks)) | |
117 return ta.task; | |
118 else | |
119 return null; | |
104 } | 120 } |
105 | 121 |
106 private | 122 private |
107 void eventLoop() | 123 void eventLoop() |
108 { | 124 { |
109 auto selector = new Selector(); | 125 selector = new Selector(); |
110 selector.open(); | 126 selector.open(); |
111 do | 127 do |
112 { | 128 { |
113 execTasks(); | 129 execTasks(); |
114 auto eventCount = selector.select(0.01); | 130 auto eventCount = selector.select(0.01); |
120 { | 136 { |
121 if (key.isReadable()) | 137 if (key.isReadable()) |
122 { | 138 { |
123 // incoming data | 139 // incoming data |
124 log.trace("Read event fired"); | 140 log.trace("Read event fired"); |
125 auto conn = cast(Dispatcher) key.attachment; | 141 auto ta = cast(TaskAttachment) key.attachment; |
126 if ( Dispatcher.State.listening == conn.getState() ) | 142 processReturn(ta.appendMessage(ta.provider.handleRead()), key.conduit); |
127 conn.handleConnection(conn.transport, &addConnection); | 143 |
128 else | |
129 processReturn(conn.handleIncoming(), selector, conn); | |
130 } | 144 } |
131 else if (key.isWritable()) | 145 else if (key.isWritable()) |
132 { | 146 { |
133 log.trace("Write event fired"); | 147 log.trace("Write event fired"); |
134 auto conn = cast(Dispatcher) key.attachment; | 148 auto ta = cast(TaskAttachment) key.attachment; |
135 processReturn(conn.handleOutgoing(), selector, conn); | 149 ta.appendMessage(ta.provider.handleWrite()); |
150 processReturn(ta.appendMessage(ta.provider.handleWrite()), key.conduit); | |
136 } | 151 } |
137 else if (key.isHangup()) | 152 else if (key.isHangup()) |
138 { | 153 { |
139 log.trace("Hangup event fired"); | 154 log.trace("Hangup event fired"); |
140 auto conn = cast(Dispatcher) key.attachment; | 155 auto ta = cast(TaskAttachment) key.attachment; |
141 processReturn(conn.handleDisconnect(), selector, conn); | 156 ta.appendMessage(ta.provider.handleDisconnect()); |
157 processReturn(ta.appendMessage(ta.provider.handleDisconnect()), key.conduit); | |
142 } | 158 } |
143 else if (key.isError() || key.isInvalidHandle()) | 159 else if (key.isError() || key.isInvalidHandle()) |
144 { | 160 { |
145 log.trace("Error event fired"); | 161 log.trace("Error event fired"); |
146 // error, close connection | 162 // error, close connection |
147 auto conn = cast(Dispatcher) key.attachment; | 163 auto ta = cast(TaskAttachment) key.attachment; |
148 conn.handleError(&remConnection); | 164 ta.appendMessage(ta.provider.handleError()); |
165 processReturn(ta.appendMessage(ta.provider.handleError()), key.conduit); | |
149 } | 166 } |
150 } | 167 } |
151 } | 168 } |
152 else if (eventCount == 0) | 169 else if (eventCount == 0) |
153 { | 170 { |
171 if (tasks[k].state() == Fiber.State.TERM) | 188 if (tasks[k].state() == Fiber.State.TERM) |
172 tasks.remove(k); | 189 tasks.remove(k); |
173 } | 190 } |
174 } | 191 } |
175 | 192 |
176 void processReturn(int result, Selector s, Dispatcher h) | 193 void processReturn(int result, Conduit c) |
177 { | 194 { |
178 switch(result) | 195 switch(result) |
179 { | 196 { |
180 case CLOSE: | 197 case CLOSE: |
181 s.unregister(h.transport); | 198 selector.unregister(c); |
182 h.transport.detach(); | 199 c.detach(); |
183 break; | 200 break; |
184 case UNREGISTER: | 201 case UNREGISTER: |
185 s.unregister(h.transport); | 202 selector.unregister(c); |
186 break; | 203 break; |
187 case REMAIN: | 204 case REMAIN: |
188 //this space intentially left blank | 205 //this space intentially left blank |
189 break; | 206 break; |
190 case REGISTER: | 207 case REGISTER: |
191 s.register(h.transport, h.events(), h); | |
192 break; | 208 break; |
193 case REREGISTER: | 209 case REREGISTER: |
194 s.register(h.transport, h.events(), h); | |
195 break; | 210 break; |
196 default: | 211 default: |
197 log.error("processReturn: unknown return value"); | 212 log.error("processReturn: unknown return value"); |
198 } | 213 } |
199 } | 214 } |