annotate dreactor/core/Vat.d @ 11:5836613d16ac

reorg! reorg!
author rick@minifunk
date Tue, 12 Aug 2008 16:59:56 -0400
parents
children d6a3cfe7c3de
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
11
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
1 /*******************************************************************************
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
2
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
3 copyright: Copyright (c) 2008 Rick Richardson. All rights reserved
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
4
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
5 license: BSD style: $(LICENSE)
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
6
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
7 version: Initial release v0.1 : May 2008
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
8
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
9 author: Rick Richardson
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
10
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
11 *******************************************************************************/
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
12
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
13 module dreactor.core.Vat;
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
14
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
15 import tango.io.selector.Selector;
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
16 import tango.io.selector.model.ISelector;
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
17 import tango.core.Exception;
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
18 import tango.core.Thread;
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
19 import tango.core.Atomic;
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
20 import tango.util.collection.CircularSeq;
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
21 import tango.util.log.Log;
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
22
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
23 import dreactor.transport.AsyncSocketConduit;
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
24 import dreactor.core.Task;
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
25 import dreactor.util.ThreadSafeQueue;
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
26
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
27 static char[] version_string = "Vat.d 0.1 2008-05-31";
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
28
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
29 Logger log;
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
30
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
31 enum : int {CLOSE = -2, UNREGISTER = -1, REMAIN = 0, REGISTER = 1, REREGISTER = 2};
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
32 alias Message delegate (Conduit c) HandlerDG;
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
33 alias Message function (Conduit c) HandlerFN;
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
34
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
35 class TaskAttachment
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
36 {
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
37 public
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
38 Task task;
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
39 HandlerDG dg;
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
40 HandlerFN fn;
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
41
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
42 this(Task ta, HandlerDG d)
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
43 { TaskAttachment t; t.task = ta; t.dg = d; return t; }
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
44
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
45 this(Task ta, HandlerFN f)
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
46 { TaskAttachment t; t.task = ta; t.fn = f; return t; }
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
47
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
48 public Message opCall(Conduit c) { dg is null ? return fn() : return dg(c); }
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
49 }
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
50
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
51 class Vat
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
52 {
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
53 private
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
54 Thread thread;
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
55 bool running;
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
56
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
57 Task[int] tasks;
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
58 int taskCount;
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
59
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
60 public
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
61
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
62 this(Task t)
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
63 {
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
64 addTask(t);
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
65 this();
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
66 }
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
67
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
68 this()
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
69 {
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
70 log = Log.lookup("dreactor.core.Vat");
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
71
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
72 running = true;
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
73 thread = new Thread(&eventLoop);
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
74 thread.start();
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
75 }
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
76
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
77 void addTask(Task t)
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
78 {
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
79 t.setVat(this);
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
80 ++taskCount;
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
81 tasks[taskCount] = t;
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
82 t.setId(taskCount);
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
83 }
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
84
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
85 void exit()
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
86 {
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
87 running = false;
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
88 }
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
89
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
90 void wait()
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
91 {
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
92 thread.join();
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
93 }
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
94
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
95 bool addConnection()
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
96 {
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
97 log.trace("adding handler");
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
98 return selector.register(h.transport, h.events(), h);
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
99 }
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
100
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
101 bool remConnection(Dispatcher handler)
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
102 {
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
103 return selector.unregister(h.transport);
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
104 }
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
105
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
106 private
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
107 void eventLoop()
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
108 {
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
109 auto selector = new Selector();
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
110 selector.open();
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
111 do
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
112 {
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
113 execTasks();
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
114 auto eventCount = selector.select(0.01);
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
115
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
116 if (eventCount > 0)
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
117 {
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
118 // process events
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
119 foreach (SelectionKey key; selector.selectedSet())
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
120 {
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
121 if (key.isReadable())
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
122 {
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
123 // incoming data
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
124 log.trace("Read event fired");
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
125 auto conn = cast(Dispatcher) key.attachment;
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
126 if ( Dispatcher.State.listening == conn.getState() )
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
127 conn.handleConnection(conn.transport, &addConnection);
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
128 else
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
129 processReturn(conn.handleIncoming(), selector, conn);
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
130 }
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
131 else if (key.isWritable())
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
132 {
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
133 log.trace("Write event fired");
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
134 auto conn = cast(Dispatcher) key.attachment;
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
135 processReturn(conn.handleOutgoing(), selector, conn);
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
136 }
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
137 else if (key.isHangup())
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
138 {
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
139 log.trace("Hangup event fired");
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
140 auto conn = cast(Dispatcher) key.attachment;
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
141 processReturn(conn.handleDisconnect(), selector, conn);
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
142 }
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
143 else if (key.isError() || key.isInvalidHandle())
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
144 {
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
145 log.trace("Error event fired");
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
146 // error, close connection
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
147 auto conn = cast(Dispatcher) key.attachment;
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
148 conn.handleError(&remConnection);
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
149 }
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
150 }
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
151 }
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
152 else if (eventCount == 0)
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
153 {
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
154 /* can't think of anything useful to do here. */
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
155 }
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
156 else
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
157 {
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
158 log.error("Selector.select returned {}", eventCount);
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
159 }
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
160
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
161 } while (running)
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
162
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
163 }
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
164
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
165 void execTasks()
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
166 {
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
167 foreach(int k; tasks.keys)
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
168 {
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
169 if (tasks[k].state() == Fiber.State.HOLD)
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
170 tasks[k].call();
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
171 if (tasks[k].state() == Fiber.State.TERM)
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
172 tasks.remove(k);
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
173 }
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
174 }
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
175
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
176 void processReturn(int result, Selector s, Dispatcher h)
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
177 {
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
178 switch(result)
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
179 {
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
180 case CLOSE:
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
181 s.unregister(h.transport);
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
182 h.transport.detach();
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
183 break;
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
184 case UNREGISTER:
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
185 s.unregister(h.transport);
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
186 break;
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
187 case REMAIN:
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
188 //this space intentially left blank
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
189 break;
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
190 case REGISTER:
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
191 s.register(h.transport, h.events(), h);
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
192 break;
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
193 case REREGISTER:
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
194 s.register(h.transport, h.events(), h);
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
195 break;
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
196 default:
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
197 log.error("processReturn: unknown return value");
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
198 }
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
199 }
5836613d16ac reorg! reorg!
rick@minifunk
parents:
diff changeset
200 }