scala.actors: fixed build on JDK 1.4

git-svn-id: http://lampsvn.epfl.ch/svn-repos/scala/scala/trunk@9835 5e8d7ff9-d8ef-0310-90f0-a4852d11357a
This commit is contained in:
phaller 2007-01-31 16:50:32 +00:00
parent 960646dba0
commit 25d37e2d4d
4 changed files with 70 additions and 23 deletions

View File

@ -134,7 +134,7 @@ INITIALISATION
<else><fail>System environment could not be determined</fail></else> <else><fail>System environment could not be determined</fail></else>
</if> </if>
<!-- Setting flag for Java versions 1.4.x --> <!-- Setting flag for Java versions 1.4.x -->
<condition property="java14" else="false"> <condition property="java14">
<contains string="${java.version}" substring="1.4"/> <contains string="${java.version}" substring="1.4"/>
</condition> </condition>
<!-- Finding out SVN revision --> <!-- Finding out SVN revision -->
@ -441,7 +441,7 @@ BUILD QUICK-TEST LAYER
<pathelement location="${quick.dir}/lib/actors"/> <pathelement location="${quick.dir}/lib/actors"/>
</classpath> </classpath>
<include name="scala/actors/**/*.scala"/> <include name="scala/actors/**/*.scala"/>
<exclude name="scala/actors/**/JDK5Scheduler.scala" if="java14"/> <exclude name="scala/actors/ThreadPoolScheduler.scala" if="java14"/>
<excludesfile name="${nsc.excludes.file}" if="excludes.avail"/> <excludesfile name="${nsc.excludes.file}" if="excludes.avail"/>
</locker> </locker>
<!-- Build compiler --> <!-- Build compiler -->
@ -587,6 +587,7 @@ TEST
<pathelement location="${strap.dir}/lib/actors"/> <pathelement location="${strap.dir}/lib/actors"/>
</classpath> </classpath>
<include name="scala/actors/**/*.scala"/> <include name="scala/actors/**/*.scala"/>
<exclude name="scala/actors/ThreadPoolScheduler.scala" if="java14"/>
<excludesfile name="${nsc.excludes.file}" if="excludes.avail"/> <excludesfile name="${nsc.excludes.file}" if="excludes.avail"/>
</quick> </quick>
<!-- Build compiler --> <!-- Build compiler -->

View File

@ -43,7 +43,9 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
} }
/** /**
* Sends <code>msg</code> to this <code>Channel</code>. * Sends a message to this <code>Channel</code>.
*
* @param msg the message to be sent
*/ */
def !(msg: Msg): unit = { def !(msg: Msg): unit = {
receiver ! scala.actors.!(this, msg) receiver ! scala.actors.!(this, msg)
@ -57,34 +59,62 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
receiver forward scala.actors.!(this, msg) receiver forward scala.actors.!(this, msg)
} }
/**
* Receives a message from this <code>Channel</code>.
*
* @param f a partial function with message patterns and actions
* @return result of processing the received value
*/
def receive[R](f: PartialFunction[Any, R]): R = { def receive[R](f: PartialFunction[Any, R]): R = {
val C = this.asInstanceOf[Channel[Any]] val C = this.asInstanceOf[Channel[Any]]
// Martin: had to do this to get it to compiler after bug909 fix
receiver.receive { receiver.receive {
case C ! msg if (f.isDefinedAt(msg)) => f(msg) case C ! msg if (f.isDefinedAt(msg)) => f(msg)
} }
} }
/**
* Receives a message from this <code>Channel</code> within a certain
* time span.
*
* @param msec the time span before timeout
* @param f a partial function with message patterns and actions
* @return result of processing the received value
*/
def receiveWithin[R](msec: long)(f: PartialFunction[Any, R]): R = { def receiveWithin[R](msec: long)(f: PartialFunction[Any, R]): R = {
val C = this.asInstanceOf[Channel[Any]] val C = this.asInstanceOf[Channel[Any]]
// Martin: had to do this to get it to compiler after bug909 fix
receiver.receiveWithin(msec) { receiver.receiveWithin(msec) {
case C ! msg if (f.isDefinedAt(msg)) => f(msg) case C ! msg if (f.isDefinedAt(msg)) => f(msg)
case TIMEOUT => f(TIMEOUT) case TIMEOUT => f(TIMEOUT)
} }
} }
/**
* Receives a message from this <code>Channel</code>.
* <p>
* This method never returns. Therefore, the rest of the computation
* has to be contained in the actions of the partial function.
*
* @param f a partial function with message patterns and actions
*/
def react(f: PartialFunction[Any, Unit]): Nothing = { def react(f: PartialFunction[Any, Unit]): Nothing = {
val C = this.asInstanceOf[Channel[Any]] val C = this.asInstanceOf[Channel[Any]]
// Martin: had to do this to get it to compiler after bug909 fix
receiver.react { receiver.react {
case C ! msg if (f.isDefinedAt(msg)) => f(msg) case C ! msg if (f.isDefinedAt(msg)) => f(msg)
} }
} }
/**
* Receives a message from this <code>Channel</code> within a certain
* time span.
* <p>
* This method never returns. Therefore, the rest of the computation
* has to be contained in the actions of the partial function.
*
* @param msec the time span before timeout
* @param f a partial function with message patterns and actions
*/
def reactWithin(msec: long)(f: PartialFunction[Any, Unit]): Nothing = { def reactWithin(msec: long)(f: PartialFunction[Any, Unit]): Nothing = {
val C = this.asInstanceOf[Channel[Any]] val C = this.asInstanceOf[Channel[Any]]
// Martin: had to do this to get it to compiler after bug909 fix
receiver.reactWithin(msec) { receiver.reactWithin(msec) {
case C ! msg if (f.isDefinedAt(msg)) => f(msg) case C ! msg if (f.isDefinedAt(msg)) => f(msg)
case TIMEOUT => f(TIMEOUT) case TIMEOUT => f(TIMEOUT)
@ -92,8 +122,11 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
} }
/** /**
* Sends <code>msg</code> to this <code>Channel</code> and * Sends a message to this <code>Channel</code> and
* awaits reply. * awaits reply.
*
* @param msg the message to be sent
* @return the reply
*/ */
def !?(msg: Msg): Any = { def !?(msg: Msg): Any = {
val replyChannel = Actor.self.freshReply() val replyChannel = Actor.self.freshReply()
@ -103,6 +136,15 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
} }
} }
/**
* Sends a message to this <code>Channel</code> and
* awaits reply within a certain time span.
*
* @param msec the time span before timeout
* @param msg the message to be sent
* @return <code>None</code> in case of timeout, otherwise
* <code>Some(x)</code> where <code>x</code> is the reply
*/
def !?(msec: long, msg: Msg): Option[Any] = { def !?(msec: long, msg: Msg): Option[Any] = {
val replyChannel = Actor.self.freshReply() val replyChannel = Actor.self.freshReply()
receiver ! scala.actors.!(this, msg) receiver ! scala.actors.!(this, msg)

View File

@ -29,7 +29,7 @@ import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, Has
object Scheduler { object Scheduler {
private var sched: IScheduler = private var sched: IScheduler =
{ {
var s: Thread with IScheduler = null var s: IScheduler = null
// Check for JDK version >= 1.5 // Check for JDK version >= 1.5
var olderThanJDK5 = false var olderThanJDK5 = false
@ -42,18 +42,8 @@ object Scheduler {
s = if (olderThanJDK5) s = if (olderThanJDK5)
new TickedScheduler new TickedScheduler
else { else
var corePoolSize = 4 Class.forName("scala.actors.ThreadPoolScheduler").newInstance().asInstanceOf[IScheduler]
var maxPoolSize = 16
val prop = java.lang.System.getProperty("actors.corePoolSize")
if (null ne prop) {
corePoolSize =
Integer.parseInt(java.lang.System.getProperty("actors.corePoolSize"))
maxPoolSize =
Integer.parseInt(java.lang.System.getProperty("actors.maxPoolSize"))
}
new JDK5Scheduler(corePoolSize, maxPoolSize)
}
s.start() s.start()
s s
} }
@ -86,6 +76,7 @@ object Scheduler {
* @author Philipp Haller * @author Philipp Haller
*/ */
trait IScheduler { trait IScheduler {
def start(): unit
def start(task: Reaction): unit def start(task: Reaction): unit
def execute(task: Reaction): unit def execute(task: Reaction): unit
def getTask(worker: WorkerThread): Runnable def getTask(worker: WorkerThread): Runnable
@ -115,6 +106,8 @@ trait IScheduler {
* @author Philipp Haller * @author Philipp Haller
*/ */
class SingleThreadedScheduler extends IScheduler { class SingleThreadedScheduler extends IScheduler {
def start() {}
def start(task: Reaction) { def start(task: Reaction) {
// execute task immediately on same thread // execute task immediately on same thread
task.run() task.run()

View File

@ -13,7 +13,7 @@ import java.util.concurrent.{ThreadPoolExecutor,
TimeUnit, TimeUnit,
RejectedExecutionHandler} RejectedExecutionHandler}
class TaskRejectedHandler(sched: JDK5Scheduler) extends RejectedExecutionHandler { class TaskRejectedHandler(sched: ThreadPoolScheduler) extends RejectedExecutionHandler {
def rejectedExecution(r: Runnable, executor: ThreadPoolExecutor) { def rejectedExecution(r: Runnable, executor: ThreadPoolExecutor) {
sched.pendReaction sched.pendReaction
r.run() r.run()
@ -26,7 +26,18 @@ class TaskRejectedHandler(sched: JDK5Scheduler) extends RejectedExecutionHandler
* @version 0.9.2 * @version 0.9.2
* @author Philipp Haller * @author Philipp Haller
*/ */
class JDK5Scheduler(initCoreSize: int, maxSize: int) extends Thread with IScheduler { class ThreadPoolScheduler extends Thread with IScheduler {
var initCoreSize = 4
var maxSize = 16
val prop = java.lang.System.getProperty("actors.corePoolSize")
if (null ne prop) {
initCoreSize =
Integer.parseInt(java.lang.System.getProperty("actors.corePoolSize"))
maxSize =
Integer.parseInt(java.lang.System.getProperty("actors.maxPoolSize"))
}
/* Note: /* Note:
* When using an unbounded queue such as a * When using an unbounded queue such as a