Mercurial > projects > ldc
comparison tango/tango/net/cluster/tina/Cluster.d @ 132:1700239cab2e trunk
[svn r136] MAJOR UNSTABLE UPDATE!!!
Initial commit after moving to Tango instead of Phobos.
Lots of bugfixes...
This build is not suitable for most things.
author | lindquist |
---|---|
date | Fri, 11 Jan 2008 17:57:40 +0100 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
131:5825d48b27d1 | 132:1700239cab2e |
---|---|
1 /******************************************************************************* | |
2 | |
3 copyright: Copyright (c) 2004 Kris Bell. All rights reserved | |
4 | |
5 license: BSD style: $(LICENSE) | |
6 | |
7 version: July 2004: Initial release | |
8 | |
9 author: Kris | |
10 | |
11 *******************************************************************************/ | |
12 | |
13 module tango.net.cluster.tina.Cluster; | |
14 | |
15 private import tango.math.Random; | |
16 | |
17 private import tango.core.Thread, | |
18 tango.core.Runtime, | |
19 tango.core.Exception; | |
20 | |
21 private import tango.util.log.Log, | |
22 tango.util.log.Logger; | |
23 | |
24 private import tango.time.Clock; | |
25 | |
26 private import tango.io.Buffer, | |
27 tango.io.GrowBuffer; | |
28 | |
29 private import tango.io.model.IConduit; | |
30 | |
31 private import tango.net.Socket, | |
32 tango.net.SocketConduit, | |
33 tango.net.SocketListener, | |
34 tango.net.InternetAddress, | |
35 tango.net.MulticastConduit; | |
36 | |
37 private import tango.net.cluster.NetworkClient; | |
38 | |
39 public import tango.net.cluster.model.ICluster; | |
40 | |
41 private import tango.net.cluster.tina.RollCall, | |
42 tango.net.cluster.tina.ProtocolReader, | |
43 tango.net.cluster.tina.ProtocolWriter; | |
44 | |
45 private import Integer = tango.text.convert.Integer; | |
46 | |
47 /******************************************************************************* | |
48 | |
49 QOS implementation for sockets. All cluster-client activity is | |
50 gated through here by the higher level classes; NetworkQueue & | |
51 NetworkCache for example. You gain access to the cluster by | |
52 creating an instance of the QOS (quality of service) you desire | |
53 and either mapping client classes onto it, or usign it directly. | |
54 For example: | |
55 --- | |
56 import tango.net.cluster.tina.Cluster; | |
57 | |
58 auto cluster = new Cluster; | |
59 cluster.join; | |
60 | |
61 auto channel = cluster.createChannel (...); | |
62 channel.putQueue (...); | |
63 channel.getQueue (); | |
64 --- | |
65 | |
66 Please see the cluster clients for additional details. Currently | |
67 these include CacheInvalidator, CacheInvalidatee, NetworkMessage, | |
68 NetworkTask, NetworkQueue, NetworkCache, NetworkCombo, plus the | |
69 Client base-class. | |
70 | |
71 *******************************************************************************/ | |
72 | |
73 class Cluster : Broadcaster, ICluster | |
74 { | |
75 private FlexNodeSet task, | |
76 queue; | |
77 private FixedNodeSet cache; | |
78 private Logger logger; | |
79 | |
80 /*********************************************************************** | |
81 | |
82 Create a cluster instance with a default logger and Nagle | |
83 caching disabled | |
84 | |
85 ***********************************************************************/ | |
86 | |
87 this () | |
88 { | |
89 this (Log.getLogger ("cluster.generic"), true); | |
90 } | |
91 | |
92 /*********************************************************************** | |
93 | |
94 Create a cluster instance with the provided logger. Option | |
95 noDelay controls the settting of the Nagle algorithm on an | |
96 active connection to a server, which should be disabled by | |
97 default (noDelay == true) | |
98 | |
99 ***********************************************************************/ | |
100 | |
101 this (Logger log, bool noDelay = true) | |
102 { | |
103 assert (log); | |
104 logger = log; | |
105 | |
106 task = new FlexNodeSet (log, noDelay); | |
107 queue = new FlexNodeSet (log, noDelay); | |
108 cache = new FixedNodeSet (log, noDelay); | |
109 } | |
110 | |
111 /*********************************************************************** | |
112 | |
113 Join the cluster as a client, discovering servers. Client | |
114 applications should invoke this before making requests so | |
115 that there are some servers to address. | |
116 | |
117 If cache facilities will be used, then the join(cacheHosts) | |
118 variation should be used instead | |
119 | |
120 ***********************************************************************/ | |
121 | |
122 final Cluster join () | |
123 { | |
124 // listen for cluster servers | |
125 auto channel = createChannel ("cluster.server.advertise"); | |
126 channel.createBulletinConsumer (¬ify); | |
127 | |
128 // ask who's currently running | |
129 channel.broadcast (new RollCall); | |
130 logger.trace ("discovering cluster nodes"); | |
131 | |
132 // wait for enabled servers to respond ... | |
133 Thread.sleep (0.250); | |
134 return this; | |
135 } | |
136 | |
137 /*********************************************************************** | |
138 | |
139 Join the cluster as a client, discovering servers. Client | |
140 applications should invoke this before making requests so | |
141 that there are some servers to address. | |
142 | |
143 If cache facilities will be used, use this method to set | |
144 the group of valid cache hosts. Each cache host should be | |
145 described as an array of machine-name and port pairs e.g. | |
146 --- | |
147 ["lucy:1234", "daisy:3343", "daisy:3344"] | |
148 --- | |
149 | |
150 This sets up a fixed set of cache hosts, which should be | |
151 identical for all cache clients. Cache hosts not included | |
152 in this list will be ignored when they come online. | |
153 | |
154 ***********************************************************************/ | |
155 | |
156 final Cluster join (char[][] cacheHosts) | |
157 { | |
158 foreach (addr; cacheHosts) | |
159 cache.addNode (new Node (log, addr, "cache")); | |
160 return join; | |
161 } | |
162 | |
163 /*********************************************************************** | |
164 | |
165 Return the logger instance provided during construction. | |
166 | |
167 ***********************************************************************/ | |
168 | |
169 final Logger log () | |
170 { | |
171 return logger; | |
172 } | |
173 | |
174 /*********************************************************************** | |
175 | |
176 Create a channel instance. Our channel implementation | |
177 includes a number of cached IO helpers (ProtocolWriter | |
178 and so on) which simplifies and speeds up execution. | |
179 | |
180 ***********************************************************************/ | |
181 | |
182 final IChannel createChannel (char[] channel) | |
183 { | |
184 return new Channel (this, channel); | |
185 } | |
186 | |
187 /*********************************************************************** | |
188 | |
189 ChannelListener method for listening to RollCall responses. | |
190 These are sent out by cluster servers both when they get a | |
191 RollCall request, and when they heartbeat. | |
192 | |
193 ***********************************************************************/ | |
194 | |
195 private void notify (IEvent event) | |
196 { | |
197 scope rollcall = new RollCall; | |
198 event.get (rollcall); | |
199 | |
200 switch (rollcall.type) | |
201 { | |
202 default: | |
203 break; | |
204 | |
205 case RollCall.Task: | |
206 task.enable (rollcall.addr, "task"); | |
207 break; | |
208 | |
209 case RollCall.Cache: | |
210 cache.enable (rollcall.addr); | |
211 break; | |
212 | |
213 case RollCall.Queue: | |
214 queue.enable (rollcall.addr, "queue"); | |
215 break; | |
216 } | |
217 } | |
218 } | |
219 | |
220 | |
221 /******************************************************************************* | |
222 | |
223 Basic multicast support across the cluster. Multicast is used | |
224 for broadcasting messages to all nodes in the cluster. We use | |
225 it for cache-invalidation, heartbeat, rollcall and notification | |
226 of queue activity | |
227 | |
228 *******************************************************************************/ | |
229 | |
230 private class Broadcaster | |
231 { | |
232 private static InternetAddress[char[]] groups; | |
233 private Buffer mBuffer; | |
234 private ProtocolWriter mWriter; | |
235 private MulticastConduit mSocket; | |
236 | |
237 private int groupPort = 3333; | |
238 private int groupPrefix = 225; | |
239 | |
240 /*********************************************************************** | |
241 | |
242 Setup a Cluster instance. Currently the buffer & writer | |
243 are shared for all bulletin serialization; this should | |
244 probably change at some point such that we can support | |
245 multiple threads broadcasting concurrently to different | |
246 output ports. | |
247 | |
248 ***********************************************************************/ | |
249 | |
250 this () | |
251 { | |
252 mBuffer = new Buffer (1024 * 4); | |
253 mSocket = new MulticastConduit; | |
254 mWriter = new ProtocolWriter (mBuffer); | |
255 } | |
256 | |
257 /*********************************************************************** | |
258 | |
259 Setup the multicast options. Port is used as the sole | |
260 address port for multicast usage, prefix is prepended | |
261 to each fabricated multicast address (should be a valid | |
262 class-D prefix), and ttl is the number of hops | |
263 | |
264 ***********************************************************************/ | |
265 | |
266 final MulticastConduit conduit () | |
267 { | |
268 return mSocket; | |
269 } | |
270 | |
271 /*********************************************************************** | |
272 | |
273 Setup the multicast options. Port is used as the sole | |
274 address port for multicast usage & prefix is prepended | |
275 to each fabricated multicast address (should be a valid | |
276 class-D prefix: 225 through 239 inclusive) | |
277 | |
278 ***********************************************************************/ | |
279 | |
280 final void multicast (int port, int prefix=225) | |
281 { | |
282 groupPort = port; | |
283 groupPrefix = prefix; | |
284 } | |
285 | |
286 /*********************************************************************** | |
287 | |
288 Broadcast a message on the specified channel. This uses | |
289 IP/Multicast to scatter the payload to all registered | |
290 listeners (on the same multicast group). Note that the | |
291 maximum message size is limited to that of an Ethernet | |
292 data frame, minus the IP/UDP header size (1472 bytes). | |
293 | |
294 Also note that we are synchronized to avoid contention | |
295 on the otherwise shared output buffer. | |
296 | |
297 ***********************************************************************/ | |
298 | |
299 final synchronized void broadcast (char[] channel, IMessage message=null) | |
300 { | |
301 // clear buffer and serialize content | |
302 mWriter.put (ProtocolWriter.Command.OK, channel, null, message); | |
303 | |
304 // Ethernet data-frame size minus the 28 byte UDP/IP header: | |
305 if (mBuffer.position > 1472) | |
306 throw new ClusterException ("message is too large to broadcast"); | |
307 | |
308 // send it to the appropriate multicast group | |
309 mSocket.write (mBuffer.slice, getGroup (channel)); | |
310 } | |
311 | |
312 /*********************************************************************** | |
313 | |
314 Return an internet address representing the multicast | |
315 group for the specified channel. We use three of the | |
316 four address segments to represent the channel itself | |
317 (via a hash on the channel name), and set the primary | |
318 segment to be that of the broadcast prefix (above). | |
319 | |
320 ***********************************************************************/ | |
321 | |
322 final synchronized InternetAddress getGroup (char[] channel) | |
323 { | |
324 auto p = channel in groups; | |
325 if (p) | |
326 return *p; | |
327 | |
328 // construct a group address from the prefix & channel-hash, | |
329 // where the hash is folded down to 24 bits | |
330 uint hash = jhash (channel.ptr, channel.length); | |
331 hash = (hash >> 24) ^ (hash & 0x00ffffff); | |
332 | |
333 auto address = Integer.toString (groupPrefix) ~ "." ~ | |
334 Integer.toString ((hash >> 16) & 0xff) ~ "." ~ | |
335 Integer.toString ((hash >> 8) & 0xff) ~ "." ~ | |
336 Integer.toString (hash & 0xff); | |
337 | |
338 // insert InternetAddress into hashmap | |
339 auto group = new InternetAddress (address, groupPort); | |
340 groups [channel] = group; | |
341 return group; | |
342 } | |
343 } | |
344 | |
345 | |
346 /******************************************************************************* | |
347 | |
348 A channel represents something akin to a publish/subscribe topic, | |
349 or a radio station. These are used to segregate cluster operations | |
350 into a set of groups, where each group is represented by a channel. | |
351 Channel names are whatever you want then to be: use of dot notation | |
352 has proved useful in the past. | |
353 | |
354 Channel maintain internal state in order to avoid heap activity. So | |
355 they should not be shared across threads without appropriate synchs | |
356 in place. One remedy is create another channel instance | |
357 | |
358 *******************************************************************************/ | |
359 | |
360 private class Channel : IChannel | |
361 { | |
362 private char[] name_; | |
363 private Buffer buffer; | |
364 private ProtocolReader reader; | |
365 private ProtocolWriter writer; | |
366 private Cluster cluster_; | |
367 | |
368 /*********************************************************************** | |
369 | |
370 Construct a channel with the specified name. We cache | |
371 a number of session-related constructs here also, in | |
372 order to eliminate runtime overhead | |
373 | |
374 ***********************************************************************/ | |
375 | |
376 this (Cluster cluster, char[] name) | |
377 in { | |
378 assert (cluster); | |
379 assert (name.length); | |
380 } | |
381 body | |
382 { | |
383 name_ = name; | |
384 cluster_ = cluster; | |
385 | |
386 // this buffer will grow as required to house larger messages | |
387 buffer = new GrowBuffer (1024 * 2); | |
388 writer = new ProtocolWriter (buffer); | |
389 | |
390 // make the reader slice directly from the buffer content | |
391 reader = new ProtocolReader (buffer); | |
392 } | |
393 | |
394 /*********************************************************************** | |
395 | |
396 Return the name of this channel. This is the name provided | |
397 when the channel was constructed. | |
398 | |
399 ***********************************************************************/ | |
400 | |
401 final char[] name () | |
402 { | |
403 return name_; | |
404 } | |
405 | |
406 /*********************************************************************** | |
407 | |
408 Return the assigned cluster | |
409 | |
410 ***********************************************************************/ | |
411 | |
412 final Cluster cluster () | |
413 { | |
414 return cluster_; | |
415 } | |
416 | |
417 /*********************************************************************** | |
418 | |
419 Return the assigned logger | |
420 | |
421 ***********************************************************************/ | |
422 | |
423 final Logger log () | |
424 { | |
425 return cluster_.log; | |
426 } | |
427 | |
428 /*********************************************************************** | |
429 | |
430 Output this channel via the provided IWriter | |
431 | |
432 ***********************************************************************/ | |
433 | |
434 final void write (IWriter writer) | |
435 { | |
436 writer.put (name_); | |
437 } | |
438 | |
439 /*********************************************************************** | |
440 | |
441 Input this channel via the provided IReader | |
442 | |
443 ***********************************************************************/ | |
444 | |
445 final void read (IReader reader) | |
446 { | |
447 reader.get (name_); | |
448 } | |
449 | |
450 /*********************************************************************** | |
451 | |
452 deserialize a message into a provided host, or via | |
453 the registered instance of the incoming message | |
454 | |
455 ***********************************************************************/ | |
456 | |
457 final IMessage thaw (IMessage host = null) | |
458 { | |
459 return reader.thaw (host); | |
460 } | |
461 | |
462 /*********************************************************************** | |
463 | |
464 Create a listener of the specified type. Listeners are | |
465 run within their own thread, since they spend the vast | |
466 majority of their time blocked on a Socket read. Would | |
467 be good to support multiplexed reading instead, such | |
468 that a thread pool could be applied instead. | |
469 | |
470 ***********************************************************************/ | |
471 | |
472 final IConsumer createConsumer (ChannelListener notify) | |
473 { | |
474 cluster_.log.trace ("creating message consumer for '" ~ name_ ~ "'"); | |
475 return new MessageConsumer (this, notify); | |
476 } | |
477 | |
478 /*********************************************************************** | |
479 | |
480 Create a listener of the specified type. Listeners are | |
481 run within their own thread, since they spend the vast | |
482 majority of their time blocked on a Socket read. Would | |
483 be good to support multiplexed reading instead, such | |
484 that a thread pool could be applied instead. | |
485 | |
486 ***********************************************************************/ | |
487 | |
488 final IConsumer createBulletinConsumer (ChannelListener notify) | |
489 { | |
490 cluster_.log.trace ("creating bulletin consumer for '" ~ name_ ~ "'"); | |
491 return new BulletinConsumer (this, notify); | |
492 } | |
493 | |
494 /*********************************************************************** | |
495 | |
496 Return a entry from the network cache, and optionally | |
497 remove it. This is a synchronous operation as opposed | |
498 to the asynchronous nature of an invalidate broadcast. | |
499 | |
500 ***********************************************************************/ | |
501 | |
502 final IMessage getCache (char[] key, bool remove, IMessage host = null) | |
503 { | |
504 void send (IConduit conduit) | |
505 { | |
506 buffer.setConduit (conduit); | |
507 writer.put (remove ? ProtocolWriter.Command.Remove : | |
508 ProtocolWriter.Command.Copy, name_, key).flush; | |
509 } | |
510 | |
511 if (cluster_.cache.request (&send, reader, key)) | |
512 return reader.thaw (host); | |
513 return null; | |
514 } | |
515 | |
516 /*********************************************************************** | |
517 | |
518 Place an entry into the network cache, replacing the | |
519 entry with the identical key. Where message.time is | |
520 set, it will be used to test for newer cache entries | |
521 than the one being sent i.e. if someone else placed | |
522 a newer entry into the cache, that one will remain. | |
523 | |
524 Note that this may cause the oldest entry in the cache | |
525 to be displaced if the cache is already full. | |
526 | |
527 ***********************************************************************/ | |
528 | |
529 final bool putCache (char[] key, IMessage message) | |
530 { | |
531 void send (IConduit conduit) | |
532 { | |
533 buffer.setConduit (conduit); | |
534 writer.put (ProtocolWriter.Command.Add, name_, key, message).flush; | |
535 } | |
536 | |
537 // return false if the cache server said there's | |
538 // already something newer | |
539 if (cluster_.cache.request (&send, reader, key)) | |
540 return false; | |
541 return true; | |
542 } | |
543 | |
544 /*********************************************************************** | |
545 | |
546 Load a network cache entry remotely. This sends the given | |
547 IMessage over a network to the cache host, where it will | |
548 be executed locally. The benefit of doing so it that the | |
549 host may deny access to the cache entry for the duration | |
550 of the load operation. This, in turn, provides a mechanism | |
551 for gating/synchronizing multiple network clients over a | |
552 given cache entry; quite handy for those entries that are | |
553 relatively expensive to construct or access. | |
554 | |
555 ***********************************************************************/ | |
556 | |
557 final bool loadCache (char[] key, IMessage message) | |
558 { | |
559 void send (IConduit conduit) | |
560 { | |
561 buffer.setConduit (conduit); | |
562 writer.put (ProtocolWriter.Command.Load, name_, key, message).flush; | |
563 } | |
564 | |
565 return cluster_.cache.request (&send, reader, key); | |
566 } | |
567 | |
568 /*********************************************************************** | |
569 | |
570 Query the cluster for queued entries on the corresponding | |
571 channel. Returns, and removes, the first matching entry | |
572 from the cluster. Note that this sweeps the cluster for | |
573 matching entries, and is synchronous in nature. The more | |
574 common approach is to setup a queue listener, which will | |
575 grab and dispatch queue entries asynchronously. | |
576 | |
577 ***********************************************************************/ | |
578 | |
579 final IMessage getQueue (IMessage host = null) | |
580 { | |
581 if (scanQueue) | |
582 return reader.thaw (host); | |
583 return null; | |
584 } | |
585 | |
586 /*********************************************************************** | |
587 | |
588 Query the cluster for queued entries on the corresponding | |
589 channel. Returns, and removes, the first matching entry | |
590 from the cluster. Note that this sweeps the cluster for | |
591 matching entries, and is synchronous in nature. The more | |
592 common approach is to setup a queue listener, which will | |
593 grab and dispatch queue entries asynchronously. | |
594 | |
595 ***********************************************************************/ | |
596 | |
597 private bool scanQueue () | |
598 { | |
599 void send (IConduit conduit) | |
600 { | |
601 buffer.setConduit (conduit); | |
602 writer.put (ProtocolWriter.Command.RemoveQueue, name_).flush; | |
603 } | |
604 | |
605 bool scan (Node node) | |
606 { | |
607 bool message; | |
608 node.request (&send, reader, message); | |
609 return message; | |
610 } | |
611 | |
612 // make a pass over each Node, looking for channel entries | |
613 return cluster_.queue.scan (&scan); | |
614 } | |
615 | |
616 /*********************************************************************** | |
617 | |
618 Add an entry to the specified network queue. May throw a | |
619 QueueFullException if there's no room available. | |
620 | |
621 ***********************************************************************/ | |
622 | |
623 final IMessage putQueue (IMessage message) | |
624 { | |
625 void send (IConduit conduit) | |
626 { | |
627 buffer.setConduit (conduit); | |
628 writer.put (ProtocolWriter.Command.AddQueue, name_, null, message).flush; | |
629 } | |
630 | |
631 cluster_.queue.request (&send, reader); | |
632 return message; | |
633 } | |
634 | |
635 /*********************************************************************** | |
636 | |
637 Send a remote call request to a server, and place the result | |
638 back into the provided message | |
639 | |
640 ***********************************************************************/ | |
641 | |
642 final bool execute (IMessage message) | |
643 { | |
644 void send (IConduit conduit) | |
645 { | |
646 buffer.setConduit (conduit); | |
647 writer.put (ProtocolWriter.Command.Call, name_, null, message).flush; | |
648 } | |
649 | |
650 if (cluster_.task.request (&send, reader)) | |
651 { | |
652 // place result back into the provided message | |
653 reader.thaw (message); | |
654 return true; | |
655 } | |
656 return false; | |
657 } | |
658 | |
659 /*********************************************************************** | |
660 | |
661 Broadcast a message on the specified channel. This uses | |
662 IP/Multicast to scatter the message to all registered | |
663 listeners (on the same multicast group). Note that the | |
664 maximum message size is limited to that of an Ethernet | |
665 data frame, minus the IP/UDP header size (1472 bytes). | |
666 | |
667 ***********************************************************************/ | |
668 | |
669 final void broadcast (IMessage message = null) | |
670 { | |
671 cluster_.broadcast (name_, message); | |
672 } | |
673 } | |
674 | |
675 | |
676 /******************************************************************************* | |
677 | |
678 A listener for multicast channel traffic. These are currently used | |
679 for cache coherency, queue publishing, and node discovery activity; | |
680 though could be used for direct messaging also. | |
681 | |
682 Be careful when using the retained channel, since it is shared with | |
683 the calling thread. Thus a race condition could arise between the | |
684 client and this thread, were both to use the channel for transfers | |
685 at the same instant. Note that MessageConsumer makes a copy of the | |
686 channel for this purpose | |
687 | |
688 *******************************************************************************/ | |
689 | |
690 private class BulletinConsumer : SocketListener, IConsumer, IEvent | |
691 { | |
692 private bool hasMore; // incoming message? | |
693 private Buffer buffer; // input buffer | |
694 private ProtocolReader reader; // input decoder | |
695 private Channel channel_; // associated channel | |
696 private Cluster cluster; // associated cluster | |
697 private MulticastConduit consumer; // broadcast listener | |
698 private ChannelListener listener; // user-level callback | |
699 | |
700 /*********************************************************************** | |
701 | |
702 Construct a multicast consumer for the specified event. The | |
703 event handler will be invoked whenever a message arrives for | |
704 the associated channel. | |
705 | |
706 ***********************************************************************/ | |
707 | |
708 this (Channel channel, ChannelListener listener) | |
709 { | |
710 this.channel_ = channel; | |
711 this.listener = listener; | |
712 this.cluster = channel.cluster; | |
713 | |
714 // buffer doesn't need to be larger than Ethernet data-frame | |
715 buffer = new Buffer (1500); | |
716 | |
717 // make the reader slice directly from the buffer content | |
718 reader = new ProtocolReader (buffer); | |
719 | |
720 // configure a listener socket | |
721 consumer = new MulticastConduit (cluster.getGroup (channel_.name), true); | |
722 consumer.join; | |
723 | |
724 super (consumer, buffer); | |
725 | |
726 // fire up this listener | |
727 super.execute; | |
728 } | |
729 | |
730 /*********************************************************************** | |
731 | |
732 Notification callback invoked when we receive a multicast | |
733 packet. Note that we check the packet channel-name against | |
734 the one we're consuming, to check for cases where the group | |
735 address had a hash collision. | |
736 | |
737 ***********************************************************************/ | |
738 | |
739 override void notify (IBuffer buffer) | |
740 { | |
741 ProtocolWriter.Command cmd; | |
742 char[] channel; | |
743 char[] element; | |
744 | |
745 // read the incoming header, along with the object guid | |
746 // where available | |
747 hasMore = reader.getHeader (cmd, channel, element); | |
748 | |
749 // check it's really for us first (might be a hash collision) | |
750 if (channel == this.channel_.name) | |
751 invoke (this); | |
752 } | |
753 | |
754 /*********************************************************************** | |
755 | |
756 ***********************************************************************/ | |
757 | |
758 IMessage get (IMessage host = null) | |
759 { | |
760 if (hasMore) | |
761 return reader.thaw (host); | |
762 | |
763 throw new ClusterException ("attempting to thaw a non-existant message"); | |
764 } | |
765 | |
766 /*********************************************************************** | |
767 | |
768 Return the assigned logger | |
769 | |
770 ***********************************************************************/ | |
771 | |
772 final Logger log () | |
773 { | |
774 return cluster.log; | |
775 } | |
776 | |
777 /*********************************************************************** | |
778 | |
779 Handle error conditions from the listener thread. | |
780 | |
781 ***********************************************************************/ | |
782 | |
783 override void exception (char [] msg) | |
784 { | |
785 cluster.log.error ("BulletinConsumer: "~msg); | |
786 } | |
787 | |
788 /*********************************************************************** | |
789 | |
790 Overridable mean of notifying the client code. | |
791 | |
792 ***********************************************************************/ | |
793 | |
794 protected void invoke (IEvent event) | |
795 { | |
796 listener (event); | |
797 } | |
798 | |
799 /*********************************************************************** | |
800 | |
801 Return the cluster instance we're associated with. | |
802 | |
803 ***********************************************************************/ | |
804 | |
805 final Channel channel () | |
806 { | |
807 return channel_; | |
808 } | |
809 | |
810 /*********************************************************************** | |
811 | |
812 Temporarily halt listening. This can be used to ignore | |
813 multicast messages while, for example, the consumer is | |
814 busy doing other things. | |
815 | |
816 ***********************************************************************/ | |
817 | |
818 final void pauseGroup () | |
819 { | |
820 consumer.leave; | |
821 } | |
822 | |
823 /*********************************************************************** | |
824 | |
825 Resume listening, post-pause. | |
826 | |
827 ***********************************************************************/ | |
828 | |
829 final void resumeGroup () | |
830 { | |
831 consumer.join; | |
832 } | |
833 | |
834 /*********************************************************************** | |
835 | |
836 Cancel this consumer. The listener is effectively disabled | |
837 from this point forward. The listener thread does not halt | |
838 at this point, but waits until the socket-read returns. | |
839 Note that the D Interface implementation requires us to | |
840 "reimplement and dispatch" trivial things like this ~ it's | |
841 a pain in the neck to maintain. | |
842 | |
843 ***********************************************************************/ | |
844 | |
845 final void cancel () | |
846 { | |
847 super.cancel; | |
848 } | |
849 | |
850 /*********************************************************************** | |
851 | |
852 Send a message back to the producer | |
853 | |
854 ***********************************************************************/ | |
855 | |
856 void reply (IChannel channel, IMessage message) | |
857 { | |
858 assert (channel); | |
859 assert (message); | |
860 | |
861 channel.broadcast (message); | |
862 } | |
863 | |
864 | |
865 /*********************************************************************** | |
866 | |
867 Return an appropriate reply channel for the given message, | |
868 or return null if no reply is expected | |
869 | |
870 ***********************************************************************/ | |
871 | |
872 IChannel replyChannel (IMessage message) | |
873 { | |
874 if (message.reply.length) | |
875 return cluster.createChannel (message.reply); | |
876 return null; | |
877 } | |
878 } | |
879 | |
880 | |
881 /******************************************************************************* | |
882 | |
883 A listener for queue events. These events are produced by the | |
884 queue host on a periodic bases when it has available entries. | |
885 We listen for them (rather than constantly scanning) and then | |
886 begin a sweep to process as many as we can. Note that we will | |
887 be in competition with other nodes to process these entries. | |
888 | |
889 Also note that we create a copy of the channel in use, so that | |
890 race-conditions with the requesting client are avoided. | |
891 | |
892 *******************************************************************************/ | |
893 | |
894 private class MessageConsumer : BulletinConsumer | |
895 { | |
896 /*********************************************************************** | |
897 | |
898 Construct a multicast consumer for the specified event | |
899 | |
900 ***********************************************************************/ | |
901 | |
902 this (Channel channel, ChannelListener listener) | |
903 { | |
904 super (channel, listener); | |
905 | |
906 // create private channel instance to use in our thread | |
907 this.channel_ = new Channel (channel.cluster, channel.name); | |
908 } | |
909 | |
910 /*********************************************************************** | |
911 | |
912 Handle error conditions from the listener thread. | |
913 | |
914 ***********************************************************************/ | |
915 | |
916 override void exception (char [] msg) | |
917 { | |
918 cluster.log.error ("MessageConsumer: "~msg); | |
919 } | |
920 | |
921 /*********************************************************************** | |
922 | |
923 Overrides the default processing to sweep the cluster for | |
924 queued entries. Each server node is queried until one is | |
925 found that contains a message. Note that it is possible | |
926 to set things up where we are told exactly which node to | |
927 go to; however given that we won't be listening whilst | |
928 scanning, and that there's likely to be a group of new | |
929 entries in the cluster, it's just as effective to scan. | |
930 This will be far from ideal for all environments, so we | |
931 should make the strategy pluggable instead. | |
932 | |
933 Note also that the content is retrieved via a duplicate | |
934 channel to avoid potential race-conditions on the original | |
935 | |
936 ***********************************************************************/ | |
937 | |
938 override IMessage get (IMessage host = null) | |
939 { | |
940 if (channel.scanQueue) | |
941 return channel.thaw (host); | |
942 return null; | |
943 } | |
944 | |
945 /*********************************************************************** | |
946 | |
947 Send a message back to the producer | |
948 | |
949 ***********************************************************************/ | |
950 | |
951 override void reply (IChannel channel, IMessage message) | |
952 { | |
953 assert (channel); | |
954 assert (message); | |
955 | |
956 channel.putQueue (message); | |
957 } | |
958 | |
959 /*********************************************************************** | |
960 | |
961 Override the default notification handler in order to | |
962 disable multicast reciepts while the application does | |
963 what it needs to | |
964 | |
965 ***********************************************************************/ | |
966 | |
967 override protected void invoke (IEvent event) | |
968 { | |
969 // temporarily pause listening while processing | |
970 pauseGroup; | |
971 try { | |
972 listener (event); | |
973 } finally resumeGroup; | |
974 } | |
975 } | |
976 | |
977 | |
978 /******************************************************************************* | |
979 | |
980 An abstraction of a socket connection. Used internally by the | |
981 socket-based Cluster. | |
982 | |
983 *******************************************************************************/ | |
984 | |
985 private class Connection | |
986 { | |
987 abstract bool reset(); | |
988 | |
989 abstract void done (Time time); | |
990 | |
991 abstract SocketConduit conduit (); | |
992 } | |
993 | |
994 | |
995 /******************************************************************************* | |
996 | |
997 A pool of socket connections for accessing cluster nodes. Note | |
998 that the entries will timeout after a period of inactivity, and | |
999 will subsequently cause a connected host to drop the supporting | |
1000 session. | |
1001 | |
1002 *******************************************************************************/ | |
1003 | |
1004 private class ConnectionPool | |
1005 { | |
1006 private Logger log; | |
1007 private int count; | |
1008 private bool noDelay; | |
1009 private InternetAddress address; | |
1010 private PoolConnection freelist; | |
1011 private TimeSpan timeout = TimeSpan.seconds(60); | |
1012 | |
1013 /*********************************************************************** | |
1014 | |
1015 Utility class to provide the basic connection facilities | |
1016 provided by the connection pool. | |
1017 | |
1018 ***********************************************************************/ | |
1019 | |
1020 static class PoolConnection : Connection | |
1021 { | |
1022 Time time; | |
1023 PoolConnection next; | |
1024 ConnectionPool parent; | |
1025 SocketConduit conduit_; | |
1026 | |
1027 /*************************************************************** | |
1028 | |
1029 Construct a new connection and set its parent | |
1030 | |
1031 ***************************************************************/ | |
1032 | |
1033 this (ConnectionPool pool) | |
1034 { | |
1035 parent = pool; | |
1036 reset; | |
1037 } | |
1038 | |
1039 /*************************************************************** | |
1040 | |
1041 Create a new socket and connect it to the specified | |
1042 server. This will cause a dedicated thread to start | |
1043 on the server. Said thread will quit when an error | |
1044 occurs. | |
1045 | |
1046 ***************************************************************/ | |
1047 | |
1048 final bool reset () | |
1049 { | |
1050 try { | |
1051 conduit_ = new SocketConduit; | |
1052 | |
1053 // apply Nagle settings | |
1054 conduit.socket.setNoDelay (parent.noDelay); | |
1055 | |
1056 // set a 500ms timeout for read operations | |
1057 conduit_.setTimeout (TimeSpan.millis(500)); | |
1058 | |
1059 // open a connection to this server | |
1060 // parent.log.trace ("connecting to server"); | |
1061 conduit_.connect (parent.address); | |
1062 return true; | |
1063 | |
1064 } catch (Object o) | |
1065 { | |
1066 if (! Runtime.isHalting) | |
1067 parent.log.warn ("server is unavailable :: "~o.toString); | |
1068 } | |
1069 return false; | |
1070 } | |
1071 | |
1072 /*************************************************************** | |
1073 | |
1074 Return the socket belonging to this connection | |
1075 | |
1076 ***************************************************************/ | |
1077 | |
1078 final SocketConduit conduit () | |
1079 { | |
1080 return conduit_; | |
1081 } | |
1082 | |
1083 /*************************************************************** | |
1084 | |
1085 Close the socket. This will cause any host session | |
1086 to be terminated. | |
1087 | |
1088 ***************************************************************/ | |
1089 | |
1090 final void close () | |
1091 { | |
1092 conduit_.detach; | |
1093 } | |
1094 | |
1095 /*************************************************************** | |
1096 | |
1097 Return this connection to the free-list. Note that | |
1098 we have to synchronize on the parent-pool itself. | |
1099 | |
1100 ***************************************************************/ | |
1101 | |
1102 final void done (Time time) | |
1103 { | |
1104 synchronized (parent) | |
1105 { | |
1106 next = parent.freelist; | |
1107 parent.freelist = this; | |
1108 this.time = time; | |
1109 } | |
1110 } | |
1111 } | |
1112 | |
1113 | |
1114 /*********************************************************************** | |
1115 | |
1116 Create a connection-pool for the specified address. | |
1117 | |
1118 ***********************************************************************/ | |
1119 | |
1120 this (InternetAddress address, Logger log, bool noDelay) | |
1121 { | |
1122 this.log = log; | |
1123 this.address = address; | |
1124 this.noDelay = noDelay; | |
1125 } | |
1126 | |
1127 /*********************************************************************** | |
1128 | |
1129 Allocate a Connection from a list rather than creating a | |
1130 new one. Reap old entries as we go. | |
1131 | |
1132 ***********************************************************************/ | |
1133 | |
1134 final synchronized Connection borrow (Time time) | |
1135 { | |
1136 if (freelist) | |
1137 do { | |
1138 auto c = freelist; | |
1139 | |
1140 freelist = c.next; | |
1141 if (freelist && (time - c.time > timeout)) | |
1142 c.close; | |
1143 else | |
1144 return c; | |
1145 } while (true); | |
1146 | |
1147 return new PoolConnection (this); | |
1148 } | |
1149 | |
1150 /*********************************************************************** | |
1151 | |
1152 Close this pool and drop all existing connections. | |
1153 | |
1154 ***********************************************************************/ | |
1155 | |
1156 final synchronized void close () | |
1157 { | |
1158 auto c = freelist; | |
1159 freelist = null; | |
1160 while (c) | |
1161 { | |
1162 c.close; | |
1163 c = c.next; | |
1164 } | |
1165 } | |
1166 } | |
1167 | |
1168 | |
1169 /******************************************************************************* | |
1170 | |
1171 Class to represent a cluster node. Each node supports both cache | |
1172 and queue functionality. Note that the set of available nodes is | |
1173 configured at startup, simplifying the discovery process in some | |
1174 significant ways, and causing less thrashing of cache-keys. | |
1175 | |
1176 *******************************************************************************/ | |
1177 | |
1178 private class Node | |
1179 { | |
1180 private Logger log; | |
1181 private char[] name, | |
1182 addr; | |
1183 private ConnectionPool pool; | |
1184 private bool enabled; | |
1185 | |
1186 alias void delegate (IConduit conduit) Requestor; | |
1187 | |
1188 /*********************************************************************** | |
1189 | |
1190 Construct a node with the provided name. This name should | |
1191 be the network name of the hosting device. | |
1192 | |
1193 ***********************************************************************/ | |
1194 | |
1195 this (Logger log, char[] addr, char[] name) | |
1196 { | |
1197 this.log = log; | |
1198 this.addr = addr; | |
1199 this.name = name ~ ':' ~ addr; | |
1200 } | |
1201 | |
1202 /*********************************************************************** | |
1203 | |
1204 Add a cache/queue reference for the remote node | |
1205 | |
1206 ***********************************************************************/ | |
1207 | |
1208 final void setPool (InternetAddress address, bool noDelay) | |
1209 { | |
1210 this.pool = new ConnectionPool (address, log, noDelay); | |
1211 } | |
1212 | |
1213 /*********************************************************************** | |
1214 | |
1215 Return the name of this node | |
1216 | |
1217 ***********************************************************************/ | |
1218 | |
1219 override char[] toString () | |
1220 { | |
1221 return name; | |
1222 } | |
1223 | |
1224 /*********************************************************************** | |
1225 | |
1226 Return the network address of this node | |
1227 | |
1228 ***********************************************************************/ | |
1229 | |
1230 final char[] address () | |
1231 { | |
1232 return addr; | |
1233 } | |
1234 | |
1235 /*********************************************************************** | |
1236 | |
1237 Remove this Node from the cluster. The node is disabled | |
1238 until it is seen to recover. | |
1239 | |
1240 ***********************************************************************/ | |
1241 | |
1242 final void fail () | |
1243 { | |
1244 setEnabled (false); | |
1245 pool.close; | |
1246 } | |
1247 | |
1248 /*********************************************************************** | |
1249 | |
1250 Get the current state of this node | |
1251 | |
1252 ***********************************************************************/ | |
1253 | |
1254 final bool isEnabled () | |
1255 { | |
1256 volatile | |
1257 return enabled; | |
1258 } | |
1259 | |
1260 /*********************************************************************** | |
1261 | |
1262 Set the enabled state of this node | |
1263 | |
1264 ***********************************************************************/ | |
1265 | |
1266 final void setEnabled (bool enabled) | |
1267 { | |
1268 if (enabled) | |
1269 log.trace ("enabling "~name); | |
1270 else | |
1271 log.trace ("disabling "~name); | |
1272 | |
1273 volatile | |
1274 this.enabled = enabled; | |
1275 } | |
1276 | |
1277 /*********************************************************************** | |
1278 | |
1279 request data; fail this Node if we can't connect. Note | |
1280 that we make several attempts to connect before writing | |
1281 the node off as a failure. We use a delegate to perform | |
1282 the request output since it may be invoked on more than | |
1283 one iteration, where the current attempt fails. | |
1284 | |
1285 We return true if the cluster node responds, and false | |
1286 otherwise. Exceptions are thrown if they occured on the | |
1287 server. Parameter 'message' is set true if a message is | |
1288 available from the server response | |
1289 | |
1290 ***********************************************************************/ | |
1291 | |
1292 final bool request (Requestor dg, ProtocolReader reader, out bool message) | |
1293 { | |
1294 ProtocolWriter.Command cmd; | |
1295 Time time; | |
1296 char[] channel; | |
1297 char[] element; | |
1298 | |
1299 // it's possible that the pool may have failed between | |
1300 // the point of selecting it, and the invocation itself | |
1301 if (pool is null) | |
1302 return false; | |
1303 | |
1304 // get a connection to the server | |
1305 auto connect = pool.borrow (time = Clock.now); | |
1306 | |
1307 // talk to the server (try a few times if necessary) | |
1308 for (int attempts=3; attempts--;) | |
1309 try { | |
1310 // attach connection to writer and send request | |
1311 dg (connect.conduit); | |
1312 | |
1313 // attach connection to reader | |
1314 reader.buffer.setConduit (connect.conduit); | |
1315 | |
1316 // load the returned object. Don't retry on | |
1317 // failed reads, since the server is either | |
1318 // really really busy, or near death. We must | |
1319 // assume it is offline until it tells us | |
1320 // otherwise (via a heartbeat) | |
1321 attempts = 0; | |
1322 message = reader.getHeader (cmd, channel, element); | |
1323 | |
1324 // return borrowed connection | |
1325 connect.done (time); | |
1326 | |
1327 } catch (RegistryException x) | |
1328 { | |
1329 connect.done (time); | |
1330 throw x; | |
1331 } | |
1332 catch (IOException x) | |
1333 { | |
1334 log.trace ("IOException on server request :: "~x.toString); | |
1335 | |
1336 // attempt to reconnect? | |
1337 if (attempts is 0 || !connect.reset) | |
1338 { | |
1339 // that server is offline | |
1340 fail; | |
1341 | |
1342 // state that we failed | |
1343 return false; | |
1344 } | |
1345 } | |
1346 | |
1347 // is message an exception? | |
1348 if (cmd !is ProtocolWriter.Command.OK) | |
1349 { | |
1350 // is node full? | |
1351 if (cmd is ProtocolWriter.Command.Full) | |
1352 throw new ClusterFullException (channel); | |
1353 | |
1354 // did node barf? | |
1355 if (cmd is ProtocolWriter.Command.Exception) | |
1356 throw new ClusterException (channel); | |
1357 | |
1358 // bogus response | |
1359 throw new ClusterException ("invalid response from cluster server"); | |
1360 } | |
1361 | |
1362 // ok, our server responded | |
1363 return true; | |
1364 } | |
1365 } | |
1366 | |
1367 | |
1368 /******************************************************************************* | |
1369 | |
1370 Models a generic set of cluster nodes. This is intended to be | |
1371 thread-safe, with no locking on a lookup operation | |
1372 | |
1373 *******************************************************************************/ | |
1374 | |
1375 private class NodeSet | |
1376 { | |
1377 private Node[char[]] map; | |
1378 private Logger log; | |
1379 private Set set; | |
1380 private bool noDelay; | |
1381 | |
1382 /*********************************************************************** | |
1383 | |
1384 ***********************************************************************/ | |
1385 | |
1386 this (Logger log, bool noDelay) | |
1387 { | |
1388 this.log = log; | |
1389 this.set = new Set; | |
1390 this.noDelay = noDelay; | |
1391 } | |
1392 | |
1393 /*********************************************************************** | |
1394 | |
1395 ***********************************************************************/ | |
1396 | |
1397 final Logger logger () | |
1398 { | |
1399 return log; | |
1400 } | |
1401 | |
1402 /*********************************************************************** | |
1403 | |
1404 Add a node to the list of servers | |
1405 | |
1406 ***********************************************************************/ | |
1407 | |
1408 synchronized final Node addNode (Node node) | |
1409 { | |
1410 auto addr = node.address; | |
1411 if (addr in map) | |
1412 throw new ClusterException ("Attempt to add cluster node '"~addr~"' more than once"); | |
1413 | |
1414 map[addr] = node; | |
1415 | |
1416 // note that this creates a new Set instance. We do this | |
1417 // so that selectNode() can avoid synchronization | |
1418 set = set.add (node); | |
1419 return node; | |
1420 } | |
1421 | |
1422 /*********************************************************************** | |
1423 | |
1424 Select a cluster server based on a starting index. If the | |
1425 selected server is not currently enabled, we just try the | |
1426 next one. This behaviour should be consistent across each | |
1427 cluster client. | |
1428 | |
1429 ***********************************************************************/ | |
1430 | |
1431 final Node selectNode (uint index) | |
1432 { | |
1433 auto hosts = set.nodes; | |
1434 uint count = hosts.length; | |
1435 | |
1436 if (count) | |
1437 { | |
1438 index %= count; | |
1439 | |
1440 while (count--) | |
1441 { | |
1442 auto node = hosts [index]; | |
1443 if (node.isEnabled) | |
1444 return node; | |
1445 | |
1446 if (++index >= hosts.length) | |
1447 index = 0; | |
1448 } | |
1449 } | |
1450 throw new ClusterEmptyException ("No appropriate cluster nodes are available"); | |
1451 } | |
1452 | |
1453 /*********************************************************************** | |
1454 | |
1455 Host class for the set of nodes. We utilize this to enable | |
1456 atomic read/write where it would not be otherwise possible | |
1457 -- D arrays are organized as ptr+length pairs and are thus | |
1458 inherently non-atomic for assignment purposes | |
1459 | |
1460 ***********************************************************************/ | |
1461 | |
1462 private static class Set | |
1463 { | |
1464 Node[] nodes, | |
1465 random; | |
1466 | |
1467 final Set add (Node node) | |
1468 { | |
1469 auto s = new Set; | |
1470 s.nodes = nodes ~ node; | |
1471 s.randomize; | |
1472 return s; | |
1473 } | |
1474 | |
1475 private final void randomize () | |
1476 { | |
1477 // copy the node list | |
1478 random = nodes.dup; | |
1479 | |
1480 // muddle up the duplicate list. This randomized list | |
1481 // is used when scanning the cluster for queued entries | |
1482 foreach (i, n; random) | |
1483 { | |
1484 auto j = Random.shared.next (random.length); | |
1485 auto tmp = random[i]; | |
1486 random[i] = random[j]; | |
1487 random[j] = tmp; | |
1488 } | |
1489 } | |
1490 } | |
1491 } | |
1492 | |
1493 | |
1494 /******************************************************************************* | |
1495 | |
1496 Models a fixed set of cluster nodes. Used for Cache | |
1497 | |
1498 *******************************************************************************/ | |
1499 | |
1500 private class FixedNodeSet : NodeSet | |
1501 { | |
1502 /*********************************************************************** | |
1503 | |
1504 ***********************************************************************/ | |
1505 | |
1506 this (Logger log, bool noDelay) | |
1507 { | |
1508 super (log, noDelay); | |
1509 } | |
1510 | |
1511 /*********************************************************************** | |
1512 | |
1513 ***********************************************************************/ | |
1514 | |
1515 final synchronized void enable (char[] addr) | |
1516 { | |
1517 auto p = addr in map; | |
1518 if (p) | |
1519 { | |
1520 auto node = *p; | |
1521 if (! node.isEnabled) | |
1522 { | |
1523 node.setPool (new InternetAddress(addr), noDelay); | |
1524 node.setEnabled (true); | |
1525 } | |
1526 } | |
1527 else | |
1528 // don't throw when no cache hosts have been configured at all | |
1529 if (set.nodes.length) | |
1530 throw new ClusterException ("Attempt to enable unregistered cache node '"~addr~"'"); | |
1531 } | |
1532 | |
1533 /*********************************************************************** | |
1534 | |
1535 Select a cluster server based on the specified key. If the | |
1536 selected server is not currently enabled, we just try the | |
1537 next one. This behaviour should be consistent across each | |
1538 cluster client. | |
1539 | |
1540 ***********************************************************************/ | |
1541 | |
1542 final bool request (Node.Requestor dg, ProtocolReader reader, char[] key) | |
1543 { | |
1544 Node node; | |
1545 bool message; | |
1546 | |
1547 do { | |
1548 node = selectNode (jhash (key.ptr, key.length)); | |
1549 } while (! node.request (dg, reader, message)); | |
1550 | |
1551 return message; | |
1552 } | |
1553 } | |
1554 | |
1555 | |
1556 /******************************************************************************* | |
1557 | |
1558 Models a flexible set of cluster nodes. Used for queue and task | |
1559 | |
1560 *******************************************************************************/ | |
1561 | |
1562 private class FlexNodeSet : NodeSet | |
1563 { | |
1564 private uint rollover; | |
1565 | |
1566 /*********************************************************************** | |
1567 | |
1568 ***********************************************************************/ | |
1569 | |
1570 this (Logger log, bool noDelay) | |
1571 { | |
1572 super (log, noDelay); | |
1573 } | |
1574 | |
1575 /*********************************************************************** | |
1576 | |
1577 ***********************************************************************/ | |
1578 | |
1579 final synchronized void enable (char[] addr, char[] name) | |
1580 { | |
1581 auto p = addr in map; | |
1582 auto node = p ? *p : addNode (new Node (log, addr, name)); | |
1583 | |
1584 if (! node.isEnabled) | |
1585 { | |
1586 node.setPool (new InternetAddress(addr), noDelay); | |
1587 node.setEnabled (true); | |
1588 } | |
1589 } | |
1590 | |
1591 /*********************************************************************** | |
1592 | |
1593 Select a cluster server based on the specified key. If the | |
1594 selected server is not currently enabled, we just try the | |
1595 next one. This behaviour should be consistent across each | |
1596 cluster client. | |
1597 | |
1598 ***********************************************************************/ | |
1599 | |
1600 final bool request (Node.Requestor dg, ProtocolReader reader) | |
1601 { | |
1602 Node node; | |
1603 bool message; | |
1604 | |
1605 do { | |
1606 node = selectNode (++rollover); | |
1607 } while (! node.request (dg, reader, message)); | |
1608 | |
1609 return message; | |
1610 } | |
1611 | |
1612 /*********************************************************************** | |
1613 | |
1614 Sweep the cluster servers. Returns true if the delegate | |
1615 returns true, false otherwise. The sweep is halted when | |
1616 the delegate returns true. Note that this scans nodes in | |
1617 a randomized pattern, which should tend to avoid 'bursty' | |
1618 activity by a set of clients upon any one cluster server. | |
1619 | |
1620 ***********************************************************************/ | |
1621 | |
1622 final bool scan (bool delegate(Node) dg) | |
1623 { | |
1624 auto hosts = set.random; | |
1625 auto index = hosts.length; | |
1626 | |
1627 while (index) | |
1628 { | |
1629 // lookup the randomized set of server nodes | |
1630 auto node = hosts [--index]; | |
1631 | |
1632 // callback on each enabled node | |
1633 if (node.isEnabled) | |
1634 if (dg (node)) | |
1635 return true; | |
1636 } | |
1637 return false; | |
1638 } | |
1639 } | |
1640 | |
1641 | |
1642 /****************************************************************************** | |
1643 | |
1644 The Bob Jenkins lookup2 algorithm. This should be relocated | |
1645 to somewhere common | |
1646 | |
1647 ******************************************************************************/ | |
1648 | |
1649 private static uint jhash (void* k, uint len, uint init = 0) | |
1650 { | |
1651 uint a = 0x9e3779b9, | |
1652 b = 0x9e3779b9, | |
1653 c = init, | |
1654 i = len; | |
1655 | |
1656 // handle most of the key | |
1657 while (i >= 12) | |
1658 { | |
1659 a += *cast(uint*)(k+0); | |
1660 b += *cast(uint*)(k+4); | |
1661 c += *cast(uint*)(k+8); | |
1662 | |
1663 a -= b; a -= c; a ^= (c>>13); | |
1664 b -= c; b -= a; b ^= (a<<8); | |
1665 c -= a; c -= b; c ^= (b>>13); | |
1666 a -= b; a -= c; a ^= (c>>12); | |
1667 b -= c; b -= a; b ^= (a<<16); | |
1668 c -= a; c -= b; c ^= (b>>5); | |
1669 a -= b; a -= c; a ^= (c>>3); | |
1670 b -= c; b -= a; b ^= (a<<10); | |
1671 c -= a; c -= b; c ^= (b>>15); | |
1672 k += 12; i -= 12; | |
1673 } | |
1674 | |
1675 // handle the last 11 bytes | |
1676 c += len; | |
1677 switch (i) | |
1678 { | |
1679 case 11: c+=(cast(uint)(cast(ubyte*)k)[10]<<24); | |
1680 case 10: c+=(cast(uint)(cast(ubyte*)k)[9]<<16); | |
1681 case 9 : c+=(cast(uint)(cast(ubyte*)k)[8]<<8); | |
1682 case 8 : b+=(cast(uint)(cast(ubyte*)k)[7]<<24); | |
1683 case 7 : b+=(cast(uint)(cast(ubyte*)k)[6]<<16); | |
1684 case 6 : b+=(cast(uint)(cast(ubyte*)k)[5]<<8); | |
1685 case 5 : b+=(cast(uint)(cast(ubyte*)k)[4]); | |
1686 case 4 : a+=(cast(uint)(cast(ubyte*)k)[3]<<24); | |
1687 case 3 : a+=(cast(uint)(cast(ubyte*)k)[2]<<16); | |
1688 case 2 : a+=(cast(uint)(cast(ubyte*)k)[1]<<8); | |
1689 case 1 : a+=(cast(uint)(cast(ubyte*)k)[0]); | |
1690 default: | |
1691 } | |
1692 | |
1693 a -= b; a -= c; a ^= (c>>13); | |
1694 b -= c; b -= a; b ^= (a<<8); | |
1695 c -= a; c -= b; c ^= (b>>13); | |
1696 a -= b; a -= c; a ^= (c>>12); | |
1697 b -= c; b -= a; b ^= (a<<16); | |
1698 c -= a; c -= b; c ^= (b>>5); | |
1699 a -= b; a -= c; a ^= (c>>3); | |
1700 b -= c; b -= a; b ^= (a<<10); | |
1701 c -= a; c -= b; c ^= (b>>15); | |
1702 | |
1703 return c; | |
1704 } | |
1705 |