132
|
1 /*******************************************************************************
|
|
2
|
|
3 copyright: Copyright (c) 2004 Kris Bell. All rights reserved
|
|
4
|
|
5 license: BSD style: $(LICENSE)
|
|
6
|
|
7 version: July 2004: Initial release
|
|
8
|
|
9 author: Kris
|
|
10
|
|
11 *******************************************************************************/
|
|
12
|
|
13 module tango.net.cluster.tina.ClusterQueue;
|
|
14
|
|
15 private import tango.core.Thread;
|
|
16
|
|
17 private import tango.stdc.stdlib : alloca;
|
|
18
|
|
19 private import tango.net.cluster.tina.Cluster,
|
|
20 tango.net.cluster.tina.QueueFile,
|
|
21 tango.net.cluster.tina.ClusterTypes;
|
|
22
|
|
23 /******************************************************************************
|
|
24
|
|
25 ******************************************************************************/
|
|
26
|
|
27 class ClusterQueue
|
|
28 {
|
|
29 private Logger log;
|
|
30 private uint used,
|
|
31 limit;
|
|
32 private double sleep;
|
|
33 private Thread thread;
|
|
34 private Cluster cluster;
|
|
35
|
|
36 /**********************************************************************
|
|
37
|
|
38 **********************************************************************/
|
|
39
|
|
40 abstract void watchdog ();
|
|
41
|
|
42 /**********************************************************************
|
|
43
|
|
44 **********************************************************************/
|
|
45
|
|
46 abstract ClusterContent get (char[] name);
|
|
47
|
|
48 /**********************************************************************
|
|
49
|
|
50 **********************************************************************/
|
|
51
|
|
52 abstract bool put (char[] name, ClusterContent content);
|
|
53
|
|
54 /**********************************************************************
|
|
55
|
|
56 **********************************************************************/
|
|
57
|
|
58 this (Cluster cluster, uint limit, double sleep)
|
|
59 {
|
|
60 thread = new Thread (&run);
|
|
61
|
|
62 log = cluster.log;
|
|
63 this.limit = limit;
|
|
64 this.sleep = sleep;
|
|
65 this.cluster = cluster;
|
|
66
|
|
67 thread.start;
|
|
68 }
|
|
69
|
|
70 /**********************************************************************
|
|
71
|
|
72 **********************************************************************/
|
|
73
|
|
74 final void publish (IChannel channel)
|
|
75 {
|
|
76 log.info ("publishing queue channel '" ~ channel.name ~ "'");
|
|
77 channel.broadcast;
|
|
78 }
|
|
79
|
|
80 /**********************************************************************
|
|
81
|
|
82 **********************************************************************/
|
|
83
|
|
84 private void run ()
|
|
85 {
|
|
86 while (true)
|
|
87 {
|
|
88 Thread.sleep (sleep);
|
|
89
|
|
90 try {
|
|
91 watchdog;
|
|
92 } catch (Object x)
|
|
93 log.error ("queue-publisher: "~x.toString);
|
|
94 }
|
|
95 }
|
|
96 }
|
|
97
|
|
98
|
|
99
|
|
100 /******************************************************************************
|
|
101
|
|
102
|
|
103 ******************************************************************************/
|
|
104
|
|
105 class PersistQueue : ClusterQueue
|
|
106 {
|
|
107 private QueueFile[char[]] queueSet;
|
|
108 private QueueFile[] queueList;
|
|
109
|
|
110 /**********************************************************************
|
|
111
|
|
112 **********************************************************************/
|
|
113
|
|
114 this (Cluster cluster, uint limit, double sleep)
|
|
115 {
|
|
116 super (cluster, limit, sleep);
|
|
117 }
|
|
118
|
|
119 /**********************************************************************
|
|
120
|
|
121 **********************************************************************/
|
|
122
|
|
123 final synchronized QueueFile lookup (char[] name)
|
|
124 {
|
|
125 auto p = name in queueSet;
|
|
126 if (p is null)
|
|
127 {
|
|
128 // name is currently a reference only; copy it
|
|
129 name = name.dup;
|
|
130
|
|
131 log.trace ("creating new queue for channel '" ~ name ~ "'");
|
|
132
|
|
133 // place new ChannelQueue into the list
|
|
134 auto queue = new QueueFile (log, cluster.createChannel(name), limit);
|
|
135 queueSet[name] = queue;
|
|
136 queueList ~= queue;
|
|
137 return queue;
|
|
138 }
|
|
139
|
|
140 return *p;
|
|
141 }
|
|
142
|
|
143 /**********************************************************************
|
|
144
|
|
145 **********************************************************************/
|
|
146
|
|
147 final bool put (char[] name, ClusterContent content)
|
|
148 {
|
|
149 // stuff content into the appropriate queue
|
|
150 auto queue = lookup (name);
|
|
151 auto ret = queue.push (content);
|
|
152
|
|
153 // notify immediately if we just transitioned from 0
|
|
154 if (ret && queue.size is 1)
|
|
155 publish (queue.channel);
|
|
156
|
|
157 return ret;
|
|
158 }
|
|
159
|
|
160 /**********************************************************************
|
|
161
|
|
162 **********************************************************************/
|
|
163
|
|
164 final ClusterContent get (char[] name)
|
|
165 {
|
|
166 return cast(ClusterContent) lookup(name).pop;
|
|
167 }
|
|
168
|
|
169 /**********************************************************************
|
|
170
|
|
171 Workaround for a compiler bug in 0.018
|
|
172
|
|
173 **********************************************************************/
|
|
174
|
|
175 private final synchronized void copy (QueueFile[] dst, QueueFile[] src)
|
|
176 {
|
|
177 dst[] = src;
|
|
178 }
|
|
179
|
|
180 /**********************************************************************
|
|
181
|
|
182 **********************************************************************/
|
|
183
|
|
184 final void watchdog ()
|
|
185 {
|
|
186 auto len = queueList.length;
|
|
187 auto list = (cast(QueueFile*) alloca(len * QueueFile.sizeof))[0..len];
|
|
188
|
|
189 // clone the list of queues to avoid stalling everything
|
|
190 copy (list, queueList);
|
|
191
|
|
192 // synchronized (this)
|
|
193 // list[] = queueList;
|
|
194
|
|
195 foreach (q; list)
|
|
196 {
|
|
197 if (q.size)
|
|
198 publish (q.channel);
|
|
199
|
|
200 if (q.isDirty)
|
|
201 {
|
|
202 q.flush;
|
|
203 log.info ("flushed "~q.channel.name~" to disk");
|
|
204 }
|
|
205 }
|
|
206 }
|
|
207 }
|
|
208
|
|
209
|
|
210 /+
|
|
211
|
|
212 /******************************************************************************
|
|
213
|
|
214 ******************************************************************************/
|
|
215
|
|
216 class MemoryQueue : ClusterQueue
|
|
217 {
|
|
218 private HashMap queueSet;
|
|
219
|
|
220 /**********************************************************************
|
|
221
|
|
222 **********************************************************************/
|
|
223
|
|
224 this (Cluster cluster, uint limit, Interval sleep)
|
|
225 {
|
|
226 queueSet = new HashMap (256);
|
|
227 super (cluster, limit, sleep);
|
|
228 }
|
|
229
|
|
230 /**********************************************************************
|
|
231
|
|
232 **********************************************************************/
|
|
233
|
|
234 final ChannelQueue lookup (char[] channel)
|
|
235 {
|
|
236 return cast(ChannelQueue) queueSet.get (channel);
|
|
237 }
|
|
238
|
|
239 /**********************************************************************
|
|
240
|
|
241 **********************************************************************/
|
|
242
|
|
243 bool put (char[] name, ClusterContent content)
|
|
244 {
|
|
245 if ((used + content.length) < limit)
|
|
246 {
|
|
247 // select the appropriate queue
|
|
248 auto queue = lookup (name);
|
|
249 if (queue is null)
|
|
250 {
|
|
251 // name is currently a reference only; copy it
|
|
252 name = name.dup;
|
|
253
|
|
254 log.trace ("creating new queue for channel '" ~ name ~ "'");
|
|
255
|
|
256 // place new ChannelQueue into the list
|
|
257 queueSet.put (name, queue = new ChannelQueue (cluster.createChannel (name)));
|
|
258 }
|
|
259
|
|
260 queue.put (cast (ClusterContent) content.dup);
|
|
261 used += content.length;
|
|
262 return true;
|
|
263 }
|
|
264 return false;
|
|
265 }
|
|
266
|
|
267 /**********************************************************************
|
|
268
|
|
269 **********************************************************************/
|
|
270
|
|
271 synchronized ClusterContent get (char[] name)
|
|
272 {
|
|
273 ClusterContent ret = null;
|
|
274 auto queue = lookup (name);
|
|
275
|
|
276 if (queue)
|
|
277 {
|
|
278 ret = queue.get;
|
|
279 used -= ret.length;
|
|
280 }
|
|
281 return ret;
|
|
282 }
|
|
283
|
|
284 /**********************************************************************
|
|
285
|
|
286 **********************************************************************/
|
|
287
|
|
288 void watchdog ()
|
|
289 {
|
|
290 foreach (char[] k, Object o; queueSet)
|
|
291 {
|
|
292 auto q = cast(ChannelQueue) o;
|
|
293 if (q.count)
|
|
294 publish (q.channel);
|
|
295 }
|
|
296 }
|
|
297 }
|
|
298
|
|
299
|
|
300 /******************************************************************************
|
|
301
|
|
302 ******************************************************************************/
|
|
303
|
|
304 private class ChannelQueue
|
|
305 {
|
|
306 private Link head, // head of the Queue
|
|
307 tail; // tail of the Queue
|
|
308 private int count; // number of items present
|
|
309 IChannel channel; // Queue channel
|
|
310
|
|
311 /**********************************************************************
|
|
312
|
|
313 **********************************************************************/
|
|
314
|
|
315 private static class Link
|
|
316 {
|
|
317 Link prev,
|
|
318 next;
|
|
319 ClusterContent data;
|
|
320
|
|
321 static Link freeList;
|
|
322
|
|
323 /**************************************************************
|
|
324
|
|
325 **************************************************************/
|
|
326
|
|
327 Link append (Link after)
|
|
328 {
|
|
329 if (after)
|
|
330 {
|
|
331 next = after.next;
|
|
332
|
|
333 // patch 'next' to point at me
|
|
334 if (next)
|
|
335 next.prev = this;
|
|
336
|
|
337 //patch 'after' to point at me
|
|
338 prev = after;
|
|
339 after.next = this;
|
|
340 }
|
|
341 return this;
|
|
342 }
|
|
343
|
|
344 /**************************************************************
|
|
345
|
|
346 **************************************************************/
|
|
347
|
|
348 Link unlink ()
|
|
349 {
|
|
350 // make 'prev' and 'next' entries see each other
|
|
351 if (prev)
|
|
352 prev.next = next;
|
|
353
|
|
354 if (next)
|
|
355 next.prev = prev;
|
|
356
|
|
357 // Murphy's law
|
|
358 next = prev = null;
|
|
359 return this;
|
|
360 }
|
|
361
|
|
362 /**************************************************************
|
|
363
|
|
364 **************************************************************/
|
|
365
|
|
366 Link create ()
|
|
367 {
|
|
368 Link l;
|
|
369
|
|
370 if (freeList)
|
|
371 {
|
|
372 l = freeList;
|
|
373 freeList = l.next;
|
|
374 }
|
|
375 else
|
|
376 l = new Link;
|
|
377 return l;
|
|
378 }
|
|
379
|
|
380 /**************************************************************
|
|
381
|
|
382 **************************************************************/
|
|
383
|
|
384 void destroy ()
|
|
385 {
|
|
386 next = freeList;
|
|
387 freeList = this;
|
|
388 this.data = null;
|
|
389 }
|
|
390 }
|
|
391
|
|
392
|
|
393 /**********************************************************************
|
|
394
|
|
395 **********************************************************************/
|
|
396
|
|
397 this (IChannel channel)
|
|
398 {
|
|
399 head = tail = new Link;
|
|
400 this.channel = channel;
|
|
401 }
|
|
402
|
|
403 /**********************************************************************
|
|
404
|
|
405 Add the specified content to the queue at the current
|
|
406 tail position, and bump tail to the next Link
|
|
407
|
|
408 **********************************************************************/
|
|
409
|
|
410 void put (ClusterContent content)
|
|
411 {
|
|
412 tail.data = content;
|
|
413 tail = tail.create.append (tail);
|
|
414 ++count;
|
|
415 }
|
|
416
|
|
417 /**********************************************************************
|
|
418
|
|
419 Extract from the head, which is the oldest item in the
|
|
420 queue. The removed Link is then appended to the tail,
|
|
421 ready for another put. Head is adjusted to point at the
|
|
422 next valid queue entry.
|
|
423
|
|
424 **********************************************************************/
|
|
425
|
|
426 ClusterContent get ()
|
|
427 {
|
|
428 if (head !is tail)
|
|
429 {
|
|
430 auto l = head;
|
|
431 head = head.next;
|
|
432 auto ret = l.data;
|
|
433 l.unlink;
|
|
434 l.destroy;
|
|
435 --count;
|
|
436 return ret;
|
|
437 }
|
|
438 return null;
|
|
439 }
|
|
440 }
|
|
441
|
|
442 +/
|