Mercurial > projects > dreactor
comparison asyncdreactor/protocol/RawTcp.d @ 11:5836613d16ac
reorg! reorg!
author | rick@minifunk |
---|---|
date | Tue, 12 Aug 2008 16:59:56 -0400 |
parents | dreactor/protocol/RawTcp.d@e75a2e506b1d |
children |
comparison
equal
deleted
inserted
replaced
10:e75a2e506b1d | 11:5836613d16ac |
---|---|
1 module dreactor.protocol.RawTcp; | |
2 | |
3 import tango.io.Conduit; | |
4 import tango.io.selector.model.ISelector; | |
5 import tango.net.Socket; | |
6 import tango.util.collection.CircularSeq; | |
7 import tango.util.log.Log; | |
8 import tango.util.log.Config; | |
9 | |
10 import dreactor.transport.AsyncSocketConduit; | |
11 import dreactor.core.Vat; | |
12 import dreactor.core.Dispatcher; | |
13 | |
14 /****************************************************************************** | |
15 | |
16 Basic TCP server or client routines for sending raw data. | |
17 | |
18 ******************************************************************************/ | |
19 class RawTCPListener | |
20 { | |
21 public | |
22 this(Dispatcher mgr, Vat sel, IPv4Address addr) | |
23 { | |
24 manager = mgr; | |
25 mgr.events(Event.Read); | |
26 mgr.setOutgoingHandler(&RawTCPHandler.onSend); | |
27 mgr.setIncomingHandler(&onReceive); | |
28 mgr.setConnectHandler(&accept); | |
29 mgr.setErrorHandler(&RawTCPHandler.onError); | |
30 mgr.setDisconnectHandler(&RawTCPHandler.onHangup); | |
31 mgr.listen(addr); | |
32 | |
33 sel.addConnection(mgr); | |
34 vat = sel; | |
35 log = Log.lookup("dreactor.protocol.RawTcpServer"); | |
36 log.info("log initialized"); | |
37 children = new CircularSeq!(Dispatcher); | |
38 } | |
39 this(Vat sel, IPv4Address addr) | |
40 { | |
41 AsyncSocketConduit cond = new AsyncSocketConduit; | |
42 cond.socket().setAddressReuse(true); | |
43 Dispatcher lh = new Dispatcher(cond, true); | |
44 this(lh, sel, addr); | |
45 } | |
46 | |
47 ~this() | |
48 { | |
49 close(); | |
50 } | |
51 | |
52 int accept(Conduit cond, RegisterD reg) | |
53 { | |
54 AsyncSocketConduit newcond = new AsyncSocketConduit; | |
55 (cast(AsyncSocketConduit)cond).socket().accept(newcond.socket); | |
56 Dispatcher h = Dispatcher.New(newcond, manager); | |
57 h.events(Event.Read); | |
58 vat.addConnection(h); | |
59 children.append(h); | |
60 log.info("accepted new connection"); | |
61 return 0; | |
62 } | |
63 | |
64 int broadcast(char[] outbuf, Dispatcher[] excluded = null) | |
65 { | |
66 foreach(Dispatcher h; children) | |
67 { | |
68 if (excluded && excluded.includes(h)) | |
69 continue; | |
70 if (h.appendOutBuffer(outbuf)) | |
71 { | |
72 h.addEvent(Event.Write); | |
73 vat.addConnection(h); | |
74 } | |
75 } | |
76 return 0; | |
77 } | |
78 | |
79 /************************************************************************** | |
80 | |
81 send | |
82 User-called function to send data to the counterpart at the other | |
83 end of the connection. This sets up the connection manager to send | |
84 data as the socket becomes free. | |
85 | |
86 **************************************************************************/ | |
87 int send(Dispatcher d, char[] outbuf, IPv4Address addr = null) | |
88 { | |
89 if (d.appendOutBuffer(outbuf)) | |
90 { | |
91 d.addEvent(Event.Write); | |
92 if (!vat.addConnection(d)) | |
93 { | |
94 log.error("unable to register mgr"); | |
95 } | |
96 } | |
97 return 0; | |
98 } | |
99 | |
100 /************************************************************************** | |
101 | |
102 receive | |
103 IncomingHandlerD | |
104 Default incoming data handler. Should be replaced with something useful. | |
105 | |
106 **************************************************************************/ | |
107 int onReceive(Dispatcher h) | |
108 { | |
109 Logger log = Log.lookup("Handlers.onReceive"); | |
110 | |
111 char inbuf[8192]; | |
112 int amt; | |
113 if((amt = h.transport.read(inbuf)) > 0) | |
114 { | |
115 if (dataHandler) | |
116 dataHandler(inbuf[0 .. amt], h); | |
117 else | |
118 log.info("Received {} byte Buffer: {}", amt, inbuf[0 .. amt]); | |
119 } | |
120 else | |
121 { | |
122 if (amt == 0) | |
123 { | |
124 children.remove(h); | |
125 (cast(AsyncSocketConduit) h.transport).shutdown(); | |
126 return CLOSE; | |
127 } | |
128 log.error("Received no data, err = {}", amt); | |
129 } | |
130 return REMAIN; | |
131 } | |
132 | |
133 void close() | |
134 { | |
135 foreach(Dispatcher d; children) | |
136 { | |
137 (cast(AsyncSocketConduit)d.transport).shutdown(); | |
138 (cast(AsyncSocketConduit)d.transport).detach(); | |
139 } | |
140 (cast(AsyncSocketConduit)manager.transport).shutdown(); | |
141 (cast(AsyncSocketConduit)manager.transport).detach(); | |
142 | |
143 } | |
144 | |
145 void setDataHandler(void delegate(char[], Dispatcher) h) | |
146 { | |
147 dataHandler = h; | |
148 } | |
149 | |
150 private | |
151 Vat vat; | |
152 CircularSeq!(Dispatcher) children; | |
153 Dispatcher manager; | |
154 Logger log; | |
155 RawTCPHandler h; | |
156 void delegate(char[], Dispatcher) dataHandler; | |
157 } | |
158 | |
159 class RawTCPClient | |
160 { | |
161 | |
162 public | |
163 this(Dispatcher mgr, Vat sel, Event evts = Event.Read) | |
164 { | |
165 manager = mgr; | |
166 manager.events(evts); | |
167 connected = false; | |
168 mgr.setOutgoingHandler(&RawTCPHandler.onSend); | |
169 mgr.setIncomingHandler(&onReceive); | |
170 mgr.setErrorHandler(&RawTCPHandler.onError); | |
171 mgr.setDisconnectHandler(&RawTCPHandler.onHangup); | |
172 vat = sel; | |
173 log = Log.lookup("dreactor.protocol.RawTcpClient"); | |
174 } | |
175 | |
176 this(Vat sel, Event evts = Event.Read) | |
177 { | |
178 AsyncSocketConduit clcond = new AsyncSocketConduit; | |
179 Dispatcher ch = new Dispatcher(clcond); | |
180 this(ch, sel, evts); | |
181 } | |
182 | |
183 ~this() | |
184 { | |
185 (cast(AsyncSocketConduit)manager.transport).shutdown(); | |
186 (cast(AsyncSocketConduit)manager.transport).detach(); | |
187 } | |
188 | |
189 int connect(IPv4Address addr) | |
190 { | |
191 (cast(AsyncSocketConduit) manager.transport()).connect(addr); | |
192 vat.addConnection(manager); | |
193 connected = true; | |
194 log.info("connected to {}", addr); | |
195 return 0; | |
196 } | |
197 | |
198 /************************************************************************** | |
199 | |
200 send | |
201 User-called function to send data to the counterpart at the other | |
202 end of the connection. This sets up the connection manager to send | |
203 data as the socket becomes free. | |
204 | |
205 **************************************************************************/ | |
206 int send(char[] outbuf, IPv4Address addr = null) | |
207 { | |
208 if (!connected) | |
209 { | |
210 log.info("send: not connected, connecting"); | |
211 if (addr !is null) | |
212 { | |
213 if (0 > connect(addr)) | |
214 { | |
215 log.error("send: unable to connect"); | |
216 return -1; | |
217 } | |
218 } | |
219 } | |
220 if (manager.appendOutBuffer(outbuf)) | |
221 { | |
222 manager.addEvent(Event.Write); | |
223 if (!vat.addConnection(manager)) | |
224 { | |
225 log.error("unable to register mgr"); | |
226 } | |
227 } | |
228 return 0; | |
229 } | |
230 | |
231 /************************************************************************** | |
232 | |
233 receive | |
234 IncomingHandlerD | |
235 Default incoming data handler. Should be replaced with something useful. | |
236 | |
237 **************************************************************************/ | |
238 public int onReceive(Dispatcher h) | |
239 { | |
240 Logger log = Log.lookup("Handlers.onReceive"); | |
241 | |
242 char inbuf[8192]; | |
243 int amt; | |
244 if((amt = h.transport.read(inbuf)) > 0) | |
245 { | |
246 if (dataHandler) | |
247 dataHandler(inbuf[0 .. amt], h); | |
248 else | |
249 log.info("Received {} byte Buffer: {}", amt, inbuf[0 .. amt]); | |
250 } | |
251 else | |
252 { | |
253 if (amt == 0) | |
254 { | |
255 return CLOSE; | |
256 } | |
257 log.error("Received no data, err = {}", amt); | |
258 } | |
259 return REMAIN; | |
260 } | |
261 | |
262 void setDataHandler(void delegate(char[], Dispatcher) h) | |
263 { | |
264 dataHandler = h; | |
265 } | |
266 private | |
267 void delegate(char[], Dispatcher) dataHandler; | |
268 Dispatcher manager; | |
269 Vat vat; | |
270 bool connected; | |
271 Logger log; | |
272 RawTCPHandler h; | |
273 } | |
274 | |
275 | |
276 /****************************************************************************** | |
277 | |
278 Default Event handlers common to both listener/clients | |
279 | |
280 ******************************************************************************/ | |
281 struct RawTCPHandler | |
282 { | |
283 /************************************************************************** | |
284 | |
285 onSend | |
286 OutgoingHandlerD | |
287 To be registered as the response to socket writable event. | |
288 Sends data, returns amount sent. Unregisters Handler for sending | |
289 if there is no more data left to send. | |
290 | |
291 ***************************************************************************/ | |
292 public static int onSend(Dispatcher h) | |
293 { | |
294 Logger log = Log.lookup("Handlers.onSend"); | |
295 | |
296 char[] outbuf = h.nextBuffer(); | |
297 if (outbuf !is null) | |
298 { | |
299 int sent = h.transport.write(outbuf); | |
300 if (sent > 0) | |
301 { | |
302 if (! h.addOffset(sent)) | |
303 { | |
304 h.remEvent(Event.Write); | |
305 return REREGISTER; | |
306 } | |
307 } | |
308 else if (sent == AsyncSocketConduit.Eof) | |
309 { | |
310 log.error("Select said socket was writable, but sent 0 bytes"); | |
311 } | |
312 else | |
313 { | |
314 log.error("Socket send return ERR"); | |
315 } | |
316 return REMAIN; | |
317 } | |
318 else | |
319 { | |
320 h.remEvent(Event.Write); | |
321 return REREGISTER; | |
322 } | |
323 } | |
324 | |
325 static int onHangup(Dispatcher d) | |
326 { | |
327 return UNREGISTER; | |
328 } | |
329 | |
330 static int onError(Dispatcher d, RegisterD unreg) | |
331 { | |
332 return CLOSE; | |
333 } | |
334 | |
335 } | |
336 | |
337 bool includes(Dispatcher[] haystack, Dispatcher needle) | |
338 { | |
339 foreach(Dispatcher h; haystack) | |
340 { | |
341 if (h is needle) | |
342 return true; | |
343 } | |
344 return false; | |
345 } |