diff tango/tango/net/cluster/tina/ClusterQueue.d @ 132:1700239cab2e trunk

[svn r136] MAJOR UNSTABLE UPDATE!!! Initial commit after moving to Tango instead of Phobos. Lots of bugfixes... This build is not suitable for most things.
author lindquist
date Fri, 11 Jan 2008 17:57:40 +0100
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/tango/tango/net/cluster/tina/ClusterQueue.d	Fri Jan 11 17:57:40 2008 +0100
@@ -0,0 +1,442 @@
+/*******************************************************************************
+
+        copyright:      Copyright (c) 2004 Kris Bell. All rights reserved
+
+        license:        BSD style: $(LICENSE)
+        
+        version:        July 2004: Initial release      
+        
+        author:         Kris
+
+*******************************************************************************/
+
+module tango.net.cluster.tina.ClusterQueue;
+
+private import  tango.core.Thread;
+
+private import tango.stdc.stdlib : alloca;
+
+private import  tango.net.cluster.tina.Cluster,
+                tango.net.cluster.tina.QueueFile,
+                tango.net.cluster.tina.ClusterTypes;
+
+/******************************************************************************
+        
+******************************************************************************/
+
+class ClusterQueue
+{
+        private Logger          log;
+        private uint            used, 
+                                limit;
+        private double          sleep;
+        private Thread          thread;
+        private Cluster         cluster;
+
+        /**********************************************************************
+
+        **********************************************************************/
+
+        abstract void watchdog ();
+
+        /**********************************************************************
+
+        **********************************************************************/
+
+        abstract ClusterContent get (char[] name);
+
+        /**********************************************************************
+
+        **********************************************************************/
+
+        abstract bool put (char[] name, ClusterContent content);
+
+        /**********************************************************************
+
+        **********************************************************************/
+
+        this (Cluster cluster, uint limit, double sleep)
+        {
+                thread = new Thread (&run);
+                
+                log = cluster.log;
+                this.limit = limit;
+                this.sleep = sleep;
+                this.cluster = cluster;
+                
+                thread.start;
+        }
+        
+        /**********************************************************************
+
+        **********************************************************************/
+
+        final void publish (IChannel channel)
+        {
+                log.info ("publishing queue channel '" ~ channel.name ~ "'");
+                channel.broadcast;
+        }
+
+        /**********************************************************************
+
+        **********************************************************************/
+
+        private void run ()
+        {       
+                while (true)
+                      {
+                      Thread.sleep (sleep);
+
+                      try {
+                          watchdog;
+                          } catch (Object x)
+                                   log.error ("queue-publisher: "~x.toString);
+                      }
+        }           
+}
+
+
+
+/******************************************************************************
+        
+
+******************************************************************************/
+
+class PersistQueue  : ClusterQueue
+{
+        private QueueFile[char[]] queueSet;
+        private QueueFile[]       queueList;
+
+        /**********************************************************************
+
+        **********************************************************************/
+
+        this (Cluster cluster, uint limit, double sleep)
+        {
+                super (cluster, limit, sleep);
+        }
+
+        /**********************************************************************
+
+        **********************************************************************/
+
+        final synchronized QueueFile lookup (char[] name)
+        {
+                auto p = name in queueSet;
+                if (p is null)
+                   {
+                   // name is currently a reference only; copy it
+                   name = name.dup;
+
+                   log.trace ("creating new queue for channel '" ~ name ~ "'");
+                
+                   // place new ChannelQueue into the list
+                   auto queue = new QueueFile (log, cluster.createChannel(name), limit);
+                   queueSet[name] = queue;
+                   queueList ~= queue;
+                   return queue;
+                   }
+                   
+                return *p;
+        }
+
+        /**********************************************************************
+
+        **********************************************************************/
+
+        final bool put (char[] name, ClusterContent content)
+        {       
+                // stuff content into the appropriate queue
+                auto queue = lookup (name);
+                auto ret = queue.push (content);
+
+                // notify immediately if we just transitioned from 0
+                if (ret && queue.size is 1)
+                    publish (queue.channel);
+
+                return ret;
+        }       
+
+        /**********************************************************************
+
+        **********************************************************************/
+
+        final ClusterContent get (char[] name)
+        {
+                return cast(ClusterContent) lookup(name).pop;
+        }   
+
+        /**********************************************************************
+
+                Workaround for a compiler bug in 0.018
+
+        **********************************************************************/
+
+        private final synchronized void copy (QueueFile[] dst, QueueFile[] src)
+        {
+                dst[] = src;
+        }   
+
+        /**********************************************************************
+
+        **********************************************************************/
+
+        final void watchdog ()
+        {       
+                auto len = queueList.length;
+                auto list = (cast(QueueFile*) alloca(len * QueueFile.sizeof))[0..len];
+
+                // clone the list of queues to avoid stalling everything
+                copy (list, queueList);
+
+                // synchronized (this)
+                //               list[] = queueList;
+
+                foreach (q; list)
+                        {
+                        if (q.size)
+                            publish (q.channel);
+
+                        if (q.isDirty)
+                           {
+                           q.flush;
+                           log.info ("flushed "~q.channel.name~" to disk");
+                           }
+                        }
+        }    
+}
+
+
+/+
+
+/******************************************************************************
+        
+******************************************************************************/
+
+class MemoryQueue : ClusterQueue
+{
+        private HashMap queueSet;
+        
+        /**********************************************************************
+
+        **********************************************************************/
+
+        this (Cluster cluster, uint limit, Interval sleep)
+        {
+                queueSet = new HashMap (256);
+                super (cluster, limit, sleep);
+        }
+
+        /**********************************************************************
+
+        **********************************************************************/
+
+        final ChannelQueue lookup (char[] channel)
+        {
+                return cast(ChannelQueue) queueSet.get (channel);
+        }
+
+        /**********************************************************************
+
+        **********************************************************************/
+
+        bool put (char[] name, ClusterContent content)
+        {       
+                if ((used + content.length) < limit)
+                   {
+                   // select the appropriate queue
+                   auto queue = lookup (name);
+                   if (queue is null)
+                      {
+                      // name is currently a reference only; copy it
+                      name = name.dup;
+
+                      log.trace ("creating new queue for channel '" ~ name ~ "'");
+
+                      // place new ChannelQueue into the list
+                      queueSet.put (name, queue = new ChannelQueue (cluster.createChannel (name)));
+                      }
+
+                   queue.put (cast (ClusterContent) content.dup);
+                   used += content.length;
+                   return true;
+                   }
+                return false;
+        }       
+
+        /**********************************************************************
+
+        **********************************************************************/
+
+        synchronized ClusterContent get (char[] name)
+        {
+                ClusterContent ret = null;
+                auto queue = lookup (name);
+
+                if (queue)
+                   {
+                   ret = queue.get;
+                   used -= ret.length;
+                   }
+                return ret;
+        }   
+        
+        /**********************************************************************
+
+        **********************************************************************/
+
+        void watchdog ()
+        {       
+                foreach (char[] k, Object o; queueSet)
+                        {
+                        auto q = cast(ChannelQueue) o;
+                        if (q.count)
+                            publish (q.channel);
+                        }
+        }           
+}
+
+
+/******************************************************************************
+        
+******************************************************************************/
+
+private class ChannelQueue
+{
+        private Link            head,           // head of the Queue
+                                tail;           // tail of the Queue
+        private int             count;          // number of items present
+        IChannel                channel;        // Queue channel
+
+        /**********************************************************************
+
+        **********************************************************************/
+
+        private static class Link
+        {
+                Link            prev,
+                                next;
+                ClusterContent  data;
+
+                static Link     freeList;
+
+                /**************************************************************
+
+                **************************************************************/
+
+                Link append (Link after)
+                {
+                        if (after)
+                           {
+                           next = after.next;
+
+                           // patch 'next' to point at me
+                           if (next)
+                               next.prev = this;
+
+                           //patch 'after' to point at me
+                           prev = after;
+                           after.next = this;
+                           }
+                        return this;
+                }
+
+                /**************************************************************
+
+                **************************************************************/
+
+                Link unlink ()
+                {
+                        // make 'prev' and 'next' entries see each other
+                        if (prev)
+                            prev.next = next;
+
+                        if (next)
+                            next.prev = prev;
+
+                        // Murphy's law 
+                        next = prev = null;
+                        return this;
+                }
+
+                /**************************************************************
+
+                **************************************************************/
+
+                Link create ()
+                {
+                        Link l;
+
+                        if (freeList)
+                           {
+                           l = freeList;
+                           freeList = l.next;
+                           }
+                        else
+                           l = new Link;
+                        return l;                       
+                }
+
+                /**************************************************************
+
+                **************************************************************/
+
+                void destroy ()
+                {
+                        next = freeList;
+                        freeList = this;
+                        this.data = null;
+                }
+        }
+
+
+        /**********************************************************************
+
+        **********************************************************************/
+
+        this (IChannel channel)
+        {
+                head = tail = new Link;
+                this.channel = channel;
+        }
+
+        /**********************************************************************
+
+                Add the specified content to the queue at the current
+                tail position, and bump tail to the next Link
+
+        **********************************************************************/
+
+        void put (ClusterContent content)
+        {
+                tail.data = content;
+                tail = tail.create.append (tail);
+                ++count;
+        }       
+
+        /**********************************************************************
+
+                Extract from the head, which is the oldest item in the 
+                queue. The removed Link is then appended to the tail, 
+                ready for another put. Head is adjusted to point at the
+                next valid queue entry.
+
+        **********************************************************************/
+
+        ClusterContent get ()
+        {
+                if (head !is tail)
+                   {
+                   auto l = head;
+                   head = head.next;
+                   auto ret = l.data;
+                   l.unlink;
+                   l.destroy;
+                   --count;
+                   return ret;
+                   }
+                return null;                   
+        }       
+}
+
++/