Mercurial > projects > dreactor
comparison dreactor/protocol/RawTcp.d @ 10:e75a2e506b1d
housekeeping
author | rick@minifunk |
---|---|
date | Fri, 01 Aug 2008 16:30:45 -0400 |
parents | 5412a1ff2e49 |
children | 5836613d16ac |
comparison
equal
deleted
inserted
replaced
9:5412a1ff2e49 | 10:e75a2e506b1d |
---|---|
14 /****************************************************************************** | 14 /****************************************************************************** |
15 | 15 |
16 Basic TCP server or client routines for sending raw data. | 16 Basic TCP server or client routines for sending raw data. |
17 | 17 |
18 ******************************************************************************/ | 18 ******************************************************************************/ |
19 class RawTCPListener : RawTCPHandler | 19 class RawTCPListener |
20 { | 20 { |
21 public | 21 public |
22 Logger log; | |
23 this(Dispatcher mgr, Vat sel, IPv4Address addr) | 22 this(Dispatcher mgr, Vat sel, IPv4Address addr) |
24 { | 23 { |
25 manager = mgr; | 24 manager = mgr; |
26 mgr.events(Event.Read); | 25 mgr.events(Event.Read); |
27 mgr.setOutgoingHandler(&onSend); | 26 mgr.setOutgoingHandler(&RawTCPHandler.onSend); |
28 mgr.setIncomingHandler(&onReceive); | 27 mgr.setIncomingHandler(&onReceive); |
29 mgr.setConnectHandler(&accept); | 28 mgr.setConnectHandler(&accept); |
30 mgr.setErrorHandler(&onError); | 29 mgr.setErrorHandler(&RawTCPHandler.onError); |
31 mgr.setDisconnectHandler(&onHangup); | 30 mgr.setDisconnectHandler(&RawTCPHandler.onHangup); |
32 mgr.listen(addr); | 31 mgr.listen(addr); |
33 | 32 |
34 sel.addConnection(mgr); | 33 sel.addConnection(mgr); |
35 vat = sel; | 34 vat = sel; |
36 log = Log.lookup("dreactor.protocol.RawTcpServer"); | 35 log = Log.lookup("dreactor.protocol.RawTcpServer"); |
45 this(lh, sel, addr); | 44 this(lh, sel, addr); |
46 } | 45 } |
47 | 46 |
48 ~this() | 47 ~this() |
49 { | 48 { |
50 foreach(Dispatcher d; children) | 49 close(); |
51 { | |
52 (cast(AsyncSocketConduit)d.transport).shutdown(); | |
53 (cast(AsyncSocketConduit)d.transport).detach(); | |
54 } | |
55 (cast(AsyncSocketConduit)manager.transport).shutdown(); | |
56 (cast(AsyncSocketConduit)manager.transport).detach(); | |
57 } | 50 } |
58 | 51 |
59 int accept(Conduit cond, RegisterD reg) | 52 int accept(Conduit cond, RegisterD reg) |
60 { | 53 { |
61 AsyncSocketConduit newcond = new AsyncSocketConduit; | 54 AsyncSocketConduit newcond = new AsyncSocketConduit; |
94 int send(Dispatcher d, char[] outbuf, IPv4Address addr = null) | 87 int send(Dispatcher d, char[] outbuf, IPv4Address addr = null) |
95 { | 88 { |
96 if (d.appendOutBuffer(outbuf)) | 89 if (d.appendOutBuffer(outbuf)) |
97 { | 90 { |
98 d.addEvent(Event.Write); | 91 d.addEvent(Event.Write); |
99 d.setOutgoingHandler(&onSend); | |
100 if (!vat.addConnection(d)) | 92 if (!vat.addConnection(d)) |
101 { | 93 { |
102 log.error("unable to register mgr"); | 94 log.error("unable to register mgr"); |
103 } | 95 } |
104 } | 96 } |
105 return 0; | 97 return 0; |
106 } | 98 } |
107 | 99 |
108 public int onReceive(Dispatcher h) | 100 /************************************************************************** |
101 | |
102 receive | |
103 IncomingHandlerD | |
104 Default incoming data handler. Should be replaced with something useful. | |
105 | |
106 **************************************************************************/ | |
107 int onReceive(Dispatcher h) | |
109 { | 108 { |
110 Logger log = Log.lookup("Handlers.onReceive"); | 109 Logger log = Log.lookup("Handlers.onReceive"); |
111 | 110 |
112 char inbuf[8192]; | 111 char inbuf[8192]; |
113 int amt; | 112 int amt; |
131 return REMAIN; | 130 return REMAIN; |
132 } | 131 } |
133 | 132 |
134 void close() | 133 void close() |
135 { | 134 { |
135 foreach(Dispatcher d; children) | |
136 { | |
137 (cast(AsyncSocketConduit)d.transport).shutdown(); | |
138 (cast(AsyncSocketConduit)d.transport).detach(); | |
139 } | |
140 (cast(AsyncSocketConduit)manager.transport).shutdown(); | |
141 (cast(AsyncSocketConduit)manager.transport).detach(); | |
136 | 142 |
137 } | 143 } |
144 | |
145 void setDataHandler(void delegate(char[], Dispatcher) h) | |
146 { | |
147 dataHandler = h; | |
148 } | |
138 | 149 |
139 private | 150 private |
140 Dispatcher manager; | |
141 Vat vat; | 151 Vat vat; |
142 CircularSeq!(Dispatcher) children; | 152 CircularSeq!(Dispatcher) children; |
153 Dispatcher manager; | |
154 Logger log; | |
155 RawTCPHandler h; | |
156 void delegate(char[], Dispatcher) dataHandler; | |
143 } | 157 } |
144 | 158 |
145 class RawTCPClient : RawTCPHandler | 159 class RawTCPClient |
146 { | 160 { |
161 | |
147 public | 162 public |
148 Logger log; | |
149 this(Dispatcher mgr, Vat sel, Event evts = Event.Read) | 163 this(Dispatcher mgr, Vat sel, Event evts = Event.Read) |
150 { | 164 { |
151 manager = mgr; | 165 manager = mgr; |
152 manager.events(evts); | 166 manager.events(evts); |
153 connected = false; | 167 connected = false; |
154 mgr.setOutgoingHandler(&onSend); | 168 mgr.setOutgoingHandler(&RawTCPHandler.onSend); |
155 mgr.setIncomingHandler(&onReceive); | 169 mgr.setIncomingHandler(&onReceive); |
156 mgr.setErrorHandler(&onError); | 170 mgr.setErrorHandler(&RawTCPHandler.onError); |
157 mgr.setDisconnectHandler(&onHangup); | 171 mgr.setDisconnectHandler(&RawTCPHandler.onHangup); |
158 vat = sel; | 172 vat = sel; |
159 log = Log.lookup("dreactor.protocol.RawTcpClient"); | 173 log = Log.lookup("dreactor.protocol.RawTcpClient"); |
160 } | 174 } |
161 | 175 |
162 this(Vat sel, Event evts = Event.Read) | 176 this(Vat sel, Event evts = Event.Read) |
212 } | 226 } |
213 } | 227 } |
214 return 0; | 228 return 0; |
215 } | 229 } |
216 | 230 |
231 /************************************************************************** | |
232 | |
233 receive | |
234 IncomingHandlerD | |
235 Default incoming data handler. Should be replaced with something useful. | |
236 | |
237 **************************************************************************/ | |
238 public int onReceive(Dispatcher h) | |
239 { | |
240 Logger log = Log.lookup("Handlers.onReceive"); | |
241 | |
242 char inbuf[8192]; | |
243 int amt; | |
244 if((amt = h.transport.read(inbuf)) > 0) | |
245 { | |
246 if (dataHandler) | |
247 dataHandler(inbuf[0 .. amt], h); | |
248 else | |
249 log.info("Received {} byte Buffer: {}", amt, inbuf[0 .. amt]); | |
250 } | |
251 else | |
252 { | |
253 if (amt == 0) | |
254 { | |
255 return CLOSE; | |
256 } | |
257 log.error("Received no data, err = {}", amt); | |
258 } | |
259 return REMAIN; | |
260 } | |
261 | |
262 void setDataHandler(void delegate(char[], Dispatcher) h) | |
263 { | |
264 dataHandler = h; | |
265 } | |
217 private | 266 private |
267 void delegate(char[], Dispatcher) dataHandler; | |
218 Dispatcher manager; | 268 Dispatcher manager; |
219 Vat vat; | 269 Vat vat; |
220 bool connected; | 270 bool connected; |
271 Logger log; | |
272 RawTCPHandler h; | |
221 } | 273 } |
222 | 274 |
223 | 275 |
224 /****************************************************************************** | 276 /****************************************************************************** |
225 | 277 |
226 Default Event handlers common to both listener/clients | 278 Default Event handlers common to both listener/clients |
227 | 279 |
228 ******************************************************************************/ | 280 ******************************************************************************/ |
229 class RawTCPHandler | 281 struct RawTCPHandler |
230 { | 282 { |
231 /************************************************************************** | 283 /************************************************************************** |
232 | 284 |
233 onSend | 285 onSend |
234 OutgoingHandlerD | 286 OutgoingHandlerD |
235 To be registered as the response to socket writable event. | 287 To be registered as the response to socket writable event. |
236 Sends data, returns amount sent. Unregisters Handler for sending | 288 Sends data, returns amount sent. Unregisters Handler for sending |
237 if there is no more data left to send. | 289 if there is no more data left to send. |
238 | 290 |
239 ***************************************************************************/ | 291 ***************************************************************************/ |
240 public int onSend(Dispatcher h) | 292 public static int onSend(Dispatcher h) |
241 { | 293 { |
242 Logger log = Log.lookup("Handlers.onSend"); | 294 Logger log = Log.lookup("Handlers.onSend"); |
243 | 295 |
244 char[] outbuf = h.nextBuffer(); | 296 char[] outbuf = h.nextBuffer(); |
245 if (outbuf !is null) | 297 if (outbuf !is null) |
246 { | 298 { |
247 int sent = h.transport.write(outbuf); | 299 int sent = h.transport.write(outbuf); |
248 if (sent > 0) | 300 if (sent > 0) |
249 { | 301 { |
250 if (! h.addOffset(sent)) | 302 if (! h.addOffset(sent)) |
251 { | 303 { |
252 h.remEvent(Event.Write); | 304 h.remEvent(Event.Write); |
253 return REREGISTER; | 305 return REREGISTER; |
254 } | 306 } |
255 } | 307 } |
256 else if (sent == AsyncSocketConduit.Eof) | 308 else if (sent == AsyncSocketConduit.Eof) |
257 { | 309 { |
258 log.error("Select said socket was writable, but sent 0 bytes"); | 310 log.error("Select said socket was writable, but sent 0 bytes"); |
311 } | |
312 else | |
313 { | |
314 log.error("Socket send return ERR"); | |
315 } | |
316 return REMAIN; | |
259 } | 317 } |
260 else | 318 else |
261 { | 319 { |
262 log.error("Socket send return ERR"); | 320 h.remEvent(Event.Write); |
263 } | 321 return REREGISTER; |
264 return REMAIN; | 322 } |
265 } | 323 } |
266 else | 324 |
267 { | 325 static int onHangup(Dispatcher d) |
268 h.remEvent(Event.Write); | 326 { |
269 return REREGISTER; | 327 return UNREGISTER; |
270 } | 328 } |
271 } | 329 |
272 /************************************************************************** | 330 static int onError(Dispatcher d, RegisterD unreg) |
273 | 331 { |
274 receive | 332 return CLOSE; |
275 IncomingHandlerD | 333 } |
276 Default incoming data handler. Should be replaced with something useful. | 334 |
277 | |
278 **************************************************************************/ | |
279 public int onReceive(Dispatcher h) | |
280 { | |
281 Logger log = Log.lookup("Handlers.onReceive"); | |
282 | |
283 char inbuf[8192]; | |
284 int amt; | |
285 if((amt = h.transport.read(inbuf)) > 0) | |
286 { | |
287 if (dataHandler) | |
288 dataHandler(inbuf[0 .. amt], h); | |
289 else | |
290 log.info("Received {} byte Buffer: {}", amt, inbuf[0 .. amt]); | |
291 } | |
292 else | |
293 { | |
294 if (amt == 0) | |
295 { | |
296 return CLOSE; | |
297 } | |
298 log.error("Received no data, err = {}", amt); | |
299 } | |
300 return REMAIN; | |
301 } | |
302 int onHangup(Dispatcher d) | |
303 { | |
304 return UNREGISTER; | |
305 } | |
306 int onError(Dispatcher d, RegisterD unreg) | |
307 { | |
308 return CLOSE; | |
309 } | |
310 | |
311 void setDataHandler(void delegate(char[],Dispatcher) del) | |
312 { | |
313 dataHandler = del; | |
314 } | |
315 protected | |
316 void delegate(char[], Dispatcher) dataHandler; | |
317 } | 335 } |
318 | 336 |
319 bool includes(Dispatcher[] haystack, Dispatcher needle) | 337 bool includes(Dispatcher[] haystack, Dispatcher needle) |
320 { | 338 { |
321 foreach(Dispatcher h; haystack) | 339 foreach(Dispatcher h; haystack) |