comparison tango/tango/net/cluster/tina/ClusterQueue.d @ 132:1700239cab2e trunk

[svn r136] MAJOR UNSTABLE UPDATE!!! Initial commit after moving to Tango instead of Phobos. Lots of bugfixes... This build is not suitable for most things.
author lindquist
date Fri, 11 Jan 2008 17:57:40 +0100
parents
children
comparison
equal deleted inserted replaced
131:5825d48b27d1 132:1700239cab2e
1 /*******************************************************************************
2
3 copyright: Copyright (c) 2004 Kris Bell. All rights reserved
4
5 license: BSD style: $(LICENSE)
6
7 version: July 2004: Initial release
8
9 author: Kris
10
11 *******************************************************************************/
12
13 module tango.net.cluster.tina.ClusterQueue;
14
15 private import tango.core.Thread;
16
17 private import tango.stdc.stdlib : alloca;
18
19 private import tango.net.cluster.tina.Cluster,
20 tango.net.cluster.tina.QueueFile,
21 tango.net.cluster.tina.ClusterTypes;
22
23 /******************************************************************************
24
25 ******************************************************************************/
26
27 class ClusterQueue
28 {
29 private Logger log;
30 private uint used,
31 limit;
32 private double sleep;
33 private Thread thread;
34 private Cluster cluster;
35
36 /**********************************************************************
37
38 **********************************************************************/
39
40 abstract void watchdog ();
41
42 /**********************************************************************
43
44 **********************************************************************/
45
46 abstract ClusterContent get (char[] name);
47
48 /**********************************************************************
49
50 **********************************************************************/
51
52 abstract bool put (char[] name, ClusterContent content);
53
54 /**********************************************************************
55
56 **********************************************************************/
57
58 this (Cluster cluster, uint limit, double sleep)
59 {
60 thread = new Thread (&run);
61
62 log = cluster.log;
63 this.limit = limit;
64 this.sleep = sleep;
65 this.cluster = cluster;
66
67 thread.start;
68 }
69
70 /**********************************************************************
71
72 **********************************************************************/
73
74 final void publish (IChannel channel)
75 {
76 log.info ("publishing queue channel '" ~ channel.name ~ "'");
77 channel.broadcast;
78 }
79
80 /**********************************************************************
81
82 **********************************************************************/
83
84 private void run ()
85 {
86 while (true)
87 {
88 Thread.sleep (sleep);
89
90 try {
91 watchdog;
92 } catch (Object x)
93 log.error ("queue-publisher: "~x.toString);
94 }
95 }
96 }
97
98
99
100 /******************************************************************************
101
102
103 ******************************************************************************/
104
105 class PersistQueue : ClusterQueue
106 {
107 private QueueFile[char[]] queueSet;
108 private QueueFile[] queueList;
109
110 /**********************************************************************
111
112 **********************************************************************/
113
114 this (Cluster cluster, uint limit, double sleep)
115 {
116 super (cluster, limit, sleep);
117 }
118
119 /**********************************************************************
120
121 **********************************************************************/
122
123 final synchronized QueueFile lookup (char[] name)
124 {
125 auto p = name in queueSet;
126 if (p is null)
127 {
128 // name is currently a reference only; copy it
129 name = name.dup;
130
131 log.trace ("creating new queue for channel '" ~ name ~ "'");
132
133 // place new ChannelQueue into the list
134 auto queue = new QueueFile (log, cluster.createChannel(name), limit);
135 queueSet[name] = queue;
136 queueList ~= queue;
137 return queue;
138 }
139
140 return *p;
141 }
142
143 /**********************************************************************
144
145 **********************************************************************/
146
147 final bool put (char[] name, ClusterContent content)
148 {
149 // stuff content into the appropriate queue
150 auto queue = lookup (name);
151 auto ret = queue.push (content);
152
153 // notify immediately if we just transitioned from 0
154 if (ret && queue.size is 1)
155 publish (queue.channel);
156
157 return ret;
158 }
159
160 /**********************************************************************
161
162 **********************************************************************/
163
164 final ClusterContent get (char[] name)
165 {
166 return cast(ClusterContent) lookup(name).pop;
167 }
168
169 /**********************************************************************
170
171 Workaround for a compiler bug in 0.018
172
173 **********************************************************************/
174
175 private final synchronized void copy (QueueFile[] dst, QueueFile[] src)
176 {
177 dst[] = src;
178 }
179
180 /**********************************************************************
181
182 **********************************************************************/
183
184 final void watchdog ()
185 {
186 auto len = queueList.length;
187 auto list = (cast(QueueFile*) alloca(len * QueueFile.sizeof))[0..len];
188
189 // clone the list of queues to avoid stalling everything
190 copy (list, queueList);
191
192 // synchronized (this)
193 // list[] = queueList;
194
195 foreach (q; list)
196 {
197 if (q.size)
198 publish (q.channel);
199
200 if (q.isDirty)
201 {
202 q.flush;
203 log.info ("flushed "~q.channel.name~" to disk");
204 }
205 }
206 }
207 }
208
209
210 /+
211
212 /******************************************************************************
213
214 ******************************************************************************/
215
216 class MemoryQueue : ClusterQueue
217 {
218 private HashMap queueSet;
219
220 /**********************************************************************
221
222 **********************************************************************/
223
224 this (Cluster cluster, uint limit, Interval sleep)
225 {
226 queueSet = new HashMap (256);
227 super (cluster, limit, sleep);
228 }
229
230 /**********************************************************************
231
232 **********************************************************************/
233
234 final ChannelQueue lookup (char[] channel)
235 {
236 return cast(ChannelQueue) queueSet.get (channel);
237 }
238
239 /**********************************************************************
240
241 **********************************************************************/
242
243 bool put (char[] name, ClusterContent content)
244 {
245 if ((used + content.length) < limit)
246 {
247 // select the appropriate queue
248 auto queue = lookup (name);
249 if (queue is null)
250 {
251 // name is currently a reference only; copy it
252 name = name.dup;
253
254 log.trace ("creating new queue for channel '" ~ name ~ "'");
255
256 // place new ChannelQueue into the list
257 queueSet.put (name, queue = new ChannelQueue (cluster.createChannel (name)));
258 }
259
260 queue.put (cast (ClusterContent) content.dup);
261 used += content.length;
262 return true;
263 }
264 return false;
265 }
266
267 /**********************************************************************
268
269 **********************************************************************/
270
271 synchronized ClusterContent get (char[] name)
272 {
273 ClusterContent ret = null;
274 auto queue = lookup (name);
275
276 if (queue)
277 {
278 ret = queue.get;
279 used -= ret.length;
280 }
281 return ret;
282 }
283
284 /**********************************************************************
285
286 **********************************************************************/
287
288 void watchdog ()
289 {
290 foreach (char[] k, Object o; queueSet)
291 {
292 auto q = cast(ChannelQueue) o;
293 if (q.count)
294 publish (q.channel);
295 }
296 }
297 }
298
299
300 /******************************************************************************
301
302 ******************************************************************************/
303
304 private class ChannelQueue
305 {
306 private Link head, // head of the Queue
307 tail; // tail of the Queue
308 private int count; // number of items present
309 IChannel channel; // Queue channel
310
311 /**********************************************************************
312
313 **********************************************************************/
314
315 private static class Link
316 {
317 Link prev,
318 next;
319 ClusterContent data;
320
321 static Link freeList;
322
323 /**************************************************************
324
325 **************************************************************/
326
327 Link append (Link after)
328 {
329 if (after)
330 {
331 next = after.next;
332
333 // patch 'next' to point at me
334 if (next)
335 next.prev = this;
336
337 //patch 'after' to point at me
338 prev = after;
339 after.next = this;
340 }
341 return this;
342 }
343
344 /**************************************************************
345
346 **************************************************************/
347
348 Link unlink ()
349 {
350 // make 'prev' and 'next' entries see each other
351 if (prev)
352 prev.next = next;
353
354 if (next)
355 next.prev = prev;
356
357 // Murphy's law
358 next = prev = null;
359 return this;
360 }
361
362 /**************************************************************
363
364 **************************************************************/
365
366 Link create ()
367 {
368 Link l;
369
370 if (freeList)
371 {
372 l = freeList;
373 freeList = l.next;
374 }
375 else
376 l = new Link;
377 return l;
378 }
379
380 /**************************************************************
381
382 **************************************************************/
383
384 void destroy ()
385 {
386 next = freeList;
387 freeList = this;
388 this.data = null;
389 }
390 }
391
392
393 /**********************************************************************
394
395 **********************************************************************/
396
397 this (IChannel channel)
398 {
399 head = tail = new Link;
400 this.channel = channel;
401 }
402
403 /**********************************************************************
404
405 Add the specified content to the queue at the current
406 tail position, and bump tail to the next Link
407
408 **********************************************************************/
409
410 void put (ClusterContent content)
411 {
412 tail.data = content;
413 tail = tail.create.append (tail);
414 ++count;
415 }
416
417 /**********************************************************************
418
419 Extract from the head, which is the oldest item in the
420 queue. The removed Link is then appended to the tail,
421 ready for another put. Head is adjusted to point at the
422 next valid queue entry.
423
424 **********************************************************************/
425
426 ClusterContent get ()
427 {
428 if (head !is tail)
429 {
430 auto l = head;
431 head = head.next;
432 auto ret = l.data;
433 l.unlink;
434 l.destroy;
435 --count;
436 return ret;
437 }
438 return null;
439 }
440 }
441
442 +/