11
|
1 module dreactor.protocol.Dispatcher;
|
0
|
2
|
|
3 import tango.io.selector.model.ISelector;
|
|
4 import tango.util.collection.CircularSeq;
|
3
|
5 import tango.net.Socket;
|
|
6 public import tango.core.Exception;
|
|
7 import dreactor.transport.AsyncSocketConduit;
|
0
|
8
|
3
|
9 import tango.util.log.Log;
|
|
10 import tango.util.log.Config;
|
0
|
11
|
6
|
12 class Dispatcher
|
0
|
13 {
|
11
|
14 public
|
|
15 this (Conduit trans)
|
0
|
16 {
|
11
|
17 cond = trans;
|
0
|
18 ibuf_len = 0;
|
|
19 o_offset = 0;
|
3
|
20 out_buffers = new CircularSeq!(char[]);
|
6
|
21 log = Log.lookup("dreactor.core.Dispatcher");
|
0
|
22 }
|
9
|
23
|
11
|
24 /**************************************************************************
|
0
|
25
|
11
|
26 onSend -- Send method
|
|
27 Called by the vat in response to a FD writeable event.
|
|
28 Sends data, returns amount sent. Unregisters Handler for sending
|
|
29 if there is no more data left to send.
|
0
|
30
|
11
|
31 ***************************************************************************/
|
|
32 public int onSend()
|
0
|
33 {
|
11
|
34 Logger log = Log.lookup("Handlers.onSend");
|
|
35
|
|
36 char[] outbuf = nextBuffer();
|
|
37 if (outbuf !is null)
|
0
|
38 {
|
11
|
39 int sent = cond.write(outbuf);
|
|
40 if (sent > 0)
|
|
41 {
|
|
42 if (! addOffset(sent))
|
|
43 {
|
|
44 return UNREGISTER;
|
|
45 }
|
|
46 }
|
|
47 else if (sent == 0)
|
|
48 {
|
|
49 log.error("Select said socket was writable, but sent 0 bytes");
|
|
50 }
|
|
51 else
|
|
52 {
|
|
53 log.error("Socket send return ERR {}", sent);
|
|
54 }
|
|
55 return REMAIN;
|
0
|
56 }
|
11
|
57 else
|
0
|
58 {
|
11
|
59 return UNREGISTER;
|
0
|
60 }
|
|
61 }
|
|
62
|
|
63 /**************************************************************************
|
|
64
|
|
65 appendOutBuffer
|
|
66
|
|
67 Adds an outgoing buffer to the list. This returns true if the list
|
|
68 was empty, indicating that the handler should be registered with the
|
|
69 SelectLoop. If it returns false, it was probably already registered.
|
|
70
|
|
71 **************************************************************************/
|
11
|
72 bool appendOutBuffer(char[] outbuf)
|
0
|
73 {
|
|
74 out_buffers.append(outbuf);
|
|
75 out_buffers_len++;
|
|
76 if (out_buffers_len == 1)
|
|
77 return true;
|
|
78 else
|
|
79 return false;
|
|
80 }
|
11
|
81
|
0
|
82 /**************************************************************************
|
|
83
|
|
84 addOffset
|
|
85 Use this function to update the offset position after a successful data
|
|
86 send. This not only manages the current offset, but will update the
|
|
87 out buffer chain if necessary.
|
|
88
|
|
89 Returns: false if there is nothing left to send, true if there is.
|
|
90
|
|
91 **************************************************************************/
|
11
|
92 bool addOffset(int off)
|
0
|
93 in
|
|
94 {
|
|
95 assert(out_buffers_len > 0);
|
|
96 }
|
|
97 body
|
|
98 {
|
|
99 char[] hd = out_buffers.head();
|
|
100 if ((off + o_offset) >= hd.length)
|
|
101 {
|
|
102 out_buffers.removeHead();
|
|
103 o_offset = 0;
|
|
104 out_buffers_len--;
|
|
105 return (out_buffers_len > 0);
|
|
106 }
|
|
107 else
|
|
108 o_offset += off;
|
|
109 return true;
|
|
110 }
|
11
|
111
|
0
|
112 /**************************************************************************
|
|
113
|
|
114 char[] nextBuffer
|
|
115
|
|
116 Returns a slice of the current outbound buffer, returns a char[] pointing
|
|
117 to null if there is no current outbound buffer
|
|
118
|
|
119 **************************************************************************/
|
|
120 synchronized char[] nextBuffer()
|
|
121 {
|
|
122 if (out_buffers_len < 1)
|
|
123 {
|
|
124 return null;
|
|
125 }
|
|
126
|
|
127 return out_buffers.head()[o_offset .. $];
|
|
128 }
|
2
|
129
|
11
|
130 Conduit cond;
|
0
|
131 CircularSeq!(char[]) out_buffers;
|
3
|
132 int out_buffers_len;
|
0
|
133 int ibuf_len;
|
|
134 int o_offset;
|
3
|
135 Logger log;
|
0
|
136 }
|