comparison dreactor/core/Vat.d @ 12:d6a3cfe7c3de

more stuff
author rick@Macintosh.local
date Wed, 27 Aug 2008 00:47:33 -0400
parents 5836613d16ac
children 8c9b1276f623
comparison
equal deleted inserted replaced
11:5836613d16ac 12:d6a3cfe7c3de
19 import tango.core.Atomic; 19 import tango.core.Atomic;
20 import tango.util.collection.CircularSeq; 20 import tango.util.collection.CircularSeq;
21 import tango.util.log.Log; 21 import tango.util.log.Log;
22 22
23 import dreactor.transport.AsyncSocketConduit; 23 import dreactor.transport.AsyncSocketConduit;
24 import dreactor.protocol.IProvider;
24 import dreactor.core.Task; 25 import dreactor.core.Task;
25 import dreactor.util.ThreadSafeQueue; 26 import dreactor.util.ThreadSafeQueue;
26 27
27 static char[] version_string = "Vat.d 0.1 2008-05-31"; 28 static char[] version_string = "Vat.d 0.1 2008-05-31";
28 29
29 Logger log;
30 30
31 enum : int {CLOSE = -2, UNREGISTER = -1, REMAIN = 0, REGISTER = 1, REREGISTER = 2}; 31 enum : int {CLOSE = -2, UNREGISTER = -1, REMAIN = 0, REGISTER = 1, REREGISTER = 2};
32 alias Message delegate (Conduit c) HandlerDG;
33 alias Message function (Conduit c) HandlerFN;
34 32
35 class TaskAttachment 33 class TaskAttachment
36 { 34 {
37 public 35 public
38 Task task; 36 Task task;
39 HandlerDG dg; 37 IProvider provider;
40 HandlerFN fn; 38
41 39 this (Task ta, IProvider p)
42 this(Task ta, HandlerDG d) 40 { task = ta; provider = p; }
43 { TaskAttachment t; t.task = ta; t.dg = d; return t; }
44
45 this(Task ta, HandlerFN f)
46 { TaskAttachment t; t.task = ta; t.fn = f; return t; }
47
48 public Message opCall(Conduit c) { dg is null ? return fn() : return dg(c); }
49 } 41 }
50 42
51 class Vat 43 class Vat
52 { 44 {
53 private 45 private
54 Thread thread; 46 Thread thread;
55 bool running; 47 bool running;
48 Logger log;
56 49
57 Task[int] tasks; 50 TaskAttachment[int] tasks; //registry for local tasks
58 int taskCount; 51
52 Selector selector;
53 static Atomic!(int) taskCount;
54 TaskAttachment[int] globalTasks; //global registry of tasks
59 55
60 public 56 public
61
62 this(Task t)
63 {
64 addTask(t);
65 this();
66 }
67 57
68 this() 58 this()
69 { 59 {
70 log = Log.lookup("dreactor.core.Vat"); 60 log = Log.lookup("dreactor.core.Vat");
71 61
72 running = true; 62 running = true;
73 thread = new Thread(&eventLoop); 63 thread = new Thread(&eventLoop);
74 thread.start(); 64 thread.start();
75 } 65 }
76 66
77 void addTask(Task t) 67 synchronized int addTask(Task t, IProvider p = null)
78 { 68 {
79 t.setVat(this); 69 t.setVat(this);
80 ++taskCount; 70 ++taskCount;
81 tasks[taskCount] = t; 71 auto ta = new TaskAttachment(t, (p !is null) ? p : new DefaultProvider);
72 tasks[taskCount] = ta;
73 globalTask[taskCount] = ta;
74 selector.register(p.getConduit(), p.getEvents(), ta);
82 t.setId(taskCount); 75 t.setId(taskCount);
76 return taskCount;
83 } 77 }
84 78
85 void exit() 79 void exit()
86 { 80 {
87 running = false; 81 running = false;
90 void wait() 84 void wait()
91 { 85 {
92 thread.join(); 86 thread.join();
93 } 87 }
94 88
95 bool addConnection() 89 bool addConnection(int tid, Conduit c, Events evts)
96 { 90 {
97 log.trace("adding handler"); 91 log.trace("adding handler");
98 return selector.register(h.transport, h.events(), h); 92 TaskAttachment ta;
93 if (ta = (tid in tasks))
94 return selector.register(c, evts, ta);
95 else
96 return false;
99 } 97 }
100 98
101 bool remConnection(Dispatcher handler) 99 bool remConnection(Conduit c)
102 { 100 {
103 return selector.unregister(h.transport); 101 return selector.unregister(c);
102 }
103
104 Task getTask(int tid)
105 {
106 TaskAttachment ta;
107 if (ta = (tid in tasks))
108 return ta.task;
109 else
110 return null;
111 }
112
113 static synchronized Task getGlobalTask(int tid)
114 {
115 TaskAttachment ta;
116 if (ta = (tid in globaltasks))
117 return ta.task;
118 else
119 return null;
104 } 120 }
105 121
106 private 122 private
107 void eventLoop() 123 void eventLoop()
108 { 124 {
109 auto selector = new Selector(); 125 selector = new Selector();
110 selector.open(); 126 selector.open();
111 do 127 do
112 { 128 {
113 execTasks(); 129 execTasks();
114 auto eventCount = selector.select(0.01); 130 auto eventCount = selector.select(0.01);
120 { 136 {
121 if (key.isReadable()) 137 if (key.isReadable())
122 { 138 {
123 // incoming data 139 // incoming data
124 log.trace("Read event fired"); 140 log.trace("Read event fired");
125 auto conn = cast(Dispatcher) key.attachment; 141 auto ta = cast(TaskAttachment) key.attachment;
126 if ( Dispatcher.State.listening == conn.getState() ) 142 processReturn(ta.appendMessage(ta.provider.handleRead()), key.conduit);
127 conn.handleConnection(conn.transport, &addConnection); 143
128 else
129 processReturn(conn.handleIncoming(), selector, conn);
130 } 144 }
131 else if (key.isWritable()) 145 else if (key.isWritable())
132 { 146 {
133 log.trace("Write event fired"); 147 log.trace("Write event fired");
134 auto conn = cast(Dispatcher) key.attachment; 148 auto ta = cast(TaskAttachment) key.attachment;
135 processReturn(conn.handleOutgoing(), selector, conn); 149 ta.appendMessage(ta.provider.handleWrite());
150 processReturn(ta.appendMessage(ta.provider.handleWrite()), key.conduit);
136 } 151 }
137 else if (key.isHangup()) 152 else if (key.isHangup())
138 { 153 {
139 log.trace("Hangup event fired"); 154 log.trace("Hangup event fired");
140 auto conn = cast(Dispatcher) key.attachment; 155 auto ta = cast(TaskAttachment) key.attachment;
141 processReturn(conn.handleDisconnect(), selector, conn); 156 ta.appendMessage(ta.provider.handleDisconnect());
157 processReturn(ta.appendMessage(ta.provider.handleDisconnect()), key.conduit);
142 } 158 }
143 else if (key.isError() || key.isInvalidHandle()) 159 else if (key.isError() || key.isInvalidHandle())
144 { 160 {
145 log.trace("Error event fired"); 161 log.trace("Error event fired");
146 // error, close connection 162 // error, close connection
147 auto conn = cast(Dispatcher) key.attachment; 163 auto ta = cast(TaskAttachment) key.attachment;
148 conn.handleError(&remConnection); 164 ta.appendMessage(ta.provider.handleError());
165 processReturn(ta.appendMessage(ta.provider.handleError()), key.conduit);
149 } 166 }
150 } 167 }
151 } 168 }
152 else if (eventCount == 0) 169 else if (eventCount == 0)
153 { 170 {
171 if (tasks[k].state() == Fiber.State.TERM) 188 if (tasks[k].state() == Fiber.State.TERM)
172 tasks.remove(k); 189 tasks.remove(k);
173 } 190 }
174 } 191 }
175 192
176 void processReturn(int result, Selector s, Dispatcher h) 193 void processReturn(int result, Conduit c)
177 { 194 {
178 switch(result) 195 switch(result)
179 { 196 {
180 case CLOSE: 197 case CLOSE:
181 s.unregister(h.transport); 198 selector.unregister(c);
182 h.transport.detach(); 199 c.detach();
183 break; 200 break;
184 case UNREGISTER: 201 case UNREGISTER:
185 s.unregister(h.transport); 202 selector.unregister(c);
186 break; 203 break;
187 case REMAIN: 204 case REMAIN:
188 //this space intentially left blank 205 //this space intentially left blank
189 break; 206 break;
190 case REGISTER: 207 case REGISTER:
191 s.register(h.transport, h.events(), h);
192 break; 208 break;
193 case REREGISTER: 209 case REREGISTER:
194 s.register(h.transport, h.events(), h);
195 break; 210 break;
196 default: 211 default:
197 log.error("processReturn: unknown return value"); 212 log.error("processReturn: unknown return value");
198 } 213 }
199 } 214 }