Use Thread.getState() instead of timestamps to detect blocked worker threads.

git-svn-id: http://lampsvn.epfl.ch/svn-repos/scala/scala/trunk@16767 5e8d7ff9-d8ef-0310-90f0-a4852d11357a
This commit is contained in:
phaller 2008-12-16 11:11:09 +00:00
parent 07cc921b85
commit af7410ac54
5 changed files with 10 additions and 150 deletions

View File

@ -389,7 +389,6 @@ trait Actor extends AbstractActor {
* @param replyTo the reply destination
*/
def send(msg: Any, replyTo: OutputChannel[Any]) = synchronized {
tick()
if (waitingFor(msg)) {
received = Some(msg)
@ -424,7 +423,6 @@ trait Actor extends AbstractActor {
assert(Actor.self == this, "receive from channel belonging to other actor")
if (shouldExit) exit() // links
this.synchronized {
tick()
val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m))
if (null eq qel) {
waitingFor = f.isDefinedAt
@ -454,7 +452,6 @@ trait Actor extends AbstractActor {
assert(Actor.self == this, "receive from channel belonging to other actor")
if (shouldExit) exit() // links
this.synchronized {
tick()
// first, remove spurious TIMEOUT message from mailbox if any
val spurious = mailbox.extractFirst((m: Any) => m == TIMEOUT)
@ -506,7 +503,6 @@ trait Actor extends AbstractActor {
assert(Actor.self == this, "react on channel belonging to other actor")
if (shouldExit) exit() // links
this.synchronized {
tick()
val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m))
if (null eq qel) {
waitingFor = f.isDefinedAt
@ -534,7 +530,6 @@ trait Actor extends AbstractActor {
assert(Actor.self == this, "react on channel belonging to other actor")
if (shouldExit) exit() // links
this.synchronized {
tick()
// first, remove spurious TIMEOUT message from mailbox if any
val spurious = mailbox.extractFirst((m: Any) => m == TIMEOUT)
@ -723,9 +718,6 @@ trait Actor extends AbstractActor {
scheduler execute task
}
private def tick(): Unit =
scheduler tick this
private[actors] var kill: () => Unit = () => {}
private def suspendActor() {

View File

@ -122,7 +122,7 @@ package scala.actors;
public class FJTaskRunnerGroup implements IFJTaskRunnerGroup {
/** The threads in this group **/
protected /*final*/ FJTaskRunner[] threads;
/*protected*/ /*final*/ FJTaskRunner[] threads;
/** Group-wide queue for tasks entered via execute() **/
/*protected*/ final LinkedQueue entryQueue = new LinkedQueue();

View File

@ -13,6 +13,7 @@ package scala.actors
import compat.Platform
import java.lang.{Runnable, Thread, InterruptedException, System, Runtime}
import java.lang.Thread.State
import scala.collection.Set
import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, HashSet}
@ -68,13 +69,10 @@ class FJTaskScheduler2 extends Thread with IScheduler {
private var terminating = false
private var suspending = false
private var lastActivity = Platform.currentTime
private var submittedTasks = 0
def printActorDump {}
private val TICK_FREQ = 50
private val CHECK_FREQ = 100
def onLockup(handler: () => Unit) =
@ -87,6 +85,12 @@ class FJTaskScheduler2 extends Thread with IScheduler {
private var lockupHandler: () => Unit = null
private def allWorkersBlocked: Boolean =
executor.threads.forall(t => {
val s = t.getState()
s == State.BLOCKED || s == State.WAITING || s == State.TIMED_WAITING
})
override def run() {
try {
while (!terminating) {
@ -103,12 +107,11 @@ class FJTaskScheduler2 extends Thread with IScheduler {
ActorGC.gc()
// check if we need more threads
if (Platform.currentTime - lastActivity >= TICK_FREQ
&& coreSize < maxSize
if (coreSize < maxSize
&& allWorkersBlocked
&& executor.checkPoolSize()) {
//Debug.info(this+": increasing thread pool size")
coreSize += 1
lastActivity = Platform.currentTime
}
else {
if (ActorGC.allTerminated) {
@ -149,13 +152,6 @@ class FJTaskScheduler2 extends Thread with IScheduler {
def run() { fun }
})
/**
* @param a the actor
*/
def tick(a: Actor) {
lastActivity = Platform.currentTime
}
/** Shuts down all idle worker threads.
*/
def shutdown(): Unit = synchronized {

View File

@ -91,11 +91,6 @@ object Scheduler extends IScheduler {
sched execute { fun }
}
/* This method is used to notify the scheduler
* of library activity by the argument Actor.
*/
def tick(a: Actor) = sched tick a
def shutdown() = sched.shutdown()
def onLockup(handler: () => Unit) = sched.onLockup(handler)
@ -130,13 +125,6 @@ trait IScheduler {
*/
def execute(task: Runnable): Unit
/** Notifies the scheduler about activity of the
* executing actor.
*
* @param a the active actor
*/
def tick(a: Actor): Unit
/** Shuts down the scheduler.
*/
def shutdown(): Unit
@ -152,15 +140,6 @@ trait IScheduler {
}
trait WorkerThreadScheduler extends IScheduler {
/**
* @param worker the worker thread executing tasks
* @return the task to be executed
*/
def getTask(worker: WorkerThread): Runnable
}
/**
* This scheduler executes the tasks of an actor on a single
* thread (the current thread).
@ -179,8 +158,6 @@ class SingleThreadedScheduler extends IScheduler {
def run() { fun }
})
def tick(a: Actor) {}
def shutdown() {}
def onLockup(handler: () => Unit) {}
@ -203,100 +180,3 @@ private[actors] class QuitException extends Throwable {
*/
override def fillInStackTrace(): Throwable = this
}
/**
* <p>
* The class <code>WorkerThread</code> is used by schedulers to execute
* actor tasks on multiple threads.
* </p>
* <p>
* !!ACHTUNG: If you change this, make sure you understand the following
* proof of deadlock-freedom!!
* </p>
* <p>
* We proof that there is no deadlock between the scheduler and
* any worker thread possible. For this, note that the scheduler
* only acquires the lock of a worker thread by calling
* <code>execute</code>. This method is only called when the worker thread
* is in the idle queue of the scheduler. On the other hand, a
* worker thread only acquires the lock of the scheduler when it
* calls <code>getTask</code>. At the only callsite of <code>getTask</code>,
* the worker thread holds its own lock.
* </p>
* <p>
* Thus, deadlock can only occur when a worker thread calls
* <code>getTask</code> while it is in the idle queue of the scheduler,
* because then the scheduler might call (at any time!) <code>execute</code>
* which tries to acquire the lock of the worker thread. In such
* a situation the worker thread would be waiting to acquire the
* lock of the scheduler and vice versa.
* </p>
* <p>
* Therefore, to prove deadlock-freedom, it suffices to ensure
* that a worker thread will never call <code>getTask</code> when
* it is in the idle queue of the scheduler.
* </p>
* <p>
* A worker thread enters the idle queue of the scheduler when
* <code>getTask</code> returns <code>null</code>. Then it will also stay
* in the while-loop W (<code>while (task eq null)</code>) until
* <code>task</code> becomes non-null. The only way this can happen is
* through a call of <code>execute</code> by the scheduler. Before every
* call of <code>execute</code> the worker thread is removed from the idle
* queue of the scheduler. Only then--after executing its task--
* the worker thread may call <code>getTask</code>. However, the scheduler
* is unable to call <code>execute</code> as the worker thread is not in
* the idle queue any more. In fact, the scheduler made sure
* that this is the case even _before_ calling <code>execute</code> and
* thus releasing the worker thread from the while-loop W. Thus,
* the property holds for every possible interleaving of thread
* execution. QED
* </p>
*
* @version 0.9.18
* @author Philipp Haller
*/
class WorkerThread(sched: WorkerThreadScheduler) extends Thread {
private var task: Runnable = null
private[actors] var running = true
def execute(r: Runnable) = synchronized {
task = r
notify()
}
override def run(): Unit =
try {
while (running) {
if (task ne null) {
try {
task.run()
} catch {
case consumed: InterruptedException =>
if (!running) throw new QuitException
}
}
this.synchronized {
task = sched getTask this
while (task eq null) {
try {
wait()
} catch {
case consumed: InterruptedException =>
if (!running) throw new QuitException
}
}
if (task == sched.QUIT_TASK) {
running = false
}
}
}
} catch {
case consumed: QuitException =>
// allow thread to quit
}
}

View File

@ -27,14 +27,6 @@ trait SchedulerAdapter extends IScheduler {
def execute(task: Runnable): Unit =
execute { task.run() }
/** Notifies the scheduler about activity of the
* executing actor.
*
* @param a the active actor
*/
def tick(a: Actor): Unit =
Scheduler tick a
/** Shuts down the scheduler.
*/
def shutdown(): Unit =