# HG changeset patch # User rick@minifunk # Date 1212903938 14400 # Node ID 7a315154bf5e2ca07b7fb18ac4fa6bb47844c467 Initial commit diff -r 000000000000 -r 7a315154bf5e .hgignore --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/.hgignore Sun Jun 08 01:45:38 2008 -0400 @@ -0,0 +1,8 @@ +syntax: glob + +dsss_imports +dsss_imports/* +dsss_objects +dsss_objects/* +dsss.last +*.swp diff -r 000000000000 -r 7a315154bf5e dreactor/core/ConnectionHandler.d --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/dreactor/core/ConnectionHandler.d Sun Jun 08 01:45:38 2008 -0400 @@ -0,0 +1,362 @@ +module dreactor.core.ConnectionHandler; + +import tango.io.selector.model.ISelector; +import tango.net.Socket.Address; +import tango.util.collection.CircularSeq; + +import dreactor.transport.AsyncSocketConduit; + +alias bool delegate(ConnectionHandler) RegisterD; + +alias bool delegate(ConnectionHandler, RegisterD) IncomingHandlerD; +alias bool delegate(ConnectionHandler, RegisterD) OutgoingHandlerD; +alias int delegate(ConnectionHandler, RegisterD) ErrorHandlerD; +alias bool delegate(ConnectionHandler, RegisterD) DisconnectHandlerD; +alias int delegate(Conduit, RegisterD) ConnectHandlerD; + +alias bool function(ConnectionHandler, RegisterD) IncomingHandlerF; +alias bool function(ConnectionHandler, RegisterD) OutgoingHandlerF; +alias int function(ConnectionHandler, RegisterD) ErrorHandlerF; +alias bool function(ConnectionHandler, RegisterD) DisconnectHandlerF; +alias int function(Conduit, RegisterD) ConnectHandlerF; + + +/****************************************************************************** + ConnectionHandler object. To be used by the SelectLoop to manage callbacks + for events. It may also be used to buffer data inbetween requests. + These can be populated passed to a SelectLoop directly by the end user, + or may be managed by a chosen Protocol. +******************************************************************************/ +class ConnectionHandler() +{ + public + enum { init, connected, listening, idle, closing }; + + /************************************************************************** + + Standard Ctor, takes a transport + + **************************************************************************/ + this (Conduit trans, bool islistener = false) + { + state = State.init; + transport = trans; + ibuf_len = 0; + obuf_len = 0; + i_offset = 0; + o_offset = 0; + isListener = islistener; + } + + + /********************************************************************** + + Setters for the handlers. These are set by the Protocols as well + + **********************************************************************/ + + void setIncomingHandler(IncomingHandlerD hand) + { + inD = hand; + inF = null; + } + + void setIncomingHandler(IncomingHandlerF hand) + { + inF = hand; + inD = null; + } + + void setOutgoingHandler(OutgoingHandlerD hand) + { + outD = hand; + outF = null; + } + + void setOutgoingHandler(OutgoingHandlerF hand) + { + outF = hand; + outD = null; + } + + void setErrorHandler(ErrorHandlerD hand) + { + errD = hand; + errF = null; + } + + void setErrorHandler(ErrorHandlerF hand) + { + errF = hand; + errD = null; + } + + void setDisconnectHandler(DisconnectHandlerD hand) + { + disD = hand; + disF = null; + } + + void setDisconnectHandler(DisconnectHandlerF hand) + { + disF = hand; + disD = null; + } + + void setConnectHandler(ConnectHandlerD hand) + { + conD = hand; + conF = null; + } + + void setConnectHandler(ConnectHandlerF hand) + { + conF = hand; + conD = null; + } + + /********************************************************************** + + Handlers to be called by the SelectLoop when events occur + + **********************************************************************/ + void handleIncoming(Conduit cond) + { + if (!inD is null) + return inD(cond); + else if (!inF is null) + return inF(cond); + } + + void handleOutgoing(Conduit cond) + { + if (!outD is null) + return outD(cond); + else if (!outF is null) + return outF(cond); + } + + int handleError(Conduit cond) + { + if (!errD is null) + return errD(cond); + else if (!errF is null) + return errF(cond); + } + + int handleDisconnect(Conduit cond) + { + if (!disD is null) + return disD(addr); + else if (!disF is null) + return disF(addr); + } + + int handleConnection(Address addr) + { + if (!conD is null) + { + return conD(addr); + } + else if (!conF is null) + { + return conF(addr); + } + } + + /************************************************************************** + + Sending / Receiving helpers + + **************************************************************************/ + + /************************************************************************** + + appendOutBuffer + + Adds an outgoing buffer to the list. This returns true if the list + was empty, indicating that the handler should be registered with the + SelectLoop. If it returns false, it was probably already registered. + + **************************************************************************/ + synchronized void appendOutBuffer(char[] outbuf) + { + out_buffers.append(outbuf); + out_buffers_len++; + if (out_buffers_len == 1) + return true; + else + return false; + } + + /************************************************************************** + + addOffset + Use this function to update the offset position after a successful data + send. This not only manages the current offset, but will update the + out buffer chain if necessary. + + Returns: false if there is nothing left to send, true if there is. + + **************************************************************************/ + synchronized bool addOffset(int off) + in + { + assert(out_buffers_len > 0); + } + body + { + char[] hd = out_buffers.head(); + if ((off + o_offset) >= hd.length) + { + out_buffers.removeHead(); + o_offset = 0; + out_buffers_len--; + return (out_buffers_len > 0); + } + else + o_offset += off; + return true; + } + + /************************************************************************** + + char[] nextBuffer + + Returns a slice of the current outbound buffer, returns a char[] pointing + to null if there is no current outbound buffer + + **************************************************************************/ + synchronized char[] nextBuffer() + { + if (out_buffers_len < 1) + { + return null; + } + + return out_buffers.head()[o_offset .. $]; + } + /************************************************************************** + + Configuration functions + + **************************************************************************/ + Event events() + { + return events_; + } + void events(Event e) + { + events_ = e; + } + void addEvent(Event e) + { + events_ |= e; + } + void remEvent(Event e) + { + events_ &= ^e; + } + + State getState() {return state;} + + /* + connection handlers are left out of this because + this method is used by the listener socket to pass + on its handlers to the accepted socket. An accepted + socket will generally do different things onConnection + */ + void setHandlers(ConnectionHandler other) + { + inD = other.inD; + outD = other.outD; + errD = other.errD; + disD = other.disD; + inF = other.inF; + outF = other.outF; + errF = other.errF; + disF = other.disF; + } + + /************************************************************************** + + Freelist allocators and deallocators + + **************************************************************************/ + static synchronized ConnectionHandler New(Conduit tran, ConnectionHandler other = null) + { + ConnectionHandler hand; + if (freelist) + { + hand = freelist; + freelist = hand.next; + hand.transport = tran; + } + else + hand = new ConnectionHandler(tran); + + if (!other is null) + { + hand.setHandlers(other); + } + return hand; + } + + static synchronized void Delete(ConnectionHandler hand) + { + hand.next = freelist; + freelist = hand.init(); + } + +private + + char[SZ] in_buffer; + CircularSeq!(char[]) out_buffers; + int ibuf_len; + int i_offset; + int o_offset; + + package Conduit transport; + State state; + Event events; + IncomingHandlerD inD; + OutgoingHandlerD outD; + ErrorHandlerD errD; + DisconnectHandlerD disD; + ConnectHandlerD conD; + + IncomingHandlerF inF; + OutgoingHandlerF outF; + ErrorHandlerF errF; + DisconnectHandlerF disF; + ConnectHandlerF conF; + + static ConnectionHandler freelist; + ConnectionHandler next; + + /************************************************************************** + Copy ctor, creates a new ConnectionHandler using the settings + of an existing handler. + **************************************************************************/ + void init() + { + transport = null; + state = State.init; + ibuf_len = 0; + obuf_len = 0; + i_offset = 0; + o_offset = 0; + out_buffer = null; + inD = null; + outD = null; + errD = null; + disD = null; + conD = null; + inF = null; + outF = null; + errF = null; + disF = null; + conF = null; + } +} + diff -r 000000000000 -r 7a315154bf5e dreactor/core/SelectLoop.d --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/dreactor/core/SelectLoop.d Sun Jun 08 01:45:38 2008 -0400 @@ -0,0 +1,133 @@ +/******************************************************************************* + + copyright: Copyright (c) 2008 Rick Richardson. All rights reserved + + license: BSD style: $(LICENSE) + + version: Initial release v0.1 : May 2008 + + author: Rick Richardson + +*******************************************************************************/ + + + +module dreactor.core.SelectLoop; + +import dreactor.transport.AsyncSocketConduit; +import tango.io.selector.Selector; +import tango.io.selector.model.ISelector; +import tango.core.Exception; +import tango.core.Thread; +import tango.core.Atomic; +import tango.util.collection.LinkSeq; +import tango.util.log.Log; +import tango.util.log.Configurator; + +import dreactor.core.ConnectionHandler; + +Logger log = Log.getLogger("dreactor.core.SelectLoop"); + +static char[] version_string = "SelectLoop.d 0.1 2008-05-31"; + +class SelectLoop +{ +private + Thread thread; + bool running; + Atomic!(int) pending; + + ThreadSafeQueue!(ConnectionHandler) freshList; + //LinkSeq!(ConnectionHandler) connList; probably not necessary + +public + this() + { + freshList = new ThreadSafeQueue!(ConnectionHandler); + //connList = new LinkSeq!(ConnectionHandler); + Configurator(); + } + + void run() + { + running = true; + thread = new Thread(&eventLoop); + thread.start(); + } + + void exit() + { + running = false; + } + + bool addConnection(ConnectionHandler handler) + { + return freshList.push(handler); + } + +private + void eventLoop() + { + auto selector = new Selector(); + selector.open(); + auto format = Log.format; + do + { + auto eventCount = selector.select(10); + + if (eventCount > 0) + { + // process events + foreach (SelectionKey key; selector.selectedSet()) + { + if (key.isReadable()) + { + // incoming data + auto conn = cast(ConnectionHandler) key.attachment; + if ( ConnectionHandler.listening == conn.getState() ) + conn.handleConnection(conn.transport, addConnection); + else if ( ! conn.handleIncoming(conn.transport)) + unregister(conn.transport); + } + else if (key.isWritable()) + { + auto conn = cast(ConnectionHandler) key.attachment; + if ( ! conn.handleOutgoing(conn.transport)) + unregister(conn.transport); + } + else if (key.isHangup()) + { + auto conn = cast(ConnectionHandler) key.attachment; + if ( ! conn.handleDisconnect(conn.transport)) + unregister(conn.transport); + } + else if (key.isError() || key.isInvalidHandle()) + { + // error, close connection + auto conn = cast(ConnectionHandler) key.attachment; + if ( ! conn.handleError(conn.transport, key.isInvalidHandle())) + selector.unregister(conn.transport); + } + } + } + else if (eventCount == 0) + { + /* can't think of anything useful to do here. */ + } + else + { + log.error(format("Selector.select returned {}", eventCount)); + } + + //add Conduits to listener + freshList.processAll( (ConnectionHandler h) + { + selector.register(conn.transport, conn.events()); + //connList.append(conn); + return 1; + }); + //freshList.processAll(reg); + + } while (running) + } +} diff -r 000000000000 -r 7a315154bf5e dreactor/protocol/Http11.d --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/dreactor/protocol/Http11.d Sun Jun 08 01:45:38 2008 -0400 @@ -0,0 +1,5 @@ +module dreactor.protocol.Http11.d; + +class Http +{ +} diff -r 000000000000 -r 7a315154bf5e dreactor/protocol/Raw.d --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/dreactor/protocol/Raw.d Sun Jun 08 01:45:38 2008 -0400 @@ -0,0 +1,125 @@ +module dreactor.protocol.Raw; + +import tango.io.Conduit; +import tango.io.selector.model.ISelector; +import dreactor.core.AsyncConduit; +import dreactor.core.SelectLoop; +import dreactor.core.ConnectionHandler; +import tango.util.collection.CircularSeq; +import tango.util.log.Log; +import tango.util.log.Configurator; + +Logger log = Log.getLogger("dreactor.core.SelectLoop"); + +/****************************************************************************** + + Basic TCP server or client routines for sending raw data. + +******************************************************************************/ +class RawListener +{ +public + + this(ConnectionHandler mgr, SelectLoop sel) + { + manager = mgr; + mgr.events(Event.Read); + sel.addConnection(mgr); + select = sel; + children = CircularSeq!(ConnectionHandler); + Configurator(); + } + + int accept(Conduit cond) + { + AsyncConduit newcond = new AsyncConduit; + cond.socket().accept(newcond.socket); + ConnectionHandler h = ConnectionHandler.New(manager); + mgr.events(Event.Read); + select.addConnection(mgr); + children.append(mgr); + } + + bool broadcast(char[] outbuf) + { + foreach(ConnectionHandler h; children) + { + if (h.appendBuffer(outbuf)) + { + h.addEvent(Event.Write); + select.addConnection(h); + } + } + } + + void close() + { + + } + + /************************************************************************** + + send + OutgoingHandlerD + To be registered as the response to socket writable event. + Sends data, returns amount sent. Unregisters Handler for sending + if there is no more data left to send. + + ***************************************************************************/ + int send(ConnectionHandler h, RegisterD reg) + { + char[] outbuf = h.nextBuffer(); + if (!outbuf is null) + { + int sent = h.transport.write(outbuf); + if (sent > 0) + { + if (! h.addOffset(sent)) + { + h.removeEvent(Event.write); + reg(h); + } + } + else if (sent == EOF) + { + // EAGAIN ? probably shouldn't have happened. + } + else + { + log.error("Socket send return ERR"); + } + return sent; + } + return 0; + } + + /************************************************************************** + + receive + IncomingHandlerD + Default incoming data handler. Should be replaced with something useful. + + **************************************************************************/ + int receive(ConnectionHandler h, RegisterD reg) + { + } + +private + ConnectionHandler manager; + SelectLoop select; + CircularSeq!(ConnectionHandler) children; +} + +class RawClient +{ + ConnectionHandler manager; + SelectLoop select; + + this(ConnectionHandler mgr, SelectLoop sel) + { + manager = mgr; + mgr.events(Event.Read); + sel.addConnection(mgr); + select = sel; + } +} diff -r 000000000000 -r 7a315154bf5e dreactor/transport/AsyncSocketConduit.d --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/dreactor/transport/AsyncSocketConduit.d Sun Jun 08 01:45:38 2008 -0400 @@ -0,0 +1,258 @@ +/******************************************************************************* + + copyright: Copyright (c) 2004 Kris Bell. All rights reserved + + license: BSD style: $(LICENSE) + + version: Mar 2004 : Initial release + version: Jan 2005 : RedShodan patch for timeout query + version: Dec 2006 : Outback release + + author: Kris, modified by Rick Richardson (May 2008) + +*******************************************************************************/ + +module dreactor.transport.AsyncSocketConduit; + +private import tango.time.Time; + +public import tango.io.Conduit; + +private import tango.net.Socket; + +/******************************************************************************* + + A wrapper around the bare Socket to implement the IConduit abstraction + and add socket-specific functionality specifically for multiplexing via + poll and the ilk. + + AsyncSocketConduit data-transfer is typically performed in conjunction with + an IBuffer, but can happily be handled directly using void array where + preferred + +*******************************************************************************/ + +class AsyncSocketConduit : Conduit +{ + package Socket socket_; + + /*********************************************************************** + + Create a streaming Internet Socket + + ***********************************************************************/ + /* overriding the enum from the IConduit interface */ + enum : uint + { + Eof = uint.max, /// the End-of-Flow identifer + Err = uint.max -1 // some error ocurred, Should disconnect + } + + this () + { + this (SocketType.STREAM, ProtocolType.TCP); + } + + /*********************************************************************** + + Create an Internet Socket. Used by subclasses and by + ServerSocket; the latter via method allocate() below + + ***********************************************************************/ + + protected this (SocketType type, ProtocolType protocol, bool create=true) + { + socket_ = new Socket (AddressFamily.INET, type, protocol, create); + } + + /*********************************************************************** + + Return the name of this device + + ***********************************************************************/ + + override char[] toString() + { + return socket.toString; + } + + /*********************************************************************** + + Return the socket wrapper + + ***********************************************************************/ + + Socket socket () + { + return socket_; + } + + /*********************************************************************** + + Return a preferred size for buffering conduit I/O + + ***********************************************************************/ + + override uint bufferSize () + { + return 1024 * 8; + } + + /*********************************************************************** + + Models a handle-oriented device. + + TODO: figure out how to avoid exposing this in the general + case + + ***********************************************************************/ + + override Handle fileHandle () + { + return cast(Handle) socket_.fileHandle; + } + + /*********************************************************************** + + Is this socket still alive? + + ***********************************************************************/ + + override bool isAlive () + { + return socket_.isAlive; + } + + /*********************************************************************** + + Connect to the provided endpoint + + ***********************************************************************/ + + AsyncSocketConduit connect (Address addr) + { + socket_.connect (addr); + return this; + } + + /*********************************************************************** + + Bind the socket. This is typically used to configure a + listening socket (such as a server or multicast socket). + The address given should describe a local adapter, or + specify the port alone (ADDR_ANY) to have the OS assign + a local adapter address. + + ***********************************************************************/ + + AsyncSocketConduit bind (Address address) + { + socket_.bind (address); + return this; + } + + /*********************************************************************** + + Inform other end of a connected socket that we're no longer + available. In general, this should be invoked before close() + is invoked + + The shutdown function shuts down the connection of the socket: + + - stops receiving data for this socket. If further data + arrives, it is rejected. + + - stops trying to transmit data from this socket. Also + discards any data waiting to be sent. Stop looking for + acknowledgement of data already sent; don't retransmit + if any data is lost. + + ***********************************************************************/ + + AsyncSocketConduit shutdown () + { + socket_.shutdown (SocketShutdown.BOTH); + return this; + } + + /*********************************************************************** + + Release this AsyncSocketConduit + + Note that one should always disconnect a AsyncSocketConduit + under normal conditions, and generally invoke shutdown + on all connected sockets beforehand + + ***********************************************************************/ + + override void detach () + { + socket_.detach; + + // deallocate if this came from the free-list, + // otherwise just wait for the GC to handle it + if (fromList) + deallocate (this); + } + + /*********************************************************************** + + Read content from the socket. + + Returns the number of bytes read from the socket, or + IConduit.Eof where there's no more content available + + Note that a timeout is equivalent to Eof. Isolating + a timeout condition can be achieved via hadTimeout() + + Note also that a zero return value is not legitimate; + such a value indicates Eof + + ***********************************************************************/ + + override uint read (void[] dst) + { + return read (dst, (void[] dst){return socket_.receive(dst);}); + } + + /*********************************************************************** + + Callback routine to write the provided content to the + socket. This will stall until the socket responds in + some manner. Returns the number of bytes sent to the + output, or IConduit.Eof if the socket cannot write. + + ***********************************************************************/ + + override uint write (void[] src) + { + int count = socket_.send (src); + if (count == 0) + count = Eof; + else if (count < 0) + count = Err; + return count; + } + + /*********************************************************************** + + Internal routine to handle socket read under a timeout. + Note that this is synchronized, in order to serialize + socket access + + ***********************************************************************/ + + package final uint read (void[] dst, int delegate(void[]) dg) + { + // invoke the actual read op + int count = dg (dst); + if (count == 0) + return Eof; + else if (count < 0) + return Err; + + return count; + } + +} + diff -r 000000000000 -r 7a315154bf5e dreactor/util/ThreadSafeQueue.d --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/dreactor/util/ThreadSafeQueue.d Sun Jun 08 01:45:38 2008 -0400 @@ -0,0 +1,84 @@ +module dreactor.util.ThreadSafeQueue; + +import tango.util.collection.CircularSeq; + +/****************************************************************************** + + ThreadSafeQueue + Queue that is probably thread safe. It acts as a job queue, in that + you can push or pop off of the queue. Or you can processAll, which will + apply a delegate to each item, then clear the list. + +******************************************************************************/ +class ThreadSafeQueue(TYPE) +{ +public + this(int maxsz = 1000) + { + list = new CircularSeq!(TYPE); + maxsize = maxsz; + size = 0; + } + + synchronized bool pop(ref TYPE t) + { + if (list.size()) + { + TYPE t = list.head(); + list.removeHead(); + --size; + return true; + } + else + return false; + } + + synchronized bool push(TYPE t) + { + if (list.size < maxsize) + { + list.append(t); + ++size; + return true; + } + else + return false; + } + + synchronized int size() + { + return size(); + } + + synchronized int processAll(int delegate(ref T value) dg) + { + int count = 0; + foreach(T t; list) + { + if (dg(t) < 0) + break; + ++count; + } + if (count == size) + clear_(); + else + list.removeRange(0, count); + return count; + } + + synchronized void clear() + { + clear_(); + } + +private + + void clear_() + { + list.removeAll(); + } + + CircularSeq!(TYPE) list; + int maxsize; + int size; +} diff -r 000000000000 -r 7a315154bf5e dsss.conf --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/dsss.conf Sun Jun 08 01:45:38 2008 -0400 @@ -0,0 +1,1 @@ +[dreactor] diff -r 000000000000 -r 7a315154bf5e test/dummy.txt