Mercurial > projects > ldc
diff tango/tango/net/cluster/tina/ClusterServer.d @ 132:1700239cab2e trunk
[svn r136] MAJOR UNSTABLE UPDATE!!!
Initial commit after moving to Tango instead of Phobos.
Lots of bugfixes...
This build is not suitable for most things.
author | lindquist |
---|---|
date | Fri, 11 Jan 2008 17:57:40 +0100 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tango/tango/net/cluster/tina/ClusterServer.d Fri Jan 11 17:57:40 2008 +0100 @@ -0,0 +1,189 @@ +/******************************************************************************* + + copyright: Copyright (c) 2004 Kris Bell. All rights reserved + + license: BSD style: $(LICENSE) + + version: July 2004: Initial release + + author: Kris + +*******************************************************************************/ + +module tango.net.cluster.tina.ClusterServer; + +private import tango.core.Thread; + +private import tango.util.ArgParser; + +private import tango.net.Socket, + tango.net.InternetAddress; + +private import tango.net.cluster.tina.Cluster, + tango.net.cluster.tina.RollCall; + +private import tango.net.cluster.NetworkRegistry; + +private import tango.net.cluster.tina.util.ServerThread; + +package import tango.net.cluster.tina.util.AbstractServer; + +/****************************************************************************** + + Extends the AbstractServer to glue cluster support together. + +******************************************************************************/ + +abstract class ClusterServer : AbstractServer +{ + package char[] name; + package Cluster cluster; + package IChannel channel; + package RollCall rollcall; + + /********************************************************************** + + Concrete server must expose a service handler + + **********************************************************************/ + + abstract void service (IConduit conduit); + + /********************************************************************** + + Construct this server with the requisite attributes. The + 'bind' address is the local address we'll be listening on, + 'threads' represents the number of socket-accept threads, + and backlog is the number of "simultaneous" connection + requests that a socket layer will buffer on our behalf. + + We also set up a listener for client discovery-requests, + and lastly, we tell active clients that we're available + for work. Clients should be listening on the appropriate + channel for an instance of the RollCall payload. + + **********************************************************************/ + + this (char[] name, InternetAddress bind, Logger logger) + { + this.name = name; + + super (bind, 1, 50, logger); + + // hook into the cluster as a server + cluster = new Cluster (logger); + } + + /********************************************************************** + + **********************************************************************/ + + void enroll (IMessage task) + { + NetworkRegistry.shared.enroll (task); + } + + /********************************************************************** + + Start the server + + Note that we hijack the calling thread, and use it to + generate a hearbeat. The hearbeat has two functions: it + tells all clients when this server starts, and it tells + them we're still alive. The latter is important if, for + example, a client request to this server had timed-out + due to the server being too busy. In such a case, the + client will mark the server as being unavailable, and + the heartbeat will presumably revert that. + + It would also be useful to monitor the GC from here. + + **********************************************************************/ + + void start (RollCall id, bool reuse=false) + { + super.start (reuse); + + // configure an identity for ourselves + id.addr = Socket.hostName ~ ':' ~ localAddress.toPortString; + this.rollcall = id; + + // clients are listening on this channel ... + channel = cluster.createChannel ("cluster.server.advertise"); + + // ... and listen for subsequent server.advertise requests + channel.createBulletinConsumer (¬ify); + + while (true) + { + getLogger.trace ("heartbeat"); + channel.broadcast (rollcall); + Thread.sleep (30.0); + } + } + + /********************************************************************** + + Return a text string identifying this server + + **********************************************************************/ + + char[] getProtocol () + { + return name; + } + + /********************************************************************** + + Return a text string identifying this server + + **********************************************************************/ + + override char[] toString () + { + return "cluster::" ~ name; + } + + /********************************************************************** + + Create a ServerSocket instance. + + **********************************************************************/ + + override ServerSocket createSocket (InternetAddress bind, int backlog, bool reuse=false) + { + return new ServerSocket (bind, backlog, reuse); + } + + /********************************************************************** + + Create a ServerThread instance. This can be overridden to + create other thread-types, perhaps with additional thread- + level data attached. + + **********************************************************************/ + + override void createThread (ServerSocket socket) + { + new ServerThread (this, socket); + } + + /********************************************************************** + + Interface method that's invoked when a client is making + discovery requests. We just send back our identity in a + reply + + **********************************************************************/ + + private void notify (IEvent event) + { + scope input = new RollCall; + event.get (input); + + // if this is a request, reply with our identity + if (input.type is input.Request) + channel.broadcast (rollcall); + } +} +