comparison tango/tango/net/cluster/tina/Cluster.d @ 132:1700239cab2e trunk

[svn r136] MAJOR UNSTABLE UPDATE!!! Initial commit after moving to Tango instead of Phobos. Lots of bugfixes... This build is not suitable for most things.
author lindquist
date Fri, 11 Jan 2008 17:57:40 +0100
parents
children
comparison
equal deleted inserted replaced
131:5825d48b27d1 132:1700239cab2e
1 /*******************************************************************************
2
3 copyright: Copyright (c) 2004 Kris Bell. All rights reserved
4
5 license: BSD style: $(LICENSE)
6
7 version: July 2004: Initial release
8
9 author: Kris
10
11 *******************************************************************************/
12
13 module tango.net.cluster.tina.Cluster;
14
15 private import tango.math.Random;
16
17 private import tango.core.Thread,
18 tango.core.Runtime,
19 tango.core.Exception;
20
21 private import tango.util.log.Log,
22 tango.util.log.Logger;
23
24 private import tango.time.Clock;
25
26 private import tango.io.Buffer,
27 tango.io.GrowBuffer;
28
29 private import tango.io.model.IConduit;
30
31 private import tango.net.Socket,
32 tango.net.SocketConduit,
33 tango.net.SocketListener,
34 tango.net.InternetAddress,
35 tango.net.MulticastConduit;
36
37 private import tango.net.cluster.NetworkClient;
38
39 public import tango.net.cluster.model.ICluster;
40
41 private import tango.net.cluster.tina.RollCall,
42 tango.net.cluster.tina.ProtocolReader,
43 tango.net.cluster.tina.ProtocolWriter;
44
45 private import Integer = tango.text.convert.Integer;
46
47 /*******************************************************************************
48
49 QOS implementation for sockets. All cluster-client activity is
50 gated through here by the higher level classes; NetworkQueue &
51 NetworkCache for example. You gain access to the cluster by
52 creating an instance of the QOS (quality of service) you desire
53 and either mapping client classes onto it, or usign it directly.
54 For example:
55 ---
56 import tango.net.cluster.tina.Cluster;
57
58 auto cluster = new Cluster;
59 cluster.join;
60
61 auto channel = cluster.createChannel (...);
62 channel.putQueue (...);
63 channel.getQueue ();
64 ---
65
66 Please see the cluster clients for additional details. Currently
67 these include CacheInvalidator, CacheInvalidatee, NetworkMessage,
68 NetworkTask, NetworkQueue, NetworkCache, NetworkCombo, plus the
69 Client base-class.
70
71 *******************************************************************************/
72
73 class Cluster : Broadcaster, ICluster
74 {
75 private FlexNodeSet task,
76 queue;
77 private FixedNodeSet cache;
78 private Logger logger;
79
80 /***********************************************************************
81
82 Create a cluster instance with a default logger and Nagle
83 caching disabled
84
85 ***********************************************************************/
86
87 this ()
88 {
89 this (Log.getLogger ("cluster.generic"), true);
90 }
91
92 /***********************************************************************
93
94 Create a cluster instance with the provided logger. Option
95 noDelay controls the settting of the Nagle algorithm on an
96 active connection to a server, which should be disabled by
97 default (noDelay == true)
98
99 ***********************************************************************/
100
101 this (Logger log, bool noDelay = true)
102 {
103 assert (log);
104 logger = log;
105
106 task = new FlexNodeSet (log, noDelay);
107 queue = new FlexNodeSet (log, noDelay);
108 cache = new FixedNodeSet (log, noDelay);
109 }
110
111 /***********************************************************************
112
113 Join the cluster as a client, discovering servers. Client
114 applications should invoke this before making requests so
115 that there are some servers to address.
116
117 If cache facilities will be used, then the join(cacheHosts)
118 variation should be used instead
119
120 ***********************************************************************/
121
122 final Cluster join ()
123 {
124 // listen for cluster servers
125 auto channel = createChannel ("cluster.server.advertise");
126 channel.createBulletinConsumer (&notify);
127
128 // ask who's currently running
129 channel.broadcast (new RollCall);
130 logger.trace ("discovering cluster nodes");
131
132 // wait for enabled servers to respond ...
133 Thread.sleep (0.250);
134 return this;
135 }
136
137 /***********************************************************************
138
139 Join the cluster as a client, discovering servers. Client
140 applications should invoke this before making requests so
141 that there are some servers to address.
142
143 If cache facilities will be used, use this method to set
144 the group of valid cache hosts. Each cache host should be
145 described as an array of machine-name and port pairs e.g.
146 ---
147 ["lucy:1234", "daisy:3343", "daisy:3344"]
148 ---
149
150 This sets up a fixed set of cache hosts, which should be
151 identical for all cache clients. Cache hosts not included
152 in this list will be ignored when they come online.
153
154 ***********************************************************************/
155
156 final Cluster join (char[][] cacheHosts)
157 {
158 foreach (addr; cacheHosts)
159 cache.addNode (new Node (log, addr, "cache"));
160 return join;
161 }
162
163 /***********************************************************************
164
165 Return the logger instance provided during construction.
166
167 ***********************************************************************/
168
169 final Logger log ()
170 {
171 return logger;
172 }
173
174 /***********************************************************************
175
176 Create a channel instance. Our channel implementation
177 includes a number of cached IO helpers (ProtocolWriter
178 and so on) which simplifies and speeds up execution.
179
180 ***********************************************************************/
181
182 final IChannel createChannel (char[] channel)
183 {
184 return new Channel (this, channel);
185 }
186
187 /***********************************************************************
188
189 ChannelListener method for listening to RollCall responses.
190 These are sent out by cluster servers both when they get a
191 RollCall request, and when they heartbeat.
192
193 ***********************************************************************/
194
195 private void notify (IEvent event)
196 {
197 scope rollcall = new RollCall;
198 event.get (rollcall);
199
200 switch (rollcall.type)
201 {
202 default:
203 break;
204
205 case RollCall.Task:
206 task.enable (rollcall.addr, "task");
207 break;
208
209 case RollCall.Cache:
210 cache.enable (rollcall.addr);
211 break;
212
213 case RollCall.Queue:
214 queue.enable (rollcall.addr, "queue");
215 break;
216 }
217 }
218 }
219
220
221 /*******************************************************************************
222
223 Basic multicast support across the cluster. Multicast is used
224 for broadcasting messages to all nodes in the cluster. We use
225 it for cache-invalidation, heartbeat, rollcall and notification
226 of queue activity
227
228 *******************************************************************************/
229
230 private class Broadcaster
231 {
232 private static InternetAddress[char[]] groups;
233 private Buffer mBuffer;
234 private ProtocolWriter mWriter;
235 private MulticastConduit mSocket;
236
237 private int groupPort = 3333;
238 private int groupPrefix = 225;
239
240 /***********************************************************************
241
242 Setup a Cluster instance. Currently the buffer & writer
243 are shared for all bulletin serialization; this should
244 probably change at some point such that we can support
245 multiple threads broadcasting concurrently to different
246 output ports.
247
248 ***********************************************************************/
249
250 this ()
251 {
252 mBuffer = new Buffer (1024 * 4);
253 mSocket = new MulticastConduit;
254 mWriter = new ProtocolWriter (mBuffer);
255 }
256
257 /***********************************************************************
258
259 Setup the multicast options. Port is used as the sole
260 address port for multicast usage, prefix is prepended
261 to each fabricated multicast address (should be a valid
262 class-D prefix), and ttl is the number of hops
263
264 ***********************************************************************/
265
266 final MulticastConduit conduit ()
267 {
268 return mSocket;
269 }
270
271 /***********************************************************************
272
273 Setup the multicast options. Port is used as the sole
274 address port for multicast usage & prefix is prepended
275 to each fabricated multicast address (should be a valid
276 class-D prefix: 225 through 239 inclusive)
277
278 ***********************************************************************/
279
280 final void multicast (int port, int prefix=225)
281 {
282 groupPort = port;
283 groupPrefix = prefix;
284 }
285
286 /***********************************************************************
287
288 Broadcast a message on the specified channel. This uses
289 IP/Multicast to scatter the payload to all registered
290 listeners (on the same multicast group). Note that the
291 maximum message size is limited to that of an Ethernet
292 data frame, minus the IP/UDP header size (1472 bytes).
293
294 Also note that we are synchronized to avoid contention
295 on the otherwise shared output buffer.
296
297 ***********************************************************************/
298
299 final synchronized void broadcast (char[] channel, IMessage message=null)
300 {
301 // clear buffer and serialize content
302 mWriter.put (ProtocolWriter.Command.OK, channel, null, message);
303
304 // Ethernet data-frame size minus the 28 byte UDP/IP header:
305 if (mBuffer.position > 1472)
306 throw new ClusterException ("message is too large to broadcast");
307
308 // send it to the appropriate multicast group
309 mSocket.write (mBuffer.slice, getGroup (channel));
310 }
311
312 /***********************************************************************
313
314 Return an internet address representing the multicast
315 group for the specified channel. We use three of the
316 four address segments to represent the channel itself
317 (via a hash on the channel name), and set the primary
318 segment to be that of the broadcast prefix (above).
319
320 ***********************************************************************/
321
322 final synchronized InternetAddress getGroup (char[] channel)
323 {
324 auto p = channel in groups;
325 if (p)
326 return *p;
327
328 // construct a group address from the prefix & channel-hash,
329 // where the hash is folded down to 24 bits
330 uint hash = jhash (channel.ptr, channel.length);
331 hash = (hash >> 24) ^ (hash & 0x00ffffff);
332
333 auto address = Integer.toString (groupPrefix) ~ "." ~
334 Integer.toString ((hash >> 16) & 0xff) ~ "." ~
335 Integer.toString ((hash >> 8) & 0xff) ~ "." ~
336 Integer.toString (hash & 0xff);
337
338 // insert InternetAddress into hashmap
339 auto group = new InternetAddress (address, groupPort);
340 groups [channel] = group;
341 return group;
342 }
343 }
344
345
346 /*******************************************************************************
347
348 A channel represents something akin to a publish/subscribe topic,
349 or a radio station. These are used to segregate cluster operations
350 into a set of groups, where each group is represented by a channel.
351 Channel names are whatever you want then to be: use of dot notation
352 has proved useful in the past.
353
354 Channel maintain internal state in order to avoid heap activity. So
355 they should not be shared across threads without appropriate synchs
356 in place. One remedy is create another channel instance
357
358 *******************************************************************************/
359
360 private class Channel : IChannel
361 {
362 private char[] name_;
363 private Buffer buffer;
364 private ProtocolReader reader;
365 private ProtocolWriter writer;
366 private Cluster cluster_;
367
368 /***********************************************************************
369
370 Construct a channel with the specified name. We cache
371 a number of session-related constructs here also, in
372 order to eliminate runtime overhead
373
374 ***********************************************************************/
375
376 this (Cluster cluster, char[] name)
377 in {
378 assert (cluster);
379 assert (name.length);
380 }
381 body
382 {
383 name_ = name;
384 cluster_ = cluster;
385
386 // this buffer will grow as required to house larger messages
387 buffer = new GrowBuffer (1024 * 2);
388 writer = new ProtocolWriter (buffer);
389
390 // make the reader slice directly from the buffer content
391 reader = new ProtocolReader (buffer);
392 }
393
394 /***********************************************************************
395
396 Return the name of this channel. This is the name provided
397 when the channel was constructed.
398
399 ***********************************************************************/
400
401 final char[] name ()
402 {
403 return name_;
404 }
405
406 /***********************************************************************
407
408 Return the assigned cluster
409
410 ***********************************************************************/
411
412 final Cluster cluster ()
413 {
414 return cluster_;
415 }
416
417 /***********************************************************************
418
419 Return the assigned logger
420
421 ***********************************************************************/
422
423 final Logger log ()
424 {
425 return cluster_.log;
426 }
427
428 /***********************************************************************
429
430 Output this channel via the provided IWriter
431
432 ***********************************************************************/
433
434 final void write (IWriter writer)
435 {
436 writer.put (name_);
437 }
438
439 /***********************************************************************
440
441 Input this channel via the provided IReader
442
443 ***********************************************************************/
444
445 final void read (IReader reader)
446 {
447 reader.get (name_);
448 }
449
450 /***********************************************************************
451
452 deserialize a message into a provided host, or via
453 the registered instance of the incoming message
454
455 ***********************************************************************/
456
457 final IMessage thaw (IMessage host = null)
458 {
459 return reader.thaw (host);
460 }
461
462 /***********************************************************************
463
464 Create a listener of the specified type. Listeners are
465 run within their own thread, since they spend the vast
466 majority of their time blocked on a Socket read. Would
467 be good to support multiplexed reading instead, such
468 that a thread pool could be applied instead.
469
470 ***********************************************************************/
471
472 final IConsumer createConsumer (ChannelListener notify)
473 {
474 cluster_.log.trace ("creating message consumer for '" ~ name_ ~ "'");
475 return new MessageConsumer (this, notify);
476 }
477
478 /***********************************************************************
479
480 Create a listener of the specified type. Listeners are
481 run within their own thread, since they spend the vast
482 majority of their time blocked on a Socket read. Would
483 be good to support multiplexed reading instead, such
484 that a thread pool could be applied instead.
485
486 ***********************************************************************/
487
488 final IConsumer createBulletinConsumer (ChannelListener notify)
489 {
490 cluster_.log.trace ("creating bulletin consumer for '" ~ name_ ~ "'");
491 return new BulletinConsumer (this, notify);
492 }
493
494 /***********************************************************************
495
496 Return a entry from the network cache, and optionally
497 remove it. This is a synchronous operation as opposed
498 to the asynchronous nature of an invalidate broadcast.
499
500 ***********************************************************************/
501
502 final IMessage getCache (char[] key, bool remove, IMessage host = null)
503 {
504 void send (IConduit conduit)
505 {
506 buffer.setConduit (conduit);
507 writer.put (remove ? ProtocolWriter.Command.Remove :
508 ProtocolWriter.Command.Copy, name_, key).flush;
509 }
510
511 if (cluster_.cache.request (&send, reader, key))
512 return reader.thaw (host);
513 return null;
514 }
515
516 /***********************************************************************
517
518 Place an entry into the network cache, replacing the
519 entry with the identical key. Where message.time is
520 set, it will be used to test for newer cache entries
521 than the one being sent i.e. if someone else placed
522 a newer entry into the cache, that one will remain.
523
524 Note that this may cause the oldest entry in the cache
525 to be displaced if the cache is already full.
526
527 ***********************************************************************/
528
529 final bool putCache (char[] key, IMessage message)
530 {
531 void send (IConduit conduit)
532 {
533 buffer.setConduit (conduit);
534 writer.put (ProtocolWriter.Command.Add, name_, key, message).flush;
535 }
536
537 // return false if the cache server said there's
538 // already something newer
539 if (cluster_.cache.request (&send, reader, key))
540 return false;
541 return true;
542 }
543
544 /***********************************************************************
545
546 Load a network cache entry remotely. This sends the given
547 IMessage over a network to the cache host, where it will
548 be executed locally. The benefit of doing so it that the
549 host may deny access to the cache entry for the duration
550 of the load operation. This, in turn, provides a mechanism
551 for gating/synchronizing multiple network clients over a
552 given cache entry; quite handy for those entries that are
553 relatively expensive to construct or access.
554
555 ***********************************************************************/
556
557 final bool loadCache (char[] key, IMessage message)
558 {
559 void send (IConduit conduit)
560 {
561 buffer.setConduit (conduit);
562 writer.put (ProtocolWriter.Command.Load, name_, key, message).flush;
563 }
564
565 return cluster_.cache.request (&send, reader, key);
566 }
567
568 /***********************************************************************
569
570 Query the cluster for queued entries on the corresponding
571 channel. Returns, and removes, the first matching entry
572 from the cluster. Note that this sweeps the cluster for
573 matching entries, and is synchronous in nature. The more
574 common approach is to setup a queue listener, which will
575 grab and dispatch queue entries asynchronously.
576
577 ***********************************************************************/
578
579 final IMessage getQueue (IMessage host = null)
580 {
581 if (scanQueue)
582 return reader.thaw (host);
583 return null;
584 }
585
586 /***********************************************************************
587
588 Query the cluster for queued entries on the corresponding
589 channel. Returns, and removes, the first matching entry
590 from the cluster. Note that this sweeps the cluster for
591 matching entries, and is synchronous in nature. The more
592 common approach is to setup a queue listener, which will
593 grab and dispatch queue entries asynchronously.
594
595 ***********************************************************************/
596
597 private bool scanQueue ()
598 {
599 void send (IConduit conduit)
600 {
601 buffer.setConduit (conduit);
602 writer.put (ProtocolWriter.Command.RemoveQueue, name_).flush;
603 }
604
605 bool scan (Node node)
606 {
607 bool message;
608 node.request (&send, reader, message);
609 return message;
610 }
611
612 // make a pass over each Node, looking for channel entries
613 return cluster_.queue.scan (&scan);
614 }
615
616 /***********************************************************************
617
618 Add an entry to the specified network queue. May throw a
619 QueueFullException if there's no room available.
620
621 ***********************************************************************/
622
623 final IMessage putQueue (IMessage message)
624 {
625 void send (IConduit conduit)
626 {
627 buffer.setConduit (conduit);
628 writer.put (ProtocolWriter.Command.AddQueue, name_, null, message).flush;
629 }
630
631 cluster_.queue.request (&send, reader);
632 return message;
633 }
634
635 /***********************************************************************
636
637 Send a remote call request to a server, and place the result
638 back into the provided message
639
640 ***********************************************************************/
641
642 final bool execute (IMessage message)
643 {
644 void send (IConduit conduit)
645 {
646 buffer.setConduit (conduit);
647 writer.put (ProtocolWriter.Command.Call, name_, null, message).flush;
648 }
649
650 if (cluster_.task.request (&send, reader))
651 {
652 // place result back into the provided message
653 reader.thaw (message);
654 return true;
655 }
656 return false;
657 }
658
659 /***********************************************************************
660
661 Broadcast a message on the specified channel. This uses
662 IP/Multicast to scatter the message to all registered
663 listeners (on the same multicast group). Note that the
664 maximum message size is limited to that of an Ethernet
665 data frame, minus the IP/UDP header size (1472 bytes).
666
667 ***********************************************************************/
668
669 final void broadcast (IMessage message = null)
670 {
671 cluster_.broadcast (name_, message);
672 }
673 }
674
675
676 /*******************************************************************************
677
678 A listener for multicast channel traffic. These are currently used
679 for cache coherency, queue publishing, and node discovery activity;
680 though could be used for direct messaging also.
681
682 Be careful when using the retained channel, since it is shared with
683 the calling thread. Thus a race condition could arise between the
684 client and this thread, were both to use the channel for transfers
685 at the same instant. Note that MessageConsumer makes a copy of the
686 channel for this purpose
687
688 *******************************************************************************/
689
690 private class BulletinConsumer : SocketListener, IConsumer, IEvent
691 {
692 private bool hasMore; // incoming message?
693 private Buffer buffer; // input buffer
694 private ProtocolReader reader; // input decoder
695 private Channel channel_; // associated channel
696 private Cluster cluster; // associated cluster
697 private MulticastConduit consumer; // broadcast listener
698 private ChannelListener listener; // user-level callback
699
700 /***********************************************************************
701
702 Construct a multicast consumer for the specified event. The
703 event handler will be invoked whenever a message arrives for
704 the associated channel.
705
706 ***********************************************************************/
707
708 this (Channel channel, ChannelListener listener)
709 {
710 this.channel_ = channel;
711 this.listener = listener;
712 this.cluster = channel.cluster;
713
714 // buffer doesn't need to be larger than Ethernet data-frame
715 buffer = new Buffer (1500);
716
717 // make the reader slice directly from the buffer content
718 reader = new ProtocolReader (buffer);
719
720 // configure a listener socket
721 consumer = new MulticastConduit (cluster.getGroup (channel_.name), true);
722 consumer.join;
723
724 super (consumer, buffer);
725
726 // fire up this listener
727 super.execute;
728 }
729
730 /***********************************************************************
731
732 Notification callback invoked when we receive a multicast
733 packet. Note that we check the packet channel-name against
734 the one we're consuming, to check for cases where the group
735 address had a hash collision.
736
737 ***********************************************************************/
738
739 override void notify (IBuffer buffer)
740 {
741 ProtocolWriter.Command cmd;
742 char[] channel;
743 char[] element;
744
745 // read the incoming header, along with the object guid
746 // where available
747 hasMore = reader.getHeader (cmd, channel, element);
748
749 // check it's really for us first (might be a hash collision)
750 if (channel == this.channel_.name)
751 invoke (this);
752 }
753
754 /***********************************************************************
755
756 ***********************************************************************/
757
758 IMessage get (IMessage host = null)
759 {
760 if (hasMore)
761 return reader.thaw (host);
762
763 throw new ClusterException ("attempting to thaw a non-existant message");
764 }
765
766 /***********************************************************************
767
768 Return the assigned logger
769
770 ***********************************************************************/
771
772 final Logger log ()
773 {
774 return cluster.log;
775 }
776
777 /***********************************************************************
778
779 Handle error conditions from the listener thread.
780
781 ***********************************************************************/
782
783 override void exception (char [] msg)
784 {
785 cluster.log.error ("BulletinConsumer: "~msg);
786 }
787
788 /***********************************************************************
789
790 Overridable mean of notifying the client code.
791
792 ***********************************************************************/
793
794 protected void invoke (IEvent event)
795 {
796 listener (event);
797 }
798
799 /***********************************************************************
800
801 Return the cluster instance we're associated with.
802
803 ***********************************************************************/
804
805 final Channel channel ()
806 {
807 return channel_;
808 }
809
810 /***********************************************************************
811
812 Temporarily halt listening. This can be used to ignore
813 multicast messages while, for example, the consumer is
814 busy doing other things.
815
816 ***********************************************************************/
817
818 final void pauseGroup ()
819 {
820 consumer.leave;
821 }
822
823 /***********************************************************************
824
825 Resume listening, post-pause.
826
827 ***********************************************************************/
828
829 final void resumeGroup ()
830 {
831 consumer.join;
832 }
833
834 /***********************************************************************
835
836 Cancel this consumer. The listener is effectively disabled
837 from this point forward. The listener thread does not halt
838 at this point, but waits until the socket-read returns.
839 Note that the D Interface implementation requires us to
840 "reimplement and dispatch" trivial things like this ~ it's
841 a pain in the neck to maintain.
842
843 ***********************************************************************/
844
845 final void cancel ()
846 {
847 super.cancel;
848 }
849
850 /***********************************************************************
851
852 Send a message back to the producer
853
854 ***********************************************************************/
855
856 void reply (IChannel channel, IMessage message)
857 {
858 assert (channel);
859 assert (message);
860
861 channel.broadcast (message);
862 }
863
864
865 /***********************************************************************
866
867 Return an appropriate reply channel for the given message,
868 or return null if no reply is expected
869
870 ***********************************************************************/
871
872 IChannel replyChannel (IMessage message)
873 {
874 if (message.reply.length)
875 return cluster.createChannel (message.reply);
876 return null;
877 }
878 }
879
880
881 /*******************************************************************************
882
883 A listener for queue events. These events are produced by the
884 queue host on a periodic bases when it has available entries.
885 We listen for them (rather than constantly scanning) and then
886 begin a sweep to process as many as we can. Note that we will
887 be in competition with other nodes to process these entries.
888
889 Also note that we create a copy of the channel in use, so that
890 race-conditions with the requesting client are avoided.
891
892 *******************************************************************************/
893
894 private class MessageConsumer : BulletinConsumer
895 {
896 /***********************************************************************
897
898 Construct a multicast consumer for the specified event
899
900 ***********************************************************************/
901
902 this (Channel channel, ChannelListener listener)
903 {
904 super (channel, listener);
905
906 // create private channel instance to use in our thread
907 this.channel_ = new Channel (channel.cluster, channel.name);
908 }
909
910 /***********************************************************************
911
912 Handle error conditions from the listener thread.
913
914 ***********************************************************************/
915
916 override void exception (char [] msg)
917 {
918 cluster.log.error ("MessageConsumer: "~msg);
919 }
920
921 /***********************************************************************
922
923 Overrides the default processing to sweep the cluster for
924 queued entries. Each server node is queried until one is
925 found that contains a message. Note that it is possible
926 to set things up where we are told exactly which node to
927 go to; however given that we won't be listening whilst
928 scanning, and that there's likely to be a group of new
929 entries in the cluster, it's just as effective to scan.
930 This will be far from ideal for all environments, so we
931 should make the strategy pluggable instead.
932
933 Note also that the content is retrieved via a duplicate
934 channel to avoid potential race-conditions on the original
935
936 ***********************************************************************/
937
938 override IMessage get (IMessage host = null)
939 {
940 if (channel.scanQueue)
941 return channel.thaw (host);
942 return null;
943 }
944
945 /***********************************************************************
946
947 Send a message back to the producer
948
949 ***********************************************************************/
950
951 override void reply (IChannel channel, IMessage message)
952 {
953 assert (channel);
954 assert (message);
955
956 channel.putQueue (message);
957 }
958
959 /***********************************************************************
960
961 Override the default notification handler in order to
962 disable multicast reciepts while the application does
963 what it needs to
964
965 ***********************************************************************/
966
967 override protected void invoke (IEvent event)
968 {
969 // temporarily pause listening while processing
970 pauseGroup;
971 try {
972 listener (event);
973 } finally resumeGroup;
974 }
975 }
976
977
978 /*******************************************************************************
979
980 An abstraction of a socket connection. Used internally by the
981 socket-based Cluster.
982
983 *******************************************************************************/
984
985 private class Connection
986 {
987 abstract bool reset();
988
989 abstract void done (Time time);
990
991 abstract SocketConduit conduit ();
992 }
993
994
995 /*******************************************************************************
996
997 A pool of socket connections for accessing cluster nodes. Note
998 that the entries will timeout after a period of inactivity, and
999 will subsequently cause a connected host to drop the supporting
1000 session.
1001
1002 *******************************************************************************/
1003
1004 private class ConnectionPool
1005 {
1006 private Logger log;
1007 private int count;
1008 private bool noDelay;
1009 private InternetAddress address;
1010 private PoolConnection freelist;
1011 private TimeSpan timeout = TimeSpan.seconds(60);
1012
1013 /***********************************************************************
1014
1015 Utility class to provide the basic connection facilities
1016 provided by the connection pool.
1017
1018 ***********************************************************************/
1019
1020 static class PoolConnection : Connection
1021 {
1022 Time time;
1023 PoolConnection next;
1024 ConnectionPool parent;
1025 SocketConduit conduit_;
1026
1027 /***************************************************************
1028
1029 Construct a new connection and set its parent
1030
1031 ***************************************************************/
1032
1033 this (ConnectionPool pool)
1034 {
1035 parent = pool;
1036 reset;
1037 }
1038
1039 /***************************************************************
1040
1041 Create a new socket and connect it to the specified
1042 server. This will cause a dedicated thread to start
1043 on the server. Said thread will quit when an error
1044 occurs.
1045
1046 ***************************************************************/
1047
1048 final bool reset ()
1049 {
1050 try {
1051 conduit_ = new SocketConduit;
1052
1053 // apply Nagle settings
1054 conduit.socket.setNoDelay (parent.noDelay);
1055
1056 // set a 500ms timeout for read operations
1057 conduit_.setTimeout (TimeSpan.millis(500));
1058
1059 // open a connection to this server
1060 // parent.log.trace ("connecting to server");
1061 conduit_.connect (parent.address);
1062 return true;
1063
1064 } catch (Object o)
1065 {
1066 if (! Runtime.isHalting)
1067 parent.log.warn ("server is unavailable :: "~o.toString);
1068 }
1069 return false;
1070 }
1071
1072 /***************************************************************
1073
1074 Return the socket belonging to this connection
1075
1076 ***************************************************************/
1077
1078 final SocketConduit conduit ()
1079 {
1080 return conduit_;
1081 }
1082
1083 /***************************************************************
1084
1085 Close the socket. This will cause any host session
1086 to be terminated.
1087
1088 ***************************************************************/
1089
1090 final void close ()
1091 {
1092 conduit_.detach;
1093 }
1094
1095 /***************************************************************
1096
1097 Return this connection to the free-list. Note that
1098 we have to synchronize on the parent-pool itself.
1099
1100 ***************************************************************/
1101
1102 final void done (Time time)
1103 {
1104 synchronized (parent)
1105 {
1106 next = parent.freelist;
1107 parent.freelist = this;
1108 this.time = time;
1109 }
1110 }
1111 }
1112
1113
1114 /***********************************************************************
1115
1116 Create a connection-pool for the specified address.
1117
1118 ***********************************************************************/
1119
1120 this (InternetAddress address, Logger log, bool noDelay)
1121 {
1122 this.log = log;
1123 this.address = address;
1124 this.noDelay = noDelay;
1125 }
1126
1127 /***********************************************************************
1128
1129 Allocate a Connection from a list rather than creating a
1130 new one. Reap old entries as we go.
1131
1132 ***********************************************************************/
1133
1134 final synchronized Connection borrow (Time time)
1135 {
1136 if (freelist)
1137 do {
1138 auto c = freelist;
1139
1140 freelist = c.next;
1141 if (freelist && (time - c.time > timeout))
1142 c.close;
1143 else
1144 return c;
1145 } while (true);
1146
1147 return new PoolConnection (this);
1148 }
1149
1150 /***********************************************************************
1151
1152 Close this pool and drop all existing connections.
1153
1154 ***********************************************************************/
1155
1156 final synchronized void close ()
1157 {
1158 auto c = freelist;
1159 freelist = null;
1160 while (c)
1161 {
1162 c.close;
1163 c = c.next;
1164 }
1165 }
1166 }
1167
1168
1169 /*******************************************************************************
1170
1171 Class to represent a cluster node. Each node supports both cache
1172 and queue functionality. Note that the set of available nodes is
1173 configured at startup, simplifying the discovery process in some
1174 significant ways, and causing less thrashing of cache-keys.
1175
1176 *******************************************************************************/
1177
1178 private class Node
1179 {
1180 private Logger log;
1181 private char[] name,
1182 addr;
1183 private ConnectionPool pool;
1184 private bool enabled;
1185
1186 alias void delegate (IConduit conduit) Requestor;
1187
1188 /***********************************************************************
1189
1190 Construct a node with the provided name. This name should
1191 be the network name of the hosting device.
1192
1193 ***********************************************************************/
1194
1195 this (Logger log, char[] addr, char[] name)
1196 {
1197 this.log = log;
1198 this.addr = addr;
1199 this.name = name ~ ':' ~ addr;
1200 }
1201
1202 /***********************************************************************
1203
1204 Add a cache/queue reference for the remote node
1205
1206 ***********************************************************************/
1207
1208 final void setPool (InternetAddress address, bool noDelay)
1209 {
1210 this.pool = new ConnectionPool (address, log, noDelay);
1211 }
1212
1213 /***********************************************************************
1214
1215 Return the name of this node
1216
1217 ***********************************************************************/
1218
1219 override char[] toString ()
1220 {
1221 return name;
1222 }
1223
1224 /***********************************************************************
1225
1226 Return the network address of this node
1227
1228 ***********************************************************************/
1229
1230 final char[] address ()
1231 {
1232 return addr;
1233 }
1234
1235 /***********************************************************************
1236
1237 Remove this Node from the cluster. The node is disabled
1238 until it is seen to recover.
1239
1240 ***********************************************************************/
1241
1242 final void fail ()
1243 {
1244 setEnabled (false);
1245 pool.close;
1246 }
1247
1248 /***********************************************************************
1249
1250 Get the current state of this node
1251
1252 ***********************************************************************/
1253
1254 final bool isEnabled ()
1255 {
1256 volatile
1257 return enabled;
1258 }
1259
1260 /***********************************************************************
1261
1262 Set the enabled state of this node
1263
1264 ***********************************************************************/
1265
1266 final void setEnabled (bool enabled)
1267 {
1268 if (enabled)
1269 log.trace ("enabling "~name);
1270 else
1271 log.trace ("disabling "~name);
1272
1273 volatile
1274 this.enabled = enabled;
1275 }
1276
1277 /***********************************************************************
1278
1279 request data; fail this Node if we can't connect. Note
1280 that we make several attempts to connect before writing
1281 the node off as a failure. We use a delegate to perform
1282 the request output since it may be invoked on more than
1283 one iteration, where the current attempt fails.
1284
1285 We return true if the cluster node responds, and false
1286 otherwise. Exceptions are thrown if they occured on the
1287 server. Parameter 'message' is set true if a message is
1288 available from the server response
1289
1290 ***********************************************************************/
1291
1292 final bool request (Requestor dg, ProtocolReader reader, out bool message)
1293 {
1294 ProtocolWriter.Command cmd;
1295 Time time;
1296 char[] channel;
1297 char[] element;
1298
1299 // it's possible that the pool may have failed between
1300 // the point of selecting it, and the invocation itself
1301 if (pool is null)
1302 return false;
1303
1304 // get a connection to the server
1305 auto connect = pool.borrow (time = Clock.now);
1306
1307 // talk to the server (try a few times if necessary)
1308 for (int attempts=3; attempts--;)
1309 try {
1310 // attach connection to writer and send request
1311 dg (connect.conduit);
1312
1313 // attach connection to reader
1314 reader.buffer.setConduit (connect.conduit);
1315
1316 // load the returned object. Don't retry on
1317 // failed reads, since the server is either
1318 // really really busy, or near death. We must
1319 // assume it is offline until it tells us
1320 // otherwise (via a heartbeat)
1321 attempts = 0;
1322 message = reader.getHeader (cmd, channel, element);
1323
1324 // return borrowed connection
1325 connect.done (time);
1326
1327 } catch (RegistryException x)
1328 {
1329 connect.done (time);
1330 throw x;
1331 }
1332 catch (IOException x)
1333 {
1334 log.trace ("IOException on server request :: "~x.toString);
1335
1336 // attempt to reconnect?
1337 if (attempts is 0 || !connect.reset)
1338 {
1339 // that server is offline
1340 fail;
1341
1342 // state that we failed
1343 return false;
1344 }
1345 }
1346
1347 // is message an exception?
1348 if (cmd !is ProtocolWriter.Command.OK)
1349 {
1350 // is node full?
1351 if (cmd is ProtocolWriter.Command.Full)
1352 throw new ClusterFullException (channel);
1353
1354 // did node barf?
1355 if (cmd is ProtocolWriter.Command.Exception)
1356 throw new ClusterException (channel);
1357
1358 // bogus response
1359 throw new ClusterException ("invalid response from cluster server");
1360 }
1361
1362 // ok, our server responded
1363 return true;
1364 }
1365 }
1366
1367
1368 /*******************************************************************************
1369
1370 Models a generic set of cluster nodes. This is intended to be
1371 thread-safe, with no locking on a lookup operation
1372
1373 *******************************************************************************/
1374
1375 private class NodeSet
1376 {
1377 private Node[char[]] map;
1378 private Logger log;
1379 private Set set;
1380 private bool noDelay;
1381
1382 /***********************************************************************
1383
1384 ***********************************************************************/
1385
1386 this (Logger log, bool noDelay)
1387 {
1388 this.log = log;
1389 this.set = new Set;
1390 this.noDelay = noDelay;
1391 }
1392
1393 /***********************************************************************
1394
1395 ***********************************************************************/
1396
1397 final Logger logger ()
1398 {
1399 return log;
1400 }
1401
1402 /***********************************************************************
1403
1404 Add a node to the list of servers
1405
1406 ***********************************************************************/
1407
1408 synchronized final Node addNode (Node node)
1409 {
1410 auto addr = node.address;
1411 if (addr in map)
1412 throw new ClusterException ("Attempt to add cluster node '"~addr~"' more than once");
1413
1414 map[addr] = node;
1415
1416 // note that this creates a new Set instance. We do this
1417 // so that selectNode() can avoid synchronization
1418 set = set.add (node);
1419 return node;
1420 }
1421
1422 /***********************************************************************
1423
1424 Select a cluster server based on a starting index. If the
1425 selected server is not currently enabled, we just try the
1426 next one. This behaviour should be consistent across each
1427 cluster client.
1428
1429 ***********************************************************************/
1430
1431 final Node selectNode (uint index)
1432 {
1433 auto hosts = set.nodes;
1434 uint count = hosts.length;
1435
1436 if (count)
1437 {
1438 index %= count;
1439
1440 while (count--)
1441 {
1442 auto node = hosts [index];
1443 if (node.isEnabled)
1444 return node;
1445
1446 if (++index >= hosts.length)
1447 index = 0;
1448 }
1449 }
1450 throw new ClusterEmptyException ("No appropriate cluster nodes are available");
1451 }
1452
1453 /***********************************************************************
1454
1455 Host class for the set of nodes. We utilize this to enable
1456 atomic read/write where it would not be otherwise possible
1457 -- D arrays are organized as ptr+length pairs and are thus
1458 inherently non-atomic for assignment purposes
1459
1460 ***********************************************************************/
1461
1462 private static class Set
1463 {
1464 Node[] nodes,
1465 random;
1466
1467 final Set add (Node node)
1468 {
1469 auto s = new Set;
1470 s.nodes = nodes ~ node;
1471 s.randomize;
1472 return s;
1473 }
1474
1475 private final void randomize ()
1476 {
1477 // copy the node list
1478 random = nodes.dup;
1479
1480 // muddle up the duplicate list. This randomized list
1481 // is used when scanning the cluster for queued entries
1482 foreach (i, n; random)
1483 {
1484 auto j = Random.shared.next (random.length);
1485 auto tmp = random[i];
1486 random[i] = random[j];
1487 random[j] = tmp;
1488 }
1489 }
1490 }
1491 }
1492
1493
1494 /*******************************************************************************
1495
1496 Models a fixed set of cluster nodes. Used for Cache
1497
1498 *******************************************************************************/
1499
1500 private class FixedNodeSet : NodeSet
1501 {
1502 /***********************************************************************
1503
1504 ***********************************************************************/
1505
1506 this (Logger log, bool noDelay)
1507 {
1508 super (log, noDelay);
1509 }
1510
1511 /***********************************************************************
1512
1513 ***********************************************************************/
1514
1515 final synchronized void enable (char[] addr)
1516 {
1517 auto p = addr in map;
1518 if (p)
1519 {
1520 auto node = *p;
1521 if (! node.isEnabled)
1522 {
1523 node.setPool (new InternetAddress(addr), noDelay);
1524 node.setEnabled (true);
1525 }
1526 }
1527 else
1528 // don't throw when no cache hosts have been configured at all
1529 if (set.nodes.length)
1530 throw new ClusterException ("Attempt to enable unregistered cache node '"~addr~"'");
1531 }
1532
1533 /***********************************************************************
1534
1535 Select a cluster server based on the specified key. If the
1536 selected server is not currently enabled, we just try the
1537 next one. This behaviour should be consistent across each
1538 cluster client.
1539
1540 ***********************************************************************/
1541
1542 final bool request (Node.Requestor dg, ProtocolReader reader, char[] key)
1543 {
1544 Node node;
1545 bool message;
1546
1547 do {
1548 node = selectNode (jhash (key.ptr, key.length));
1549 } while (! node.request (dg, reader, message));
1550
1551 return message;
1552 }
1553 }
1554
1555
1556 /*******************************************************************************
1557
1558 Models a flexible set of cluster nodes. Used for queue and task
1559
1560 *******************************************************************************/
1561
1562 private class FlexNodeSet : NodeSet
1563 {
1564 private uint rollover;
1565
1566 /***********************************************************************
1567
1568 ***********************************************************************/
1569
1570 this (Logger log, bool noDelay)
1571 {
1572 super (log, noDelay);
1573 }
1574
1575 /***********************************************************************
1576
1577 ***********************************************************************/
1578
1579 final synchronized void enable (char[] addr, char[] name)
1580 {
1581 auto p = addr in map;
1582 auto node = p ? *p : addNode (new Node (log, addr, name));
1583
1584 if (! node.isEnabled)
1585 {
1586 node.setPool (new InternetAddress(addr), noDelay);
1587 node.setEnabled (true);
1588 }
1589 }
1590
1591 /***********************************************************************
1592
1593 Select a cluster server based on the specified key. If the
1594 selected server is not currently enabled, we just try the
1595 next one. This behaviour should be consistent across each
1596 cluster client.
1597
1598 ***********************************************************************/
1599
1600 final bool request (Node.Requestor dg, ProtocolReader reader)
1601 {
1602 Node node;
1603 bool message;
1604
1605 do {
1606 node = selectNode (++rollover);
1607 } while (! node.request (dg, reader, message));
1608
1609 return message;
1610 }
1611
1612 /***********************************************************************
1613
1614 Sweep the cluster servers. Returns true if the delegate
1615 returns true, false otherwise. The sweep is halted when
1616 the delegate returns true. Note that this scans nodes in
1617 a randomized pattern, which should tend to avoid 'bursty'
1618 activity by a set of clients upon any one cluster server.
1619
1620 ***********************************************************************/
1621
1622 final bool scan (bool delegate(Node) dg)
1623 {
1624 auto hosts = set.random;
1625 auto index = hosts.length;
1626
1627 while (index)
1628 {
1629 // lookup the randomized set of server nodes
1630 auto node = hosts [--index];
1631
1632 // callback on each enabled node
1633 if (node.isEnabled)
1634 if (dg (node))
1635 return true;
1636 }
1637 return false;
1638 }
1639 }
1640
1641
1642 /******************************************************************************
1643
1644 The Bob Jenkins lookup2 algorithm. This should be relocated
1645 to somewhere common
1646
1647 ******************************************************************************/
1648
1649 private static uint jhash (void* k, uint len, uint init = 0)
1650 {
1651 uint a = 0x9e3779b9,
1652 b = 0x9e3779b9,
1653 c = init,
1654 i = len;
1655
1656 // handle most of the key
1657 while (i >= 12)
1658 {
1659 a += *cast(uint*)(k+0);
1660 b += *cast(uint*)(k+4);
1661 c += *cast(uint*)(k+8);
1662
1663 a -= b; a -= c; a ^= (c>>13);
1664 b -= c; b -= a; b ^= (a<<8);
1665 c -= a; c -= b; c ^= (b>>13);
1666 a -= b; a -= c; a ^= (c>>12);
1667 b -= c; b -= a; b ^= (a<<16);
1668 c -= a; c -= b; c ^= (b>>5);
1669 a -= b; a -= c; a ^= (c>>3);
1670 b -= c; b -= a; b ^= (a<<10);
1671 c -= a; c -= b; c ^= (b>>15);
1672 k += 12; i -= 12;
1673 }
1674
1675 // handle the last 11 bytes
1676 c += len;
1677 switch (i)
1678 {
1679 case 11: c+=(cast(uint)(cast(ubyte*)k)[10]<<24);
1680 case 10: c+=(cast(uint)(cast(ubyte*)k)[9]<<16);
1681 case 9 : c+=(cast(uint)(cast(ubyte*)k)[8]<<8);
1682 case 8 : b+=(cast(uint)(cast(ubyte*)k)[7]<<24);
1683 case 7 : b+=(cast(uint)(cast(ubyte*)k)[6]<<16);
1684 case 6 : b+=(cast(uint)(cast(ubyte*)k)[5]<<8);
1685 case 5 : b+=(cast(uint)(cast(ubyte*)k)[4]);
1686 case 4 : a+=(cast(uint)(cast(ubyte*)k)[3]<<24);
1687 case 3 : a+=(cast(uint)(cast(ubyte*)k)[2]<<16);
1688 case 2 : a+=(cast(uint)(cast(ubyte*)k)[1]<<8);
1689 case 1 : a+=(cast(uint)(cast(ubyte*)k)[0]);
1690 default:
1691 }
1692
1693 a -= b; a -= c; a ^= (c>>13);
1694 b -= c; b -= a; b ^= (a<<8);
1695 c -= a; c -= b; c ^= (b>>13);
1696 a -= b; a -= c; a ^= (c>>12);
1697 b -= c; b -= a; b ^= (a<<16);
1698 c -= a; c -= b; c ^= (b>>5);
1699 a -= b; a -= c; a ^= (c>>3);
1700 b -= c; b -= a; b ^= (a<<10);
1701 c -= a; c -= b; c ^= (b>>15);
1702
1703 return c;
1704 }
1705