11
|
1 /*******************************************************************************
|
|
2
|
|
3 copyright: Copyright (c) 2008 Rick Richardson. All rights reserved
|
|
4
|
|
5 license: BSD style: $(LICENSE)
|
|
6
|
|
7 version: Initial release v0.1 : May 2008
|
|
8
|
|
9 author: Rick Richardson
|
|
10
|
|
11 *******************************************************************************/
|
|
12
|
|
13 module dreactor.core.Vat;
|
|
14
|
|
15 import tango.io.selector.Selector;
|
|
16 import tango.io.selector.model.ISelector;
|
|
17 import tango.core.Exception;
|
|
18 import tango.core.Thread;
|
|
19 import tango.core.Atomic;
|
|
20 import tango.util.collection.CircularSeq;
|
|
21 import tango.util.log.Log;
|
|
22
|
|
23 import dreactor.transport.AsyncSocketConduit;
|
12
|
24 import dreactor.protocol.IProvider;
|
11
|
25 import dreactor.core.Task;
|
|
26 import dreactor.util.ThreadSafeQueue;
|
|
27
|
|
28 static char[] version_string = "Vat.d 0.1 2008-05-31";
|
|
29
|
|
30
|
|
31 enum : int {CLOSE = -2, UNREGISTER = -1, REMAIN = 0, REGISTER = 1, REREGISTER = 2};
|
|
32
|
|
33 class TaskAttachment
|
|
34 {
|
|
35 public
|
|
36 Task task;
|
12
|
37 IProvider provider;
|
11
|
38
|
12
|
39 this (Task ta, IProvider p)
|
|
40 { task = ta; provider = p; }
|
11
|
41 }
|
|
42
|
|
43 class Vat
|
|
44 {
|
|
45 private
|
|
46 Thread thread;
|
|
47 bool running;
|
12
|
48 Logger log;
|
11
|
49
|
12
|
50 TaskAttachment[int] tasks; //registry for local tasks
|
|
51
|
|
52 Selector selector;
|
|
53 static Atomic!(int) taskCount;
|
|
54 TaskAttachment[int] globalTasks; //global registry of tasks
|
11
|
55
|
|
56 public
|
|
57
|
|
58 this()
|
|
59 {
|
|
60 log = Log.lookup("dreactor.core.Vat");
|
|
61
|
|
62 running = true;
|
|
63 thread = new Thread(&eventLoop);
|
|
64 thread.start();
|
|
65 }
|
12
|
66
|
|
67 synchronized int addTask(Task t, IProvider p = null)
|
11
|
68 {
|
|
69 t.setVat(this);
|
|
70 ++taskCount;
|
12
|
71 auto ta = new TaskAttachment(t, (p !is null) ? p : new DefaultProvider);
|
|
72 tasks[taskCount] = ta;
|
|
73 globalTask[taskCount] = ta;
|
|
74 selector.register(p.getConduit(), p.getEvents(), ta);
|
11
|
75 t.setId(taskCount);
|
12
|
76 return taskCount;
|
11
|
77 }
|
|
78
|
|
79 void exit()
|
|
80 {
|
|
81 running = false;
|
|
82 }
|
|
83
|
|
84 void wait()
|
|
85 {
|
|
86 thread.join();
|
|
87 }
|
|
88
|
12
|
89 bool addConnection(int tid, Conduit c, Events evts)
|
11
|
90 {
|
|
91 log.trace("adding handler");
|
12
|
92 TaskAttachment ta;
|
|
93 if (ta = (tid in tasks))
|
|
94 return selector.register(c, evts, ta);
|
|
95 else
|
|
96 return false;
|
11
|
97 }
|
|
98
|
12
|
99 bool remConnection(Conduit c)
|
|
100 {
|
|
101 return selector.unregister(c);
|
|
102 }
|
|
103
|
|
104 Task getTask(int tid)
|
11
|
105 {
|
12
|
106 TaskAttachment ta;
|
|
107 if (ta = (tid in tasks))
|
|
108 return ta.task;
|
|
109 else
|
|
110 return null;
|
|
111 }
|
|
112
|
|
113 static synchronized Task getGlobalTask(int tid)
|
|
114 {
|
|
115 TaskAttachment ta;
|
|
116 if (ta = (tid in globaltasks))
|
|
117 return ta.task;
|
|
118 else
|
|
119 return null;
|
11
|
120 }
|
|
121
|
|
122 private
|
|
123 void eventLoop()
|
|
124 {
|
12
|
125 selector = new Selector();
|
11
|
126 selector.open();
|
|
127 do
|
|
128 {
|
|
129 execTasks();
|
|
130 auto eventCount = selector.select(0.01);
|
|
131
|
|
132 if (eventCount > 0)
|
|
133 {
|
|
134 // process events
|
|
135 foreach (SelectionKey key; selector.selectedSet())
|
|
136 {
|
|
137 if (key.isReadable())
|
|
138 {
|
|
139 // incoming data
|
|
140 log.trace("Read event fired");
|
12
|
141 auto ta = cast(TaskAttachment) key.attachment;
|
|
142 processReturn(ta.appendMessage(ta.provider.handleRead()), key.conduit);
|
|
143
|
11
|
144 }
|
|
145 else if (key.isWritable())
|
|
146 {
|
|
147 log.trace("Write event fired");
|
12
|
148 auto ta = cast(TaskAttachment) key.attachment;
|
|
149 ta.appendMessage(ta.provider.handleWrite());
|
|
150 processReturn(ta.appendMessage(ta.provider.handleWrite()), key.conduit);
|
11
|
151 }
|
|
152 else if (key.isHangup())
|
|
153 {
|
|
154 log.trace("Hangup event fired");
|
12
|
155 auto ta = cast(TaskAttachment) key.attachment;
|
|
156 ta.appendMessage(ta.provider.handleDisconnect());
|
|
157 processReturn(ta.appendMessage(ta.provider.handleDisconnect()), key.conduit);
|
11
|
158 }
|
|
159 else if (key.isError() || key.isInvalidHandle())
|
|
160 {
|
12
|
161 log.trace("Error event fired");
|
11
|
162 // error, close connection
|
12
|
163 auto ta = cast(TaskAttachment) key.attachment;
|
|
164 ta.appendMessage(ta.provider.handleError());
|
|
165 processReturn(ta.appendMessage(ta.provider.handleError()), key.conduit);
|
11
|
166 }
|
|
167 }
|
|
168 }
|
|
169 else if (eventCount == 0)
|
|
170 {
|
|
171 /* can't think of anything useful to do here. */
|
|
172 }
|
|
173 else
|
|
174 {
|
|
175 log.error("Selector.select returned {}", eventCount);
|
|
176 }
|
|
177
|
|
178 } while (running)
|
|
179
|
|
180 }
|
|
181
|
|
182 void execTasks()
|
|
183 {
|
|
184 foreach(int k; tasks.keys)
|
|
185 {
|
|
186 if (tasks[k].state() == Fiber.State.HOLD)
|
|
187 tasks[k].call();
|
|
188 if (tasks[k].state() == Fiber.State.TERM)
|
|
189 tasks.remove(k);
|
|
190 }
|
|
191 }
|
|
192
|
12
|
193 void processReturn(int result, Conduit c)
|
11
|
194 {
|
|
195 switch(result)
|
|
196 {
|
|
197 case CLOSE:
|
12
|
198 selector.unregister(c);
|
|
199 c.detach();
|
11
|
200 break;
|
|
201 case UNREGISTER:
|
12
|
202 selector.unregister(c);
|
11
|
203 break;
|
|
204 case REMAIN:
|
|
205 //this space intentially left blank
|
|
206 break;
|
|
207 case REGISTER:
|
|
208 break;
|
|
209 case REREGISTER:
|
|
210 break;
|
|
211 default:
|
|
212 log.error("processReturn: unknown return value");
|
|
213 }
|
|
214 }
|
|
215 }
|