Mercurial > projects > ldc
diff tango/tango/net/cluster/tina/CacheThread.d @ 132:1700239cab2e trunk
[svn r136] MAJOR UNSTABLE UPDATE!!!
Initial commit after moving to Tango instead of Phobos.
Lots of bugfixes...
This build is not suitable for most things.
author | lindquist |
---|---|
date | Fri, 11 Jan 2008 17:57:40 +0100 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tango/tango/net/cluster/tina/CacheThread.d Fri Jan 11 17:57:40 2008 +0100 @@ -0,0 +1,162 @@ +/******************************************************************************* + + copyright: Copyright (c) 2004 Kris Bell. All rights reserved + + license: BSD style: $(LICENSE) + + version: July 2004: Initial release + + author: Kris + +*******************************************************************************/ + +module tango.net.cluster.tina.CacheThread; + +private import tango.core.Exception; + +private import tango.net.cluster.NetworkRegistry; + +private import tango.net.cluster.tina.ClusterCache, + tango.net.cluster.tina.ClusterTypes, + tango.net.cluster.tina.ClusterThread; + +/****************************************************************************** + + Thread for handling cache requests + +******************************************************************************/ + +class CacheThread : ClusterThread +{ + private ClusterCache cache; + private NetworkRegistry registry; + + /********************************************************************** + + Note that the conduit stays open until the client kills it + + **********************************************************************/ + + this (AbstractServer server, IConduit conduit, Cluster cluster, ClusterCache cache) + { + super (server, conduit, cluster); + + // clone the registry so that we have our own set of + // message templates to act as hosts. This eliminates + // allocating hosts on the fly for load() requests + registry = NetworkRegistry.shared.dup; + + // retain the cache instance + this.cache = cache; + } + + /********************************************************************** + + process client requests + + **********************************************************************/ + + void dispatch () + { + ProtocolWriter.Command cmd; + long time; + char[] channel; + char[] element; + + // wait for request to arrive + auto content = reader.getPacket (cmd, channel, element, time); + + switch (cmd) + { + case ProtocolWriter.Command.Add: + logger.trace (sprint ("{} add cache entry '{}' on channel '{}'", client, element, channel)); + + // return the content if we can't put it in the cache + if (cache.put (channel, element, content, Time(time))) + writer.success ("success"); + else + writer.reply (content); + break; + + case ProtocolWriter.Command.Copy: + logger.trace (sprint ("{} copy cache entry '{}' on channel '{}'", client, element, channel)); + + writer.reply (cache.get (channel, element)); + break; + + case ProtocolWriter.Command.Remove: + logger.trace (sprint ("{} remove cache entry '{}' on channel '{}'", client, element, channel)); + + writer.reply (cache.extract (channel, element)); + break; + + case ProtocolWriter.Command.Load: + logger.trace (sprint ("{} loading cache entry '{}' on channel '{}'", client, element, channel)); + + load (cmd, channel, element); + break; + + default: + throw new IllegalArgumentException ("invalid command"); + } + } + + + /********************************************************************** + + Manages the loading of cache entries remotely, upon + the host that actually contains the cache entry. + + The benefit of this approach lies in the ability to + 'gate' access to specific resources across the entire + network. That is; where particular cache entries are + prohibitively costly to construct, it is worthwhile + ensuring that cost is reduced to a bare minimum. These + remote loaders allow the cache host to block multiple + network clients until there's a new entry available. + Without this mechanism, it would become possible for + multiple network clients to request the same entry + simultaneously, therefore increasing the overall cost. + The end result is similar to that of a distributed + transaction. + + **********************************************************************/ + + void load (ProtocolWriter.Command cmd, char[] channel, char[] element) + { + // convert to a message instance. Note that we use a private + // set of msg templates, so we don't collide with other threads + auto msg = reader.thaw (registry); + + // check to see if it has already been updated or is + // currently locked; go home if so, otherwise lock it + if (cache.lock (channel, element, msg.time)) + try { + // ensure this is the right object + auto loader = cast(IMessageLoader) msg; + if (loader) + { + // acknowledge the request. Do NOT wait for completion! + writer.success.flush; + + // get the new cache entry. The 'time' attribute should + // be set appropriately before return + if (auto e = loader.load) + { + long time; + // serialize new entry and stuff it into cache + writer.put (writer.Command.OK, channel, element, e); + cache.put (channel, element, reader.getPacket (cmd, channel, element, time), e.time); + } + } + else + writer.exception (sprint ("invalid remote cache-loader '{}'", msg.toString)).flush; + + } finally + // ensure we unlock this one! + cache.unlock (channel, element); + else + writer.success.flush; + } +} +