Clean-ups in scheduler hierarchy. Restricted visibility of several traits. Added tests exercising cleaned-up interface.

git-svn-id: http://lampsvn.epfl.ch/svn-repos/scala/scala/trunk@21060 5e8d7ff9-d8ef-0310-90f0-a4852d11357a
This commit is contained in:
phaller 2010-03-04 11:40:16 +00:00
parent 082334e7c1
commit 139d6f686b
18 changed files with 416 additions and 28 deletions

View File

@ -14,7 +14,10 @@ package object actors {
@deprecated("this class is going to be removed in a future release")
type WorkerThread = java.lang.Thread
@deprecated("use scala.actors.scheduler.SingleThreadedScheduler instead")
type SingleThreadedScheduler = scala.actors.scheduler.SingleThreadedScheduler
@deprecated("this value is going to be removed in a future release")
val ActorGC = scala.actors.Scheduler.impl.asInstanceOf[scala.actors.scheduler.ThreadPoolScheduler]
val ActorGC = scala.actors.Scheduler.impl.asInstanceOf[scala.actors.scheduler.ActorGC]
}

View File

@ -24,9 +24,12 @@ import java.util.concurrent.{ThreadPoolExecutor, TimeUnit, LinkedBlockingQueue,
*
* @author Philipp Haller
*/
private[actors] class DefaultThreadPoolScheduler(daemon: Boolean) extends ThreadPoolScheduler(daemon) {
private[actors] class DefaultThreadPoolScheduler(daemon: Boolean)
extends ExecutorScheduler {
executor = {
setDaemon(daemon)
def executor = {
val workQueue = new LinkedBlockingQueue[Runnable]
val threadFactory = new ThreadFactory {

View File

@ -11,16 +11,57 @@
package scala.actors
package scheduler
import java.util.concurrent.Callable
import java.util.concurrent.{Callable, ExecutorService}
import scala.concurrent.ThreadPoolRunner
/**
* The <code>ExecutorScheduler</code> object is used to create
* <code>ExecutorScheduler</code> instances.
*
* @author Philipp Haller
*/
object ExecutorScheduler {
private def start(sched: ExecutorScheduler): ExecutorScheduler = {
sched.start()
sched
}
/** Creates an <code>ExecutorScheduler</code> using the provided
* <code>ExecutorService</code>.
*
* @param exec the executor to use
* @return the scheduler
*/
def apply(exec: ExecutorService): ExecutorScheduler =
start(new ExecutorScheduler {
def executor: ExecutorService = exec
})
/** Creates an <code>ExecutorScheduler</code> using the provided
* <code>ExecutorService</code>.
*
* @param exec the executor to use
* @param term whether the scheduler should automatically terminate
* @return the scheduler
*/
def apply(exec: ExecutorService, term: Boolean): ExecutorScheduler =
start(new ExecutorScheduler {
def executor: ExecutorService = exec
override val terminate = term
})
}
/**
* The <code>ExecutorScheduler</code> class uses an
* <code>ExecutorService</code> to execute <code>Actor</code>s.
*
* @author Philipp Haller
*/
private[scheduler] trait ExecutorScheduler extends IScheduler with ThreadPoolRunner {
trait ExecutorScheduler extends Thread
with IScheduler with TerminationService
with ThreadPoolRunner {
def execute(task: Runnable) {
super[ThreadPoolRunner].execute(task.asInstanceOf[Task[Unit]])

View File

@ -0,0 +1,19 @@
/* __ *\
** ________ ___ / / ___ Scala API **
** / __/ __// _ | / / / _ | (c) 2005-2010, LAMP/EPFL **
** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
** /____/\___/_/ |_/____/_/ | | **
** |/ **
\* */
package scala.actors.scheduler
import scala.util.control.ControlThrowable
/**
* The <code>QuitControl</code> class is used to manage control flow
* of certain schedulers.
*
* @author Philipp Haller
*/
private[scheduler] class QuitControl extends ControlThrowable

View File

@ -11,7 +11,6 @@
package scala.actors
package scheduler
import scala.util.control.ControlThrowable
import java.lang.{Runnable, Thread, InterruptedException}
/**
@ -70,11 +69,3 @@ abstract class SchedulerService(daemon: Boolean) extends Thread with IScheduler
}
}
/**
* The <code>QuitControl</code> class is used to manage control flow
* of certain schedulers and worker threads.
*
* @version 0.9.8
* @author Philipp Haller
*/
private[actors] class QuitControl extends ControlThrowable

View File

@ -28,9 +28,9 @@ import java.util.concurrent.ExecutorService
*
* @author Philipp Haller
*/
class SimpleExecutorScheduler(protected var executor: ExecutorService,
protected var terminate: Boolean)
extends TerminationService(terminate) with ExecutorScheduler {
class SimpleExecutorScheduler(protected override val executor: ExecutorService,
protected override val terminate: Boolean)
extends ExecutorScheduler {
/* This constructor (and the var above) is currently only used to work
* around a bug in scaladoc, which cannot deal with early initializers

View File

@ -13,7 +13,8 @@ package scheduler
import scala.collection.mutable.HashMap
trait TerminationMonitor {
private[scheduler] trait TerminationMonitor {
_: IScheduler =>
protected var activeActors = 0
protected val terminationHandlers = new HashMap[Reactor, () => Unit]

View File

@ -11,7 +11,7 @@
package scala.actors
package scheduler
import java.lang.{Runnable, Thread, InterruptedException}
import java.lang.{Thread, InterruptedException}
/**
* The <code>TerminationService</code> class starts a new thread
@ -21,11 +21,16 @@ import java.lang.{Runnable, Thread, InterruptedException}
*
* @author Philipp Haller
*/
abstract class TerminationService(terminate: Boolean)
extends Thread with IScheduler with TerminationMonitor {
private[scheduler] trait TerminationService extends TerminationMonitor {
_: Thread with IScheduler =>
private var terminating = false
/** Indicates whether the scheduler should terminate when all
* actors have terminated.
*/
protected val terminate = true
protected val CHECK_FREQ = 50
def onShutdown(): Unit
@ -39,11 +44,11 @@ abstract class TerminationService(terminate: Boolean)
} catch {
case _: InterruptedException =>
}
if (terminating)
if (terminating || (terminate && allActorsTerminated))
throw new QuitControl
if (terminate && allActorsTerminated)
throw new QuitControl
gc()
}
}
} catch {
@ -60,4 +65,5 @@ abstract class TerminationService(terminate: Boolean)
def shutdown(): Unit = synchronized {
terminating = true
}
}

View File

@ -29,14 +29,14 @@ import scala.concurrent.ManagedBlocker
* @author Philipp Haller
*/
class ThreadPoolScheduler(protected var executor: ThreadPoolExecutor,
protected val terminate: Boolean,
protected override val terminate: Boolean,
protected val daemon: Boolean)
extends Thread with ExecutorScheduler with TerminationMonitor {
setDaemon(daemon)
private var terminating = false // guarded by this
protected val CHECK_FREQ = 10
protected override val CHECK_FREQ = 10
/* This constructor (and the var above) is currently only used to work
* around a bug in scaladoc, which cannot deal with early initializers
@ -74,7 +74,7 @@ class ThreadPoolScheduler(protected var executor: ThreadPoolExecutor,
/** Shuts down the scheduler.
*/
def shutdown(): Unit = synchronized {
override def shutdown(): Unit = synchronized {
terminating = true
}

View File

@ -0,0 +1,20 @@
Two: OK
One: OK
Two: OK
One: OK
Two: OK
One: OK
Two: OK
One: OK
Two: OK
One: OK
Two: OK
One: OK
Two: OK
One: OK
Two: OK
One: OK
Two: OK
One: OK
Two: OK
One: OK

View File

@ -0,0 +1,59 @@
import java.util.concurrent.Executors
import scala.actors.{Actor, SchedulerAdapter}
import Actor._
trait AdaptedActor extends Actor {
override def scheduler =
Test.scheduler
}
object One extends AdaptedActor {
def act() {
Two.start()
var i = 0
loopWhile (i < 10000) {
i += 1
Two ! 'MsgForTwo
react {
case 'MsgForOne =>
if (i % 1000 == 0)
println("One: OK")
if (i == 10000)
Test.executor.shutdown()
}
}
}
}
object Two extends AdaptedActor {
def act() {
var i = 0
loopWhile (i < 10000) {
i += 1
react {
case 'MsgForTwo =>
if (i % 1000 == 0)
println("Two: OK")
One ! 'MsgForOne
}
}
}
}
object Test {
val executor =
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())
val scheduler =
new SchedulerAdapter {
def execute(block: => Unit) {
executor.execute(new Runnable {
def run() { block }
})
}
}
def main(args: Array[String]) {
One.start()
}
}

View File

@ -0,0 +1,21 @@
Two: OK
One: OK
Two: OK
One: OK
Two: OK
One: OK
Two: OK
One: OK
Two: OK
One: OK
Two: OK
One: OK
Two: OK
One: OK
Two: OK
One: OK
Two: OK
One: OK
Two: OK
One: OK
One exited

View File

@ -0,0 +1,67 @@
import scala.actors.{Actor, SchedulerAdapter, Exit}
import Actor._
import java.util.concurrent.Executors
object One extends AdaptedActor {
def act() {
Two.start()
var i = 0
loopWhile (i < Test.NUM_MSG) {
i += 1
Two ! 'MsgForTwo
react {
case 'MsgForOne =>
if (i % (Test.NUM_MSG/10) == 0)
println("One: OK")
}
}
}
}
object Two extends AdaptedActor {
def act() {
var i = 0
loopWhile (i < Test.NUM_MSG) {
i += 1
react {
case 'MsgForTwo =>
if (i % (Test.NUM_MSG/10) == 0)
println("Two: OK")
One ! 'MsgForOne
}
}
}
}
trait AdaptedActor extends Actor {
override def scheduler =
Test.scheduler
}
object Test {
val NUM_MSG = 100000
val executor =
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())
val scheduler =
new SchedulerAdapter {
def execute(block: => Unit) {
executor.execute(new Runnable {
def run() { block }
})
}
}
def main(args: Array[String]) {
self.trapExit = true
link(One)
One.start()
receive {
case Exit(from, reason) =>
println("One exited")
Test.executor.shutdown()
}
}
}

View File

@ -0,0 +1,20 @@
Two: OK
One: OK
Two: OK
One: OK
Two: OK
One: OK
Two: OK
One: OK
Two: OK
One: OK
Two: OK
One: OK
Two: OK
One: OK
Two: OK
One: OK
Two: OK
One: OK
Two: OK
One: OK

View File

@ -0,0 +1,52 @@
import scala.actors.Actor
import scala.actors.scheduler.ExecutorScheduler
import java.util.concurrent.Executors
object One extends AdaptedActor {
def act() {
Two.start()
var i = 0
loopWhile (i < Test.NUM_MSG) {
i += 1
Two ! 'MsgForTwo
react {
case 'MsgForOne =>
if (i % (Test.NUM_MSG/10) == 0)
println("One: OK")
}
}
}
}
object Two extends AdaptedActor {
def act() {
var i = 0
loopWhile (i < Test.NUM_MSG) {
i += 1
react {
case 'MsgForTwo =>
if (i % (Test.NUM_MSG/10) == 0)
println("Two: OK")
One ! 'MsgForOne
}
}
}
}
trait AdaptedActor extends Actor {
override def scheduler =
Test.scheduler
}
object Test {
val NUM_MSG = 100000
val executor =
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())
val scheduler = ExecutorScheduler(executor)
def main(args: Array[String]) {
One.start()
}
}

View File

@ -0,0 +1,21 @@
Two: OK
One: OK
Two: OK
One: OK
Two: OK
One: OK
Two: OK
One: OK
Two: OK
One: OK
Two: OK
One: OK
Two: OK
One: OK
Two: OK
One: OK
Two: OK
One: OK
Two: OK
One: OK
One exited

View File

@ -0,0 +1,64 @@
import scala.actors.{Actor, Exit}
import scala.actors.scheduler.ExecutorScheduler
import java.util.concurrent.Executors
object One extends AdaptedActor {
def act() {
Two.start()
var i = 0
loopWhile (i < Test.NUM_MSG) {
i += 1
Two ! 'MsgForTwo
react {
case 'MsgForOne =>
if (i % (Test.NUM_MSG/10) == 0)
println("One: OK")
}
}
}
}
object Two extends AdaptedActor {
def act() {
var i = 0
loopWhile (i < Test.NUM_MSG) {
i += 1
react {
case 'MsgForTwo =>
if (i % (Test.NUM_MSG/10) == 0)
println("Two: OK")
One ! 'MsgForOne
}
}
}
}
trait AdaptedActor extends Actor {
override def scheduler =
Test.scheduler
}
object Test {
val NUM_MSG = 100000
val scheduler =
ExecutorScheduler(
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()),
false)
def main(args: Array[String]) {
(new AdaptedActor {
def act() {
trapExit = true
link(One)
One.start()
receive {
case Exit(from, reason) =>
println("One exited")
Test.scheduler.shutdown()
}
}
}).start()
}
}

View File

@ -6,7 +6,7 @@ case object Pong
case object Stop
/**
* Ping pong example for OutputChannelActor.
* Ping pong example for Reactor.
*
* @author Philipp Haller
*/