Mercurial > projects > ldc
view tango/tango/net/cluster/tina/CacheThread.d @ 132:1700239cab2e trunk
[svn r136] MAJOR UNSTABLE UPDATE!!!
Initial commit after moving to Tango instead of Phobos.
Lots of bugfixes...
This build is not suitable for most things.
author | lindquist |
---|---|
date | Fri, 11 Jan 2008 17:57:40 +0100 |
parents | |
children |
line wrap: on
line source
/******************************************************************************* copyright: Copyright (c) 2004 Kris Bell. All rights reserved license: BSD style: $(LICENSE) version: July 2004: Initial release author: Kris *******************************************************************************/ module tango.net.cluster.tina.CacheThread; private import tango.core.Exception; private import tango.net.cluster.NetworkRegistry; private import tango.net.cluster.tina.ClusterCache, tango.net.cluster.tina.ClusterTypes, tango.net.cluster.tina.ClusterThread; /****************************************************************************** Thread for handling cache requests ******************************************************************************/ class CacheThread : ClusterThread { private ClusterCache cache; private NetworkRegistry registry; /********************************************************************** Note that the conduit stays open until the client kills it **********************************************************************/ this (AbstractServer server, IConduit conduit, Cluster cluster, ClusterCache cache) { super (server, conduit, cluster); // clone the registry so that we have our own set of // message templates to act as hosts. This eliminates // allocating hosts on the fly for load() requests registry = NetworkRegistry.shared.dup; // retain the cache instance this.cache = cache; } /********************************************************************** process client requests **********************************************************************/ void dispatch () { ProtocolWriter.Command cmd; long time; char[] channel; char[] element; // wait for request to arrive auto content = reader.getPacket (cmd, channel, element, time); switch (cmd) { case ProtocolWriter.Command.Add: logger.trace (sprint ("{} add cache entry '{}' on channel '{}'", client, element, channel)); // return the content if we can't put it in the cache if (cache.put (channel, element, content, Time(time))) writer.success ("success"); else writer.reply (content); break; case ProtocolWriter.Command.Copy: logger.trace (sprint ("{} copy cache entry '{}' on channel '{}'", client, element, channel)); writer.reply (cache.get (channel, element)); break; case ProtocolWriter.Command.Remove: logger.trace (sprint ("{} remove cache entry '{}' on channel '{}'", client, element, channel)); writer.reply (cache.extract (channel, element)); break; case ProtocolWriter.Command.Load: logger.trace (sprint ("{} loading cache entry '{}' on channel '{}'", client, element, channel)); load (cmd, channel, element); break; default: throw new IllegalArgumentException ("invalid command"); } } /********************************************************************** Manages the loading of cache entries remotely, upon the host that actually contains the cache entry. The benefit of this approach lies in the ability to 'gate' access to specific resources across the entire network. That is; where particular cache entries are prohibitively costly to construct, it is worthwhile ensuring that cost is reduced to a bare minimum. These remote loaders allow the cache host to block multiple network clients until there's a new entry available. Without this mechanism, it would become possible for multiple network clients to request the same entry simultaneously, therefore increasing the overall cost. The end result is similar to that of a distributed transaction. **********************************************************************/ void load (ProtocolWriter.Command cmd, char[] channel, char[] element) { // convert to a message instance. Note that we use a private // set of msg templates, so we don't collide with other threads auto msg = reader.thaw (registry); // check to see if it has already been updated or is // currently locked; go home if so, otherwise lock it if (cache.lock (channel, element, msg.time)) try { // ensure this is the right object auto loader = cast(IMessageLoader) msg; if (loader) { // acknowledge the request. Do NOT wait for completion! writer.success.flush; // get the new cache entry. The 'time' attribute should // be set appropriately before return if (auto e = loader.load) { long time; // serialize new entry and stuff it into cache writer.put (writer.Command.OK, channel, element, e); cache.put (channel, element, reader.getPacket (cmd, channel, element, time), e.time); } } else writer.exception (sprint ("invalid remote cache-loader '{}'", msg.toString)).flush; } finally // ensure we unlock this one! cache.unlock (channel, element); else writer.success.flush; } }