scala.actors: added snapshot/restart for Scheduler.

git-svn-id: http://lampsvn.epfl.ch/svn-repos/scala/scala/trunk@10369 5e8d7ff9-d8ef-0310-90f0-a4852d11357a
This commit is contained in:
phaller 2007-03-16 15:25:00 +00:00
parent 3814f511bd
commit a2dd17491a
7 changed files with 134 additions and 42 deletions

View File

@ -378,6 +378,12 @@ public class FJTaskRunner extends Thread {
/* -------- Suspending -------- */
protected boolean suspending = false;
synchronized void setSuspending(boolean susp) {
suspending = susp;
}
/* ------------ DEQ operations ------------------- */
@ -793,7 +799,6 @@ public class FJTaskRunner extends Thread {
public void run() {
try{
while (!interrupted()) {
FJTask task = pop();
if (task != null) {
if (!task.isDone()) {
@ -806,6 +811,23 @@ public class FJTaskRunner extends Thread {
else
scanWhileIdling();
}
// check for suspending
if (suspending) {
synchronized(this) {
// move all local tasks to group-wide entry queue
for (int i = 0; i < deq.length; ++i) {
synchronized(group) {
try {
FJTask task = (FJTask)deq[i].take();
if (task != null)
group.getEntryQueue().put(task);
} catch (InterruptedException ie) {
System.err.println("Suspend: when transferring task to entryQueue: "+ie);
}
}
}
}
}
}
finally {
group.setInactive(this);

View File

@ -125,7 +125,11 @@ public class FJTaskRunnerGroup implements IFJTaskRunnerGroup {
protected /*final*/ FJTaskRunner[] threads;
/** Group-wide queue for tasks entered via execute() **/
protected final LinkedQueue entryQueue = new LinkedQueue();
/*protected*/ final LinkedQueue entryQueue = new LinkedQueue();
public LinkedQueue getEntryQueue() {
return entryQueue;
}
/** Number of threads that are not waiting for work **/
protected int activeCount = 0;
@ -155,6 +159,29 @@ public class FJTaskRunnerGroup implements IFJTaskRunnerGroup {
static final int DEFAULT_SCAN_PRIORITY = Thread.MIN_PRIORITY+1;
/* -------- Suspending -------- */
void snapshot() throws InterruptedException {
// set flag in all task runners to suspend
for (int i = 0; i < threads.length; ++i) {
FJTaskRunner t = threads[i];
t.setSuspending(true);
}
// interrupt all task runners
// assume: current thread not in threads (scheduler)
for (int i = 0; i < threads.length; ++i) {
Thread t = threads[i];
t.interrupt();
}
// wait until all of them have terminated
for (int i = 0; i < threads.length; ++i) {
Thread t = threads[i];
t.join();
}
}
/**
* Create a FJTaskRunnerGroup with the indicated number
* of FJTaskRunner threads. Normally, the best size to use is

View File

@ -11,7 +11,7 @@ import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, Has
/**
* FJTaskScheduler2
*
* @version 0.9.4
* @version 0.9.5
* @author Philipp Haller
*/
class FJTaskScheduler2 extends Thread with IScheduler {
@ -36,6 +36,7 @@ class FJTaskScheduler2 extends Thread with IScheduler {
new FJTaskRunnerGroup(coreSize)
private var terminating = false
private var suspending = false
private var lastActivity = Platform.currentTime
@ -76,24 +77,26 @@ class FJTaskScheduler2 extends Thread with IScheduler {
if (terminating) throw new QuitException
}
// check if we need more threads
if (Platform.currentTime - lastActivity >= TICK_FREQ
&& coreSize < maxSize
&& executor.checkPoolSize()) {
// do nothing
}
else {
if (pendingReactions == 0) {
// if all worker threads idle terminate
if (executor.getActiveCount() == 0) {
// Note that we don't have to shutdown
// the FJTaskRunnerGroup since there is
// no separate thread associated with it,
// and FJTaskRunner threads have daemon status.
if (!suspending) {
// check if we need more threads
if (Platform.currentTime - lastActivity >= TICK_FREQ
&& coreSize < maxSize
&& executor.checkPoolSize()) {
// do nothing
}
else {
if (pendingReactions <= 0) {
// if all worker threads idle terminate
if (executor.getActiveCount() == 0) {
// Note that we don't have to shutdown
// the FJTaskRunnerGroup since there is
// no separate thread associated with it,
// and FJTaskRunner threads have daemon status.
// terminate timer thread
TimerThread.t.interrupt()
throw new QuitException
// terminate timer thread
TimerThread.t.interrupt()
throw new QuitException
}
}
}
}
@ -114,6 +117,10 @@ class FJTaskScheduler2 extends Thread with IScheduler {
executor.execute(task)
}
def execute(task: FJTask) {
executor.execute(task)
}
def start(task: Reaction) {
this.synchronized {
pendingReactions = pendingReactions + 1
@ -141,4 +148,13 @@ class FJTaskScheduler2 extends Thread with IScheduler {
// terminate timer thread
TimerThread.t.interrupt()
}
def snapshot(): LinkedQueue = synchronized {
suspending = true
executor.snapshot()
// grab tasks from executor
executor.entryQueue
}
}

View File

@ -1,6 +1,12 @@
package scala.actors;
/**
* IFJTaskRunnerGroup
*
* @version 0.9.5
* @author Philipp Haller
*/
interface IFJTaskRunnerGroup {
public void executeTask(FJTask t);
public FJTaskRunner[] getArray();
@ -8,5 +14,5 @@ interface IFJTaskRunnerGroup {
public void setActive(FJTaskRunner t);
public void checkActive(FJTaskRunner t, long scans);
public void setInactive(FJTaskRunner t);
public LinkedQueue getEntryQueue();
}

View File

@ -22,29 +22,13 @@ import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, Has
* The <code>Scheduler</code> object is used by
* <code>Actor</code> to execute tasks of an execution of an actor.
*
* @version 0.9.4
* @version 0.9.5
* @author Philipp Haller
*/
object Scheduler {
private var sched: IScheduler =
{
var s: IScheduler = new FJTaskScheduler2
/*
// Check for JDK version >= 1.5
var olderThanJDK5 = false
try {
java.lang.Class.forName("java.util.concurrent.ThreadPoolExecutor")
} catch {
case _: ClassNotFoundException =>
olderThanJDK5 = true
}
s = if (olderThanJDK5)
new TickedScheduler
else
Class.forName("scala.actors.ThreadPoolScheduler").newInstance().asInstanceOf[IScheduler]
*/
s.start()
s
}
@ -54,6 +38,25 @@ object Scheduler {
sched = scheduler
}
var tasks: LinkedQueue = null
def snapshot(): unit = synchronized {
tasks = sched.snapshot()
sched.shutdown()
}
def restart(): unit = synchronized {
sched = {
var s: IScheduler = new FJTaskScheduler2
s.start()
s
}
while (!tasks.isEmpty()) {
sched.execute(tasks.take().asInstanceOf[FJTask])
}
tasks = null
}
def start(task: Reaction) = sched.start(task)
def execute(task: Reaction) = {
val t = currentThread
@ -84,19 +87,23 @@ object Scheduler {
* This abstract class provides a common interface for all
* schedulers used to execute actor tasks.
*
* @version 0.9.4
* @version 0.9.5
* @author Philipp Haller
*/
trait IScheduler {
def start(): unit
def start(task: Reaction): unit
def execute(task: Reaction): unit
def execute(task: FJTask): unit
def getTask(worker: WorkerThread): Runnable
def tick(a: Actor): unit
def terminated(a: Actor): unit
def pendReaction: unit
def unPendReaction: unit
def snapshot(): LinkedQueue
def shutdown(): unit
def onLockup(handler: () => unit): unit
@ -114,7 +121,7 @@ trait IScheduler {
* This scheduler executes the tasks of an actor on a single
* thread (the current thread).
*
* @version 0.9.4
* @version 0.9.5
* @author Philipp Haller
*/
class SingleThreadedScheduler extends IScheduler {
@ -130,6 +137,11 @@ class SingleThreadedScheduler extends IScheduler {
task.run()
}
def execute(task: FJTask) {
// execute task immediately on same thread
task.run()
}
def getTask(worker: WorkerThread): Runnable = null
def tick(a: Actor): Unit = {}
def terminated(a: Actor): unit = {}
@ -137,6 +149,7 @@ class SingleThreadedScheduler extends IScheduler {
def unPendReaction: unit = {}
def shutdown(): Unit = {}
def snapshot(): LinkedQueue = { null }
def onLockup(handler: () => unit): unit = {}
def onLockup(millis: int)(handler: () => unit): unit = {}

View File

@ -21,7 +21,7 @@ import java.util.concurrent.{ThreadPoolExecutor,
* This handler executes rejected tasks on the thread of
* the scheduler.
*
* @version 0.9.4
* @version 0.9.5
* @author Philipp Haller
*/
private class TaskRejectedHandler(sched: ThreadPoolScheduler) extends RejectedExecutionHandler {
@ -156,6 +156,10 @@ class ThreadPoolScheduler extends Thread with IScheduler {
executor.execute(item)
}
def execute(task: FJTask) { }
def snapshot(): LinkedQueue = null
/**
* @param worker the worker thread executing tasks
* @return the executed task

View File

@ -13,7 +13,7 @@ import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, Has
* by the execution of actors. Unlike <code>ThreadPoolScheduler</code>, this
* scheduler is available on all Java versions >= 1.4.</p>
*
* @version 0.9.4
* @version 0.9.5
* @author Philipp Haller
*/
class TickedScheduler extends Thread with IScheduler {
@ -127,6 +127,10 @@ class TickedScheduler extends Thread with IScheduler {
}
}
def execute(task: FJTask) { }
def snapshot(): LinkedQueue = null
/**
* @param worker the worker thread executing tasks
* @return the executed task