diff --git a/src/actors/scala/actors/package.scala b/src/actors/scala/actors/package.scala index 7a7ef5899..eadc75c43 100644 --- a/src/actors/scala/actors/package.scala +++ b/src/actors/scala/actors/package.scala @@ -14,7 +14,10 @@ package object actors { @deprecated("this class is going to be removed in a future release") type WorkerThread = java.lang.Thread + @deprecated("use scala.actors.scheduler.SingleThreadedScheduler instead") + type SingleThreadedScheduler = scala.actors.scheduler.SingleThreadedScheduler + @deprecated("this value is going to be removed in a future release") - val ActorGC = scala.actors.Scheduler.impl.asInstanceOf[scala.actors.scheduler.ThreadPoolScheduler] + val ActorGC = scala.actors.Scheduler.impl.asInstanceOf[scala.actors.scheduler.ActorGC] } diff --git a/src/actors/scala/actors/scheduler/DefaultThreadPoolScheduler.scala b/src/actors/scala/actors/scheduler/DefaultThreadPoolScheduler.scala index 4fed00ba2..42942f934 100644 --- a/src/actors/scala/actors/scheduler/DefaultThreadPoolScheduler.scala +++ b/src/actors/scala/actors/scheduler/DefaultThreadPoolScheduler.scala @@ -24,9 +24,12 @@ import java.util.concurrent.{ThreadPoolExecutor, TimeUnit, LinkedBlockingQueue, * * @author Philipp Haller */ -private[actors] class DefaultThreadPoolScheduler(daemon: Boolean) extends ThreadPoolScheduler(daemon) { +private[actors] class DefaultThreadPoolScheduler(daemon: Boolean) + extends ExecutorScheduler { - executor = { + setDaemon(daemon) + + def executor = { val workQueue = new LinkedBlockingQueue[Runnable] val threadFactory = new ThreadFactory { diff --git a/src/actors/scala/actors/scheduler/ExecutorScheduler.scala b/src/actors/scala/actors/scheduler/ExecutorScheduler.scala index bcd524f34..4b5eec21d 100644 --- a/src/actors/scala/actors/scheduler/ExecutorScheduler.scala +++ b/src/actors/scala/actors/scheduler/ExecutorScheduler.scala @@ -11,16 +11,57 @@ package scala.actors package scheduler -import java.util.concurrent.Callable +import java.util.concurrent.{Callable, ExecutorService} import scala.concurrent.ThreadPoolRunner +/** + * The ExecutorScheduler object is used to create + * ExecutorScheduler instances. + * + * @author Philipp Haller + */ +object ExecutorScheduler { + + private def start(sched: ExecutorScheduler): ExecutorScheduler = { + sched.start() + sched + } + + /** Creates an ExecutorScheduler using the provided + * ExecutorService. + * + * @param exec the executor to use + * @return the scheduler + */ + def apply(exec: ExecutorService): ExecutorScheduler = + start(new ExecutorScheduler { + def executor: ExecutorService = exec + }) + + /** Creates an ExecutorScheduler using the provided + * ExecutorService. + * + * @param exec the executor to use + * @param term whether the scheduler should automatically terminate + * @return the scheduler + */ + def apply(exec: ExecutorService, term: Boolean): ExecutorScheduler = + start(new ExecutorScheduler { + def executor: ExecutorService = exec + override val terminate = term + }) + +} + /** * The ExecutorScheduler class uses an * ExecutorService to execute Actors. * * @author Philipp Haller */ -private[scheduler] trait ExecutorScheduler extends IScheduler with ThreadPoolRunner { +trait ExecutorScheduler extends Thread + with IScheduler with TerminationService + with ThreadPoolRunner { def execute(task: Runnable) { super[ThreadPoolRunner].execute(task.asInstanceOf[Task[Unit]]) diff --git a/src/actors/scala/actors/scheduler/QuitControl.scala b/src/actors/scala/actors/scheduler/QuitControl.scala new file mode 100644 index 000000000..b217094c1 --- /dev/null +++ b/src/actors/scala/actors/scheduler/QuitControl.scala @@ -0,0 +1,19 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2005-2010, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.actors.scheduler + +import scala.util.control.ControlThrowable + +/** + * The QuitControl class is used to manage control flow + * of certain schedulers. + * + * @author Philipp Haller + */ +private[scheduler] class QuitControl extends ControlThrowable diff --git a/src/actors/scala/actors/scheduler/SchedulerService.scala b/src/actors/scala/actors/scheduler/SchedulerService.scala index 1f886dbae..9fbee3d0b 100644 --- a/src/actors/scala/actors/scheduler/SchedulerService.scala +++ b/src/actors/scala/actors/scheduler/SchedulerService.scala @@ -11,7 +11,6 @@ package scala.actors package scheduler -import scala.util.control.ControlThrowable import java.lang.{Runnable, Thread, InterruptedException} /** @@ -70,11 +69,3 @@ abstract class SchedulerService(daemon: Boolean) extends Thread with IScheduler } } -/** - * The QuitControl class is used to manage control flow - * of certain schedulers and worker threads. - * - * @version 0.9.8 - * @author Philipp Haller - */ -private[actors] class QuitControl extends ControlThrowable diff --git a/src/actors/scala/actors/scheduler/SimpleExecutorScheduler.scala b/src/actors/scala/actors/scheduler/SimpleExecutorScheduler.scala index 324a60174..fafba06c8 100644 --- a/src/actors/scala/actors/scheduler/SimpleExecutorScheduler.scala +++ b/src/actors/scala/actors/scheduler/SimpleExecutorScheduler.scala @@ -28,9 +28,9 @@ import java.util.concurrent.ExecutorService * * @author Philipp Haller */ -class SimpleExecutorScheduler(protected var executor: ExecutorService, - protected var terminate: Boolean) - extends TerminationService(terminate) with ExecutorScheduler { +class SimpleExecutorScheduler(protected override val executor: ExecutorService, + protected override val terminate: Boolean) + extends ExecutorScheduler { /* This constructor (and the var above) is currently only used to work * around a bug in scaladoc, which cannot deal with early initializers diff --git a/src/actors/scala/actors/scheduler/TerminationMonitor.scala b/src/actors/scala/actors/scheduler/TerminationMonitor.scala index ccee007d4..d676cc2b1 100644 --- a/src/actors/scala/actors/scheduler/TerminationMonitor.scala +++ b/src/actors/scala/actors/scheduler/TerminationMonitor.scala @@ -13,7 +13,8 @@ package scheduler import scala.collection.mutable.HashMap -trait TerminationMonitor { +private[scheduler] trait TerminationMonitor { + _: IScheduler => protected var activeActors = 0 protected val terminationHandlers = new HashMap[Reactor, () => Unit] diff --git a/src/actors/scala/actors/scheduler/TerminationService.scala b/src/actors/scala/actors/scheduler/TerminationService.scala index da71b6ac2..aa047eedf 100644 --- a/src/actors/scala/actors/scheduler/TerminationService.scala +++ b/src/actors/scala/actors/scheduler/TerminationService.scala @@ -11,7 +11,7 @@ package scala.actors package scheduler -import java.lang.{Runnable, Thread, InterruptedException} +import java.lang.{Thread, InterruptedException} /** * The TerminationService class starts a new thread @@ -21,11 +21,16 @@ import java.lang.{Runnable, Thread, InterruptedException} * * @author Philipp Haller */ -abstract class TerminationService(terminate: Boolean) - extends Thread with IScheduler with TerminationMonitor { +private[scheduler] trait TerminationService extends TerminationMonitor { + _: Thread with IScheduler => private var terminating = false + /** Indicates whether the scheduler should terminate when all + * actors have terminated. + */ + protected val terminate = true + protected val CHECK_FREQ = 50 def onShutdown(): Unit @@ -39,11 +44,11 @@ abstract class TerminationService(terminate: Boolean) } catch { case _: InterruptedException => } - if (terminating) + + if (terminating || (terminate && allActorsTerminated)) throw new QuitControl - if (terminate && allActorsTerminated) - throw new QuitControl + gc() } } } catch { @@ -60,4 +65,5 @@ abstract class TerminationService(terminate: Boolean) def shutdown(): Unit = synchronized { terminating = true } + } diff --git a/src/actors/scala/actors/scheduler/ThreadPoolScheduler.scala b/src/actors/scala/actors/scheduler/ThreadPoolScheduler.scala index beb0878b4..b7a40ca93 100644 --- a/src/actors/scala/actors/scheduler/ThreadPoolScheduler.scala +++ b/src/actors/scala/actors/scheduler/ThreadPoolScheduler.scala @@ -29,14 +29,14 @@ import scala.concurrent.ManagedBlocker * @author Philipp Haller */ class ThreadPoolScheduler(protected var executor: ThreadPoolExecutor, - protected val terminate: Boolean, + protected override val terminate: Boolean, protected val daemon: Boolean) extends Thread with ExecutorScheduler with TerminationMonitor { setDaemon(daemon) private var terminating = false // guarded by this - protected val CHECK_FREQ = 10 + protected override val CHECK_FREQ = 10 /* This constructor (and the var above) is currently only used to work * around a bug in scaladoc, which cannot deal with early initializers @@ -74,7 +74,7 @@ class ThreadPoolScheduler(protected var executor: ThreadPoolExecutor, /** Shuts down the scheduler. */ - def shutdown(): Unit = synchronized { + override def shutdown(): Unit = synchronized { terminating = true } diff --git a/test/files/jvm/actor-executor.check b/test/files/jvm/actor-executor.check new file mode 100644 index 000000000..bdbdb5c6a --- /dev/null +++ b/test/files/jvm/actor-executor.check @@ -0,0 +1,20 @@ +Two: OK +One: OK +Two: OK +One: OK +Two: OK +One: OK +Two: OK +One: OK +Two: OK +One: OK +Two: OK +One: OK +Two: OK +One: OK +Two: OK +One: OK +Two: OK +One: OK +Two: OK +One: OK diff --git a/test/files/jvm/actor-executor.scala b/test/files/jvm/actor-executor.scala new file mode 100644 index 000000000..e65004338 --- /dev/null +++ b/test/files/jvm/actor-executor.scala @@ -0,0 +1,59 @@ +import java.util.concurrent.Executors +import scala.actors.{Actor, SchedulerAdapter} +import Actor._ + +trait AdaptedActor extends Actor { + override def scheduler = + Test.scheduler +} + +object One extends AdaptedActor { + def act() { + Two.start() + var i = 0 + loopWhile (i < 10000) { + i += 1 + Two ! 'MsgForTwo + react { + case 'MsgForOne => + if (i % 1000 == 0) + println("One: OK") + if (i == 10000) + Test.executor.shutdown() + } + } + } +} + +object Two extends AdaptedActor { + def act() { + var i = 0 + loopWhile (i < 10000) { + i += 1 + react { + case 'MsgForTwo => + if (i % 1000 == 0) + println("Two: OK") + One ! 'MsgForOne + } + } + } +} + +object Test { + val executor = + Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()) + + val scheduler = + new SchedulerAdapter { + def execute(block: => Unit) { + executor.execute(new Runnable { + def run() { block } + }) + } + } + + def main(args: Array[String]) { + One.start() + } +} diff --git a/test/files/jvm/actor-executor2.check b/test/files/jvm/actor-executor2.check new file mode 100644 index 000000000..da78f4583 --- /dev/null +++ b/test/files/jvm/actor-executor2.check @@ -0,0 +1,21 @@ +Two: OK +One: OK +Two: OK +One: OK +Two: OK +One: OK +Two: OK +One: OK +Two: OK +One: OK +Two: OK +One: OK +Two: OK +One: OK +Two: OK +One: OK +Two: OK +One: OK +Two: OK +One: OK +One exited diff --git a/test/files/jvm/actor-executor2.scala b/test/files/jvm/actor-executor2.scala new file mode 100644 index 000000000..9e99e167a --- /dev/null +++ b/test/files/jvm/actor-executor2.scala @@ -0,0 +1,67 @@ +import scala.actors.{Actor, SchedulerAdapter, Exit} +import Actor._ +import java.util.concurrent.Executors + +object One extends AdaptedActor { + def act() { + Two.start() + var i = 0 + loopWhile (i < Test.NUM_MSG) { + i += 1 + Two ! 'MsgForTwo + react { + case 'MsgForOne => + if (i % (Test.NUM_MSG/10) == 0) + println("One: OK") + } + } + } +} + +object Two extends AdaptedActor { + def act() { + var i = 0 + loopWhile (i < Test.NUM_MSG) { + i += 1 + react { + case 'MsgForTwo => + if (i % (Test.NUM_MSG/10) == 0) + println("Two: OK") + One ! 'MsgForOne + } + } + } +} + +trait AdaptedActor extends Actor { + override def scheduler = + Test.scheduler +} + +object Test { + val NUM_MSG = 100000 + + val executor = + Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()) + + val scheduler = + new SchedulerAdapter { + def execute(block: => Unit) { + executor.execute(new Runnable { + def run() { block } + }) + } + } + + def main(args: Array[String]) { + self.trapExit = true + link(One) + One.start() + + receive { + case Exit(from, reason) => + println("One exited") + Test.executor.shutdown() + } + } +} diff --git a/test/files/jvm/actor-executor3.check b/test/files/jvm/actor-executor3.check new file mode 100644 index 000000000..bdbdb5c6a --- /dev/null +++ b/test/files/jvm/actor-executor3.check @@ -0,0 +1,20 @@ +Two: OK +One: OK +Two: OK +One: OK +Two: OK +One: OK +Two: OK +One: OK +Two: OK +One: OK +Two: OK +One: OK +Two: OK +One: OK +Two: OK +One: OK +Two: OK +One: OK +Two: OK +One: OK diff --git a/test/files/jvm/actor-executor3.scala b/test/files/jvm/actor-executor3.scala new file mode 100644 index 000000000..bf060b8ac --- /dev/null +++ b/test/files/jvm/actor-executor3.scala @@ -0,0 +1,52 @@ +import scala.actors.Actor +import scala.actors.scheduler.ExecutorScheduler +import java.util.concurrent.Executors + +object One extends AdaptedActor { + def act() { + Two.start() + var i = 0 + loopWhile (i < Test.NUM_MSG) { + i += 1 + Two ! 'MsgForTwo + react { + case 'MsgForOne => + if (i % (Test.NUM_MSG/10) == 0) + println("One: OK") + } + } + } +} + +object Two extends AdaptedActor { + def act() { + var i = 0 + loopWhile (i < Test.NUM_MSG) { + i += 1 + react { + case 'MsgForTwo => + if (i % (Test.NUM_MSG/10) == 0) + println("Two: OK") + One ! 'MsgForOne + } + } + } +} + +trait AdaptedActor extends Actor { + override def scheduler = + Test.scheduler +} + +object Test { + val NUM_MSG = 100000 + + val executor = + Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()) + + val scheduler = ExecutorScheduler(executor) + + def main(args: Array[String]) { + One.start() + } +} diff --git a/test/files/jvm/actor-executor4.check b/test/files/jvm/actor-executor4.check new file mode 100644 index 000000000..da78f4583 --- /dev/null +++ b/test/files/jvm/actor-executor4.check @@ -0,0 +1,21 @@ +Two: OK +One: OK +Two: OK +One: OK +Two: OK +One: OK +Two: OK +One: OK +Two: OK +One: OK +Two: OK +One: OK +Two: OK +One: OK +Two: OK +One: OK +Two: OK +One: OK +Two: OK +One: OK +One exited diff --git a/test/files/jvm/actor-executor4.scala b/test/files/jvm/actor-executor4.scala new file mode 100644 index 000000000..a912d7609 --- /dev/null +++ b/test/files/jvm/actor-executor4.scala @@ -0,0 +1,64 @@ +import scala.actors.{Actor, Exit} +import scala.actors.scheduler.ExecutorScheduler +import java.util.concurrent.Executors + +object One extends AdaptedActor { + def act() { + Two.start() + var i = 0 + loopWhile (i < Test.NUM_MSG) { + i += 1 + Two ! 'MsgForTwo + react { + case 'MsgForOne => + if (i % (Test.NUM_MSG/10) == 0) + println("One: OK") + } + } + } +} + +object Two extends AdaptedActor { + def act() { + var i = 0 + loopWhile (i < Test.NUM_MSG) { + i += 1 + react { + case 'MsgForTwo => + if (i % (Test.NUM_MSG/10) == 0) + println("Two: OK") + One ! 'MsgForOne + } + } + } +} + +trait AdaptedActor extends Actor { + override def scheduler = + Test.scheduler +} + +object Test { + val NUM_MSG = 100000 + + val scheduler = + ExecutorScheduler( + Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()), + false) + + def main(args: Array[String]) { + (new AdaptedActor { + def act() { + trapExit = true + link(One) + One.start() + + receive { + case Exit(from, reason) => + println("One exited") + Test.scheduler.shutdown() + } + } + }).start() + } +} diff --git a/test/files/jvm/reactor.scala b/test/files/jvm/reactor.scala index 2fc54d834..919263b65 100644 --- a/test/files/jvm/reactor.scala +++ b/test/files/jvm/reactor.scala @@ -6,7 +6,7 @@ case object Pong case object Stop /** - * Ping pong example for OutputChannelActor. + * Ping pong example for Reactor. * * @author Philipp Haller */