Mercurial > projects > ldc
comparison tango/tango/net/cluster/tina/ClusterQueue.d @ 132:1700239cab2e trunk
[svn r136] MAJOR UNSTABLE UPDATE!!!
Initial commit after moving to Tango instead of Phobos.
Lots of bugfixes...
This build is not suitable for most things.
author | lindquist |
---|---|
date | Fri, 11 Jan 2008 17:57:40 +0100 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
131:5825d48b27d1 | 132:1700239cab2e |
---|---|
1 /******************************************************************************* | |
2 | |
3 copyright: Copyright (c) 2004 Kris Bell. All rights reserved | |
4 | |
5 license: BSD style: $(LICENSE) | |
6 | |
7 version: July 2004: Initial release | |
8 | |
9 author: Kris | |
10 | |
11 *******************************************************************************/ | |
12 | |
13 module tango.net.cluster.tina.ClusterQueue; | |
14 | |
15 private import tango.core.Thread; | |
16 | |
17 private import tango.stdc.stdlib : alloca; | |
18 | |
19 private import tango.net.cluster.tina.Cluster, | |
20 tango.net.cluster.tina.QueueFile, | |
21 tango.net.cluster.tina.ClusterTypes; | |
22 | |
23 /****************************************************************************** | |
24 | |
25 ******************************************************************************/ | |
26 | |
27 class ClusterQueue | |
28 { | |
29 private Logger log; | |
30 private uint used, | |
31 limit; | |
32 private double sleep; | |
33 private Thread thread; | |
34 private Cluster cluster; | |
35 | |
36 /********************************************************************** | |
37 | |
38 **********************************************************************/ | |
39 | |
40 abstract void watchdog (); | |
41 | |
42 /********************************************************************** | |
43 | |
44 **********************************************************************/ | |
45 | |
46 abstract ClusterContent get (char[] name); | |
47 | |
48 /********************************************************************** | |
49 | |
50 **********************************************************************/ | |
51 | |
52 abstract bool put (char[] name, ClusterContent content); | |
53 | |
54 /********************************************************************** | |
55 | |
56 **********************************************************************/ | |
57 | |
58 this (Cluster cluster, uint limit, double sleep) | |
59 { | |
60 thread = new Thread (&run); | |
61 | |
62 log = cluster.log; | |
63 this.limit = limit; | |
64 this.sleep = sleep; | |
65 this.cluster = cluster; | |
66 | |
67 thread.start; | |
68 } | |
69 | |
70 /********************************************************************** | |
71 | |
72 **********************************************************************/ | |
73 | |
74 final void publish (IChannel channel) | |
75 { | |
76 log.info ("publishing queue channel '" ~ channel.name ~ "'"); | |
77 channel.broadcast; | |
78 } | |
79 | |
80 /********************************************************************** | |
81 | |
82 **********************************************************************/ | |
83 | |
84 private void run () | |
85 { | |
86 while (true) | |
87 { | |
88 Thread.sleep (sleep); | |
89 | |
90 try { | |
91 watchdog; | |
92 } catch (Object x) | |
93 log.error ("queue-publisher: "~x.toString); | |
94 } | |
95 } | |
96 } | |
97 | |
98 | |
99 | |
100 /****************************************************************************** | |
101 | |
102 | |
103 ******************************************************************************/ | |
104 | |
105 class PersistQueue : ClusterQueue | |
106 { | |
107 private QueueFile[char[]] queueSet; | |
108 private QueueFile[] queueList; | |
109 | |
110 /********************************************************************** | |
111 | |
112 **********************************************************************/ | |
113 | |
114 this (Cluster cluster, uint limit, double sleep) | |
115 { | |
116 super (cluster, limit, sleep); | |
117 } | |
118 | |
119 /********************************************************************** | |
120 | |
121 **********************************************************************/ | |
122 | |
123 final synchronized QueueFile lookup (char[] name) | |
124 { | |
125 auto p = name in queueSet; | |
126 if (p is null) | |
127 { | |
128 // name is currently a reference only; copy it | |
129 name = name.dup; | |
130 | |
131 log.trace ("creating new queue for channel '" ~ name ~ "'"); | |
132 | |
133 // place new ChannelQueue into the list | |
134 auto queue = new QueueFile (log, cluster.createChannel(name), limit); | |
135 queueSet[name] = queue; | |
136 queueList ~= queue; | |
137 return queue; | |
138 } | |
139 | |
140 return *p; | |
141 } | |
142 | |
143 /********************************************************************** | |
144 | |
145 **********************************************************************/ | |
146 | |
147 final bool put (char[] name, ClusterContent content) | |
148 { | |
149 // stuff content into the appropriate queue | |
150 auto queue = lookup (name); | |
151 auto ret = queue.push (content); | |
152 | |
153 // notify immediately if we just transitioned from 0 | |
154 if (ret && queue.size is 1) | |
155 publish (queue.channel); | |
156 | |
157 return ret; | |
158 } | |
159 | |
160 /********************************************************************** | |
161 | |
162 **********************************************************************/ | |
163 | |
164 final ClusterContent get (char[] name) | |
165 { | |
166 return cast(ClusterContent) lookup(name).pop; | |
167 } | |
168 | |
169 /********************************************************************** | |
170 | |
171 Workaround for a compiler bug in 0.018 | |
172 | |
173 **********************************************************************/ | |
174 | |
175 private final synchronized void copy (QueueFile[] dst, QueueFile[] src) | |
176 { | |
177 dst[] = src; | |
178 } | |
179 | |
180 /********************************************************************** | |
181 | |
182 **********************************************************************/ | |
183 | |
184 final void watchdog () | |
185 { | |
186 auto len = queueList.length; | |
187 auto list = (cast(QueueFile*) alloca(len * QueueFile.sizeof))[0..len]; | |
188 | |
189 // clone the list of queues to avoid stalling everything | |
190 copy (list, queueList); | |
191 | |
192 // synchronized (this) | |
193 // list[] = queueList; | |
194 | |
195 foreach (q; list) | |
196 { | |
197 if (q.size) | |
198 publish (q.channel); | |
199 | |
200 if (q.isDirty) | |
201 { | |
202 q.flush; | |
203 log.info ("flushed "~q.channel.name~" to disk"); | |
204 } | |
205 } | |
206 } | |
207 } | |
208 | |
209 | |
210 /+ | |
211 | |
212 /****************************************************************************** | |
213 | |
214 ******************************************************************************/ | |
215 | |
216 class MemoryQueue : ClusterQueue | |
217 { | |
218 private HashMap queueSet; | |
219 | |
220 /********************************************************************** | |
221 | |
222 **********************************************************************/ | |
223 | |
224 this (Cluster cluster, uint limit, Interval sleep) | |
225 { | |
226 queueSet = new HashMap (256); | |
227 super (cluster, limit, sleep); | |
228 } | |
229 | |
230 /********************************************************************** | |
231 | |
232 **********************************************************************/ | |
233 | |
234 final ChannelQueue lookup (char[] channel) | |
235 { | |
236 return cast(ChannelQueue) queueSet.get (channel); | |
237 } | |
238 | |
239 /********************************************************************** | |
240 | |
241 **********************************************************************/ | |
242 | |
243 bool put (char[] name, ClusterContent content) | |
244 { | |
245 if ((used + content.length) < limit) | |
246 { | |
247 // select the appropriate queue | |
248 auto queue = lookup (name); | |
249 if (queue is null) | |
250 { | |
251 // name is currently a reference only; copy it | |
252 name = name.dup; | |
253 | |
254 log.trace ("creating new queue for channel '" ~ name ~ "'"); | |
255 | |
256 // place new ChannelQueue into the list | |
257 queueSet.put (name, queue = new ChannelQueue (cluster.createChannel (name))); | |
258 } | |
259 | |
260 queue.put (cast (ClusterContent) content.dup); | |
261 used += content.length; | |
262 return true; | |
263 } | |
264 return false; | |
265 } | |
266 | |
267 /********************************************************************** | |
268 | |
269 **********************************************************************/ | |
270 | |
271 synchronized ClusterContent get (char[] name) | |
272 { | |
273 ClusterContent ret = null; | |
274 auto queue = lookup (name); | |
275 | |
276 if (queue) | |
277 { | |
278 ret = queue.get; | |
279 used -= ret.length; | |
280 } | |
281 return ret; | |
282 } | |
283 | |
284 /********************************************************************** | |
285 | |
286 **********************************************************************/ | |
287 | |
288 void watchdog () | |
289 { | |
290 foreach (char[] k, Object o; queueSet) | |
291 { | |
292 auto q = cast(ChannelQueue) o; | |
293 if (q.count) | |
294 publish (q.channel); | |
295 } | |
296 } | |
297 } | |
298 | |
299 | |
300 /****************************************************************************** | |
301 | |
302 ******************************************************************************/ | |
303 | |
304 private class ChannelQueue | |
305 { | |
306 private Link head, // head of the Queue | |
307 tail; // tail of the Queue | |
308 private int count; // number of items present | |
309 IChannel channel; // Queue channel | |
310 | |
311 /********************************************************************** | |
312 | |
313 **********************************************************************/ | |
314 | |
315 private static class Link | |
316 { | |
317 Link prev, | |
318 next; | |
319 ClusterContent data; | |
320 | |
321 static Link freeList; | |
322 | |
323 /************************************************************** | |
324 | |
325 **************************************************************/ | |
326 | |
327 Link append (Link after) | |
328 { | |
329 if (after) | |
330 { | |
331 next = after.next; | |
332 | |
333 // patch 'next' to point at me | |
334 if (next) | |
335 next.prev = this; | |
336 | |
337 //patch 'after' to point at me | |
338 prev = after; | |
339 after.next = this; | |
340 } | |
341 return this; | |
342 } | |
343 | |
344 /************************************************************** | |
345 | |
346 **************************************************************/ | |
347 | |
348 Link unlink () | |
349 { | |
350 // make 'prev' and 'next' entries see each other | |
351 if (prev) | |
352 prev.next = next; | |
353 | |
354 if (next) | |
355 next.prev = prev; | |
356 | |
357 // Murphy's law | |
358 next = prev = null; | |
359 return this; | |
360 } | |
361 | |
362 /************************************************************** | |
363 | |
364 **************************************************************/ | |
365 | |
366 Link create () | |
367 { | |
368 Link l; | |
369 | |
370 if (freeList) | |
371 { | |
372 l = freeList; | |
373 freeList = l.next; | |
374 } | |
375 else | |
376 l = new Link; | |
377 return l; | |
378 } | |
379 | |
380 /************************************************************** | |
381 | |
382 **************************************************************/ | |
383 | |
384 void destroy () | |
385 { | |
386 next = freeList; | |
387 freeList = this; | |
388 this.data = null; | |
389 } | |
390 } | |
391 | |
392 | |
393 /********************************************************************** | |
394 | |
395 **********************************************************************/ | |
396 | |
397 this (IChannel channel) | |
398 { | |
399 head = tail = new Link; | |
400 this.channel = channel; | |
401 } | |
402 | |
403 /********************************************************************** | |
404 | |
405 Add the specified content to the queue at the current | |
406 tail position, and bump tail to the next Link | |
407 | |
408 **********************************************************************/ | |
409 | |
410 void put (ClusterContent content) | |
411 { | |
412 tail.data = content; | |
413 tail = tail.create.append (tail); | |
414 ++count; | |
415 } | |
416 | |
417 /********************************************************************** | |
418 | |
419 Extract from the head, which is the oldest item in the | |
420 queue. The removed Link is then appended to the tail, | |
421 ready for another put. Head is adjusted to point at the | |
422 next valid queue entry. | |
423 | |
424 **********************************************************************/ | |
425 | |
426 ClusterContent get () | |
427 { | |
428 if (head !is tail) | |
429 { | |
430 auto l = head; | |
431 head = head.next; | |
432 auto ret = l.data; | |
433 l.unlink; | |
434 l.destroy; | |
435 --count; | |
436 return ret; | |
437 } | |
438 return null; | |
439 } | |
440 } | |
441 | |
442 +/ |