comparison dreactor/protocol/RawTcp.d @ 8:60cf25102fb2

fixing mercurial's screwup
author rick@minifunk
date Wed, 09 Jul 2008 00:56:21 -0400
parents 287ba7de97c4
children 5412a1ff2e49
comparison
equal deleted inserted replaced
7:2c6ab06a8829 8:60cf25102fb2
14 /****************************************************************************** 14 /******************************************************************************
15 15
16 Basic TCP server or client routines for sending raw data. 16 Basic TCP server or client routines for sending raw data.
17 17
18 ******************************************************************************/ 18 ******************************************************************************/
19 class RawTCPListener 19 class RawTCPListener : RawTCPHandler
20 { 20 {
21 public 21 public
22 Logger log; 22 Logger log;
23 this(Dispatcher mgr, Vat sel, IPv4Address addr) 23 this(Dispatcher mgr, Vat sel, IPv4Address addr)
24 { 24 {
25 manager = mgr; 25 manager = mgr;
26 mgr.events(Event.Read); 26 mgr.events(Event.Read);
27 mgr.setOutgoingHandler(&Handlers.onSend); 27 mgr.setOutgoingHandler(&onSend);
28 mgr.setIncomingHandler(&Handlers.onReceive); 28 mgr.setIncomingHandler(&onReceive);
29 mgr.setConnectHandler(&accept); 29 mgr.setConnectHandler(&accept);
30 mgr.setErrorHandler(&onError);
31 mgr.setDisconnectHandler(&onHangup);
30 mgr.listen(addr); 32 mgr.listen(addr);
31 33
32 sel.addConnection(mgr); 34 sel.addConnection(mgr);
33 vat = sel; 35 vat = sel;
34 log = Log.lookup("dreactor.protocol.RawTcpServer"); 36 log = Log.lookup("dreactor.protocol.RawTcpServer");
35 log.info("log initialized"); 37 log.info("log initialized");
36 children = new CircularSeq!(Dispatcher); 38 children = new CircularSeq!(Dispatcher);
37 } 39 }
40 this(Vat sel, IPv4Address addr)
41 {
42 AsyncSocketConduit cond = new AsyncSocketConduit;
43 Dispatcher lh = new Dispatcher(cond, true);
44 this(lh, sel, addr);
45 }
38 46
39 int accept(Conduit cond, RegisterD reg) 47 int accept(Conduit cond, RegisterD reg)
40 { 48 {
41 AsyncSocketConduit newcond = new AsyncSocketConduit; 49 AsyncSocketConduit newcond = new AsyncSocketConduit;
42 (cast(AsyncSocketConduit)cond).socket().accept(newcond.socket); 50 (cast(AsyncSocketConduit)cond).socket().accept(newcond.socket);
46 children.append(h); 54 children.append(h);
47 log.info("accepted new connection"); 55 log.info("accepted new connection");
48 return 0; 56 return 0;
49 } 57 }
50 58
51 int broadcast(char[] outbuf) 59 int broadcast(char[] outbuf, Dispatcher[] excluded)
52 { 60 {
53 foreach(Dispatcher h; children) 61 foreach(Dispatcher h; children)
54 { 62 {
63 if (excluded && excluded.includes(h))
64 continue;
55 if (h.appendOutBuffer(outbuf)) 65 if (h.appendOutBuffer(outbuf))
56 { 66 {
57 h.addEvent(Event.Write); 67 h.addEvent(Event.Write);
58 vat.addConnection(h); 68 vat.addConnection(h);
59 } 69 }
60 } 70 }
61 return 0; 71 return 0;
72 }
73
74 public int onReceive(Dispatcher h)
75 {
76 Logger log = Log.lookup("Handlers.onReceive");
77
78 char inbuf[8192];
79 int amt;
80 if((amt = h.transport.read(inbuf)) > 0)
81 {
82 if (dataHandler)
83 dataHandler(inbuf[0 .. amt], h);
84 else
85 log.info("Received {} byte Buffer: {}", amt, inbuf[0 .. amt]);
86 }
87 else
88 {
89 if (amt == 0)
90 {
91 children.remove(h);
92 return CLOSE;
93 }
94 log.error("Received no data, err = {}", amt);
95 }
96 return REMAIN;
62 } 97 }
63 98
64 void close() 99 void close()
65 { 100 {
66 101
70 Dispatcher manager; 105 Dispatcher manager;
71 Vat vat; 106 Vat vat;
72 CircularSeq!(Dispatcher) children; 107 CircularSeq!(Dispatcher) children;
73 } 108 }
74 109
75 class RawTCPClient 110 class RawTCPClient : RawTCPHandler
76 { 111 {
77 public 112 public
78 Logger log; 113 Logger log;
79 this(Dispatcher mgr, Vat sel, Event evts = Event.Read) 114 this(Dispatcher mgr, Vat sel, Event evts = Event.Read)
80 { 115 {
81 manager = mgr; 116 manager = mgr;
82 manager.events(evts); 117 manager.events(evts);
83 connected = false; 118 connected = false;
84 mgr.setOutgoingHandler(&Handlers.onSend); 119 mgr.setOutgoingHandler(&onSend);
85 mgr.setIncomingHandler(&Handlers.onReceive); 120 mgr.setIncomingHandler(&onReceive);
121 mgr.setErrorHandler(&onError);
122 mgr.setDisconnectHandler(&onHangup);
86 vat = sel; 123 vat = sel;
87 log = Log.lookup("dreactor.protocol.RawTcpClient"); 124 log = Log.lookup("dreactor.protocol.RawTcpClient");
125 }
126 this(Vat sel, Event evts = Event.Read)
127 {
128 AsyncSocketConduit clcond = new AsyncSocketConduit;
129 Dispatcher ch = new Dispatcher(clcond);
130 this(ch, sel, evts);
88 } 131 }
89 132
90 int connect(IPv4Address addr) 133 int connect(IPv4Address addr)
91 { 134 {
92 (cast(AsyncSocketConduit) manager.transport()).connect(addr); 135 (cast(AsyncSocketConduit) manager.transport()).connect(addr);
139 /****************************************************************************** 182 /******************************************************************************
140 183
141 Default Event handlers common to both listener/clients 184 Default Event handlers common to both listener/clients
142 185
143 ******************************************************************************/ 186 ******************************************************************************/
144 class Handlers 187 class RawTCPHandler
145 { 188 {
146 /************************************************************************** 189 /**************************************************************************
147 190
148 onSend 191 onSend
149 OutgoingHandlerD 192 OutgoingHandlerD
150 To be registered as the response to socket writable event. 193 To be registered as the response to socket writable event.
151 Sends data, returns amount sent. Unregisters Handler for sending 194 Sends data, returns amount sent. Unregisters Handler for sending
152 if there is no more data left to send. 195 if there is no more data left to send.
153 196
154 ***************************************************************************/ 197 ***************************************************************************/
155 public static int onSend(Dispatcher h) 198 public int onSend(Dispatcher h)
156 { 199 {
157 Logger log = Log.lookup("Handlers.onSend"); 200 Logger log = Log.lookup("Handlers.onSend");
158 201
159 log.info("top of onSend"); 202 log.info("top of onSend");
160 char[] outbuf = h.nextBuffer(); 203 char[] outbuf = h.nextBuffer();
190 receive 233 receive
191 IncomingHandlerD 234 IncomingHandlerD
192 Default incoming data handler. Should be replaced with something useful. 235 Default incoming data handler. Should be replaced with something useful.
193 236
194 **************************************************************************/ 237 **************************************************************************/
195 public static int onReceive(Dispatcher h) 238 public int onReceive(Dispatcher h)
196 { 239 {
197 Logger log = Log.lookup("Handlers.onReceive"); 240 Logger log = Log.lookup("Handlers.onReceive");
198 241
199 char inbuf[8192]; 242 char inbuf[8192];
200 int amt; 243 int amt;
201 if((amt = h.transport.read(inbuf)) > 0) 244 if((amt = h.transport.read(inbuf)) > 0)
202 log.info("Received {} byte Buffer: {}", amt, inbuf[0 .. amt]); 245 {
203 else 246 if (dataHandler)
247 dataHandler(inbuf[0 .. amt], h);
248 else
249 log.info("Received {} byte Buffer: {}", amt, inbuf[0 .. amt]);
250 }
251 else
252 {
253 if (amt == 0)
254 {
255 return CLOSE;
256 }
204 log.error("Received no data, err = {}", amt); 257 log.error("Received no data, err = {}", amt);
205 258 }
206 return REMAIN; 259 return REMAIN;
207 } 260 }
208 } 261 int onHangup(Dispatcher d)
262 {
263 return UNREGISTER;
264 }
265 int onError(Dispatcher d, RegisterD unreg)
266 {
267 return CLOSE;
268 }
269
270 void setDataHandler(void delegate(char[],Dispatcher) del)
271 {
272 dataHandler = del;
273 }
274 protected
275 void delegate(char[], Dispatcher) dataHandler;
276 }
277
278 bool includes(Dispatcher[] haystack, Dispatcher needle)
279 {
280 foreach(Dispatcher h; haystack)
281 {
282 if (h is needle)
283 return true;
284 }
285 return false;
286 }