Mercurial > projects > ldc
view tango/tango/net/cluster/tina/ClusterServer.d @ 132:1700239cab2e trunk
[svn r136] MAJOR UNSTABLE UPDATE!!!
Initial commit after moving to Tango instead of Phobos.
Lots of bugfixes...
This build is not suitable for most things.
author | lindquist |
---|---|
date | Fri, 11 Jan 2008 17:57:40 +0100 |
parents | |
children |
line wrap: on
line source
/******************************************************************************* copyright: Copyright (c) 2004 Kris Bell. All rights reserved license: BSD style: $(LICENSE) version: July 2004: Initial release author: Kris *******************************************************************************/ module tango.net.cluster.tina.ClusterServer; private import tango.core.Thread; private import tango.util.ArgParser; private import tango.net.Socket, tango.net.InternetAddress; private import tango.net.cluster.tina.Cluster, tango.net.cluster.tina.RollCall; private import tango.net.cluster.NetworkRegistry; private import tango.net.cluster.tina.util.ServerThread; package import tango.net.cluster.tina.util.AbstractServer; /****************************************************************************** Extends the AbstractServer to glue cluster support together. ******************************************************************************/ abstract class ClusterServer : AbstractServer { package char[] name; package Cluster cluster; package IChannel channel; package RollCall rollcall; /********************************************************************** Concrete server must expose a service handler **********************************************************************/ abstract void service (IConduit conduit); /********************************************************************** Construct this server with the requisite attributes. The 'bind' address is the local address we'll be listening on, 'threads' represents the number of socket-accept threads, and backlog is the number of "simultaneous" connection requests that a socket layer will buffer on our behalf. We also set up a listener for client discovery-requests, and lastly, we tell active clients that we're available for work. Clients should be listening on the appropriate channel for an instance of the RollCall payload. **********************************************************************/ this (char[] name, InternetAddress bind, Logger logger) { this.name = name; super (bind, 1, 50, logger); // hook into the cluster as a server cluster = new Cluster (logger); } /********************************************************************** **********************************************************************/ void enroll (IMessage task) { NetworkRegistry.shared.enroll (task); } /********************************************************************** Start the server Note that we hijack the calling thread, and use it to generate a hearbeat. The hearbeat has two functions: it tells all clients when this server starts, and it tells them we're still alive. The latter is important if, for example, a client request to this server had timed-out due to the server being too busy. In such a case, the client will mark the server as being unavailable, and the heartbeat will presumably revert that. It would also be useful to monitor the GC from here. **********************************************************************/ void start (RollCall id, bool reuse=false) { super.start (reuse); // configure an identity for ourselves id.addr = Socket.hostName ~ ':' ~ localAddress.toPortString; this.rollcall = id; // clients are listening on this channel ... channel = cluster.createChannel ("cluster.server.advertise"); // ... and listen for subsequent server.advertise requests channel.createBulletinConsumer (¬ify); while (true) { getLogger.trace ("heartbeat"); channel.broadcast (rollcall); Thread.sleep (30.0); } } /********************************************************************** Return a text string identifying this server **********************************************************************/ char[] getProtocol () { return name; } /********************************************************************** Return a text string identifying this server **********************************************************************/ override char[] toString () { return "cluster::" ~ name; } /********************************************************************** Create a ServerSocket instance. **********************************************************************/ override ServerSocket createSocket (InternetAddress bind, int backlog, bool reuse=false) { return new ServerSocket (bind, backlog, reuse); } /********************************************************************** Create a ServerThread instance. This can be overridden to create other thread-types, perhaps with additional thread- level data attached. **********************************************************************/ override void createThread (ServerSocket socket) { new ServerThread (this, socket); } /********************************************************************** Interface method that's invoked when a client is making discovery requests. We just send back our identity in a reply **********************************************************************/ private void notify (IEvent event) { scope input = new RollCall; event.get (input); // if this is a request, reply with our identity if (input.type is input.Request) channel.broadcast (rollcall); } }