132
|
1 /*******************************************************************************
|
|
2
|
|
3 copyright: Copyright (c) 2004 Kris Bell. All rights reserved
|
|
4
|
|
5 license: BSD style: $(LICENSE)
|
|
6
|
|
7 version: July 2004: Initial release
|
|
8
|
|
9 author: Kris
|
|
10
|
|
11 *******************************************************************************/
|
|
12
|
|
13 module tango.net.cluster.tina.ProtocolWriter;
|
|
14
|
|
15 private import tango.io.protocol.Writer,
|
|
16 tango.io.protocol.PickleProtocol;
|
|
17
|
|
18 private import tango.net.cluster.model.IMessage;
|
|
19
|
|
20 private import tango.net.cluster.NetworkRegistry;
|
|
21
|
|
22 private import tango.net.cluster.tina.ClusterTypes;
|
|
23
|
|
24 /*******************************************************************************
|
|
25
|
|
26 Objects passed around a cluster are prefixed with a header, so the
|
|
27 receiver can pick them apart correctly. This header consists of:
|
|
28 ---
|
|
29 * the packet size, including the header (16 bits)
|
|
30 * a command code (8 bits)
|
|
31 * a version id (8 bits)
|
|
32 * a timestamp (64 bits)
|
|
33 * length of the channel name (32 bits)
|
|
34 * the channel name
|
|
35 * length of the key (32 bits)
|
|
36 * the key
|
|
37 * an optional payload (an IMessage instance)
|
|
38 ---
|
|
39
|
|
40 Everything is written in Network order (big endian)
|
|
41
|
|
42 *******************************************************************************/
|
|
43
|
|
44 class ProtocolWriter
|
|
45 {
|
|
46 private Writer emit;
|
|
47 package IBuffer buffer;
|
|
48
|
|
49 const ubyte Version = 0x01;
|
|
50
|
|
51 /***********************************************************************
|
|
52
|
|
53 protocol commands
|
|
54
|
|
55 ***********************************************************************/
|
|
56
|
|
57 enum Command : ubyte
|
|
58 {
|
|
59 OK,
|
|
60 Exception,
|
|
61 Full,
|
|
62 Locked,
|
|
63 Add,
|
|
64 Copy,
|
|
65 Remove,
|
|
66 Load,
|
|
67 AddQueue,
|
|
68 RemoveQueue,
|
|
69 Call
|
|
70 }
|
|
71
|
|
72 /***********************************************************************
|
|
73
|
|
74 Construct a ProtocolWriter upon the given buffer. As
|
|
75 Objects are serialized, their content is written to this
|
|
76 buffer. The buffer content is then typically flushed to
|
|
77 some external conduit, such as a file or socket.
|
|
78
|
|
79 Note that serialized data is always in Network order.
|
|
80
|
|
81 ***********************************************************************/
|
|
82
|
|
83 this (IBuffer buffer)
|
|
84 {
|
|
85 assert (buffer);
|
|
86 emit = new Writer (new PickleProtocol(this.buffer = buffer));
|
|
87 }
|
|
88
|
|
89 /***********************************************************************
|
|
90
|
|
91 Stuff the request into our output buffer. Note that this
|
|
92 protocol is prefixed by a 'size' value, requiring that
|
|
93 all messages can be contained within the buffer. This is
|
|
94 not considered a serious limitation, as the messages are
|
|
95 not intended to be "large" given that they're traversing
|
|
96 the network.
|
|
97
|
|
98 ***********************************************************************/
|
|
99
|
|
100 ProtocolWriter put (Command cmd, char[] channel, char[] element = null, IMessage msg = null)
|
|
101 {
|
|
102 auto time = (msg ? msg.time : Time.init);
|
|
103
|
|
104 // reset the buffer first!
|
|
105 buffer.clear;
|
|
106
|
|
107 auto content = cast(ubyte[]) buffer.getContent;
|
|
108 emit (cast(ushort) 0)
|
|
109 (cast(ubyte) cmd)
|
|
110 (cast(ubyte) Version)
|
|
111 (cast(ulong) time.ticks)
|
|
112 (channel)
|
|
113 (element);
|
|
114
|
|
115 // is there a payload?
|
|
116 if (msg)
|
|
117 NetworkRegistry.shared.freeze (emit, msg);
|
|
118
|
|
119 // go back and write the total number of bytes
|
|
120 auto size = buffer.limit;
|
|
121 content[0] = cast(ubyte) (size >> 8);
|
|
122 content[1] = cast(ubyte) (size & 0xff);
|
|
123 return this;
|
|
124 }
|
|
125
|
|
126 /***********************************************************************
|
|
127
|
|
128 Emit a ClusterContent constructed by ProtocolReader.getPacket
|
|
129
|
|
130 ***********************************************************************/
|
|
131
|
|
132 ProtocolWriter reply (ClusterContent content)
|
|
133 {
|
|
134 uint empty = 0;
|
|
135
|
|
136 // reset the buffer first
|
|
137 buffer.clear;
|
|
138
|
|
139 // write the length, the ack, version, and timestamp
|
|
140 emit (cast(ushort) (content.length + ushort.sizeof + ubyte.sizeof + ubyte.sizeof + ulong.sizeof))
|
|
141 (cast(ubyte) ProtocolWriter.Command.OK)
|
|
142 (cast(ubyte) Version)
|
|
143 (cast(ulong) ulong.init);
|
|
144
|
|
145 // and the payload (which includes both channel & element)
|
|
146 if (content.length)
|
|
147 buffer.append (content);
|
|
148 else
|
|
149 // or filler for an empty channel & element ...
|
|
150 emit (empty) (empty);
|
|
151
|
|
152 return this;
|
|
153 }
|
|
154
|
|
155 /***********************************************************************
|
|
156
|
|
157 Write an exception message
|
|
158
|
|
159 ***********************************************************************/
|
|
160
|
|
161 ProtocolWriter exception (char[] message)
|
|
162 {
|
|
163 return put (ProtocolWriter.Command.Exception, message);
|
|
164 }
|
|
165
|
|
166 /***********************************************************************
|
|
167
|
|
168 Write an "OK" confirmation
|
|
169
|
|
170 ***********************************************************************/
|
|
171
|
|
172 ProtocolWriter success (char[] message = null)
|
|
173 {
|
|
174 return put (ProtocolWriter.Command.OK, message);
|
|
175 }
|
|
176
|
|
177 /***********************************************************************
|
|
178
|
|
179 Indicate something has filled up
|
|
180
|
|
181 ***********************************************************************/
|
|
182
|
|
183 ProtocolWriter full (char[] message)
|
|
184 {
|
|
185 return put (ProtocolWriter.Command.Full, message);
|
|
186 }
|
|
187
|
|
188 /***********************************************************************
|
|
189
|
|
190 Flush the output
|
|
191
|
|
192 ***********************************************************************/
|
|
193
|
|
194 void flush ()
|
|
195 {
|
|
196 emit.flush;
|
|
197 }
|
|
198 }
|
|
199
|