annotate tango/tango/net/cluster/tina/ClusterQueue.d @ 132:1700239cab2e trunk

[svn r136] MAJOR UNSTABLE UPDATE!!! Initial commit after moving to Tango instead of Phobos. Lots of bugfixes... This build is not suitable for most things.
author lindquist
date Fri, 11 Jan 2008 17:57:40 +0100
parents
children
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
132
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1 /*******************************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
2
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
3 copyright: Copyright (c) 2004 Kris Bell. All rights reserved
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
4
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
5 license: BSD style: $(LICENSE)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
6
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
7 version: July 2004: Initial release
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
8
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
9 author: Kris
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
10
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
11 *******************************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
12
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
13 module tango.net.cluster.tina.ClusterQueue;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
14
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
15 private import tango.core.Thread;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
16
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
17 private import tango.stdc.stdlib : alloca;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
18
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
19 private import tango.net.cluster.tina.Cluster,
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
20 tango.net.cluster.tina.QueueFile,
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
21 tango.net.cluster.tina.ClusterTypes;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
22
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
23 /******************************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
24
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
25 ******************************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
26
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
27 class ClusterQueue
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
28 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
29 private Logger log;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
30 private uint used,
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
31 limit;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
32 private double sleep;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
33 private Thread thread;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
34 private Cluster cluster;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
35
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
36 /**********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
37
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
38 **********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
39
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
40 abstract void watchdog ();
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
41
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
42 /**********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
43
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
44 **********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
45
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
46 abstract ClusterContent get (char[] name);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
47
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
48 /**********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
49
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
50 **********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
51
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
52 abstract bool put (char[] name, ClusterContent content);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
53
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
54 /**********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
55
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
56 **********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
57
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
58 this (Cluster cluster, uint limit, double sleep)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
59 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
60 thread = new Thread (&run);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
61
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
62 log = cluster.log;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
63 this.limit = limit;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
64 this.sleep = sleep;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
65 this.cluster = cluster;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
66
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
67 thread.start;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
68 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
69
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
70 /**********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
71
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
72 **********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
73
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
74 final void publish (IChannel channel)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
75 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
76 log.info ("publishing queue channel '" ~ channel.name ~ "'");
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
77 channel.broadcast;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
78 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
79
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
80 /**********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
81
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
82 **********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
83
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
84 private void run ()
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
85 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
86 while (true)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
87 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
88 Thread.sleep (sleep);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
89
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
90 try {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
91 watchdog;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
92 } catch (Object x)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
93 log.error ("queue-publisher: "~x.toString);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
94 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
95 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
96 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
97
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
98
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
99
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
100 /******************************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
101
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
102
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
103 ******************************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
104
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
105 class PersistQueue : ClusterQueue
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
106 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
107 private QueueFile[char[]] queueSet;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
108 private QueueFile[] queueList;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
109
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
110 /**********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
111
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
112 **********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
113
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
114 this (Cluster cluster, uint limit, double sleep)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
115 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
116 super (cluster, limit, sleep);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
117 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
118
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
119 /**********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
120
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
121 **********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
122
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
123 final synchronized QueueFile lookup (char[] name)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
124 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
125 auto p = name in queueSet;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
126 if (p is null)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
127 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
128 // name is currently a reference only; copy it
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
129 name = name.dup;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
130
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
131 log.trace ("creating new queue for channel '" ~ name ~ "'");
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
132
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
133 // place new ChannelQueue into the list
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
134 auto queue = new QueueFile (log, cluster.createChannel(name), limit);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
135 queueSet[name] = queue;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
136 queueList ~= queue;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
137 return queue;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
138 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
139
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
140 return *p;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
141 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
142
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
143 /**********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
144
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
145 **********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
146
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
147 final bool put (char[] name, ClusterContent content)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
148 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
149 // stuff content into the appropriate queue
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
150 auto queue = lookup (name);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
151 auto ret = queue.push (content);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
152
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
153 // notify immediately if we just transitioned from 0
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
154 if (ret && queue.size is 1)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
155 publish (queue.channel);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
156
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
157 return ret;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
158 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
159
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
160 /**********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
161
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
162 **********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
163
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
164 final ClusterContent get (char[] name)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
165 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
166 return cast(ClusterContent) lookup(name).pop;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
167 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
168
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
169 /**********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
170
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
171 Workaround for a compiler bug in 0.018
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
172
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
173 **********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
174
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
175 private final synchronized void copy (QueueFile[] dst, QueueFile[] src)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
176 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
177 dst[] = src;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
178 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
179
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
180 /**********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
181
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
182 **********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
183
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
184 final void watchdog ()
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
185 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
186 auto len = queueList.length;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
187 auto list = (cast(QueueFile*) alloca(len * QueueFile.sizeof))[0..len];
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
188
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
189 // clone the list of queues to avoid stalling everything
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
190 copy (list, queueList);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
191
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
192 // synchronized (this)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
193 // list[] = queueList;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
194
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
195 foreach (q; list)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
196 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
197 if (q.size)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
198 publish (q.channel);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
199
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
200 if (q.isDirty)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
201 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
202 q.flush;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
203 log.info ("flushed "~q.channel.name~" to disk");
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
204 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
205 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
206 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
207 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
208
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
209
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
210 /+
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
211
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
212 /******************************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
213
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
214 ******************************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
215
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
216 class MemoryQueue : ClusterQueue
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
217 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
218 private HashMap queueSet;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
219
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
220 /**********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
221
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
222 **********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
223
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
224 this (Cluster cluster, uint limit, Interval sleep)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
225 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
226 queueSet = new HashMap (256);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
227 super (cluster, limit, sleep);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
228 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
229
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
230 /**********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
231
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
232 **********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
233
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
234 final ChannelQueue lookup (char[] channel)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
235 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
236 return cast(ChannelQueue) queueSet.get (channel);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
237 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
238
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
239 /**********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
240
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
241 **********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
242
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
243 bool put (char[] name, ClusterContent content)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
244 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
245 if ((used + content.length) < limit)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
246 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
247 // select the appropriate queue
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
248 auto queue = lookup (name);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
249 if (queue is null)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
250 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
251 // name is currently a reference only; copy it
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
252 name = name.dup;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
253
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
254 log.trace ("creating new queue for channel '" ~ name ~ "'");
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
255
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
256 // place new ChannelQueue into the list
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
257 queueSet.put (name, queue = new ChannelQueue (cluster.createChannel (name)));
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
258 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
259
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
260 queue.put (cast (ClusterContent) content.dup);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
261 used += content.length;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
262 return true;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
263 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
264 return false;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
265 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
266
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
267 /**********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
268
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
269 **********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
270
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
271 synchronized ClusterContent get (char[] name)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
272 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
273 ClusterContent ret = null;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
274 auto queue = lookup (name);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
275
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
276 if (queue)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
277 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
278 ret = queue.get;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
279 used -= ret.length;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
280 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
281 return ret;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
282 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
283
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
284 /**********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
285
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
286 **********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
287
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
288 void watchdog ()
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
289 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
290 foreach (char[] k, Object o; queueSet)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
291 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
292 auto q = cast(ChannelQueue) o;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
293 if (q.count)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
294 publish (q.channel);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
295 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
296 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
297 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
298
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
299
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
300 /******************************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
301
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
302 ******************************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
303
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
304 private class ChannelQueue
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
305 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
306 private Link head, // head of the Queue
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
307 tail; // tail of the Queue
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
308 private int count; // number of items present
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
309 IChannel channel; // Queue channel
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
310
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
311 /**********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
312
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
313 **********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
314
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
315 private static class Link
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
316 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
317 Link prev,
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
318 next;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
319 ClusterContent data;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
320
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
321 static Link freeList;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
322
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
323 /**************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
324
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
325 **************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
326
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
327 Link append (Link after)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
328 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
329 if (after)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
330 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
331 next = after.next;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
332
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
333 // patch 'next' to point at me
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
334 if (next)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
335 next.prev = this;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
336
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
337 //patch 'after' to point at me
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
338 prev = after;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
339 after.next = this;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
340 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
341 return this;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
342 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
343
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
344 /**************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
345
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
346 **************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
347
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
348 Link unlink ()
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
349 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
350 // make 'prev' and 'next' entries see each other
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
351 if (prev)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
352 prev.next = next;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
353
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
354 if (next)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
355 next.prev = prev;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
356
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
357 // Murphy's law
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
358 next = prev = null;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
359 return this;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
360 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
361
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
362 /**************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
363
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
364 **************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
365
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
366 Link create ()
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
367 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
368 Link l;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
369
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
370 if (freeList)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
371 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
372 l = freeList;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
373 freeList = l.next;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
374 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
375 else
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
376 l = new Link;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
377 return l;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
378 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
379
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
380 /**************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
381
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
382 **************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
383
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
384 void destroy ()
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
385 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
386 next = freeList;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
387 freeList = this;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
388 this.data = null;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
389 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
390 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
391
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
392
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
393 /**********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
394
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
395 **********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
396
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
397 this (IChannel channel)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
398 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
399 head = tail = new Link;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
400 this.channel = channel;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
401 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
402
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
403 /**********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
404
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
405 Add the specified content to the queue at the current
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
406 tail position, and bump tail to the next Link
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
407
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
408 **********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
409
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
410 void put (ClusterContent content)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
411 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
412 tail.data = content;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
413 tail = tail.create.append (tail);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
414 ++count;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
415 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
416
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
417 /**********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
418
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
419 Extract from the head, which is the oldest item in the
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
420 queue. The removed Link is then appended to the tail,
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
421 ready for another put. Head is adjusted to point at the
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
422 next valid queue entry.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
423
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
424 **********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
425
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
426 ClusterContent get ()
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
427 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
428 if (head !is tail)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
429 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
430 auto l = head;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
431 head = head.next;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
432 auto ret = l.data;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
433 l.unlink;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
434 l.destroy;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
435 --count;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
436 return ret;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
437 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
438 return null;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
439 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
440 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
441
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
442 +/