diff dwtx/core/internal/jobs/WorkerPool.d @ 122:9d0585bcb7aa

Add core.jobs package
author Frank Benoit <benoit@tionex.de>
date Tue, 12 Aug 2008 02:34:21 +0200
parents
children 862b05e0334a
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/dwtx/core/internal/jobs/WorkerPool.d	Tue Aug 12 02:34:21 2008 +0200
@@ -0,0 +1,296 @@
+/*******************************************************************************
+ * Copyright (c) 2003, 2007 IBM Corporation and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ *     IBM - Initial API and implementation
+ * Port to the D programming language:
+ *     Frank Benoit <benoit@tionex.de>
+ *******************************************************************************/
+module dwtx.core.internal.jobs.WorkerPool;
+
+import tango.core.Thread;
+import tango.core.sync.Mutex;
+import tango.core.sync.Condition;
+import tango.text.convert.Format;
+import dwt.dwthelper.utils;
+
+import dwtx.core.runtime.Assert;
+import dwtx.core.runtime.IStatus;
+import dwtx.core.runtime.jobs.Job;
+import dwtx.core.internal.jobs.JobManager;
+import dwtx.core.internal.jobs.Worker;
+
+import dwtx.core.internal.jobs.InternalJob;
+import dwtx.core.internal.jobs.ThreadJob;
+
+/**
+ * Maintains a pool of worker threads. Threads are constructed lazily as
+ * required, and are eventually discarded if not in use for awhile. This class
+ * maintains the thread creation/destruction policies for the job manager.
+ *
+ * Implementation note: all the data structures of this class are protected
+ * by the instance's object monitor.  To avoid deadlock with third party code,
+ * this lock is never held when calling methods outside this class that may in
+ * turn use locks.
+ */
+class WorkerPool {
+
+    protected Mutex mutex;
+    protected Condition condition;
+
+
+    /**
+     * Threads not used by their best before timestamp are destroyed.
+     */
+    private static const int BEST_BEFORE = 60000;
+    /**
+     * There will always be at least MIN_THREADS workers in the pool.
+     */
+    private static const int MIN_THREADS = 1;
+    /**
+     * Use the busy thread count to avoid starting new threads when a living
+     * thread is just doing house cleaning (notifying listeners, etc).
+     */
+    private int busyThreads = 0;
+
+    /**
+     * The default context class loader to use when creating worker threads.
+     */
+//     protected const ClassLoader defaultContextLoader;
+
+    /**
+     * Records whether new worker threads should be daemon threads.
+     */
+    private bool isDaemon = false;
+
+    private JobManager manager;
+    /**
+     * The number of workers in the threads array
+     */
+    private int numThreads = 0;
+    /**
+     * The number of threads that are currently sleeping
+     */
+    private int sleepingThreads = 0;
+    /**
+     * The living set of workers in this pool.
+     */
+    private Worker[] threads;
+
+    protected package this(JobManager manager) {
+        threads = new Worker[10];
+        this.manager = manager;
+        mutex = new Mutex;
+        condition = new Condition(mutex);
+//         this.defaultContextLoader = Thread.currentThread().getContextClassLoader();
+    }
+
+    /**
+     * Adds a worker to the list of workers.
+     */
+    private void add(Worker worker) {
+        synchronized(mutex){
+            int size = threads.length;
+            if (numThreads + 1 > size) {
+                Worker[] newThreads = new Worker[2 * size];
+                System.arraycopy(threads, 0, newThreads, 0, size);
+                threads = newThreads;
+            }
+            threads[numThreads++] = worker;
+        }
+    }
+
+    private void decrementBusyThreads() {
+        synchronized(mutex){
+            //impossible to have less than zero busy threads
+            if (--busyThreads < 0) {
+                if (JobManager.DEBUG)
+                    Assert.isTrue(false, Integer.toString(busyThreads));
+                busyThreads = 0;
+            }
+        }
+    }
+
+    /**
+     * Signals the end of a job.  Note that this method can be called under
+     * OutOfMemoryError conditions and thus must be paranoid about allocating objects.
+     */
+    protected void endJob(InternalJob job, IStatus result) {
+        decrementBusyThreads();
+        //need to end rule in graph before ending job so that 2 threads
+        //do not become the owners of the same rule in the graph
+        if ((job.getRule_package() !is null) && !(cast(ThreadJob)job )) {
+            //remove any locks this thread may be owning on that rule
+            manager.getLockManager().removeLockCompletely(Thread.getThis(), job.getRule_package());
+        }
+        manager.endJob_package(job, result, true);
+        //ensure this thread no longer owns any scheduling rules
+        manager.implicitJobs.endJob(job);
+    }
+    package void endJob_package(InternalJob job, IStatus result) {
+        endJob(job, result);
+    }
+
+    /**
+     * Signals the death of a worker thread.  Note that this method can be called under
+     * OutOfMemoryError conditions and thus must be paranoid about allocating objects.
+     */
+    protected void endWorker(Worker worker) {
+        synchronized(mutex){
+            if (remove(worker) && JobManager.DEBUG)
+                JobManager.debug_(Format("worker removed from pool: {}", worker)); //$NON-NLS-1$
+        }
+    }
+    package void endWorker_package(Worker worker) {
+        endWorker(worker);
+    }
+
+    private void incrementBusyThreads() {
+        synchronized(mutex){
+            //impossible to have more busy threads than there are threads
+            if (++busyThreads > numThreads) {
+                if (JobManager.DEBUG)
+                    Assert.isTrue(false, Format( "{},{}", busyThreads, numThreads));
+                busyThreads = numThreads;
+            }
+        }
+    }
+
+    /**
+     * Notification that a job has been added to the queue. Wake a worker,
+     * creating a new worker if necessary. The provided job may be null.
+     */
+    protected package void jobQueued() {
+        synchronized(mutex){
+            //if there is a sleeping thread, wake it up
+            if (sleepingThreads > 0) {
+                condition.notify();
+                return;
+            }
+            //create a thread if all threads are busy
+            if (busyThreads >= numThreads) {
+                Worker worker = new Worker(this);
+                worker.isDaemon(isDaemon);
+                add(worker);
+                if (JobManager.DEBUG)
+                    JobManager.debug_(Format("worker added to pool: {}", worker)); //$NON-NLS-1$
+                worker.start();
+                return;
+            }
+        }
+    }
+
+    /**
+     * Remove a worker thread from our list.
+     * @return true if a worker was removed, and false otherwise.
+     */
+    private bool remove(Worker worker) {
+        synchronized(mutex){
+            for (int i = 0; i < threads.length; i++) {
+                if (threads[i] is worker) {
+                    System.arraycopy(threads, i + 1, threads, i, numThreads - i - 1);
+                    threads[--numThreads] = null;
+                    return true;
+                }
+            }
+            return false;
+        }
+    }
+
+    /**
+     * Sets whether threads created in the worker pool should be daemon threads.
+     */
+    void setDaemon(bool value) {
+        this.isDaemon = value;
+    }
+
+    protected void shutdown() {
+        synchronized(mutex){
+            condition.notifyAll();
+        }
+    }
+    package void shutdown_package() {
+        shutdown();
+    }
+
+    /**
+     * Sleep for the given duration or until woken.
+     */
+    private void sleep(long duration) {
+        synchronized(mutex){
+            sleepingThreads++;
+            busyThreads--;
+            if (JobManager.DEBUG)
+                JobManager.debug_(Format("worker sleeping for: {}ms", duration)); //$NON-NLS-1$ //$NON-NLS-2$
+            try {
+                condition.wait(duration/1000.0f);
+            } catch (InterruptedException e) {
+                if (JobManager.DEBUG)
+                    JobManager.debug_("worker interrupted while waiting... :-|"); //$NON-NLS-1$
+            } finally {
+                sleepingThreads--;
+                busyThreads++;
+            }
+        }
+    }
+
+    /**
+     * Returns a new job to run. Returns null if the thread should die.
+     */
+    protected InternalJob startJob(Worker worker) {
+        //if we're above capacity, kill the thread
+        synchronized (mutex) {
+            if (!manager.isActive_package()) {
+                //must remove the worker immediately to prevent all threads from expiring
+                endWorker(worker);
+                return null;
+            }
+            //set the thread to be busy now in case of reentrant scheduling
+            incrementBusyThreads();
+        }
+        Job job = null;
+        try {
+            job = manager.startJob_package();
+            //spin until a job is found or until we have been idle for too long
+            long idleStart = System.currentTimeMillis();
+            while (manager.isActive_package() && job is null) {
+                long hint = manager.sleepHint_package();
+                if (hint > 0)
+                    sleep(Math.min(hint, BEST_BEFORE));
+                job = manager.startJob_package();
+                //if we were already idle, and there are still no new jobs, then
+                // the thread can expire
+                synchronized (mutex) {
+                    if (job is null && (System.currentTimeMillis() - idleStart > BEST_BEFORE) && (numThreads - busyThreads) > MIN_THREADS) {
+                        //must remove the worker immediately to prevent all threads from expiring
+                        endWorker(worker);
+                        return null;
+                    }
+                }
+            }
+            if (job !is null) {
+                //if this job has a rule, then we are essentially acquiring a lock
+                if ((job.getRule() !is null) && !(cast(ThreadJob)job )) {
+                    //don't need to re-aquire locks because it was not recorded in the graph
+                    //that this thread waited to get this rule
+                    manager.getLockManager().addLockThread(Thread.getThis(), job.getRule());
+                }
+                //see if we need to wake another worker
+                if (manager.sleepHint_package() <= 0)
+                    jobQueued();
+            }
+        } finally {
+            //decrement busy thread count if we're not running a job
+            if (job is null)
+                decrementBusyThreads();
+        }
+        return job;
+    }
+    package InternalJob startJob_package(Worker worker) {
+        return startJob(worker);
+    }
+}