Mercurial > projects > dreactor
comparison asyncdreactor/core/AsyncVat.d @ 11:5836613d16ac
reorg! reorg!
author | rick@minifunk |
---|---|
date | Tue, 12 Aug 2008 16:59:56 -0400 |
parents | dreactor/core/AsyncVat.d@e75a2e506b1d |
children |
comparison
equal
deleted
inserted
replaced
10:e75a2e506b1d | 11:5836613d16ac |
---|---|
1 /******************************************************************************* | |
2 | |
3 copyright: Copyright (c) 2008 Rick Richardson. All rights reserved | |
4 | |
5 license: BSD style: $(LICENSE) | |
6 | |
7 version: Initial release v0.1 : May 2008 | |
8 | |
9 author: Rick Richardson | |
10 | |
11 *******************************************************************************/ | |
12 | |
13 module dreactor.core.Vat; | |
14 | |
15 import tango.io.selector.Selector; | |
16 import tango.io.selector.model.ISelector; | |
17 import tango.core.Exception; | |
18 import tango.core.Thread; | |
19 import tango.core.Atomic; | |
20 import tango.util.collection.LinkSeq; | |
21 import tango.util.log.Log; | |
22 | |
23 import dreactor.transport.AsyncSocketConduit; | |
24 import dreactor.core.Dispatcher; | |
25 import dreactor.util.ThreadSafeQueue; | |
26 | |
27 Logger log; | |
28 | |
29 enum : int {CLOSE = -2, UNREGISTER = -1, REMAIN = 0, REGISTER = 1, REREGISTER = 2}; | |
30 | |
31 static char[] version_string = "Vat.d 0.1 2008-05-31"; | |
32 | |
33 class Vat | |
34 { | |
35 private | |
36 Thread thread; | |
37 bool running; | |
38 Atomic!(int) pending; | |
39 | |
40 ThreadSafeQueue!(Dispatcher) freshList; | |
41 ThreadSafeQueue!(Dispatcher) remList; | |
42 public | |
43 this() | |
44 { | |
45 freshList = new ThreadSafeQueue!(Dispatcher); | |
46 remList = new ThreadSafeQueue!(Dispatcher); | |
47 log = Log.lookup("dreactor.core.Vat"); | |
48 } | |
49 | |
50 void run() | |
51 { | |
52 running = true; | |
53 thread = new Thread(&eventLoop); | |
54 thread.start(); | |
55 } | |
56 | |
57 void exit() | |
58 { | |
59 running = false; | |
60 } | |
61 | |
62 void wait() | |
63 { | |
64 thread.join(); | |
65 } | |
66 | |
67 bool addConnection(Dispatcher handler) | |
68 { | |
69 log.trace("adding handler"); | |
70 return freshList.push(handler); | |
71 } | |
72 | |
73 bool remConnection(Dispatcher handler) | |
74 { | |
75 return remList.push(handler); | |
76 } | |
77 | |
78 private | |
79 void eventLoop() | |
80 { | |
81 auto selector = new Selector(); | |
82 selector.open(); | |
83 do | |
84 { | |
85 auto eventCount = selector.select(0.01); | |
86 | |
87 if (eventCount > 0) | |
88 { | |
89 // process events | |
90 foreach (SelectionKey key; selector.selectedSet()) | |
91 { | |
92 if (key.isReadable()) | |
93 { | |
94 // incoming data | |
95 log.trace("Read event fired"); | |
96 auto conn = cast(Dispatcher) key.attachment; | |
97 if ( Dispatcher.State.listening == conn.getState() ) | |
98 conn.handleConnection(conn.transport, &addConnection); | |
99 else | |
100 processReturn(conn.handleIncoming(), selector, conn); | |
101 } | |
102 else if (key.isWritable()) | |
103 { | |
104 log.trace("Write event fired"); | |
105 auto conn = cast(Dispatcher) key.attachment; | |
106 processReturn(conn.handleOutgoing(), selector, conn); | |
107 } | |
108 else if (key.isHangup()) | |
109 { | |
110 log.trace("Hangup event fired"); | |
111 auto conn = cast(Dispatcher) key.attachment; | |
112 processReturn(conn.handleDisconnect(), selector, conn); | |
113 } | |
114 else if (key.isError() || key.isInvalidHandle()) | |
115 { | |
116 log.trace("Error event fired"); | |
117 // error, close connection | |
118 auto conn = cast(Dispatcher) key.attachment; | |
119 conn.handleError(&remConnection); | |
120 } | |
121 } | |
122 } | |
123 else if (eventCount == 0) | |
124 { | |
125 /* can't think of anything useful to do here. */ | |
126 } | |
127 else | |
128 { | |
129 log.error("Selector.select returned {}", eventCount); | |
130 } | |
131 //add Conduits to listener | |
132 freshList.processAll( (ref Dispatcher h) | |
133 { | |
134 selector.register(h.transport, h.events(), h); | |
135 return 1; | |
136 }); | |
137 remList.processAll( (ref Dispatcher h) | |
138 { | |
139 selector.unregister(h.transport); | |
140 return 1; | |
141 }); | |
142 | |
143 } while (running) | |
144 | |
145 } | |
146 | |
147 void processReturn(int result, Selector s, Dispatcher h) | |
148 { | |
149 switch(result) | |
150 { | |
151 case CLOSE: | |
152 s.unregister(h.transport); | |
153 h.transport.detach(); | |
154 break; | |
155 case UNREGISTER: | |
156 s.unregister(h.transport); | |
157 break; | |
158 case REMAIN: | |
159 //this space intentially left blank | |
160 break; | |
161 case REGISTER: | |
162 s.register(h.transport, h.events(), h); | |
163 break; | |
164 case REREGISTER: | |
165 s.register(h.transport, h.events(), h); | |
166 break; | |
167 default: | |
168 log.error("processReturn: unknown return value"); | |
169 } | |
170 } | |
171 } |