Mercurial > projects > ldc
diff tango/tango/net/cluster/tina/Cluster.d @ 132:1700239cab2e trunk
[svn r136] MAJOR UNSTABLE UPDATE!!!
Initial commit after moving to Tango instead of Phobos.
Lots of bugfixes...
This build is not suitable for most things.
author | lindquist |
---|---|
date | Fri, 11 Jan 2008 17:57:40 +0100 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tango/tango/net/cluster/tina/Cluster.d Fri Jan 11 17:57:40 2008 +0100 @@ -0,0 +1,1705 @@ +/******************************************************************************* + + copyright: Copyright (c) 2004 Kris Bell. All rights reserved + + license: BSD style: $(LICENSE) + + version: July 2004: Initial release + + author: Kris + +*******************************************************************************/ + +module tango.net.cluster.tina.Cluster; + +private import tango.math.Random; + +private import tango.core.Thread, + tango.core.Runtime, + tango.core.Exception; + +private import tango.util.log.Log, + tango.util.log.Logger; + +private import tango.time.Clock; + +private import tango.io.Buffer, + tango.io.GrowBuffer; + +private import tango.io.model.IConduit; + +private import tango.net.Socket, + tango.net.SocketConduit, + tango.net.SocketListener, + tango.net.InternetAddress, + tango.net.MulticastConduit; + +private import tango.net.cluster.NetworkClient; + +public import tango.net.cluster.model.ICluster; + +private import tango.net.cluster.tina.RollCall, + tango.net.cluster.tina.ProtocolReader, + tango.net.cluster.tina.ProtocolWriter; + +private import Integer = tango.text.convert.Integer; + +/******************************************************************************* + + QOS implementation for sockets. All cluster-client activity is + gated through here by the higher level classes; NetworkQueue & + NetworkCache for example. You gain access to the cluster by + creating an instance of the QOS (quality of service) you desire + and either mapping client classes onto it, or usign it directly. + For example: + --- + import tango.net.cluster.tina.Cluster; + + auto cluster = new Cluster; + cluster.join; + + auto channel = cluster.createChannel (...); + channel.putQueue (...); + channel.getQueue (); + --- + + Please see the cluster clients for additional details. Currently + these include CacheInvalidator, CacheInvalidatee, NetworkMessage, + NetworkTask, NetworkQueue, NetworkCache, NetworkCombo, plus the + Client base-class. + +*******************************************************************************/ + +class Cluster : Broadcaster, ICluster +{ + private FlexNodeSet task, + queue; + private FixedNodeSet cache; + private Logger logger; + + /*********************************************************************** + + Create a cluster instance with a default logger and Nagle + caching disabled + + ***********************************************************************/ + + this () + { + this (Log.getLogger ("cluster.generic"), true); + } + + /*********************************************************************** + + Create a cluster instance with the provided logger. Option + noDelay controls the settting of the Nagle algorithm on an + active connection to a server, which should be disabled by + default (noDelay == true) + + ***********************************************************************/ + + this (Logger log, bool noDelay = true) + { + assert (log); + logger = log; + + task = new FlexNodeSet (log, noDelay); + queue = new FlexNodeSet (log, noDelay); + cache = new FixedNodeSet (log, noDelay); + } + + /*********************************************************************** + + Join the cluster as a client, discovering servers. Client + applications should invoke this before making requests so + that there are some servers to address. + + If cache facilities will be used, then the join(cacheHosts) + variation should be used instead + + ***********************************************************************/ + + final Cluster join () + { + // listen for cluster servers + auto channel = createChannel ("cluster.server.advertise"); + channel.createBulletinConsumer (¬ify); + + // ask who's currently running + channel.broadcast (new RollCall); + logger.trace ("discovering cluster nodes"); + + // wait for enabled servers to respond ... + Thread.sleep (0.250); + return this; + } + + /*********************************************************************** + + Join the cluster as a client, discovering servers. Client + applications should invoke this before making requests so + that there are some servers to address. + + If cache facilities will be used, use this method to set + the group of valid cache hosts. Each cache host should be + described as an array of machine-name and port pairs e.g. + --- + ["lucy:1234", "daisy:3343", "daisy:3344"] + --- + + This sets up a fixed set of cache hosts, which should be + identical for all cache clients. Cache hosts not included + in this list will be ignored when they come online. + + ***********************************************************************/ + + final Cluster join (char[][] cacheHosts) + { + foreach (addr; cacheHosts) + cache.addNode (new Node (log, addr, "cache")); + return join; + } + + /*********************************************************************** + + Return the logger instance provided during construction. + + ***********************************************************************/ + + final Logger log () + { + return logger; + } + + /*********************************************************************** + + Create a channel instance. Our channel implementation + includes a number of cached IO helpers (ProtocolWriter + and so on) which simplifies and speeds up execution. + + ***********************************************************************/ + + final IChannel createChannel (char[] channel) + { + return new Channel (this, channel); + } + + /*********************************************************************** + + ChannelListener method for listening to RollCall responses. + These are sent out by cluster servers both when they get a + RollCall request, and when they heartbeat. + + ***********************************************************************/ + + private void notify (IEvent event) + { + scope rollcall = new RollCall; + event.get (rollcall); + + switch (rollcall.type) + { + default: + break; + + case RollCall.Task: + task.enable (rollcall.addr, "task"); + break; + + case RollCall.Cache: + cache.enable (rollcall.addr); + break; + + case RollCall.Queue: + queue.enable (rollcall.addr, "queue"); + break; + } + } +} + + +/******************************************************************************* + + Basic multicast support across the cluster. Multicast is used + for broadcasting messages to all nodes in the cluster. We use + it for cache-invalidation, heartbeat, rollcall and notification + of queue activity + +*******************************************************************************/ + +private class Broadcaster +{ + private static InternetAddress[char[]] groups; + private Buffer mBuffer; + private ProtocolWriter mWriter; + private MulticastConduit mSocket; + + private int groupPort = 3333; + private int groupPrefix = 225; + + /*********************************************************************** + + Setup a Cluster instance. Currently the buffer & writer + are shared for all bulletin serialization; this should + probably change at some point such that we can support + multiple threads broadcasting concurrently to different + output ports. + + ***********************************************************************/ + + this () + { + mBuffer = new Buffer (1024 * 4); + mSocket = new MulticastConduit; + mWriter = new ProtocolWriter (mBuffer); + } + + /*********************************************************************** + + Setup the multicast options. Port is used as the sole + address port for multicast usage, prefix is prepended + to each fabricated multicast address (should be a valid + class-D prefix), and ttl is the number of hops + + ***********************************************************************/ + + final MulticastConduit conduit () + { + return mSocket; + } + + /*********************************************************************** + + Setup the multicast options. Port is used as the sole + address port for multicast usage & prefix is prepended + to each fabricated multicast address (should be a valid + class-D prefix: 225 through 239 inclusive) + + ***********************************************************************/ + + final void multicast (int port, int prefix=225) + { + groupPort = port; + groupPrefix = prefix; + } + + /*********************************************************************** + + Broadcast a message on the specified channel. This uses + IP/Multicast to scatter the payload to all registered + listeners (on the same multicast group). Note that the + maximum message size is limited to that of an Ethernet + data frame, minus the IP/UDP header size (1472 bytes). + + Also note that we are synchronized to avoid contention + on the otherwise shared output buffer. + + ***********************************************************************/ + + final synchronized void broadcast (char[] channel, IMessage message=null) + { + // clear buffer and serialize content + mWriter.put (ProtocolWriter.Command.OK, channel, null, message); + + // Ethernet data-frame size minus the 28 byte UDP/IP header: + if (mBuffer.position > 1472) + throw new ClusterException ("message is too large to broadcast"); + + // send it to the appropriate multicast group + mSocket.write (mBuffer.slice, getGroup (channel)); + } + + /*********************************************************************** + + Return an internet address representing the multicast + group for the specified channel. We use three of the + four address segments to represent the channel itself + (via a hash on the channel name), and set the primary + segment to be that of the broadcast prefix (above). + + ***********************************************************************/ + + final synchronized InternetAddress getGroup (char[] channel) + { + auto p = channel in groups; + if (p) + return *p; + + // construct a group address from the prefix & channel-hash, + // where the hash is folded down to 24 bits + uint hash = jhash (channel.ptr, channel.length); + hash = (hash >> 24) ^ (hash & 0x00ffffff); + + auto address = Integer.toString (groupPrefix) ~ "." ~ + Integer.toString ((hash >> 16) & 0xff) ~ "." ~ + Integer.toString ((hash >> 8) & 0xff) ~ "." ~ + Integer.toString (hash & 0xff); + + // insert InternetAddress into hashmap + auto group = new InternetAddress (address, groupPort); + groups [channel] = group; + return group; + } +} + + +/******************************************************************************* + + A channel represents something akin to a publish/subscribe topic, + or a radio station. These are used to segregate cluster operations + into a set of groups, where each group is represented by a channel. + Channel names are whatever you want then to be: use of dot notation + has proved useful in the past. + + Channel maintain internal state in order to avoid heap activity. So + they should not be shared across threads without appropriate synchs + in place. One remedy is create another channel instance + +*******************************************************************************/ + +private class Channel : IChannel +{ + private char[] name_; + private Buffer buffer; + private ProtocolReader reader; + private ProtocolWriter writer; + private Cluster cluster_; + + /*********************************************************************** + + Construct a channel with the specified name. We cache + a number of session-related constructs here also, in + order to eliminate runtime overhead + + ***********************************************************************/ + + this (Cluster cluster, char[] name) + in { + assert (cluster); + assert (name.length); + } + body + { + name_ = name; + cluster_ = cluster; + + // this buffer will grow as required to house larger messages + buffer = new GrowBuffer (1024 * 2); + writer = new ProtocolWriter (buffer); + + // make the reader slice directly from the buffer content + reader = new ProtocolReader (buffer); + } + + /*********************************************************************** + + Return the name of this channel. This is the name provided + when the channel was constructed. + + ***********************************************************************/ + + final char[] name () + { + return name_; + } + + /*********************************************************************** + + Return the assigned cluster + + ***********************************************************************/ + + final Cluster cluster () + { + return cluster_; + } + + /*********************************************************************** + + Return the assigned logger + + ***********************************************************************/ + + final Logger log () + { + return cluster_.log; + } + + /*********************************************************************** + + Output this channel via the provided IWriter + + ***********************************************************************/ + + final void write (IWriter writer) + { + writer.put (name_); + } + + /*********************************************************************** + + Input this channel via the provided IReader + + ***********************************************************************/ + + final void read (IReader reader) + { + reader.get (name_); + } + + /*********************************************************************** + + deserialize a message into a provided host, or via + the registered instance of the incoming message + + ***********************************************************************/ + + final IMessage thaw (IMessage host = null) + { + return reader.thaw (host); + } + + /*********************************************************************** + + Create a listener of the specified type. Listeners are + run within their own thread, since they spend the vast + majority of their time blocked on a Socket read. Would + be good to support multiplexed reading instead, such + that a thread pool could be applied instead. + + ***********************************************************************/ + + final IConsumer createConsumer (ChannelListener notify) + { + cluster_.log.trace ("creating message consumer for '" ~ name_ ~ "'"); + return new MessageConsumer (this, notify); + } + + /*********************************************************************** + + Create a listener of the specified type. Listeners are + run within their own thread, since they spend the vast + majority of their time blocked on a Socket read. Would + be good to support multiplexed reading instead, such + that a thread pool could be applied instead. + + ***********************************************************************/ + + final IConsumer createBulletinConsumer (ChannelListener notify) + { + cluster_.log.trace ("creating bulletin consumer for '" ~ name_ ~ "'"); + return new BulletinConsumer (this, notify); + } + + /*********************************************************************** + + Return a entry from the network cache, and optionally + remove it. This is a synchronous operation as opposed + to the asynchronous nature of an invalidate broadcast. + + ***********************************************************************/ + + final IMessage getCache (char[] key, bool remove, IMessage host = null) + { + void send (IConduit conduit) + { + buffer.setConduit (conduit); + writer.put (remove ? ProtocolWriter.Command.Remove : + ProtocolWriter.Command.Copy, name_, key).flush; + } + + if (cluster_.cache.request (&send, reader, key)) + return reader.thaw (host); + return null; + } + + /*********************************************************************** + + Place an entry into the network cache, replacing the + entry with the identical key. Where message.time is + set, it will be used to test for newer cache entries + than the one being sent i.e. if someone else placed + a newer entry into the cache, that one will remain. + + Note that this may cause the oldest entry in the cache + to be displaced if the cache is already full. + + ***********************************************************************/ + + final bool putCache (char[] key, IMessage message) + { + void send (IConduit conduit) + { + buffer.setConduit (conduit); + writer.put (ProtocolWriter.Command.Add, name_, key, message).flush; + } + + // return false if the cache server said there's + // already something newer + if (cluster_.cache.request (&send, reader, key)) + return false; + return true; + } + + /*********************************************************************** + + Load a network cache entry remotely. This sends the given + IMessage over a network to the cache host, where it will + be executed locally. The benefit of doing so it that the + host may deny access to the cache entry for the duration + of the load operation. This, in turn, provides a mechanism + for gating/synchronizing multiple network clients over a + given cache entry; quite handy for those entries that are + relatively expensive to construct or access. + + ***********************************************************************/ + + final bool loadCache (char[] key, IMessage message) + { + void send (IConduit conduit) + { + buffer.setConduit (conduit); + writer.put (ProtocolWriter.Command.Load, name_, key, message).flush; + } + + return cluster_.cache.request (&send, reader, key); + } + + /*********************************************************************** + + Query the cluster for queued entries on the corresponding + channel. Returns, and removes, the first matching entry + from the cluster. Note that this sweeps the cluster for + matching entries, and is synchronous in nature. The more + common approach is to setup a queue listener, which will + grab and dispatch queue entries asynchronously. + + ***********************************************************************/ + + final IMessage getQueue (IMessage host = null) + { + if (scanQueue) + return reader.thaw (host); + return null; + } + + /*********************************************************************** + + Query the cluster for queued entries on the corresponding + channel. Returns, and removes, the first matching entry + from the cluster. Note that this sweeps the cluster for + matching entries, and is synchronous in nature. The more + common approach is to setup a queue listener, which will + grab and dispatch queue entries asynchronously. + + ***********************************************************************/ + + private bool scanQueue () + { + void send (IConduit conduit) + { + buffer.setConduit (conduit); + writer.put (ProtocolWriter.Command.RemoveQueue, name_).flush; + } + + bool scan (Node node) + { + bool message; + node.request (&send, reader, message); + return message; + } + + // make a pass over each Node, looking for channel entries + return cluster_.queue.scan (&scan); + } + + /*********************************************************************** + + Add an entry to the specified network queue. May throw a + QueueFullException if there's no room available. + + ***********************************************************************/ + + final IMessage putQueue (IMessage message) + { + void send (IConduit conduit) + { + buffer.setConduit (conduit); + writer.put (ProtocolWriter.Command.AddQueue, name_, null, message).flush; + } + + cluster_.queue.request (&send, reader); + return message; + } + + /*********************************************************************** + + Send a remote call request to a server, and place the result + back into the provided message + + ***********************************************************************/ + + final bool execute (IMessage message) + { + void send (IConduit conduit) + { + buffer.setConduit (conduit); + writer.put (ProtocolWriter.Command.Call, name_, null, message).flush; + } + + if (cluster_.task.request (&send, reader)) + { + // place result back into the provided message + reader.thaw (message); + return true; + } + return false; + } + + /*********************************************************************** + + Broadcast a message on the specified channel. This uses + IP/Multicast to scatter the message to all registered + listeners (on the same multicast group). Note that the + maximum message size is limited to that of an Ethernet + data frame, minus the IP/UDP header size (1472 bytes). + + ***********************************************************************/ + + final void broadcast (IMessage message = null) + { + cluster_.broadcast (name_, message); + } +} + + +/******************************************************************************* + + A listener for multicast channel traffic. These are currently used + for cache coherency, queue publishing, and node discovery activity; + though could be used for direct messaging also. + + Be careful when using the retained channel, since it is shared with + the calling thread. Thus a race condition could arise between the + client and this thread, were both to use the channel for transfers + at the same instant. Note that MessageConsumer makes a copy of the + channel for this purpose + +*******************************************************************************/ + +private class BulletinConsumer : SocketListener, IConsumer, IEvent +{ + private bool hasMore; // incoming message? + private Buffer buffer; // input buffer + private ProtocolReader reader; // input decoder + private Channel channel_; // associated channel + private Cluster cluster; // associated cluster + private MulticastConduit consumer; // broadcast listener + private ChannelListener listener; // user-level callback + + /*********************************************************************** + + Construct a multicast consumer for the specified event. The + event handler will be invoked whenever a message arrives for + the associated channel. + + ***********************************************************************/ + + this (Channel channel, ChannelListener listener) + { + this.channel_ = channel; + this.listener = listener; + this.cluster = channel.cluster; + + // buffer doesn't need to be larger than Ethernet data-frame + buffer = new Buffer (1500); + + // make the reader slice directly from the buffer content + reader = new ProtocolReader (buffer); + + // configure a listener socket + consumer = new MulticastConduit (cluster.getGroup (channel_.name), true); + consumer.join; + + super (consumer, buffer); + + // fire up this listener + super.execute; + } + + /*********************************************************************** + + Notification callback invoked when we receive a multicast + packet. Note that we check the packet channel-name against + the one we're consuming, to check for cases where the group + address had a hash collision. + + ***********************************************************************/ + + override void notify (IBuffer buffer) + { + ProtocolWriter.Command cmd; + char[] channel; + char[] element; + + // read the incoming header, along with the object guid + // where available + hasMore = reader.getHeader (cmd, channel, element); + + // check it's really for us first (might be a hash collision) + if (channel == this.channel_.name) + invoke (this); + } + + /*********************************************************************** + + ***********************************************************************/ + + IMessage get (IMessage host = null) + { + if (hasMore) + return reader.thaw (host); + + throw new ClusterException ("attempting to thaw a non-existant message"); + } + + /*********************************************************************** + + Return the assigned logger + + ***********************************************************************/ + + final Logger log () + { + return cluster.log; + } + + /*********************************************************************** + + Handle error conditions from the listener thread. + + ***********************************************************************/ + + override void exception (char [] msg) + { + cluster.log.error ("BulletinConsumer: "~msg); + } + + /*********************************************************************** + + Overridable mean of notifying the client code. + + ***********************************************************************/ + + protected void invoke (IEvent event) + { + listener (event); + } + + /*********************************************************************** + + Return the cluster instance we're associated with. + + ***********************************************************************/ + + final Channel channel () + { + return channel_; + } + + /*********************************************************************** + + Temporarily halt listening. This can be used to ignore + multicast messages while, for example, the consumer is + busy doing other things. + + ***********************************************************************/ + + final void pauseGroup () + { + consumer.leave; + } + + /*********************************************************************** + + Resume listening, post-pause. + + ***********************************************************************/ + + final void resumeGroup () + { + consumer.join; + } + + /*********************************************************************** + + Cancel this consumer. The listener is effectively disabled + from this point forward. The listener thread does not halt + at this point, but waits until the socket-read returns. + Note that the D Interface implementation requires us to + "reimplement and dispatch" trivial things like this ~ it's + a pain in the neck to maintain. + + ***********************************************************************/ + + final void cancel () + { + super.cancel; + } + + /*********************************************************************** + + Send a message back to the producer + + ***********************************************************************/ + + void reply (IChannel channel, IMessage message) + { + assert (channel); + assert (message); + + channel.broadcast (message); + } + + + /*********************************************************************** + + Return an appropriate reply channel for the given message, + or return null if no reply is expected + + ***********************************************************************/ + + IChannel replyChannel (IMessage message) + { + if (message.reply.length) + return cluster.createChannel (message.reply); + return null; + } +} + + +/******************************************************************************* + + A listener for queue events. These events are produced by the + queue host on a periodic bases when it has available entries. + We listen for them (rather than constantly scanning) and then + begin a sweep to process as many as we can. Note that we will + be in competition with other nodes to process these entries. + + Also note that we create a copy of the channel in use, so that + race-conditions with the requesting client are avoided. + +*******************************************************************************/ + +private class MessageConsumer : BulletinConsumer +{ + /*********************************************************************** + + Construct a multicast consumer for the specified event + + ***********************************************************************/ + + this (Channel channel, ChannelListener listener) + { + super (channel, listener); + + // create private channel instance to use in our thread + this.channel_ = new Channel (channel.cluster, channel.name); + } + + /*********************************************************************** + + Handle error conditions from the listener thread. + + ***********************************************************************/ + + override void exception (char [] msg) + { + cluster.log.error ("MessageConsumer: "~msg); + } + + /*********************************************************************** + + Overrides the default processing to sweep the cluster for + queued entries. Each server node is queried until one is + found that contains a message. Note that it is possible + to set things up where we are told exactly which node to + go to; however given that we won't be listening whilst + scanning, and that there's likely to be a group of new + entries in the cluster, it's just as effective to scan. + This will be far from ideal for all environments, so we + should make the strategy pluggable instead. + + Note also that the content is retrieved via a duplicate + channel to avoid potential race-conditions on the original + + ***********************************************************************/ + + override IMessage get (IMessage host = null) + { + if (channel.scanQueue) + return channel.thaw (host); + return null; + } + + /*********************************************************************** + + Send a message back to the producer + + ***********************************************************************/ + + override void reply (IChannel channel, IMessage message) + { + assert (channel); + assert (message); + + channel.putQueue (message); + } + + /*********************************************************************** + + Override the default notification handler in order to + disable multicast reciepts while the application does + what it needs to + + ***********************************************************************/ + + override protected void invoke (IEvent event) + { + // temporarily pause listening while processing + pauseGroup; + try { + listener (event); + } finally resumeGroup; + } +} + + +/******************************************************************************* + + An abstraction of a socket connection. Used internally by the + socket-based Cluster. + +*******************************************************************************/ + +private class Connection +{ + abstract bool reset(); + + abstract void done (Time time); + + abstract SocketConduit conduit (); +} + + +/******************************************************************************* + + A pool of socket connections for accessing cluster nodes. Note + that the entries will timeout after a period of inactivity, and + will subsequently cause a connected host to drop the supporting + session. + +*******************************************************************************/ + +private class ConnectionPool +{ + private Logger log; + private int count; + private bool noDelay; + private InternetAddress address; + private PoolConnection freelist; + private TimeSpan timeout = TimeSpan.seconds(60); + + /*********************************************************************** + + Utility class to provide the basic connection facilities + provided by the connection pool. + + ***********************************************************************/ + + static class PoolConnection : Connection + { + Time time; + PoolConnection next; + ConnectionPool parent; + SocketConduit conduit_; + + /*************************************************************** + + Construct a new connection and set its parent + + ***************************************************************/ + + this (ConnectionPool pool) + { + parent = pool; + reset; + } + + /*************************************************************** + + Create a new socket and connect it to the specified + server. This will cause a dedicated thread to start + on the server. Said thread will quit when an error + occurs. + + ***************************************************************/ + + final bool reset () + { + try { + conduit_ = new SocketConduit; + + // apply Nagle settings + conduit.socket.setNoDelay (parent.noDelay); + + // set a 500ms timeout for read operations + conduit_.setTimeout (TimeSpan.millis(500)); + + // open a connection to this server + // parent.log.trace ("connecting to server"); + conduit_.connect (parent.address); + return true; + + } catch (Object o) + { + if (! Runtime.isHalting) + parent.log.warn ("server is unavailable :: "~o.toString); + } + return false; + } + + /*************************************************************** + + Return the socket belonging to this connection + + ***************************************************************/ + + final SocketConduit conduit () + { + return conduit_; + } + + /*************************************************************** + + Close the socket. This will cause any host session + to be terminated. + + ***************************************************************/ + + final void close () + { + conduit_.detach; + } + + /*************************************************************** + + Return this connection to the free-list. Note that + we have to synchronize on the parent-pool itself. + + ***************************************************************/ + + final void done (Time time) + { + synchronized (parent) + { + next = parent.freelist; + parent.freelist = this; + this.time = time; + } + } + } + + + /*********************************************************************** + + Create a connection-pool for the specified address. + + ***********************************************************************/ + + this (InternetAddress address, Logger log, bool noDelay) + { + this.log = log; + this.address = address; + this.noDelay = noDelay; + } + + /*********************************************************************** + + Allocate a Connection from a list rather than creating a + new one. Reap old entries as we go. + + ***********************************************************************/ + + final synchronized Connection borrow (Time time) + { + if (freelist) + do { + auto c = freelist; + + freelist = c.next; + if (freelist && (time - c.time > timeout)) + c.close; + else + return c; + } while (true); + + return new PoolConnection (this); + } + + /*********************************************************************** + + Close this pool and drop all existing connections. + + ***********************************************************************/ + + final synchronized void close () + { + auto c = freelist; + freelist = null; + while (c) + { + c.close; + c = c.next; + } + } +} + + +/******************************************************************************* + + Class to represent a cluster node. Each node supports both cache + and queue functionality. Note that the set of available nodes is + configured at startup, simplifying the discovery process in some + significant ways, and causing less thrashing of cache-keys. + +*******************************************************************************/ + +private class Node +{ + private Logger log; + private char[] name, + addr; + private ConnectionPool pool; + private bool enabled; + + alias void delegate (IConduit conduit) Requestor; + + /*********************************************************************** + + Construct a node with the provided name. This name should + be the network name of the hosting device. + + ***********************************************************************/ + + this (Logger log, char[] addr, char[] name) + { + this.log = log; + this.addr = addr; + this.name = name ~ ':' ~ addr; + } + + /*********************************************************************** + + Add a cache/queue reference for the remote node + + ***********************************************************************/ + + final void setPool (InternetAddress address, bool noDelay) + { + this.pool = new ConnectionPool (address, log, noDelay); + } + + /*********************************************************************** + + Return the name of this node + + ***********************************************************************/ + + override char[] toString () + { + return name; + } + + /*********************************************************************** + + Return the network address of this node + + ***********************************************************************/ + + final char[] address () + { + return addr; + } + + /*********************************************************************** + + Remove this Node from the cluster. The node is disabled + until it is seen to recover. + + ***********************************************************************/ + + final void fail () + { + setEnabled (false); + pool.close; + } + + /*********************************************************************** + + Get the current state of this node + + ***********************************************************************/ + + final bool isEnabled () + { + volatile + return enabled; + } + + /*********************************************************************** + + Set the enabled state of this node + + ***********************************************************************/ + + final void setEnabled (bool enabled) + { + if (enabled) + log.trace ("enabling "~name); + else + log.trace ("disabling "~name); + + volatile + this.enabled = enabled; + } + + /*********************************************************************** + + request data; fail this Node if we can't connect. Note + that we make several attempts to connect before writing + the node off as a failure. We use a delegate to perform + the request output since it may be invoked on more than + one iteration, where the current attempt fails. + + We return true if the cluster node responds, and false + otherwise. Exceptions are thrown if they occured on the + server. Parameter 'message' is set true if a message is + available from the server response + + ***********************************************************************/ + + final bool request (Requestor dg, ProtocolReader reader, out bool message) + { + ProtocolWriter.Command cmd; + Time time; + char[] channel; + char[] element; + + // it's possible that the pool may have failed between + // the point of selecting it, and the invocation itself + if (pool is null) + return false; + + // get a connection to the server + auto connect = pool.borrow (time = Clock.now); + + // talk to the server (try a few times if necessary) + for (int attempts=3; attempts--;) + try { + // attach connection to writer and send request + dg (connect.conduit); + + // attach connection to reader + reader.buffer.setConduit (connect.conduit); + + // load the returned object. Don't retry on + // failed reads, since the server is either + // really really busy, or near death. We must + // assume it is offline until it tells us + // otherwise (via a heartbeat) + attempts = 0; + message = reader.getHeader (cmd, channel, element); + + // return borrowed connection + connect.done (time); + + } catch (RegistryException x) + { + connect.done (time); + throw x; + } + catch (IOException x) + { + log.trace ("IOException on server request :: "~x.toString); + + // attempt to reconnect? + if (attempts is 0 || !connect.reset) + { + // that server is offline + fail; + + // state that we failed + return false; + } + } + + // is message an exception? + if (cmd !is ProtocolWriter.Command.OK) + { + // is node full? + if (cmd is ProtocolWriter.Command.Full) + throw new ClusterFullException (channel); + + // did node barf? + if (cmd is ProtocolWriter.Command.Exception) + throw new ClusterException (channel); + + // bogus response + throw new ClusterException ("invalid response from cluster server"); + } + + // ok, our server responded + return true; + } +} + + +/******************************************************************************* + + Models a generic set of cluster nodes. This is intended to be + thread-safe, with no locking on a lookup operation + +*******************************************************************************/ + +private class NodeSet +{ + private Node[char[]] map; + private Logger log; + private Set set; + private bool noDelay; + + /*********************************************************************** + + ***********************************************************************/ + + this (Logger log, bool noDelay) + { + this.log = log; + this.set = new Set; + this.noDelay = noDelay; + } + + /*********************************************************************** + + ***********************************************************************/ + + final Logger logger () + { + return log; + } + + /*********************************************************************** + + Add a node to the list of servers + + ***********************************************************************/ + + synchronized final Node addNode (Node node) + { + auto addr = node.address; + if (addr in map) + throw new ClusterException ("Attempt to add cluster node '"~addr~"' more than once"); + + map[addr] = node; + + // note that this creates a new Set instance. We do this + // so that selectNode() can avoid synchronization + set = set.add (node); + return node; + } + + /*********************************************************************** + + Select a cluster server based on a starting index. If the + selected server is not currently enabled, we just try the + next one. This behaviour should be consistent across each + cluster client. + + ***********************************************************************/ + + final Node selectNode (uint index) + { + auto hosts = set.nodes; + uint count = hosts.length; + + if (count) + { + index %= count; + + while (count--) + { + auto node = hosts [index]; + if (node.isEnabled) + return node; + + if (++index >= hosts.length) + index = 0; + } + } + throw new ClusterEmptyException ("No appropriate cluster nodes are available"); + } + + /*********************************************************************** + + Host class for the set of nodes. We utilize this to enable + atomic read/write where it would not be otherwise possible + -- D arrays are organized as ptr+length pairs and are thus + inherently non-atomic for assignment purposes + + ***********************************************************************/ + + private static class Set + { + Node[] nodes, + random; + + final Set add (Node node) + { + auto s = new Set; + s.nodes = nodes ~ node; + s.randomize; + return s; + } + + private final void randomize () + { + // copy the node list + random = nodes.dup; + + // muddle up the duplicate list. This randomized list + // is used when scanning the cluster for queued entries + foreach (i, n; random) + { + auto j = Random.shared.next (random.length); + auto tmp = random[i]; + random[i] = random[j]; + random[j] = tmp; + } + } + } +} + + +/******************************************************************************* + + Models a fixed set of cluster nodes. Used for Cache + +*******************************************************************************/ + +private class FixedNodeSet : NodeSet +{ + /*********************************************************************** + + ***********************************************************************/ + + this (Logger log, bool noDelay) + { + super (log, noDelay); + } + + /*********************************************************************** + + ***********************************************************************/ + + final synchronized void enable (char[] addr) + { + auto p = addr in map; + if (p) + { + auto node = *p; + if (! node.isEnabled) + { + node.setPool (new InternetAddress(addr), noDelay); + node.setEnabled (true); + } + } + else + // don't throw when no cache hosts have been configured at all + if (set.nodes.length) + throw new ClusterException ("Attempt to enable unregistered cache node '"~addr~"'"); + } + + /*********************************************************************** + + Select a cluster server based on the specified key. If the + selected server is not currently enabled, we just try the + next one. This behaviour should be consistent across each + cluster client. + + ***********************************************************************/ + + final bool request (Node.Requestor dg, ProtocolReader reader, char[] key) + { + Node node; + bool message; + + do { + node = selectNode (jhash (key.ptr, key.length)); + } while (! node.request (dg, reader, message)); + + return message; + } +} + + +/******************************************************************************* + + Models a flexible set of cluster nodes. Used for queue and task + +*******************************************************************************/ + +private class FlexNodeSet : NodeSet +{ + private uint rollover; + + /*********************************************************************** + + ***********************************************************************/ + + this (Logger log, bool noDelay) + { + super (log, noDelay); + } + + /*********************************************************************** + + ***********************************************************************/ + + final synchronized void enable (char[] addr, char[] name) + { + auto p = addr in map; + auto node = p ? *p : addNode (new Node (log, addr, name)); + + if (! node.isEnabled) + { + node.setPool (new InternetAddress(addr), noDelay); + node.setEnabled (true); + } + } + + /*********************************************************************** + + Select a cluster server based on the specified key. If the + selected server is not currently enabled, we just try the + next one. This behaviour should be consistent across each + cluster client. + + ***********************************************************************/ + + final bool request (Node.Requestor dg, ProtocolReader reader) + { + Node node; + bool message; + + do { + node = selectNode (++rollover); + } while (! node.request (dg, reader, message)); + + return message; + } + + /*********************************************************************** + + Sweep the cluster servers. Returns true if the delegate + returns true, false otherwise. The sweep is halted when + the delegate returns true. Note that this scans nodes in + a randomized pattern, which should tend to avoid 'bursty' + activity by a set of clients upon any one cluster server. + + ***********************************************************************/ + + final bool scan (bool delegate(Node) dg) + { + auto hosts = set.random; + auto index = hosts.length; + + while (index) + { + // lookup the randomized set of server nodes + auto node = hosts [--index]; + + // callback on each enabled node + if (node.isEnabled) + if (dg (node)) + return true; + } + return false; + } +} + + +/****************************************************************************** + + The Bob Jenkins lookup2 algorithm. This should be relocated + to somewhere common + +******************************************************************************/ + +private static uint jhash (void* k, uint len, uint init = 0) +{ + uint a = 0x9e3779b9, + b = 0x9e3779b9, + c = init, + i = len; + + // handle most of the key + while (i >= 12) + { + a += *cast(uint*)(k+0); + b += *cast(uint*)(k+4); + c += *cast(uint*)(k+8); + + a -= b; a -= c; a ^= (c>>13); + b -= c; b -= a; b ^= (a<<8); + c -= a; c -= b; c ^= (b>>13); + a -= b; a -= c; a ^= (c>>12); + b -= c; b -= a; b ^= (a<<16); + c -= a; c -= b; c ^= (b>>5); + a -= b; a -= c; a ^= (c>>3); + b -= c; b -= a; b ^= (a<<10); + c -= a; c -= b; c ^= (b>>15); + k += 12; i -= 12; + } + + // handle the last 11 bytes + c += len; + switch (i) + { + case 11: c+=(cast(uint)(cast(ubyte*)k)[10]<<24); + case 10: c+=(cast(uint)(cast(ubyte*)k)[9]<<16); + case 9 : c+=(cast(uint)(cast(ubyte*)k)[8]<<8); + case 8 : b+=(cast(uint)(cast(ubyte*)k)[7]<<24); + case 7 : b+=(cast(uint)(cast(ubyte*)k)[6]<<16); + case 6 : b+=(cast(uint)(cast(ubyte*)k)[5]<<8); + case 5 : b+=(cast(uint)(cast(ubyte*)k)[4]); + case 4 : a+=(cast(uint)(cast(ubyte*)k)[3]<<24); + case 3 : a+=(cast(uint)(cast(ubyte*)k)[2]<<16); + case 2 : a+=(cast(uint)(cast(ubyte*)k)[1]<<8); + case 1 : a+=(cast(uint)(cast(ubyte*)k)[0]); + default: + } + + a -= b; a -= c; a ^= (c>>13); + b -= c; b -= a; b ^= (a<<8); + c -= a; c -= b; c ^= (b>>13); + a -= b; a -= c; a ^= (c>>12); + b -= c; b -= a; b ^= (a<<16); + c -= a; c -= b; c ^= (b>>5); + a -= b; a -= c; a ^= (c>>3); + b -= c; b -= a; b ^= (a<<10); + c -= a; c -= b; c ^= (b>>15); + + return c; +} +