Added actors library.

git-svn-id: http://lampsvn.epfl.ch/svn-repos/scala/scala/trunk@7941 5e8d7ff9-d8ef-0310-90f0-a4852d11357a
This commit is contained in:
phaller 2006-06-21 12:35:21 +00:00
parent 151adeb972
commit 8997e512ef
73 changed files with 4916 additions and 1 deletions

View File

@ -53,6 +53,7 @@ PROPERTIES
<property name="dist.dir" value="${basedir}/dists"/>
<property name="lib.jar.name" value="scala-library.jar"/>
<property name="dbc.jar.name" value="scala-dbc.jar"/>
<property name="actors.jar.name" value="scala-actors.jar"/>
<property name="comp.jar.name" value="scala-compiler.jar"/>
<property name="scala.exec.name" value="scala"/>
<property name="scalac.exec.name" value="scalac"/>
@ -289,6 +290,7 @@ BUILD LOCAL REFERENCE (LOCKER) LAYER
<include name="**/*.scala"/>
<exclude name="scala/Predef.scala"/>
<exclude name="scala/dbc/**"/>
<exclude name="scala/actors/**"/>
<excludesfile name="${nsc.excludes.file}" if="excludes.avail"/>
</starr>
<!-- Build compiler -->
@ -405,6 +407,7 @@ BUILD QUICK-TEST LAYER
<include name="**/*.scala"/>
<exclude name="scala/Predef.scala"/>
<exclude name="scala/dbc/**"/>
<exclude name="scala/actors/**"/>
<excludesfile name="${nsc.excludes.file}" if="excludes.avail"/>
</locker>
<!-- Build DBC -->
@ -421,6 +424,20 @@ BUILD QUICK-TEST LAYER
<include name="scala/dbc/**/*.scala"/>
<excludesfile name="${nsc.excludes.file}" if="excludes.avail"/>
</locker>
<!-- Build actors library -->
<mkdir dir="${quick.dir}/lib/actors"/>
<locker
srcdir="${src.dir}/actors"
destdir="${quick.dir}/lib/actors"
addparams="${nsc.params}"
scalacdebugging="${nsc.log-files}">
<classpath>
<pathelement location="${quick.dir}/lib/library"/>
<pathelement location="${quick.dir}/lib/actors"/>
</classpath>
<include name="scala/actors/**/*.scala"/>
<excludesfile name="${nsc.excludes.file}" if="excludes.avail"/>
</locker>
<!-- Build compiler -->
<mkdir dir="${quick.dir}/lib/compiler"/>
<locker
@ -540,6 +557,7 @@ TEST
<include name="**/*.scala"/>
<exclude name="scala/Predef.scala"/>
<exclude name="scala/dbc/**"/>
<exclude name="scala/actors/**"/>
<excludesfile name="${nsc.excludes.file}" if="excludes.avail"/>
</quick>
<!-- Build DBC -->
@ -555,6 +573,19 @@ TEST
<include name="scala/dbc/**/*.scala"/>
<excludesfile name="${nsc.excludes.file}" if="excludes.avail"/>
</quick>
<!-- Build actors library -->
<mkdir dir="${strap.dir}/lib/actors"/>
<quick
srcdir="${src.dir}/actors"
destdir="${strap.dir}/lib/actors"
addparams="${nsc.params}">
<classpath>
<pathelement location="${strap.dir}/lib/library"/>
<pathelement location="${strap.dir}/lib/actors"/>
</classpath>
<include name="scala/actors/**/*.scala"/>
<excludesfile name="${nsc.excludes.file}" if="excludes.avail"/>
</quick>
<!-- Build compiler -->
<mkdir dir="${strap.dir}/lib/compiler"/>
<quick
@ -683,6 +714,7 @@ DOCUMENTATION
documenttitle="&lt;div&gt;Scala ${version.number}&lt;/div&gt;"
classpath="${quick.dir}/lib/library">
<include name="dbc/**/*.scala"/>
<include name="actors/**/*.scala"/>
<include name="library/**/*.scala"/>
<excludesfile name="${nsc.excludes.file}" if="excludes.avail"/>
</quickdoc>
@ -824,6 +856,12 @@ GENERATES A DISTRIBUTION
<attribute name="Signature-Version" value="${version.number}"/>
</manifest>
</jar>
<jar destfile="${dist.current.dir}/lib/${actors.jar.name}">
<fileset dir="${strap.dir}/lib/actors"/>
<manifest>
<attribute name="Signature-Version" value="${version.number}"/>
</manifest>
</jar>
<!-- Copy executable files -->
<mkdir dir="${dist.current.dir}/bin"/>
<copy todir="${dist.current.dir}/bin">
@ -886,7 +924,7 @@ GENERATES A DISTRIBUTION
version="${version.number}"
desc="The Scala library. This is the minimal requirement to run any Scala program."
link="${sbaz.universe}/scala-library-${version.number}.sbp">
<libset dir="${dist.current.dir}/lib" includes="${lib.jar.name},${dbc.jar.name}"/>
<libset dir="${dist.current.dir}/lib" includes="${lib.jar.name},${dbc.jar.name},${actors.jar.name}"/>
</quicksbaz>
<!-- Create the Scala developper package -->
<quicksbaz

View File

@ -0,0 +1,60 @@
import scala.actors.multi.Pid
import actors.distributed.RemoteActor
import actors.distributed.TCP
import actors.distributed.TcpNode
import actors.distributed.TcpService
abstract class CounterMessage
case class Incr() extends CounterMessage
case class Value(p: Pid) extends CounterMessage
case class Result(v: int) extends CounterMessage
class Counter extends RemoteActor {
override def run(): unit =
loop(0)
def loop(value: int): unit = {
Console.println("Value: " + value)
receive {
case Incr() =>
loop(value + 1)
case Value(p) =>
p ! Result(value)
loop(value)
case other =>
loop(value)
}
}
}
class CounterUser extends RemoteActor {
override def run(): unit = {
alive(TCP())
spawn(TcpNode("127.0.0.1", 9090), "Counter")
receive {
case p: Pid =>
// communicate with counter
Console.println("" + node + ": Sending Incr() to remote Counter (" + p + ")...")
p ! Incr()
p ! Incr()
p ! Value(self)
receive {
case Result(v) =>
Console.println("Received result: " + v)
}
}
}
}
object CounterTest {
def main(args: Array[String]): unit = {
val serv = new TcpService(9090)
serv.start()
val cu = new CounterUser
cu.start()
}
}

View File

@ -0,0 +1,66 @@
/**
@author Philipp Haller <philipp.haller@epfl.ch>
This shows "customer passing" for implementing
recursive algorithms using actors.
*/
import scala.actors._;
import scala.actors.single._;
abstract class FactorialMessage;
case class Factorial(n: int, resTo: Actor) extends FactorialMessage;
case class Value(n: int) extends FactorialMessage;
class FactorialProcess extends Actor {
override def run: unit = {
receive {
case Factorial(n, resTo) =>
if (n == 0) {
Debug.info("Sending Value(1) to " + resTo)
resTo send Value(1)
}
else {
// spawn process that multiplies
/*val m = spawnReceive({
case Value(value) => resTo send Value(n * value)
});*/
val m = new MultiplyActor(n, resTo)
m.start
Debug.info("Sending Factorial(" + (n-1) + ", " + m + ") to " + this)
this send Factorial(n-1, m)
}
run
}
}
}
class MultiplyActor(factor: int, resTo: Actor) extends Actor {
override def run: unit =
receive {
case Value(value) =>
Debug.info("Sending Value(" + factor * value + ") to " + resTo)
resTo send Value(factor * value)
Debug.info("Done sending.")
}
}
object CustomerPassing {
def main(args: Array[String]): unit = {
val fac = new FactorialProcess
fac.start
val c = new Actor {
override def run: unit = {
fac send Factorial(3, this)
receive {
case Value(value) =>
System.out.println("Result: " + value)
}
}
}
c.start
}
}

View File

@ -0,0 +1,38 @@
package scala.actors;
/**
* @author Philipp Haller
*/
object Debug {
var lev = 2
def level = lev
def level_= (lev: int) = {
//Console.println("Setting debug level to " + lev)
this.lev = lev
}
def info(s: String) =
if (lev > 2) System.out.println("Info: " + s)
def warning(s: String) =
if (lev > 1) System.err.println("Warning: " + s)
def error(s: String) =
if (lev > 0) System.err.println("Error: " + s)
}
class Debug(tag: String) {
var lev = 2
def level = lev
def level_= (lev: int) = {
//Console.println("Setting debug level (" + tag + ") to " + lev)
this.lev = lev
}
def info(s: String) =
if (lev > 2) System.out.println(tag + " (info): " + s)
def warning(s: String) =
if (lev > 1) System.err.println(tag + " (warn): " + s)
def error(s: String) =
if (lev > 0) System.err.println(tag + " (erro): " + s)
}

View File

@ -0,0 +1,19 @@
package scala.actors;
/**
* @author Philipp Haller
*/
class Done extends Throwable {
override def fillInStackTrace(): Throwable =
this;
}
class ContinueException extends Throwable {
override def fillInStackTrace(): Throwable =
this;
}
class AbortException extends Throwable {
override def fillInStackTrace(): Throwable =
this;
}

View File

@ -0,0 +1,74 @@
package scala.actors;
/*
* SPanel.scala
* GUI for simple texts
*
*/
import java.awt._;
import java.awt.event._;
import javax.swing.event._;
import javax.swing._;
import java.io._;
class SPanel (WIDTH:int, HEIGHT:int, title:String) extends JFrame{
private var textArea:JTextArea = null;
private var resetButton:JButton = null;
private var scrollPane:JScrollPane = null;
private var panel:JPanel = null;
private var contentPane:Container = null;
private var levelChoice:JComboBox = null;
private var formatChoice:JComboBox = null;
//init
setTitle(title);
setSize(WIDTH, HEIGHT);
this.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
contentPane = getContentPane();
panel = new JPanel();
contentPane.add(panel, BorderLayout.SOUTH);
// Add a text area with scroll bars
textArea = new JTextArea(8, 40);
scrollPane = new JScrollPane(textArea);
contentPane.add(scrollPane, BorderLayout.CENTER);
// Add a "reset textarea" button
resetButton = new JButton("Clear");
panel.add(resetButton);
resetButton.addActionListener(
new ActionListener() {
def actionPerformed(evt:ActionEvent):Unit= {
textArea.setText("");
}
}
);
this.repaint();
this.show();
def addText(text:String):Unit = {
textArea.append(text+'\n');
if ( textArea.getHeight() > scrollPane.getHeight() ) {
scrollPane.getVerticalScrollBar().setValue(scrollPane.getVerticalScrollBar().getMaximum());
}
repaint();
}
/*
def actionPerformed(ActionEvent e):Unit {
}
*/
override def paint(g:Graphics ): unit = {
super.paint( g );
}
}

View File

@ -0,0 +1,236 @@
package scala.actors
import scala.collection.mutable._
import scala.actors.multi._
/**
* @author Philipp Haller
*/
abstract class IScheduler /*extends java.util.concurrent.Executor*/ {
def execute(item: ReceiverTask): unit;
def getTask(worker: WorkerThread): Runnable;
def tick(a: MailBox): unit;
val QUIT_TASK = new Runnable() {
def run(): unit = {};
override def toString() = "QUIT_TASK";
}
}
object Scheduler /*extends java.util.concurrent.Executor*/ {
private var sched: /*java.util.concurrent.Executor*/ IScheduler =
//java.util.concurrent.Executors.newFixedThreadPool(2);
//new FixedWorkersScheduler(2);
new SpareWorkerScheduler2
//new SpareWorkerScheduler
def impl = sched
def impl_= (scheduler: /*java.util.concurrent.Executor*/ IScheduler) = {
Debug.info("Using scheduler " + scheduler)
sched = scheduler
}
def execute(item: ReceiverTask) =
sched.execute(item)
def tick(a: MailBox) =
sched.tick(a)
}
class SpareWorkerScheduler2 extends IScheduler {
private val tasks = new Queue[ReceiverTask];
private var workers: Buffer[WorkerThread] = new ArrayBuffer[WorkerThread];
val idle = new Queue[WorkerThread];
val ticks = new HashMap[WorkerThread, long]
val executing = new HashMap[MailBox, WorkerThread]
var TICKFREQ = 50
def init = {
for (val i <- List.range(0, 2)) {
val worker = new WorkerThread(this)
workers += worker
worker.start()
}
}
init;
var maxWorkers = 0;
var ticksCnt = 0;
def tick(a: MailBox): unit = synchronized {
ticksCnt = ticksCnt + 1
executing.get(a) match {
case None => // thread outside of scheduler; error("No worker thread associated with actor " + a)
case Some(wt) =>
ticks.update(wt, System.currentTimeMillis)
}
}
def execute(item: ReceiverTask): unit = synchronized {
if (idle.length > 0) {
val wt = idle.dequeue
executing.update(item.actor, wt)
wt.execute(item)
}
else {
/* only create new worker thread
when all are blocked according to heuristic
we check time stamps of latest send/receive ops
of ALL workers
we stop if there is one which is not blocked */
val iter = workers.elements
var foundBusy = false
while (iter.hasNext && !foundBusy) {
val wt = iter.next
ticks.get(wt) match {
case None => foundBusy = true // assume not blocked
case Some(ts) => {
val currTime = System.currentTimeMillis
if (currTime - ts < TICKFREQ)
foundBusy = true
}
}
}
if (!foundBusy) {
val newWorker = new WorkerThread(this)
workers += newWorker
maxWorkers = workers.length // statistics
executing.update(item.actor, newWorker)
newWorker.execute(item)
newWorker.start()
}
else {
// wait assuming busy thread will be finished soon
// and ask for more work
tasks += item
}
}
}
def getTask(worker: WorkerThread) = synchronized {
if (tasks.length > 0) {
val item = tasks.dequeue
executing.update(item.actor, worker)
item
}
else {
idle += worker
null
}
}
}
/**
* @author Philipp Haller
*/
class SpareWorkerScheduler extends IScheduler {
private var canQuit = false;
private val tasks = new Queue[Runnable];
private val idle = new Queue[WorkerThread];
private var workers: Buffer[WorkerThread] = new ArrayBuffer[WorkerThread];
def init = {
for (val i <- List.range(0, 2)) {
val worker = new WorkerThread(this)
workers += worker
worker.start()
}
}
init;
var maxWorkers = 0;
def execute(item: ReceiverTask): unit = synchronized {
if (idle.length == 0) {
tasks += item
// create new worker
val newWorker = new WorkerThread(this)
workers += newWorker
maxWorkers = workers.length
newWorker.start()
//newWorker.execute(item)
}
else {
canQuit = true
idle.dequeue.execute(item)
}
}
def getTask(worker: WorkerThread) = synchronized {
if (tasks.length > 0) tasks.dequeue
else {
idle += worker
null
//if ((idle.length == workers.length) && canQuit) haltExcept(worker)
//else null
}
}
def tick(a: MailBox): unit = {}
def haltExcept(w: WorkerThread) = {
for (val i <- List.range(0, workers.length))
if (workers(i) != w) workers(i).halt
QUIT_TASK
}
}
/**
* @author Philipp Haller
*/
class FixedWorkersScheduler(workercnt: int) extends IScheduler {
private var canQuit = false;
private val tasks = new Queue[Runnable];
private val idle = new Queue[WorkerThread];
//Console.println("Running with " + workercnt + " workers")
private var workers = new Array[WorkerThread](workercnt);
def init = {
for (val i <- List.range(0, workers.length)) {
workers(i) = new WorkerThread(this)
workers(i).start()
}
}
init;
def execute(item: ReceiverTask): unit = synchronized {
if (workers.length == 0) item.run
else {
canQuit = true
if (idle.length > 0) idle.dequeue.execute(item)
else tasks += item
}
}
def getTask(worker: WorkerThread) = synchronized {
if (tasks.length > 0) tasks.dequeue
else {
idle += worker
null
//if ((idle.length == workers.length) && canQuit) haltExcept(worker)
//else null
}
}
def tick(a: MailBox): unit = {}
def haltExcept(w: WorkerThread) = {
for (val i <- List.range(0, workers.length))
if (workers(i) != w) workers(i).halt
QUIT_TASK
}
}

View File

@ -0,0 +1,156 @@
/**
@author Sebastien Noir
This class allows the (locl) sending of a message to an actor after a timeout. Used by the library to build receiveWithin(time:long). Note that the library deletes non received TIMEOUT() message if a messsage is received before the time-out occurs.
*/
package scala.actors
import scala.collection.mutable.PriorityQueue
import scala.actors.multi.Actor
import scala.actors.multi.MailBox
case class Signal()
object TimerThread extends AnyRef with Runnable {
case class WakedActor(actor: MailBox, time: long, reason: String) extends Ordered[WakedActor] {
var valid = true
def compare [b >: WakedActor <% Ordered[b]](that: b): int = that match {
case that2: WakedActor => -(this.time compare that2.time)
case _ => error("not comparable")
}
}
var queue = new PriorityQueue[WakedActor]
val t = new Thread(this); t.start
var lateList: List[WakedActor] = Nil
def trashRequest(a: MailBox) = synchronized {
// keep in mind: killing dead people is a bad idea!
queue.elements.find((wa: WakedActor) => wa.actor == a && wa.valid) match {
case Some(b) =>
b.valid = false
case None =>
lateList.find((wa2: WakedActor) => wa2.actor == a && wa2.valid) match {
case Some(b2) =>
b2.valid = false
case None =>
}
}
}
override def run = while (true) {
this.synchronized {
try {
val sleepTime = dequeueLateAndGetSleepTime
if (lateList.isEmpty) {
wait(sleepTime)
}
} catch {
case t: Throwable => { t.printStackTrace(); throw t }
}
}
// process guys waiting for signal and empty list
for (val wa <- lateList) {
if (wa.valid) {
wa.actor send Signal()
}
}
lateList = Nil
}
def requestSignal(a: Actor, waitMillis: long, reason: String): unit = this.synchronized {
Console.println("TTTT Actor "+a+" requests Signal in "+waitMillis +" ms for :"+reason)
val wakeTime = now + waitMillis
if (waitMillis < 0) {
a send Signal()
return
}
if (queue.isEmpty) { // add to queue and restart sleeping
queue += WakedActor(a, wakeTime, reason)
notify()
} else { //queue full
if (queue.max.time > wakeTime) { // add to 1st position and restart sleeping
queue += WakedActor (a, wakeTime, reason)
notify()
} else { // simply add to queue
queue += WakedActor (a, wakeTime, reason)
}
}
}
def requestTimeout(a: MailBox, waitMillis: long): unit = synchronized {
val wakeTime = now + waitMillis
if (waitMillis < 0) {
a send Signal()
return
}
if (queue.isEmpty) { // add to queue and restart sleeping
queue += WakedActor(a, wakeTime, "")
notify()
} else
if (queue.max.time > wakeTime) { // add to 1st position and restart sleeping
queue += WakedActor (a, wakeTime, "")
notify()
}
else // simply add to queue
queue += WakedActor (a, wakeTime, "")
}
private def dequeueLateAndGetSleepTime: long = {
val FOREVER: long = 0
var waitingList: List[WakedActor] = Nil
while (!queue.isEmpty) {
val next = queue.max.time
val amount = next - now
if (amount > 0) { // guy in queue is not late
lateList = waitingList // give back the list of waiting guys for signaling
return amount
}
else // we're late: dequeue and examine next guy
waitingList = queue.dequeue :: waitingList
}
// empty queue => sleep forever
lateList = waitingList
return FOREVER
}
def now = new java.util.Date().getTime()
}
//================================================================================
object TimerThreadTest {
def main (args:Array[String]) = {
new Tester (1000, "ONE").start
new Tester (500, "TWO").start
}
class Tester (duration : int, name:String) extends Actor {
var i = 0
def loop:unit = {
receive {
case Signal() =>
Console.println(name + i)
i = i+1;
loop
}
}
override def run = {
for (val i <-List.range(1,10)) {
TimerThread.requestSignal(this, (duration * i).asInstanceOf[long], ""+duration*i)
}
loop
}
}
}

View File

@ -0,0 +1,34 @@
package scala.actors;
/**
* @author Philipp Haller
*/
class WorkerThread(sched: IScheduler) extends Thread {
private var task: Runnable = null
private var running = true
def halt = synchronized {
running = false
notify()
}
def execute(r: Runnable) = synchronized {
//Debug.info("WORK: " + this + ": Executing task " + r)
task = r
notify()
}
override def run(): unit = synchronized {
while (running) {
if (task != null) {
task.run()
//Debug.info("WORK: " + this + " has finished.")
}
//Debug.info("WORK: " + this + ": Getting new task...")
task = sched.getTask(this)
//Debug.info("WORK (" + this + "): got task " + task)
if (task == sched.QUIT_TASK) running = false
else if (task == null) wait()
}
}
}

View File

@ -0,0 +1,9 @@
package scala.actors.distributed;
abstract class JXTAServiceBase(nodename: String) extends Thread with Service {
val serializer = new JavaSerializer(this);
private val internalNode = new JXTANode(nodename);
def node: Node = internalNode;
def createPid(actor: RemoteActor): RemotePid =
new JXTAPid(internalNode, makeUid, kernel, actor)
}

View File

@ -0,0 +1,34 @@
package scala.actors.distributed;
import java.io._;
import scala.collection.mutable._;
import scala.actors.distributed.picklers.BytePickle.SPU;
import scala.actors.distributed.picklers._;
import scala.actors.multi._;
[serializable]
class JavaSerializer(serv: Service) extends Serializer(serv) {
val debug = true;
def log (s:String) = {
if (debug) Console.println("JAVASerializer: "+s)
}
def serialize(o: AnyRef): Array[byte] = {
val bos = new ByteArrayOutputStream()
val out = new ObjectOutputStream(bos)
out.writeObject(o)
out.flush()
bos.toByteArray()
}
def deserialize(bytes: Array[byte]): AnyRef = {
val bis = new ByteArrayInputStream(bytes);
val in = new ObjectInputStream(bis);
in.readObject();
}
def pid: SPU[Pid] = null;
def addRep(name: String, repCons: Serializer => AnyRef): unit = {};
}

View File

@ -0,0 +1,40 @@
package scala.actors.distributed
import scala.actors.multi.Pid
import scala.actors.multi.ExcHandlerDesc
abstract class MessageTyper {
type DataType=Array[byte];
}
abstract class SystemMessage;
case class Send(rec: Pid, data: MessageTyper#DataType) extends SystemMessage;
case class Spawn(replyto: Pid, p: String) extends SystemMessage;
case class PidReply(res: Pid) extends SystemMessage;
case class Disconnect() extends SystemMessage;
case class NodeDown() extends SystemMessage;
// ACHTUNG: Tells "from" to create a _uni-directional_ link!
case class Link(from: Pid, to: Pid) extends SystemMessage;
case class UnLink(from: Pid, to: Pid) extends SystemMessage;
case class Exit1(from: Pid, to: Pid, reason: Symbol) extends SystemMessage;
case class SpawnObject(replyto: Pid, data: MessageTyper#DataType) extends SystemMessage;
case class NamedSend(sym: Symbol, data: MessageTyper#DataType) extends SystemMessage;
case class ForwardExc(destDesc: ExcHandlerDesc, e: Throwable) extends SystemMessage;
/*
case class NamedSendRep (ser:Serializer) extends TypeRep[NamedSend](ser) {
def serialize(content: NamedSend, w: java.io.Writer): unit = {
StringRep(ser).serialize(content.sym.name, w)
StringRep(ser).serialize(content.data, w)
}
def deserialize(r:java.io.Reader): NamedSend = {
NamedSend(Symbol(StringRep(ser).deserialize(r)),
StringRep(ser).deserialize(r))
}
}
*/

View File

@ -0,0 +1,36 @@
package scala.actors.distributed;
import scala.actors.distributed.picklers.BytePickle._;
import scala.actors.multi.Pid;
object MessagesComb {
def sendPU(ser: Serializer): SPU[Send] =
wrap((p: Pair[Pid,Array[byte]]) => Send(p._1, p._2),
(s: Send) => Pair(s.rec, s.data),
pair(ser.pid, bytearray));
def spawnPU(ser: Serializer): SPU[Spawn] =
wrap((p: Pair[Pid,String]) => Spawn(p._1, p._2),
(s: Spawn) => Pair(s.replyto, s.p),
pair(ser.pid, string));
def symbolPU: SPU[Symbol] =
wrap((s: String) => Symbol(s),
(sym: Symbol) => sym.name,
string);
def exitPU(ser: Serializer): SPU[Exit1] =
wrap((p: Triple[Pid,Pid,Symbol]) => Exit1(p._1, p._2, p._3),
(e: Exit1) => Triple(e.from, e.to, e.reason),
triple(ser.pid, ser.pid, symbolPU));
def spawnObjectPU(ser: Serializer): SPU[SpawnObject] =
wrap((p: Pair[Pid,Array[byte]]) => SpawnObject(p._1, p._2),
(s: SpawnObject) => Pair(s.replyto, s.data),
pair(ser.pid, bytearray));
def namedSendPU: SPU[NamedSend] =
wrap((p: Pair[Symbol,Array[byte]]) => NamedSend(p._1, p._2),
(ns: NamedSend) => Pair(ns.sym, ns.data),
pair(symbolPU, bytearray));
}

View File

@ -0,0 +1,7 @@
package scala.actors.distributed;
case class Name(node: Node, sym: Symbol, kernel: NetKernel) {
def !(msg: AnyRef): unit = {
kernel.namedSend(this, msg)
}
}

View File

@ -0,0 +1,591 @@
package scala.actors.distributed
import java.io.StringReader
import java.io.StringWriter
import java.util.logging._
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
import java.net.UnknownHostException
import java.io.IOException
import java.lang.SecurityException
import scala.actors.multi.Actor
import scala.actors.multi.ExcHandlerDesc
case class RA(a: RemoteActor);
object NetKernel {
var kernel: NetKernel = null;
}
class NetKernel(service: Service) {
NetKernel.kernel = this
// contains constructors
private val ptable =
new HashMap[String, () => RemoteActor];
// maps local ids to scala.actors
private val rtable =
new HashMap[int, RemoteActor];
// maps scala.actors to their RemotePid
private val pidTable =
new HashMap[RemoteActor, RemotePid];
private var running = true;
val logLevel = Level.FINE
val l = Logger.getLogger("NetKernel")
l.setLevel(logLevel)
val consHand = new ConsoleHandler
consHand.setLevel(logLevel)
l.addHandler(consHand)
//start // start NetKernel
/** only called if destDesc is local. */
def handleExc(destDesc: ExcHandlerDesc, e: Throwable) = {
destDesc.pid match {
case rpid: RemotePid =>
(rtable get rpid.localId) match {
case Some(actor) =>
actor.handleExc(destDesc, e)
case None =>
error("exc desc refers to non-registered actor")
}
}
}
def forwardExc(destDesc: ExcHandlerDesc, e: Throwable) = {
// locality check (handler local to this node?)
destDesc.pid match {
case rpid: RemotePid =>
if (rpid.node == this.node)
handleExc(destDesc, e)
else
sendToNode(rpid.node, ForwardExc(destDesc, e))
}
}
def sendToNode(node: Node, msg: AnyRef) = {
//val sw = new StringWriter
val bytes = service.serializer.serialize(msg /*, sw*/)
service.send(node, bytes)
//service.send(node, sw.toString())
}
def addConstructor(key: String, value: () => RemoteActor) =
ptable.update(key, value);
def node: Node = service.node;
def nodes: List[Node] = service.nodes;
def pidOf(actor: RemoteActor): RemotePid = synchronized {
pidTable.get(actor) match {
case None => error("malformed pid table in " + this)
case Some(pid) => pid
}
}
def disconnectNode(n: Node) = synchronized {
service.disconnectNode(n)
}
def getLocalRef(locId: int): RemoteActor =
rtable.get(locId) match {
case None =>
error("" + locId + " is not registered at " + this)
case Some(remoteActor: RemoteActor) =>
remoteActor
}
def localSend(localId: int, msg: AnyRef): unit = synchronized {
rtable.get(localId) match {
case None =>
error("" + localId + " is not registered at " + this)
case Some(remoteActor: RemoteActor) =>
//Console.println("local send to " + remoteActor)
remoteActor send msg
}
}
def localSend(pid: RemotePid, msg: AnyRef): unit =
localSend(pid.localId, msg);
def remoteSend(pid: RemotePid, msg: AnyRef) = synchronized {
//Console.println("NetKernel: Remote msg delivery to " + pid)
service.remoteSend(pid, msg)
}
def namedSend(name: Name, msg: AnyRef): unit = {
if (name.node == this.node) {
// look-up name
nameTable.get(name.sym) match {
case None =>
// message is lost
//Console.println("lost message " + msg + " because " + name + " not registered.")
case Some(localId) => localSend(localId, msg)
}
}
else {
// remote send
// serialize msg
///val sw = new StringWriter
val bytes = service.serializer.serialize(msg/*, sw*/)
sendToNode(name.node, NamedSend(name.sym, bytes))
//sendToNode(name.node, NamedSend(name.sym, sw.toString()))
}
}
val nameTable = new HashMap[Symbol, int]
def registerName(name: Symbol, pid: RemotePid): unit = synchronized {
nameTable += name -> pid.localId
}
def registerName(name: Symbol, a: RemoteActor): unit = synchronized {
val pid = register(a)
registerName(name, pid)
a.start
}
/*override def run: unit = receive {
case ForwardExc(destDesc, e) =>
// TODO
case Spawn(reply: RemotePid, pname) =>
val newPid = spawn(pname)
// need to send back the Pid
remoteSend(reply, newPid)
run
case SpawnObject(reply: RemotePid, data: Array[byte]) =>
//val sr = new StringReader(data)
//service.serializer.deserialize(sr) match {
service.serializer.deserialize(data) match {
case RA(actor) =>
val newPid = register(actor)
//Console.println("Spawned a new " + newProc + " (" + newPid + ")")
actor.start
// need to send back the Pid
remoteSend(reply, newPid)
}
run
case NamedSend(sym: Symbol, data) =>
// look-up name
nameTable.get(sym) match {
case None =>
// message is lost
Console.println("lost message " + data + " because " + sym + " not registered.")
case Some(localId) =>
// deserialize data
//val sr = new StringReader(data)
//val msg = service.serializer.deserialize(sr)
val msg = service.serializer.deserialize(data)
localSend(localId, msg)
}
run
case Send(pid: RemotePid, data) =>
// deserialize data
//val sr = new StringReader(data)
//val msg = service.serializer.deserialize(sr)
val msg = service.serializer.deserialize(data)
Console.println("locally send " + msg + " to " + pid)
localSend(pid, msg)
run
case Link(from:RemotePid, to:RemotePid) =>
// assume from is local
linkFromLocal(from, to)
run
case UnLink(from:RemotePid, to:RemotePid) =>
// assume from is local
unlinkFromLocal(from, to)
run
case Exit1(from:RemotePid, to:RemotePid, reason) =>
// check if "to" traps exit signals
// if so send a message
if (trapExits.contains(to.localId))
// convert signal into message
localSend(to, Exit1(from, to, reason))
else
if (reason.name.equals("normal")) {
// ignore signal
}
else
exit(from, to, reason)
run
}*/
// TODO
/*def isReachable(remoteNode: Node): boolean = {
val pingMsg = new Ping(node)
val sw = new StringWriter
service.serializer.serialize(pingMsg, sw)
service.send(remoteNode, sw.toString())
}*/
def processMsg(msg: AnyRef): unit = synchronized {
msg match {
case Spawn(reply: RemotePid, pname) =>
val newPid = spawn(pname)
// need to send back the Pid
remoteSend(reply, newPid)
case SpawnObject(reply: RemotePid, data) =>
//val sr = new StringReader(data)
//service.serializer.deserialize(sr) match {
service.serializer.deserialize(data) match {
case RA(actor) =>
val newPid = register(actor)
//Console.println("Spawned a new " + newProc + " (" + newPid + ")")
actor.start
// need to send back the Pid
remoteSend(reply, newPid)
}
case NamedSend(sym: Symbol, data) =>
// look-up name
nameTable.get(sym) match {
case None =>
// message is lost
//Console.println("lost message " + msg + " because " + sym + " not registered.")
case Some(localId) =>
// deserialize data
//val sr = new StringReader(data)
//val msg = service.serializer.deserialize(sr)
val msg = service.serializer.deserialize(data)
localSend(localId, msg)
}
case Send(pid: RemotePid, data) =>
// deserialize data
//val sr = new StringReader(data)
//val msg = service.serializer.deserialize(sr)
val msg = service.serializer.deserialize(data)
//Console.println("locally send " + msg + " to " + pid)
localSend(pid, msg)
case Link(from:RemotePid, to:RemotePid) =>
// assume from is local
linkFromLocal(from, to)
case UnLink(from:RemotePid, to:RemotePid) =>
// assume from is local
unlinkFromLocal(from, to)
case Exit1(from:RemotePid, to:RemotePid, reason) =>
// check if "to" traps exit signals
// if so send a message
if (trapExits.contains(to.localId)) {
// convert signal into message
// TODO: simpler message (w/o to) for actor!
localSend(to, Exit1(from, to, reason))
}
else {
if (reason.name.equals("normal")) {
// ignore signal
}
else
exit(from, to, reason)
}
}
}
/* Registers an instance of a remote actor inside this NetKernel.
*/
def register(newProc: RemoteActor): RemotePid = synchronized {
newProc.kernel = this
val newPid = service.createPid(newProc)
rtable += newPid.localId -> newProc
pidTable += newProc -> newPid
newPid
}
// local spawn
def spawn(pname: String): RemotePid = synchronized {
// get constructor out of table
(ptable.get(pname)) match {
case None =>
//error("No constructor found. Cannot start process.")
//null
val newProc = Class.forName(pname).newInstance().asInstanceOf[RemoteActor]
// create Pid for remote communication and register process
val newPid = register(newProc)
//Console.println("Spawned a new " + newProc + " (" + newPid + ")")
newProc.start
newPid
case Some(cons) =>
val newProc = cons()
// create Pid for remote communication and register process
val newPid = register(newProc)
//Console.println("Spawned a new " + newProc + " (" + newPid + ")")
newProc.start
newPid
}
}
// local spawn
def spawn(name: String, arg: RemotePid): RemotePid = synchronized {
val newPid = spawn(name)
localSend(newPid, arg)
newPid
}
// assume this.node != node
def spawn(replyTo: RemotePid, node: Node, a: RemoteActor): unit = {
val ra = RA(a)
//val rsw = new StringWriter
//service.serializer.serialize(ra, rsw)
val bytes = service.serializer.serialize(ra)
//sendToNode(node, SpawnObject(replyTo, rsw.toString()))
sendToNode(node, SpawnObject(replyTo, bytes))
}
def registerSerializer(index: String, rep: Serializer => AnyRef) = {
service.serializer.addRep(index, rep)
// send registering requests to remote nodes
}
// remote spawn
def spawn(replyTo: RemotePid, node: Node, name: String): RemotePid = synchronized {
// check if actor is to be spawned locally
if (node == this.node) {
val newPid = spawn(name)
newPid
}
else {
sendToNode(node, Spawn(replyTo, name))
null // pid needs to be sent back
}
}
/* Spawns a new actor (locally), executing "fun".
*/
def spawn(fun: RemoteActor => unit): RemotePid = synchronized {
val newProc = new RemoteActor {
override def run: unit =
fun(this);
}
// create Pid for remote communication and register process
val newPid = register(newProc)
//Console.println("Spawned a new " + newProc + " (" + newPid + ")")
newProc.start
newPid
}
/* Spawns a new actor (locally), executing "fun".
*/
def spawnLink(pid: RemotePid, fun: RemoteActor => unit): RemotePid = synchronized {
val newProc = new RemoteActor {
override def run: unit =
fun(this);
}
// create Pid for remote communication and register process
val newPid = register(newProc)
//Console.println("Spawned a new " + newProc + " (" + newPid + ")")
// link new process to pid (assume pid is local)
link(pid, newPid)
newProc.start
newPid
}
// maps local ids to their linked pids
private val links = new HashMap[int,HashSet[RemotePid]];
// which of the local processes traps exit signals?
private val trapExits = new HashSet[int];
def processFlag(pid: RemotePid, flag: Symbol, set: boolean) = synchronized {
if (flag.name.equals("trapExit")) {
if (trapExits.contains(pid.localId) && !set)
trapExits -= pid.localId
else if (!trapExits.contains(pid.localId) && set)
trapExits += pid.localId
}
}
// assume from.node == this.node
private def unlinkFromLocal(from: RemotePid, to: RemotePid): unit =
links.get(from.localId) match {
case None =>
// has no links -> ignore
case Some(set) =>
set -= to
if (set.size == 0) links -= from.localId
};
/*
unlinks bi-directional link
assume from.node == this.node
*/
def unlink(from: RemotePid, to: RemotePid): unit = synchronized {
unlinkFromLocal(from, to)
if (to.node == this.node)
unlinkFromLocal(to, from)
else
// (2) send message to NetKernel of "to" to unlink a
// uni-directional link from "to" to "from"
sendToNode(to.node, UnLink(to, from))
}
// assume from.node == this.node
private def linkFromLocal(from: RemotePid, to: RemotePid): unit =
// TODO: send Exit to from if to is invalid
links.get(from.localId) match {
case None =>
// from has no links, yet
val linksTo = new HashSet[RemotePid]
linksTo += to
links += from.localId -> linksTo
case Some(set) =>
set += to
};
/*
creates bi-directional link
assume from.node == this.node
*/
def link(from: RemotePid, to: RemotePid): unit = synchronized {
// (1) create locally a uni-directional link
linkFromLocal(from, to)
if (to.node == this.node)
linkFromLocal(to, from)
else
// (2) send message to NetKernel of "to" to create a
// uni-directional link from "to" to "from"
sendToNode(to.node, Link(to, from))
}
// Assume "to" is local.
def exit(from: RemotePid, to: RemotePid, reason: Symbol): unit = {
// remove link
unlinkFromLocal(to, from)
exit(to, reason)
}
val exitMarks = new HashSet[RemotePid]
/*
If reason is unequal to 'normal then
this will cause all linked processes to
(transitively) terminate abnormally.
Assume pid is local.
*/
def exit(pid: RemotePid, reason: Symbol): unit = synchronized {
if (!(exitMarks contains pid)) {
exitMarks += pid // mark pid as exiting
//Console.println("" + pid + " is exiting (" + reason + ").")
// first look-up remote actor in rtable
val actor = rtable(pid.localId)
// remove from table of running processes
rtable -= pid.localId
// remove from pid table
pidTable -= actor
// send exit signals to linked processes
links.get(pid.localId) match {
case None =>
//Console.println("no linked processes")
case Some(set) => // set of remote pids that we want to terminate
//Console.println("sending exit signals to linked processes")
val iter = set.elements
while (iter.hasNext) {
val linkedPid = iter.next
unlinkFromLocal(pid, linkedPid)
if (linkedPid.node == this.node) {
unlinkFromLocal(linkedPid, pid)
if (trapExits.contains(linkedPid.localId))
localSend(linkedPid, Exit1(pid, linkedPid, reason))
else if (!reason.name.equals("normal"))
exit(linkedPid, reason)
}
else
sendToNode(linkedPid.node,
Exit1(pid, linkedPid, reason))
}
exitMarks -= pid
}
}
}
private val monNodes =
new HashMap[Node,HashMap[RemotePid,int]];
def monitorNode(client: RemotePid, mnode: Node, cond: boolean) = synchronized {
monNodes.get(mnode) match {
case None =>
// nobody is monitoring this node
if (cond) {
val map = new HashMap[RemotePid,int]
map += client -> 1
monNodes += mnode -> map
}
case Some(map) =>
map.update(client, map(client) + (if (cond) 1 else -1))
}
// if no connection exists:
// try connecting, if it fails deliver nodedown msg
if (cond && !service.isConnected(mnode)) {
try {
service.connect(mnode)
}
catch {
case uhe: UnknownHostException =>
nodeDown(mnode)
case ioe: IOException =>
nodeDown(mnode)
case se: SecurityException =>
nodeDown(mnode)
}
}
}
def nodeDown(mnode: Node) = {
// send NodeDown msg to registered RemotePids
monNodes.get(mnode) match {
case None =>
// nobody is monitoring this node
case Some(map) =>
// iterate over keys (RemotePids of interested clients)
val iter = map.keys
while (iter.hasNext) {
val client = iter.next
for (val i <- List.range(0, map(client))) {
// send nodedown msg
client ! Pair(NodeDown(), mnode)
}
}
}
}
}

View File

@ -0,0 +1,6 @@
package scala.actors.distributed;
[serializable] abstract class Node;
[serializable] case class TcpNode(address: String, port: int) extends Node;
[serializable] case class JXTANode(name: String) extends Node;

View File

@ -0,0 +1,12 @@
package scala.actors.distributed;
import scala.actors.distributed.picklers.BytePickle._;
object NodeComb {
def tcpNodePU: SPU[TcpNode] =
wrap((p: Pair[String,int]) => TcpNode(p._1, p._2),
(n: TcpNode) => Pair(n.address, n.port), pair(string, nat));
def jxtaNodePU: SPU[JXTANode] =
wrap((s: String) => JXTANode(s),
(n: JXTANode) => n.name, string);
}

View File

@ -0,0 +1,168 @@
package scala.actors.distributed
import scala.actors.multi.MailBox
import scala.actors.multi.Actor
import scala.actors.multi.Pid
import scala.actors.multi.LocalPid
import scala.actors.multi.ExcHandlerDesc
import scala.collection.mutable.HashMap
import scala.collection.mutable.Stack
abstract class ServiceName
case class JXTA(groupName: String) extends ServiceName
case class TCP() extends ServiceName
class RemoteActor extends Actor {
override def forwardExc(destDesc: ExcHandlerDesc, e: Throwable) = {
// locality check (handler local to this actor?)
if (destDesc.pid == self)
handleExc(destDesc, e)
else
kernel.forwardExc(destDesc, e)
}
override def receive(f: PartialFunction[Message,unit]): scala.All = {
if (isAlive) {
continuation = null
sent.dequeueFirst(f.isDefinedAt) match {
case Some(msg) =>
try {
f(msg)
}
catch {
case d: Done =>
throw new Done
case t: Throwable =>
if (!excHandlerDescs.isEmpty)
forwardExc(excHandlerDescs.top, t)
else
die(Symbol(t.toString()))
}
die()
case None =>
continuation = f
Debug.info("No msg found. " + this + " has continuation " + continuation + ".")
}
}
throw new Done
}
var kernel: NetKernel = null;
def node =
self.node
def nodes = kernel.nodes
private var selfCached: RemotePid = null
override def self: RemotePid = {
if (selfCached == null)
selfCached = kernel pidOf this
selfCached
}
def serialize(index: String, rep: Serializer => AnyRef) =
kernel.registerSerializer(index, rep);
def alive(s: ServiceName): unit = {
var service: Service = null
s match {
case TCP() =>
val serv = new TcpService(TcpService.generatePort)
service = serv
serv.start()
/*case JXTA(groupName) =>
val serv = new ch.epfl.lamp.scala.actors.jxta.JXTAService("AliveActor"
+ new java.util.Date().getTime() + "-"
+ new java.util.Random().nextInt(1000),
java.util.logging.Level.FINEST) {
//millis before we give up group creation and create the group.
override def TIME_BEFORE_AUTO_GROUP_CREATE: long = 30000
val PIPE_ID:String="1"
//val ADV_LIFETIME:long = 1 * 60 * 60 * 1000 //millis to keep advertisements ...
override def MY_GROUP_NAME:String = groupName
/*val SENDER_MESSAGE = "PalcomDemo"; //used to identify the message element in jxta messages
val PIPE_BASE_ID:String = "PIPE4Pal4"+MY_GROUP_NAME;
val MESSAGE_THRESHOLD = 5;*/
}
service = serv
serv.start()*/
case _ => throw new Exception ("Unknown Service in RemoteActor")
}
// create RemotePid
selfCached = service.kernel.register(this)
}
def node(pid: Pid): Node = pid match {
case rpid: RemotePid => rpid.node
case lpid: LocalPid => null
}
def disconnectNode(node: Node) =
kernel.disconnectNode(node);
//does not call start def of Actor
def register(name: Symbol, pid: RemotePid): unit =
kernel.registerName(name, pid);
//calls start def of Actor
def register(name: Symbol, a: RemoteActor): unit =
kernel.registerName(name, a);
def name(node: Node, sym: Symbol): Name =
Name(node, sym, kernel)
def spawn(node: Node, name: String): RemotePid =
kernel.spawn(self, node, name);
def spawn(node: Node, a: RemoteActor): unit =
kernel.spawn(self, node, a);
def spawn(fun: RemoteActor => unit): RemotePid =
kernel.spawn(fun);
def spawn(a: RemoteActor): RemotePid = {
val pid = kernel.register(a)
a.start
pid
}
def spawnLink(fun: RemoteActor => unit): RemotePid =
kernel.spawnLink(self, fun);
def monitorNode(node: Node, cond: boolean) =
kernel.monitorNode(self, node, cond);
// this should be:
// self.link(pid)
// if self is RemotePid it will invoke NetKernel
def link(pid: RemotePid): unit =
kernel.link(self, pid);
def unlink(pid: RemotePid): unit =
kernel.unlink(self, pid);
override def exit(reason: Symbol): unit =
kernel.exit(self, reason);
override def processFlag(flag: Symbol, set: boolean) =
kernel.processFlag(self, flag, set);
override def die(reason: Symbol) = {
if (isAlive) {
isAlive = false
Debug.info("" + this + " died.")
kernel.exit(self, reason)
}
}
override def die() = {
if (isAlive) {
isAlive = false
Debug.info("" + this + " died.")
kernel.exit(self, 'normal)
}
}
}

View File

@ -0,0 +1,184 @@
package scala.actors.distributed
import scala.actors.multi.Pid
import scala.actors.multi.MailBox
import scala.actors.multi.ExcHandlerDesc
import java.io._
[serializable]
abstract class RemotePid(locId: int, kern: NetKernel, actor: RemoteActor) extends Pid {
def this() = this(0, null, null) // for serialization
private var _locId = locId
override def equals(that: Any) = that match {
case rpid: RemotePid =>
(this.node == rpid.node && this.localId == rpid.localId)
case _ => false
}
//[throws(classOf[IOException])]
private def writeObject(out: ObjectOutputStream): Unit = {
//Console.println("writing locID"+locId)
out.writeInt(locId)
}
//[throws(classOf[IOException]), throws(classOf[ClassNotFoundException])]
private def readObject(in: ObjectInputStream): Unit = {
_locId = in.readInt()
//Console.println("read _locID"+_locId)
}
//[throws(classOf[ObjectStreamException])]
private def readResolve(): AnyRef = {
Console.println("readResolve")
null
//build nothing. Subclasses will do...
}
def node: Node;
def localId: int = locId;
def kernel = kern;
def !(msg: MailBox#Message): unit = {
//Console.println("! " + msg)
if (actor != null)
actor send msg
else
kernel.remoteSend(this, msg)
}
def link(other: Pid): unit =
other match {
case rpid: RemotePid =>
kernel.link(this, rpid)
};
//TODO (dont know if this is local to kernel.node)
def linkTo(other: Pid): unit =
other match {
case rpid: RemotePid =>
// do nothing
};
def unlink(other: Pid): unit =
other match {
case rpid: RemotePid =>
kernel.unlink(this, rpid)
};
//TODO (dont know if this is local to kernel.node)
def unlinkFrom(other: Pid): unit =
other match {
case rpid: RemotePid =>
// do nothing
};
def exit(reason: Symbol): unit = kernel.exit(this, reason);
def exit(from: Pid, reason: Symbol): unit = {
from match {
case rpid: RemotePid =>
kernel.exit(rpid, reason);
}
}
def handleExc(destDesc: ExcHandlerDesc, e: Throwable): unit = {}
}
[serializable] case class TcpPid(n: TcpNode, locId: int, kern: NetKernel, actor: RemoteActor) extends RemotePid(locId, kern, actor) {
def node: TcpNode = n;
private var _locId = locId
private var _node = n
override def equals(that: Any) =
super.equals(that)
//[throws(classOf[IOException])]
private def writeObject(out: ObjectOutputStream): Unit = {
out.writeInt(locId)
out.writeObject(n)
}
//[throws(classOf[IOException]), throws(classOf[ClassNotFoundException])]
private def readObject(in: ObjectInputStream): Unit = {
_locId = in.readInt()
_node = in.readObject().asInstanceOf[TcpNode]
}
//[throws(classOf[ObjectStreamException])]
private def readResolve(): AnyRef = {
val kernel = NetKernel.kernel;
//TODO val actor = kernel.getLocalRef(_locId)
TcpPid(_node, _locId, kernel, actor)
}
}
[serializable] case class JXTAPid(n: JXTANode, locId: int, kern: NetKernel, actor: RemoteActor) extends RemotePid(locId, kern, actor) {
def node: JXTANode = n;
private var _locId = locId
private var _node = n
override def equals(that: Any) =
super.equals(that)
//[throws(classOf[IOException])]
private def writeObject(out: ObjectOutputStream): Unit = {
out.writeInt(locId)
out.writeObject(n)
}
//[throws(classOf[IOException]), throws(classOf[ClassNotFoundException])]
private def readObject(in: ObjectInputStream): Unit = {
_locId = in.readInt()
_node = in.readObject().asInstanceOf[JXTANode]
}
//[throws(classOf[ObjectStreamException])]
private def readResolve(): AnyRef = {
val kernel = NetKernel.kernel;
//TODO val actor = kernel.getLocalRef(_locId)
JXTAPid(_node, _locId, kernel, actor)
}
}
//================================================================================
object CaseTest {
def getBytes(obj: AnyRef): Array[byte] = {
val bos = new ByteArrayOutputStream()
val out = new ObjectOutputStream(bos)
out.writeObject(obj)
out.flush()
bos.toByteArray()
}
def getObject(a: Array[byte]): AnyRef = {
val bis = new ByteArrayInputStream(a)
val in = new ObjectInputStream(bis)
val obj = in.readObject()
obj
}
def main(args: Array[String]): Unit = {
val node = JXTANode ("test node");
val pid1 = JXTAPid (node, 4, null, null);
val pid2 = JXTAPid (node, 4, new NetKernel(null), null);
Console.println("node Before: " + node)
Console.println("node After : " + getObject(getBytes(node)))
Console.println("pid1 Before: " + pid1)
Console.println("pid1 After : " + getObject(getBytes(pid1)))
Console.println("pid2 Before: " + pid2)
Console.println("pid2 After : " + getObject(getBytes(pid2)))
Console.println("pid2 After : " + getObject((new String (getBytes(pid2))).getBytes))
}
}

View File

@ -0,0 +1,53 @@
package scala.actors.distributed;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import scala.actors.distributed.picklers.BytePickle.SPU;
import scala.actors.multi.Pid;
abstract class Serializer(s: Service) {
def serialize(o: AnyRef/*, w: Writer*/): Array[byte];
def deserialize(a: Array[byte]/*r: Reader*/): AnyRef;
// throws IOException
def readBytes(inputStream: DataInputStream): Array[byte] = {
try {
val length = inputStream.readInt();
val bytes = new Array[byte](length);
inputStream.readFully(bytes, 0, length);
return bytes;
}
catch {
case npe: NullPointerException => {
throw new EOFException("Connection closed.");
}
}
}
// throws IOException, ClassNotFoundException
def readObject(inputStream: DataInputStream): AnyRef = {
val bytes = readBytes(inputStream);
deserialize(bytes);
}
// throws IOException
def writeBytes(outputStream: DataOutputStream, bytes: Array[byte]): unit = {
val length = bytes.length;
// original length
outputStream.writeInt(length);
outputStream.write(bytes, 0, length);
outputStream.flush();
}
// throws IOException
def writeObject(outputStream: DataOutputStream, obj: AnyRef) = {
val bytes = serialize(obj);
writeBytes(outputStream, bytes);
}
def pid: SPU[Pid];
def service = s;
def addRep(name: String, repCons: Serializer => AnyRef): unit;
}

View File

@ -0,0 +1,72 @@
package scala.actors.distributed;
import java.io.StringWriter;
trait Service {
val serializer: Serializer;
def node: Node;
def createPid(actor: RemoteActor): RemotePid;
def send(node: Node, data: Array[byte]): unit;
def connect(node: Node): unit; // non blocking.
def disconnectNode(node: Node): unit;
def isConnected(node: Node): boolean;
//blocking. timeout depends on Implementation.
def isReachable(node: Node): boolean;
def getRoundTripTimeMillis(node:Node): long; //blocking
def nodes:List[Node]
// implemented parts:
private val kern = new NetKernel(this);
def kernel = kern;
def spawn(name: String): RemotePid =
kern spawn name;
def spawn(name: String, arg: RemotePid): RemotePid =
kern.spawn(name, arg);
//suggested addition by seb
def spawn(fun: RemoteActor => unit): RemotePid =
kernel.spawn(fun);
def spawn(a:RemoteActor):RemotePid = {
//Console.println("Service:spawn(RemoteActor)")
val pid = kernel.register(a)
//Console.println("RemoteActor("+a+") registered in kernel")
a.start
//Console.println("RemoteActor("+a+") started")
pid
}
def send(pid: RemotePid, msg: AnyRef): unit = synchronized {
if (pid.node == this.node)
kernel.localSend(pid, msg)
else
kernel.remoteSend(pid, msg)
}
def remoteSend(pid: RemotePid, msg: AnyRef): unit = synchronized {
//Console.println("Service: Sending " + msg + " to " + pid)
// lets try to serialize the message
//val sw = new StringWriter
//serializer.serialize(msg, sw)
val bytes = serializer.serialize(msg)
//val sendMsg = Send(pid, sw.toString())
val sendMsg = Send(pid, bytes)
//val sw2 = new StringWriter
//serializer.serialize(sendMsg, sw2)
//send(pid.node, sw2.toString())
val bytes2 = serializer.serialize(sendMsg)
send(pid.node, bytes2)
}
private var idCnt = 0;
def makeUid = { idCnt = idCnt + 1; idCnt }
}

View File

@ -0,0 +1,126 @@
package scala.actors.distributed;
import java.io._;
import scala.collection.mutable._;
import scala.actors.distributed.picklers.BytePickle._;
import scala.actors.distributed.MessagesComb._;
import scala.actors.distributed.NodeComb._;
import scala.actors.multi._;
//import scala.actors.distributed.examples.CounterMessagesComb._;
//TODO: change Service to NetKernel in Serializer interface
class TcpSerializerComb(serv: Service) extends Serializer(serv) {
private def lookup(typename: String): PU[AnyRef] = {
val op = table.get(typename)
op match {
case None =>
error("No type representation found.")
null
case Some(rep) =>
val repr = rep.asInstanceOf[PU[AnyRef]]
repr
}
}
private def lookup(r: Reader): PU[AnyRef] = {
// read length of type name
val carr = new Array[char](8)
r.read(carr)
val len = Util.decode(new String(carr))
val content = new Array[char](len)
r.read(content)
lookup(new String(content))
}
def pid: SPU[Pid] = {
val nodeIntPU = wrap((p: Pair[TcpNode,int]) => TcpPid(p._1, p._2, serv.kernel,
if (p._1 == serv.node) serv.kernel.getLocalRef(p._2)
else null),
(t: TcpPid) => Pair(t.node, t.localId),
pair(tcpNodePU, nat));
wrap((p:Pid) => p, (pid:Pid) => pid match {
case tpid: TcpPid =>
tpid
case other =>
error("no instance of TcpPid!!")
}, nodeIntPU)
}
def anyRef: SPU[AnyRef] =
wrap((typename: String) => Class.forName(typename).newInstance().asInstanceOf[AnyRef],
(obj: AnyRef) => Util.baseName(obj),
string);
def actorPU: SPU[RA] =
wrap((typename: String) => RA(Class.forName(typename).newInstance().asInstanceOf[RemoteActor]),
(obj: RA) => Util.baseName(obj.a),
string);
val log = new Debug("TcpSerializerComb")
log.level = 3
val table = new HashMap[String, AnyRef]
initialize
def initialize = {
table += "int" -> nat
table += "Send" -> sendPU(this)
table += "Spawn" -> spawnPU(this)
table += "TcpNode" -> tcpNodePU
table += "TcpPid" -> pid
table += "Exit" -> exitPU(this)
table += "AnyRef" -> anyRef
table += "RA" -> actorPU
table += "SpawnObject" -> spawnObjectPU(this)
//table += "Incr" -> incrPU(this)
//table += "Value" -> valuePU(this)
//table += "Result" -> resultPU(this)
}
def addRep(name: String, repCons: Serializer => AnyRef) =
table.update(name, repCons(this));
def +=(name: String) =
new InternalMapTo(name);
class InternalMapTo(name: String) {
def ->(repCons: Serializer => AnyRef): unit =
table.update(name, repCons(TcpSerializerComb.this));
}
def serialize(o: AnyRef): Array[byte] = {
log.info("pickling value of type " + Util.baseName(o));
val op = table.get(Util.baseName(o));
op match {
case None => error("No type representation for " + Util.baseName(o) + " found. Cannot serialize.");
case Some(rep) =>
// first write type name
val bytes = pickle(string, Util.baseName(o))
val repr = rep.asInstanceOf[SPU[AnyRef]];
log.info("using type representation " + repr);
val res = repr.appP(o, new PicklerState(bytes, new PicklerEnv)).stream
res
}
}
def deserialize(bytes: Array[byte]): AnyRef = {
val ups = string.appU(new UnPicklerState(bytes, new UnPicklerEnv))
val typename = ups._1
table.get(typename) match {
case None => error("No type representation for " + typename + " found. Cannot deserialize.")
case Some(rep) =>
val repr = rep.asInstanceOf[SPU[AnyRef]];
val obj = repr.appU(ups._2)._1
log.info("unpickling successful")
obj
}
}
}

View File

@ -0,0 +1,215 @@
package scala.actors.distributed;
import java.net._;
import java.io._;
import java.util.logging._;
object TcpService {
val random = new java.util.Random(0)
def generatePort: int = {
var portnum = 0
try {
portnum = 8000 + random.nextInt(500)
val socket = new ServerSocket(portnum)
socket.close()
}
catch {
case ioe: IOException =>
// this happens when trying to open a socket twice at the same port
// try again
generatePort
case se: SecurityException =>
// do nothing
}
portnum
}
}
object TestPorts {
def main(args: Array[String]): unit = {
val random = new java.util.Random(0)
val socket = new ServerSocket(8000 + random.nextInt(500))
Console.println(TcpService.generatePort)
}
}
class TcpService(port: int) extends Thread with Service {
val serializer: JavaSerializer = new JavaSerializer(this);
private val internalNode = new TcpNode(InetAddress.getLocalHost().getHostAddress(), port);
def node: TcpNode = internalNode;
def createPid(actor: RemoteActor): RemotePid =
new TcpPid(internalNode, makeUid, kernel, actor)
def send(node: Node, data: String): unit = synchronized {
// retrieve worker thread (if any) that already has connection
node match {
case tnode: TcpNode =>
getConnection(tnode) match {
case None =>
// we are not connected, yet
Console.println("We are not connected, yet.");
val newWorker = connect(tnode); //bad in a sync BLOCK!!!
newWorker transmit data
case Some(worker) => worker transmit data
}
case any => error("no TcpNode!");
}
}
def send(node: Node, data: Array[byte]): unit = synchronized {
// retrieve worker thread (if any) that already has connection
node match {
case tnode: TcpNode =>
getConnection(tnode) match {
case None =>
// we are not connected, yet
Console.println("We are not connected, yet.");
val newWorker = connect(tnode); //bad in a sync BLOCK!!!
newWorker transmit data
case Some(worker) => worker transmit data
}
case any => error("no TcpNode!");
}
}
override def run(): unit = {
try {
val socket = new ServerSocket(port);
Console.println("Tcp Service started: " + node);
while (true) {
val nextClient = socket.accept();
Console.println("Received request from " + nextClient.getInetAddress() + ":" + nextClient.getPort());
// this is bad because client will have other port than actual node
// solution: worker should read node from stream
// and call main thread to update connection table
// spawn new worker thread
val worker = new TcpServiceWorker(this, nextClient);
worker.readNode;
// start worker thread
worker.start()
}
}
catch {
case ioe:IOException => {
// do nothing
}
case sec:SecurityException => {
// do nothing
}
}
}
// connection management
private val connections = new scala.collection.mutable.HashMap[TcpNode,TcpServiceWorker];
def nodes:List[Node] = throw new Exception ("nodes need to be implemented in TcpService!")
def addConnection(n: TcpNode, w: TcpServiceWorker) = synchronized {
connections += n -> w
}
def getConnection(n: TcpNode) = synchronized {
connections.get(n)
}
def isConnected(n: Node): boolean = synchronized {
n match {
case tnode: TcpNode =>
connections.get(tnode) match {
case None => false
case Some(x) => true
}
case _ => false
}
}
def connect(n: Node): unit = synchronized {
n match {
case tnode: TcpNode =>
connect(tnode)
}
}
def connect(n: TcpNode): TcpServiceWorker = synchronized {
Console.println("" + node + ": Connecting to node " + n + " ...")
val sock = new Socket(n.address, n.port)
Console.println("Connected.")
// spawn new worker thread
val worker = new TcpServiceWorker(this, sock)
worker.sendNode;
// start worker thread
worker.start()
// register locally (we want to reuse connections which correspond to connected sockets)
// update connection table
addConnection(n, worker)
worker
}
def disconnectNode(n: Node) = synchronized {
n match {
case node: TcpNode =>
Console.println("Disconnecting from " + node + " ...")
connections.get(node) match {
case None => Console.println("Cannot disconnect from " + node + ". Not connected.")
case Some(worker) =>
//TODO: sending disconnect message
//worker.sendDisconnect;
// update table
connections -= node
Console.println("Halting worker...")
worker.halt
}
case any => error("No TcpNode!!");
}
}
def isReachable(node: Node): boolean =
if (isConnected(node)) true
else try {
connect(node)
return true
}
catch {
case uhe: UnknownHostException =>
false
case ioe: IOException =>
false
case se: SecurityException =>
false
}
def getRoundTripTimeMillis(node: Node): long = 0
def nodeDown(mnode: TcpNode): unit = synchronized {
kernel nodeDown mnode
connections -= mnode
}
/*def closeConnection(worker: TcpServiceWorker): unit = synchronized {
connections.get(worker) match {
case None =>
System.err.println("Worker " + worker + " not registered.");
case Some(socket) => {
try {
socket.close();
connections -= worker
}
catch {
case ioe:IOException =>
System.err.println("Couldn't close connection.");
connections -= worker
}
}
};
System.out.println("OK. Connection closed.")
}*/
}

View File

@ -0,0 +1,86 @@
package scala.actors.distributed;
import java.net._;
import java.io._;
class TcpServiceWorker(parent: TcpService, so: Socket) extends Thread {
val in = so.getInputStream();
val out = so.getOutputStream();
val datain = new DataInputStream(in);
val dataout = new DataOutputStream(out);
val reader = new BufferedReader(new InputStreamReader(in));
val writer = new PrintWriter(new OutputStreamWriter(out));
val log = new Debug("TcpServiceWorker")
log.level = 2
def transmit(msg: Send): unit = synchronized {
val data = parent.serializer.serialize(msg);
transmit(data);
}
def transmit(data: String): unit = synchronized {
log.info("Transmitting " + data)
writer.write(data)
writer.flush()
}
def transmit(data: Array[byte]): unit = synchronized {
log.info("Transmitting " + data)
dataout.writeInt(data.length);
dataout.write(data)
dataout.flush()
}
def sendNode = {
Console.println("Sending our name " + parent.node);
parent.serializer.writeObject(dataout, parent.node);
}
var connectedNode: TcpNode = _;
def readNode = {
Console.println("" + parent.node + ": Reading node name...");
//val node = parent.serializer.deserialize(reader);
val node = parent.serializer.readObject(datain);
Console.println("Connection from " + node);
node match {
case n: TcpNode => {
connectedNode = n
Console.println("Adding connection to " + node + " to table.");
parent.addConnection(n, this)
}
}
}
var running = true;
def halt = synchronized {
so.close(); // close socket
running = false; // stop
}
override def run(): unit = {
try {
while (running) {
if (in.available() > 0) {
log.info("deserializing...");
//val msg = parent.serializer.deserialize(reader);
val msg = parent.serializer.readObject(datain);
log.info("Received object: " + msg);
parent.kernel.processMsg(msg)
}
}
}
catch {
case ioe:IOException =>
Console.println("" + ioe + " while reading from socket.");
parent nodeDown connectedNode
case e:Exception =>
// catch-all
Console.println("" + e + " while reading from socket.");
parent nodeDown connectedNode
}
}
}

View File

@ -0,0 +1,58 @@
package scala.actors.distributed;
import java.io._;
import scala.collection.mutable._;
object Util {
def pad(s: String, req: int): String = {
val buf = new StringBuffer;
val add: int = req - s.length();
for (val i <- List.range(1, add+1))
buf append "0";
buf append s;
buf.toString()
}
def encode(i: int) = pad(Integer.toHexString(i), 8);
def encode(l: long) = pad(java.lang.Long.toHexString(l), 16);
def decode(s: String): int = Integer.decode("0x" + s).intValue();
def decodeLong(s: String): long = java.lang.Long.decode("0x" + s).longValue();
def baseName(o: Any) = {
val s = o.toString();
def baseName(s: String): String = {
if (s.indexOf('$') != -1)
baseName(s.substring(0,s.indexOf('$')))
else if (s.indexOf('(') != -1)
baseName(s.substring(0,s.indexOf('(')))
else if (s.indexOf('@') != -1)
baseName(s.substring(0,s.indexOf('@')))
else s
}
baseName(s)
}
def extractArgs(s: String): Buffer[String] = {
// extract strings between first-level commas
var level: int = 0;
val carr: Array[char] = s.toCharArray();
var buf = new StringBuffer; // current string
val args = new ArrayBuffer[String];
for (val i <- List.range(0,carr.length)) {
if ((level == 0) && (carr(i) == ',')) {
// argument finished
args += buf.toString();
buf = new StringBuffer
} else {
if (carr(i) == '(') level = level + 1;
if (carr(i) == ')') level = level - 1;
buf append carr(i)
}
}
args += buf.toString();
args
}
}

View File

@ -0,0 +1,436 @@
package scala.actors.distributed.picklers
import scala.collection.mutable.HashMap
import scala.collection.mutable.ArrayBuffer
/**
Pickler combinators.
Author: Philipp Haller <philipp.haller@epfl.ch>
*/
object BytePickle {
class PicklerState(val stream: Array[byte], val dict: PicklerEnv) {}
class UnPicklerState(val stream: Array[byte], val dict: UnPicklerEnv) {}
abstract class PU[t] {
def appP(a: t, state: Array[byte]): Array[byte];
def appU(state: Array[byte]): Pair[t, Array[byte]];
}
abstract class SPU[t] {
def appP(a: t, state: PicklerState): PicklerState;
def appU(state: UnPicklerState): Pair[t, UnPicklerState];
}
class PicklerEnv extends HashMap[Any, int] {
private var cnt: int = 64;
def nextLoc() = { cnt = cnt + 1; cnt };
}
class UnPicklerEnv extends HashMap[int, Any] {
private var cnt: int = 64;
def nextLoc() = { cnt = cnt + 1; cnt };
}
abstract class RefDef;
case class Ref() extends RefDef;
case class Def() extends RefDef;
def refDef: PU[RefDef] = new PU[RefDef] {
def appP(b: RefDef, s: Array[byte]): Array[byte] =
b match {
case Ref() => Array.concat(s, (List[byte](0)).toArray)
case Def() => Array.concat(s, (List[byte](1)).toArray)
};
def appU(s: Array[byte]): Pair[RefDef, Array[byte]] =
if (s(0) == 0) Pair(Ref(), s.subArray(1, s.length))
else Pair(Def(), s.subArray(1, s.length));
}
val REF = 0
val DEF = 1
def unat: PU[int] = new PU[int] {
def appP(n: int, s: Array[byte]): Array[byte] =
Array.concat(s, nat2Bytes(n));
def appU(s: Array[byte]): Pair[int, Array[byte]] = {
var num = 0
def readNat: int = {
var b = 0;
var x = 0;
do {
b = s(num)
num = num + 1
x = (x << 7) + (b & 0x7f);
} while ((b & 0x80) != 0);
x
}
Pair(readNat, s.subArray(num, s.length))
}
}
def share[a](pa: SPU[a]): SPU[a] = new SPU[a] {
def appP(v: a, state: PicklerState): PicklerState = {
/*
- is there some value equal to v associated with a location l in the pickle environment?
- yes: write REF-tag to outstream together with l
- no:
write DEF-tag to outstream
record current location l of outstream
--> serialize value
add entry to pickle environment, mapping v onto l
*/
val pe = state.dict
pe.get(v) match {
case None =>
//Console.println("" + v + " is new")
//Console.println("writing DEF...")
val sPrime = refDef.appP(Def(), state.stream)
val l = pe.nextLoc()
//Console.println("applying pickler to state " + sPrime)
val sPrimePrime = pa.appP(v, new PicklerState(sPrime, pe))
//Console.println("updating dict (" + l + ") for " + v)
pe.update(v, l)
return sPrimePrime
case Some(l) =>
//Console.println("writing REF...")
val sPrime = refDef.appP(Ref(), state.stream)
//Console.println("writing location to state " + sPrime)
return new PicklerState(unat.appP(l, sPrime), pe)
}
}
def appU(state: UnPicklerState): Pair[a, UnPicklerState] = {
/*
- first, read tag (i.e. DEF or REF)
- if REF:
read location l
look up resulting value in unpickler environment
- if DEF:
record location l of input stream
--> deserialize value v with argument deserializer
add entry to unpickler environment, mapping l onto v
*/
val upe = state.dict
val res = refDef.appU(state.stream)
res._1 match {
case Def() =>
val l = upe.nextLoc
val res2 = pa.appU(new UnPicklerState(res._2, upe))
upe.update(l, res2._1)
return res2
case Ref() =>
val res2 = unat.appU(res._2) // read location
upe.get(res2._1) match { // lookup value in unpickler env
case None => error("invalid unpickler environment"); return null
case Some(v) => return Pair(v.asInstanceOf[a], new UnPicklerState(res2._2, upe))
}
}
}
}
def upickle[t](p: PU[t], a: t): Array[byte] =
p.appP(a, new Array[byte](0));
def uunpickle[t](p: PU[t], stream: Array[byte]): t =
p.appU(stream)._1;
def pickle[t](p: SPU[t], a: t): Array[byte] =
p.appP(a, new PicklerState(new Array[byte](0), new PicklerEnv)).stream;
def unpickle[t](p: SPU[t], stream: Array[byte]): t =
p.appU(new UnPicklerState(stream, new UnPicklerEnv))._1;
def ulift[t](x: t): PU[t] = new PU[t] {
def appP(a: t, state: Array[byte]): Array[byte] =
if (x != a) { error("value to be pickled (" + a + ") != " + x); state }
else state;
def appU(state: Array[byte]) = Pair(x, state);
}
def lift[t](x: t): SPU[t] = new SPU[t] {
def appP(a: t, state: PicklerState): PicklerState =
if (x != a) { /*error("value to be pickled (" + a + ") != " + x);*/ state }
else state;
def appU(state: UnPicklerState) = Pair(x, state);
}
def usequ[t,u](f: u => t, pa: PU[t], k: t => PU[u]): PU[u] = new PU[u] {
def appP(b: u, s: Array[byte]): Array[byte] = {
val a = f(b)
val sPrime = pa.appP(a, s)
val pb = k(a)
val sPrimePrime = pb.appP(b, sPrime)
sPrimePrime
}
def appU(s: Array[byte]): Pair[u, Array[byte]] = {
val resPa = pa.appU(s)
val a = resPa._1
val sPrime = resPa._2
val pb = k(a)
pb.appU(sPrime)
}
}
def sequ[t,u](f: u => t, pa: SPU[t], k: t => SPU[u]): SPU[u] = new SPU[u] {
def appP(b: u, s: PicklerState): PicklerState = {
val a = f(b)
//Console.println("pickling " + a + ", s: " + s.stream)
val sPrime = pa.appP(a, s)
val pb = k(a)
//Console.println("pickling " + b + ", s: " + s.stream)
pb.appP(b, sPrime)
}
def appU(s: UnPicklerState): Pair[u, UnPicklerState] = {
val resPa = pa.appU(s)
val a = resPa._1
val sPrime = resPa._2
val pb = k(a)
pb.appU(sPrime)
}
}
def upair[a,b](pa: PU[a], pb: PU[b]): PU[Pair[a,b]] = {
def fst(p: Pair[a,b]): a = p._1;
def snd(p: Pair[a,b]): b = p._2;
usequ(fst, pa, (x: a) => usequ(snd, pb, (y: b) => ulift(Pair(x, y))))
}
def pair[a,b](pa: SPU[a], pb: SPU[b]): SPU[Pair[a,b]] = {
def fst(p: Pair[a,b]): a = p._1;
def snd(p: Pair[a,b]): b = p._2;
sequ(fst, pa, (x: a) => sequ(snd, pb, (y: b) => lift(Pair(x, y))))
}
def triple[a,b,c](pa: SPU[a], pb: SPU[b], pc: SPU[c]): SPU[Triple[a,b,c]] = {
def fst(p: Triple[a,b,c]): a = p._1;
def snd(p: Triple[a,b,c]): b = p._2;
def trd(p: Triple[a,b,c]): c = p._3;
sequ(fst, pa,
(x: a) => sequ(snd, pb,
(y: b) => sequ(trd, pc,
(z: c) => lift(Triple(x, y, z)))))
}
def uwrap[a,b](i: a => b, j: b => a, pa: PU[a]): PU[b] =
usequ(j, pa, (x: a) => ulift(i(x)));
def wrap[a,b](i: a => b, j: b => a, pa: SPU[a]): SPU[b] =
sequ(j, pa, (x: a) => lift(i(x)));
def appendByte(a: Array[byte], b: int): Array[byte] = {
Array.concat(a, (List[byte](b.asInstanceOf[byte])).toArray)
}
def nat2Bytes(x: int): Array[byte] = {
val buf = new ArrayBuffer[byte]
def writeNatPrefix(x: int): unit = {
val y = x >>> 7;
if (y != 0) writeNatPrefix(y);
buf += ((x & 0x7f) | 0x80).asInstanceOf[byte];
}
val y = x >>> 7;
if (y != 0) writeNatPrefix(y);
buf += (x & 0x7f).asInstanceOf[byte];
buf.toArray
}
def nat: SPU[int] = new SPU[int] {
def appP(n: int, s: PicklerState): PicklerState = {
new PicklerState(Array.concat(s.stream, nat2Bytes(n)), s.dict);
}
def appU(s: UnPicklerState): Pair[int,UnPicklerState] = {
var num = 0
def readNat: int = {
var b = 0;
var x = 0;
do {
b = s.stream(num)
num = num + 1
x = (x << 7) + (b & 0x7f);
} while ((b & 0x80) != 0);
x
}
Pair(readNat, new UnPicklerState(s.stream.subArray(num, s.stream.length), s.dict))
}
}
def byte: SPU[byte] = new SPU[byte] {
def appP(b: byte, s: PicklerState): PicklerState =
new PicklerState(Array.concat(s.stream, (List[byte](b)).toArray), s.dict);
def appU(s: UnPicklerState): Pair[byte, UnPicklerState] =
Pair(s.stream(0), new UnPicklerState(s.stream.subArray(1, s.stream.length), s.dict));
}
def string: SPU[String] =
share(wrap((a:Array[byte]) => UTF8Codec.decode(a, 0, a.length), (s:String) => UTF8Codec.encode(s), bytearray));
def bytearray: SPU[Array[byte]] = {
wrap((l:List[byte]) => l.toArray, .toList, list(byte))
}
def bool: SPU[boolean] = {
def toEnum(b: boolean) = if (b) 1 else 0;
def fromEnum(n: int) = if (n == 0) false else true;
wrap(fromEnum, toEnum, nat)
}
def ufixedList[a](pa: PU[a])(n: int): PU[List[a]] = {
def pairToList(p: Pair[a,List[a]]): List[a] =
p._1 :: p._2;
def listToPair(l: List[a]): Pair[a,List[a]] =
l match { case x :: xs => Pair(x, xs) }
if (n == 0) ulift(Nil)
else
uwrap(pairToList, listToPair, upair(pa, ufixedList(pa)(n-1)))
}
def fixedList[a](pa: SPU[a])(n: int): SPU[List[a]] = {
def pairToList(p: Pair[a,List[a]]): List[a] =
p._1 :: p._2;
def listToPair(l: List[a]): Pair[a,List[a]] =
l match { case x :: xs => Pair(x, xs) }
if (n == 0) lift(Nil)
else
wrap(pairToList, listToPair, pair(pa, fixedList(pa)(n-1)))
}
def list[a](pa: SPU[a]): SPU[List[a]] =
sequ((l: List[a])=>l.length, nat, fixedList(pa));
def ulist[a](pa: PU[a]): PU[List[a]] =
usequ((l:List[a]) => l.length, unat, ufixedList(pa));
def data[a](tag: a => int, ps: List[()=>SPU[a]]): SPU[a] =
sequ(tag, nat, (x: int)=> ps.apply(x)());
def printByteArray(a: Array[byte]) = {
val iter = a.elements
while (iter.hasNext) {
val el = iter.next
Console.print("" + el + ", ")
}
}
def main(args: Array[String]) = {
// test nat2Bytes
Console.println(printByteArray(nat2Bytes(1)))
Console.println(printByteArray(nat2Bytes(10)))
Console.println(printByteArray(nat2Bytes(16)))
Console.println(printByteArray(nat2Bytes(256)))
Console.println(100000)
var res = pickle(nat, 100000)
Console.println(printByteArray(res))
var up = unpickle(nat, res)
Console.println(up)
// -- int list
val intList = List(1, 7, 13)
Console.println(intList)
val res9 = pickle(list(nat), intList)
Console.println(printByteArray(res9))
val up9 = unpickle(list(nat), res9)
Console.println(up9)
// ---------------
// -- boolean list
val bList = List(false, true, true)
Console.println(bList)
val res2 = pickle(list(bool), bList)
Console.println(printByteArray(res2))
val up2 = unpickle(list(bool), res2)
Console.println(up2)
// -- string
val s = "Hello"
Console.println(s)
val res3 = pickle(string, s)
Console.println(printByteArray(res3))
val up3 = unpickle(string, res3)
Console.println(up3)
val personPU = wrap((p:Pair[String,int]) => Person(p._1, p._2), (p:Person) => Pair(p.name, p.age), pair(string, nat));
val p = Person("Philipp", 25)
Console.println(p)
val res4 = pickle(personPU, p)
Console.println(printByteArray(res4))
val up4 = unpickle(personPU, res4)
Console.println(up4)
val x = Var("x");
val i = Lam("x", x);
val k = Lam("x", Lam("y", x));
val kki = App(k, App(k, i));
/*def varPU: PU[Term] = wrap(Var,
(t: Term)=> t match {case Var(x)=>x},
string);
def lamPU: PU[Term] = wrap(p: Pair[String,Term]=>Lam(p._1, p._2),
(t: Term)=> t match {case Lam(s, t)=>Pair(s, t)},
pair(string, termPU));
def appPU: PU[Term] = wrap(p: Pair[Term,Term]=>App(p._1, p._2),
(t: Term)=> t match {case App(t1, t2)=>Pair(t1, t2)},
pair(termPU, termPU));
def termPU: PU[Term] = data((t: Term)=> t match {case Var(_)=>0; case Lam(_,_)=>1; case App(_,_)=>2},
List(()=>varPU, ()=>lamPU, ()=>appPU));
Console.println("\n" + k);
val res5 = pickle(termPU, k);
Console.println(res5);
val up5 = unpickle(termPU, res5);
Console.println(up5);
Console.println("\n" + kki);
val res6 = pickle(termPU, kki);
Console.println(res6);
Console.println("len: " + res6.length)
val up6 = unpickle(termPU, res6);
Console.println(up6);*/
def varSPU: SPU[Term] = wrap(Var,
(t: Term)=> t match {case Var(x)=>x},
string);
def lamSPU: SPU[Term] = wrap((p: Pair[String,Term])=>Lam(p._1, p._2),
(t: Term)=> t match {case Lam(s, t)=>Pair(s, t)},
pair(string, termSPU));
def appSPU: SPU[Term] = wrap((p: Pair[Term,Term])=>App(p._1, p._2),
(t: Term)=> t match {case App(t1, t2)=>Pair(t1, t2)},
pair(termSPU, termSPU));
def termSPU: SPU[Term] = share(data((t: Term)=> t match {case Var(_)=>0; case Lam(_,_)=>1; case App(_,_)=>2},
List(()=>varSPU, ()=>lamSPU, ()=>appSPU)));
Console.println("\n" + k);
val res8 = pickle(termSPU, k);
Console.println(printByteArray(res8));
Console.println("len: " + res8.length)
val up8 = unpickle(termSPU, res8);
Console.println(up8);
Console.println("\n" + kki);
val res7 = pickle(termSPU, kki);
Console.println(printByteArray(res7));
Console.println("len: " + res7.length)
val up7 = unpickle(termSPU, res7);
Console.println(up7);
}
case class Person(name: String, age: int);
abstract class Term;
case class Var(s: String) extends Term;
case class Lam(s: String, t: Term) extends Term;
case class App(t1: Term, t2: Term) extends Term;
}

View File

@ -0,0 +1,519 @@
package scala.actors.distributed.picklers;
import scala.collection.mutable.HashMap;
import java.io.StringReader;
import java.io.StringWriter;
/**
Pickler combinators.
Author: Philipp Haller <philipp.haller@epfl.ch>
*/
object SStreamPickle {
abstract class PU[t] {
def appP(a: t, state: OutStream): OutStream;
def appU(state: InStream): Pair[t,InStream];
}
//def pickle[t](p: PU[t], a: t): OutStream =
// p.appP(a, "");
def unpickle[t](p: PU[t], stream: InStream): t =
p.appU(stream)._1;
def lift[t](x: t): PU[t] = new PU[t] {
def appP(a: t, state: OutStream): OutStream = state;
def appU(state: InStream) = Pair(x, state);
}
def sequ[t,u](f: u => t, pa: PU[t], k: t => PU[u]): PU[u] = new PU[u] {
def appP(b: u, s: OutStream): OutStream = {
val a = f(b)
val sPrime = pa.appP(a, s)
val pb = k(a)
val sPrimePrime = pb.appP(b, sPrime)
sPrimePrime
}
def appU(s: InStream): Pair[u,InStream] = {
val resPa = pa.appU(s)
val a = resPa._1
val sPrime = resPa._2
val pb = k(a)
pb.appU(sPrime)
}
}
def pair[a,b](pa: PU[a], pb: PU[b]): PU[Pair[a,b]] = {
def fst(p: Pair[a,b]): a = p._1;
def snd(p: Pair[a,b]): b = p._2;
sequ(fst, pa, (x: a) => sequ(snd, pb, (y: b) => lift(Pair(x, y))))
}
def triple[a,b,c](pa: PU[a], pb: PU[b], pc: PU[c]): PU[Triple[a,b,c]] = {
def fst(p: Triple[a,b,c]): a = p._1;
def snd(p: Triple[a,b,c]): b = p._2;
def trd(p: Triple[a,b,c]): c = p._3;
sequ(fst, pa,
(x: a) => sequ(snd, pb,
(y: b) => sequ(trd, pc,
(z: c) => lift(Triple(x, y, z)))))
}
def wrap[a,b](i: a => b, j: b => a, pa: PU[a]): PU[b] =
sequ(j, pa, (x: a) => lift(i(x)));
def unit: PU[unit] =
lift(unit);
def pad(s: String, req: int): String = {
val buf = new StringBuffer
for (val i <- List.range(1, req-s.length+1))
buf append "0"
(buf append s).toString
}
def encode(i: int): String = pad(Integer.toHexString(i), 8);
def decode(s: String): int = Integer.decode("0x" + s).intValue();
def int: PU[int] = new PU[int] {
def appP(n: int, s: OutStream): OutStream = {
s.write(encode(n))
s
}
def appU(s: InStream): Pair[int,InStream] = {
val substr = s.read(8)
//Console.println("unpickling " + substr)
Pair(decode(substr), s)
}
}
def char: PU[char] = new PU[char] {
def appP(b: char, s: OutStream): OutStream = {
s.write(b)
s
}
def appU(s: InStream): Pair[char,InStream] = {
val carr = new Array[char](1)
s.read(carr)
//Console.println("unpickling " + carr(0))
Pair(carr(0), s)
}
}
def bool: PU[boolean] = {
def toEnum(b: boolean) = if (b) 1 else 0;
def fromEnum(n: int) = if (n == 0) false else true;
wrap(fromEnum, toEnum, nat)
}
def fixedList[a](pa: PU[a])(n: int): PU[List[a]] = {
def pairToList(p: Pair[a,List[a]]): List[a] =
p._1 :: p._2;
def listToPair(l: List[a]): Pair[a,List[a]] =
l match { case x :: xs => Pair(x, xs) }
if (n == 0) lift(Nil)
else
wrap(pairToList, listToPair, pair(pa, fixedList(pa)(n-1)))
}
def list[a](pa: PU[a]): PU[List[a]] =
sequ((l: List[a])=>l.length, nat, fixedList(pa));
def string: PU[String] =
wrap(List.toString, (str: String)=>str.toCharArray().toList, list(char));
def alt[a](tag: a => int, ps: List[PU[a]]): PU[a] =
sequ(tag, int, ps.apply);
def data[a](tag: a => int, ps: List[()=>PU[a]]): PU[a] =
sequ(tag, nat, (x: int)=> ps.apply(x)());
def option[a](pa: PU[a]): PU[Option[a]] = {
def tag(x: Option[a]) = x match {
case None => 0
case Some(y) => 1
}
def fromSome(x: Option[a]) = x match {
case Some(y) => y
case None => null
}
def toSome(x: a): Option[a] = Some(x);
val pnone: PU[Option[a]] = lift(None)
alt(tag, List(pnone, wrap(toSome, fromSome, pa)))
}
def byteString(b: int) =
pad(Integer.toHexString(b), 2);
def natString(x: int): String = {
val buf = new StringBuffer
def writeNatPrefix(x: int): unit = {
val y = x >>> 7;
if (y != 0) writeNatPrefix(y);
buf.append(byteString((x & 0x7f) | 0x80));
}
val y = x >>> 7;
if (y != 0) writeNatPrefix(y);
buf.append(byteString(x & 0x7f));
buf.toString()
}
def nat: PU[int] = new PU[int] {
def appP(n: int, s: OutStream): OutStream = {
s.write(natString(n))
s
}
def appU(s: InStream): Pair[int,InStream] = {
def readNat: int = {
var b = 0;
var x = 0;
do {
b = decode(s.read(2));
x = (x << 7) + (b & 0x7f);
} while ((b & 0x80) != 0);
x
}
Pair(readNat, s)
}
}
def main(args: Array[String]) = {
def testBase128(x: int) = {
Console.println(x)
val sw = new StringWriter
val os = new OutStream(sw)
val res = nat.appP(x, os)
os.flush()
Console.println(sw.toString())
val up = nat.appU(new InStream(new StringReader(sw.toString())))
Console.println(up._1)
}
testBase128(0)
testBase128(1)
testBase128(64)
testBase128(127)
testBase128(128)
testBase128(8192)
def pickleTest[a](x: a, pa: PU[a]) = {
Console.println(x)
val sw = new StringWriter
val os = new OutStream(sw)
val res = pa.appP(x, os)
os.flush()
Console.println(sw.toString())
val up = pa.appU(new InStream(new StringReader(sw.toString())))
Console.println(up._1)
}
pickleTest(List(1, 7, 13), list(nat))
pickleTest(List(false, true, true), list(bool))
pickleTest("Hello", string)
val personPU = wrap((p: Pair[String,int]) => Person(p._1, p._2), (p: Person) => Pair(p.name, p.age), pair(string, nat));
val p = Person("Philipp", 25)
pickleTest(p, personPU)
val x = Var("x");
val i = Lam("x", x);
val k = Lam("x", Lam("y", x));
val kki = App(k, App(k, i));
def varPU: PU[Term] =
wrap(Var,
(t: Term)=> t match {case Var(x)=>x},
string);
def lamPU: PU[Term] =
wrap((p: Pair[String,Term])=>Lam(p._1, p._2),
(t: Term)=> t match {case Lam(s, t)=>Pair(s, t)},
pair(string, termPU));
def appPU: PU[Term] =
wrap((p: Pair[Term,Term])=>App(p._1, p._2),
(t: Term)=> t match {case App(t1, t2)=>Pair(t1, t2)},
pair(termPU, termPU));
def termPU: PU[Term] =
data((t: Term)=> t match {case Var(_)=>0; case Lam(_,_)=>1; case App(_,_)=>2},
List(()=>varPU, ()=>lamPU, ()=>appPU));
pickleTest(k, termPU)
pickleTest(kki, termPU)
}
case class Person(name: String, age: int);
}
abstract class Term;
case class Var(s: String) extends Term;
case class Lam(s: String, t: Term) extends Term;
case class App(t1: Term, t2: Term) extends Term;
object ShareStreamPickle {
abstract class SPU[t] {
def appP(a: t, state: PicklerState): PicklerState;
def appU(state: UnPicklerState): Pair[t, UnPicklerState];
}
//def pickle[t](p: SPU[t], a: t): String =
// p.appP(a, new PicklerState("", new PicklerEnv)).stream;
//def unpickle[t](p: SPU[t], stream: String): t =
// p.appU(new UnPicklerState(stream, new UnPicklerEnv))._1;
class PicklerEnv extends HashMap[Any, int] {
private var cnt: int = 64;
def nextLoc() = { cnt = cnt + 1; cnt };
}
class UnPicklerEnv extends HashMap[int, Any] {
private var cnt: int = 64;
def nextLoc() = { cnt = cnt + 1; cnt };
}
class PicklerState(val stream: OutStream, val dict: PicklerEnv) {}
class UnPicklerState(val stream: InStream, val dict: UnPicklerEnv) {}
abstract class RefDef;
case class Ref() extends RefDef;
case class Def() extends RefDef;
def refDef: SStreamPickle.PU[RefDef] = new SStreamPickle.PU[RefDef] {
def appP(b: RefDef, s: OutStream): OutStream =
b match {
case Ref() => s.write("0"); s
case Def() => s.write("1"); s
};
def appU(s: InStream): Pair[RefDef, InStream] =
if (s.readChar == '0') Pair(Ref(), s)
else Pair(Def(), s);
}
def share[a](pa: SPU[a]): SPU[a] = new SPU[a] {
def appP(v: a, state: PicklerState): PicklerState = {
/*
- is there some value equal to v associated with a location l in the pickle environment?
- yes: write REF-tag to outstream together with l
- no:
write DEF-tag to outstream
record current location l of outstream
--> serialize value
add entry to pickle environment, mapping v onto l
*/
val pe = state.dict
pe.get(v) match {
case None =>
//Console.println("" + v + " is new")
//Console.println("writing DEF...")
val sPrime = refDef.appP(Def(), state.stream)
val l = pe.nextLoc()
//Console.println("applying pickler to state " + sPrime)
val sPrimePrime = pa.appP(v, new PicklerState(sPrime, pe))
//Console.println("updating dict (" + l + ") for " + v)
pe.update(v, l)
return sPrimePrime
case Some(l) =>
//Console.println("writing REF...")
val sPrime = refDef.appP(Ref(), state.stream)
//Console.println("writing location to state " + sPrime)
return new PicklerState(SStreamPickle.nat.appP(l, sPrime), pe)
}
}
def appU(state: UnPicklerState): Pair[a, UnPicklerState] = {
/*
- first, read tag (i.e. DEF or REF)
- if REF:
read location l
look up resulting value in unpickler environment
- if DEF:
record location l of input stream
--> deserialize value v with argument deserializer
add entry to unpickler environment, mapping l onto v
*/
val upe = state.dict
val res = refDef.appU(state.stream)
res._1 match {
case Def() =>
val l = upe.nextLoc
val res2 = pa.appU(new UnPicklerState(res._2, upe))
upe.update(l, res2._1)
return res2
case Ref() =>
val res2 = SStreamPickle.nat.appU(res._2) // read location
upe.get(res2._1) match { // lookup value in unpickler env
case None => error("invalid unpickler environment"); return null
case Some(v) => return Pair(v.asInstanceOf[a], new UnPicklerState(res2._2, upe))
}
}
}
}
def lift[t](x: t): SPU[t] = new SPU[t] {
def appP(a: t, state: PicklerState): PicklerState = state;
def appU(state: UnPicklerState) = Pair(x, state);
}
def sequ[t,u](f: u => t, pa: SPU[t], k: t => SPU[u]): SPU[u] = new SPU[u] {
def appP(b: u, s: PicklerState): PicklerState = {
val a = f(b)
//Console.println("pickling " + a + ", s: " + s.stream)
val sPrime = pa.appP(a, s)
val pb = k(a)
//Console.println("pickling " + b + ", s: " + s.stream)
pb.appP(b, sPrime)
}
def appU(s: UnPicklerState): Pair[u, UnPicklerState] = {
val resPa = pa.appU(s)
val a = resPa._1
val sPrime = resPa._2
val pb = k(a)
pb.appU(sPrime)
}
}
def pair[a,b](pa: SPU[a], pb: SPU[b]): SPU[Pair[a,b]] = {
def fst(p: Pair[a,b]): a = p._1;
def snd(p: Pair[a,b]): b = p._2;
sequ(fst, pa, (x: a) => sequ(snd, pb, (y: b) => lift(Pair(x, y))))
}
def wrap[a,b](i: a => b, j: b => a, pa: SPU[a]): SPU[b] =
sequ(j, pa, (x: a) => lift(i(x)));
def char: SPU[char] = new SPU[char] {
def appP(b: char, s: PicklerState): PicklerState = {
s.stream.write(b)
new PicklerState(s.stream, s.dict)
}
def appU(s: UnPicklerState): Pair[char, UnPicklerState] =
Pair(s.stream.readChar, new UnPicklerState(s.stream, s.dict));
}
def pad(s: String, req: int): String = {
val buf = new StringBuffer
for (val i <- List.range(1, req-s.length+1))
buf append "0"
(buf append s).toString
}
def encode(i: int): String = pad(Integer.toHexString(i), 8);
def decode(s: String): int = Integer.decode("0x" + s).intValue();
def byteString(b: int) =
pad(Integer.toHexString(b), 2);
def natString(x: int): String = {
val buf = new StringBuffer
def writeNatPrefix(x: int): unit = {
val y = x >>> 7;
if (y != 0) writeNatPrefix(y);
buf.append(byteString((x & 0x7f) | 0x80));
}
val y = x >>> 7;
if (y != 0) writeNatPrefix(y);
buf.append(byteString(x & 0x7f));
buf.toString()
}
def nat: SPU[int] = new SPU[int] {
def appP(n: int, s: PicklerState): PicklerState = {
s.stream.write(natString(n))
new PicklerState(s.stream, s.dict)
}
def appU(s: UnPicklerState): Pair[int,UnPicklerState] = {
def readNat: int = {
var b = 0;
var x = 0;
do {
b = decode(s.stream.read(2));
x = (x << 7) + (b & 0x7f);
} while ((b & 0x80) != 0);
x
}
Pair(readNat, new UnPicklerState(s.stream, s.dict))
}
}
def fixedList[a](pa: SPU[a])(n: int): SPU[List[a]] = {
def pairToList(p: Pair[a,List[a]]): List[a] =
p._1 :: p._2;
def listToPair(l: List[a]): Pair[a,List[a]] =
l match { case x :: xs => Pair(x, xs) }
if (n == 0) lift(Nil)
else
wrap(pairToList, listToPair, pair(pa, fixedList(pa)(n-1)))
}
def list[a](pa: SPU[a]): SPU[List[a]] =
sequ((l: List[a])=>l.length, nat, fixedList(pa));
def string: SPU[String] =
wrap(List.toString, (str: String)=>str.toCharArray().toList, list(char));
def alt[a](tag: a => int, ps: List[SPU[a]]): SPU[a] =
sequ(tag, nat, ps.apply);
def data[a](tag: a => int, ps: List[()=>SPU[a]]): SPU[a] =
sequ(tag, nat, (x: int)=> ps.apply(x)());
def main(args: Array[String]) = {
def pickleTest[a](x: a, pa: SPU[a]) = {
Console.println(x)
val sw = new StringWriter
val os = new OutStream(sw)
val res = pa.appP(x, new PicklerState(os, new PicklerEnv))
os.flush()
Console.println(sw.toString())
val up = pa.appU(new UnPicklerState(new InStream(new StringReader(sw.toString())), new UnPicklerEnv))
Console.println(up._1)
}
val x = Var("x");
val i = Lam("x", x);
val k = Lam("x", Lam("y", x));
val kki = App(k, App(k, i));
def varSPU: SPU[Term] = wrap(Var,
(t: Term)=> t match {case Var(x)=>x},
string);
def lamSPU: SPU[Term] = wrap((p: Pair[String,Term])=>Lam(p._1, p._2),
(t: Term)=> t match {case Lam(s, t)=>Pair(s, t)},
pair(string, termSPU));
def appSPU: SPU[Term] = wrap((p: Pair[Term,Term])=>App(p._1, p._2),
(t: Term)=> t match {case App(t1, t2)=>Pair(t1, t2)},
pair(termSPU, termSPU));
def termSPU: SPU[Term] = data((t: Term)=> t match {case Var(_)=>0; case Lam(_,_)=>1; case App(_,_)=>2},
List(()=>varSPU, ()=>lamSPU, ()=>appSPU));
def termSPUshared: SPU[Term] = share(data((t: Term)=> t match {case Var(_)=>0; case Lam(_,_)=>1; case App(_,_)=>2},
List(()=>varSPU, ()=>lamSPU, ()=>appSPU)));
pickleTest(k, termSPU)
pickleTest(k, termSPUshared)
pickleTest(kki, termSPU)
pickleTest(kki, termSPUshared)
}
}

View File

@ -0,0 +1,87 @@
package scala.actors.distributed.picklers;
import java.io.Reader;
import java.io.Writer;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import scala.collection.mutable._;
class OutStream(writer: Writer) {
val picklerEnv = new PicklerEnv;
private var loc: int = 0;
def getLocation = loc;
def write(s: String): unit = {
loc = loc + s.length()
writer.write(s)
//Console.println("new loc: " + loc)
}
def write(c: char): unit = {
loc = loc + 1
writer.write(c)
//Console.println("new loc: " + loc)
}
def flush(): unit =
writer.flush();
}
class InStream(reader: Reader) {
val unpicklerEnv = new UnpicklerEnv;
private var loc: int = 0;
def getLocation = loc;
def read(num: int): String = {
val carr = new Array[char](num)
val cnt = reader.read(carr)
loc = loc + num
//Console.println("new loc: " + loc)
new String(carr)
/*val buf = new StringBuffer
var ch = r.read()
loc = loc + 1
if (num > 1) {
var cnt = 1
while (cnt < num && ch != -1) {
buf.append(ch)
ch = r.read()
loc = loc + 1
cnt = cnt + 1
}
if (cnt == num)
buf.toString()
else
buf.toString() // error
}
else
new String(ch.asInstanceOf[char])*/
}
def read(cbuf: Array[char]): int = {
loc = loc + cbuf.length
reader.read(cbuf)
}
def readChar: char = {
val carr = new Array[char](1)
read(carr)
carr(0)
}
}
class PicklerEnv[a] extends HashMap[a, int] {
private var cnt: int = 0;
def nextLoc() = { cnt = cnt + 1; cnt };
}
class UnpicklerEnv[a] extends HashMap[int, a] {
private var cnt: int = 0;
def nextLoc() = { cnt = cnt + 1; cnt };
}

View File

@ -0,0 +1,77 @@
/* ____ ____ ____ ____ ______ *\
** / __// __ \/ __// __ \/ ____/ SOcos COmpiles Scala **
** __\_ \/ /_/ / /__/ /_/ /\_ \ (c) 2002-2006, LAMP/EPFL **
** /_____/\____/\___/\____/____/ **
\* */
// $Id: UTF8Codec.scala 7116 2006-04-11 15:36:05Z mihaylov $
package scala.actors.distributed.picklers
object UTF8Codec {
def encode(src: Array[Char], from: Int, dst: Array[Byte], to: Int, len: Int): Int = {
var i = from;
var j = to;
val end = from + len;
while (i < end) {
val ch = src(i);
i = i + 1;
if (ch < 128) {
dst(j) = ch.toByte;
j = j + 1;
}
else if (ch <= 0x3FF) {
dst(j) = (0xC0 | (ch >> 6)).toByte;
dst(j+1) = (0x80 | (ch & 0x3F)).toByte;
j = j + 2;
} else {
dst(j) = (0xE0 | (ch >> 12)).toByte;
dst(j+1) = (0x80 | ((ch >> 6) & 0x3F)).toByte;
dst(j+2) = (0x80 | (ch & 0x3F)).toByte;
j = j + 3;
}
}
j
}
def encode(s: String, dst: Array[Byte], to: Int): Int =
encode(s.toCharArray(), 0, dst, to, s.length());
def encode(s: String): Array[Byte] = {
val dst = new Array[Byte](s.length() * 3);
val len = encode(s, dst, 0);
dst.subArray(0, len)
}
def decode(src: Array[Byte], from: Int,
dst: Array[Char], to: Int, len: Int): Int =
{
var i = from;
var j = to;
val end = from + len;
while (i < end) {
var b = src(i) & 0xFF;
i = i + 1;
if (b >= 0xE0) {
b = ((b & 0x0F) << 12) | (src(i) & 0x3F) << 6;
b = b | (src(i+1) & 0x3F);
i = i + 2;
} else if (b >= 0xC0) {
b = ((b & 0x1F) << 6) | (src(i) & 0x3F);
i = i + 1;
}
dst(j) = b.toChar;
j = j + 1;
}
j
}
def decode(src: Array[Byte], from: Int, len: Int): String = {
val cs = new Array[Char](len);
String.copyValueOf(cs, 0, decode(src, 0, cs, 0, len));
}
}

View File

@ -0,0 +1,20 @@
package scala.actors.gui
import javax.swing._
import event._
/** A class for buttons; standard constructor wraps around a swing button */
class Button(val jbutton: JButton) extends Container(jbutton) with SwingComponent with Publisher {
def this(txt: String) = this(new JButton(txt))
def this() = this(new JButton())
def text: String = jbutton.getText()
def text_=(s: String) = jbutton.setText(s)
def icon: Icon = jbutton.getIcon()
def icon_=(i: Icon) = jbutton.setIcon(i)
jbutton.addActionListener {
new java.awt.event.ActionListener {
def actionPerformed(e: java.awt.event.ActionEvent): unit =
publish(ButtonPressed(Button.this))
}
}
}

View File

@ -0,0 +1,8 @@
package scala.actors.gui;
import javax.swing
class Caret(val jcaret: swing.text.Caret) {
def dot: int = jcaret.getDot()
def mark: int = jcaret.getMark()
}

View File

@ -0,0 +1,9 @@
package scala.actors.gui;
import javax.swing._;
import java.awt._;
class Component(val acomponent: java.awt.Component) extends Subscriber {
def show: this.type = { acomponent.setVisible(true); this }
}

View File

@ -0,0 +1,18 @@
package scala.actors.gui;
import javax.swing._
import scala.collection.mutable.ListBuffer
class Container(val jcontainer: java.awt.Container) extends Component(jcontainer) {
def this() = this(new java.awt.Container())
val elems = new ListBuffer[Component]
def += (c: Component) = {
elems += c
jcontainer.add(c.acomponent)
}
def -= (c: Component) = {
elems -= c
jcontainer.remove(c.acomponent)
}
}

View File

@ -0,0 +1,8 @@
package scala.actors.gui;
import javax.swing._
class EmptyBorder(_top: int, _left: int, _bottom: int, _right: int)
extends border.EmptyBorder(_top, _left, _bottom, _right) {
def this() = this(0, 0, 0, 0)
}

View File

@ -0,0 +1,9 @@
package scala.actors.gui;
import javax.swing._
import java.awt.event._
import event._
class FormattedTextField(val jftextfield: JFormattedTextField) extends TextComponent(jftextfield) {
def this(format: java.text.Format) = this(new JFormattedTextField(format));
}

View File

@ -0,0 +1,33 @@
package scala.actors.gui;
import javax.swing._;
import event._;
class Frame(val jframe: JFrame) extends Container(jframe) with Publisher {
def this() = this(new JFrame("Untitled Frame"))
def title: String = jframe.getTitle()
def title_=(s: String) = jframe.setTitle(s)
val contents = new Container(jframe.getContentPane())
private var default_button: Button = null
def defaultButton = default_button
def defaultButton_=(b: Button) = { default_button = b; jframe.getRootPane().setDefaultButton(b.jbutton) }
def pack: this.type = { jframe.pack(); this }
jframe.addWindowListener {
new java.awt.event.WindowListener {
def windowActivated(e: java.awt.event.WindowEvent) = publish(WindowActivated(Frame.this))
def windowClosed(e: java.awt.event.WindowEvent) = publish(WindowClosed(Frame.this))
def windowClosing(e: java.awt.event.WindowEvent) = publish(WindowClosing(Frame.this))
def windowDeactivated(e: java.awt.event.WindowEvent) = publish(WindowDeactivated(Frame.this))
def windowDeiconified(e: java.awt.event.WindowEvent) = publish(WindowDeiconified(Frame.this))
def windowIconified(e: java.awt.event.WindowEvent) = publish(WindowIconified(Frame.this))
def windowOpened(e: java.awt.event.WindowEvent) = publish(WindowOpened(Frame.this))
}
}
/*jframe.addMouseMotionListener (
new java.awt.event.MouseMotionListener {
def mouseDragged(e: java.awt.event.MouseEvent) = publish(MouseDragged(e))
def mouseMoved(e: java.awt.event.MouseEvent) = publish(MouseMoved(e))
}
)*/
}

View File

@ -0,0 +1,20 @@
package scala.actors.gui
import javax.swing._
import event.Event
class GUIApplication {
def defaultLookAndFeelDecorated: boolean = true
def init() = {
UIManager.setLookAndFeel(UIManager.getSystemLookAndFeelClassName())
JFrame.setDefaultLookAndFeelDecorated(defaultLookAndFeelDecorated)
}
def run(prog: => unit): unit =
SwingUtilities.invokeLater {
new Runnable() {
def run() = { init(); prog }
}
}
}

View File

@ -0,0 +1,14 @@
package scala.actors.gui;
import javax.swing._;
class Label(val jlabel: JLabel) extends Container(jlabel) with SwingComponent {
def this(txt: String) = this(new JLabel(txt))
def this() = this("Untitled Label")
def text: String = jlabel.getText()
def text_=(s: String) = jlabel.setText(s)
def halign: Orientation.Value = Orientation(jlabel.getHorizontalAlignment())
def halign_=(x: Orientation.Value) = jlabel.setHorizontalAlignment(x.id)
def valign: Orientation.Value = Orientation(jlabel.getVerticalAlignment())
def valign_=(x: Orientation.Value) = jlabel.setVerticalAlignment(x.id)
}

View File

@ -0,0 +1,14 @@
package scala.actors.gui;
import javax.swing._;
import scala.actors.gui.event._;
class MainFrame(jframe: JFrame) extends Frame(jframe) {
def this() = this(new JFrame("Untitled Frame"))
addHandler {
case WindowClosing(_) => System.exit(1)
}
subscribe(this)
}

View File

@ -0,0 +1,11 @@
package scala.actors.gui
import javax.swing.SwingConstants._
object Orientation extends Enumeration {
val left = Value(LEFT, "left")
val right = Value(RIGHT, "right")
val bottom = Value(BOTTOM, "bottom")
val top = Value(TOP, "top")
val center = Value(CENTER, "center")
}

View File

@ -0,0 +1,15 @@
package scala.actors.gui;
import javax.swing._
import java.awt.event._
class Panel(val jpanel: JPanel) extends Container(jpanel) with SwingComponent {
def this(layout: java.awt.LayoutManager, elements: Component*) = {
this(new JPanel(layout));
for (val elem <- elements) this += elem
}
def this(elements: Component*) = this(new java.awt.FlowLayout, elements: _*)
def layout: java.awt.LayoutManager = jpanel.getLayout()
def layout_=(x: java.awt.LayoutManager) = jpanel.setLayout(x)
}

View File

@ -0,0 +1,126 @@
package scala.actors.gui
import scala.collection.mutable.ListBuffer
import scala.actors.single.Actor
import scala.actors.single.Pid
import scala.actors.gui.event.Event
class EventHandlers {
type Handler = PartialFunction[AnyRef,unit]
private val handlers = new ListBuffer[Handler]
def += (h: Handler) = { handlers += h }
def -= (h: Handler) = { handlers -= h }
def compoundHandler = new Handler {
def isDefinedAt(e: AnyRef): boolean = handlers.exists(.isDefinedAt(e))
def apply(e: AnyRef): unit =
handlers.find(.isDefinedAt(e)) match {
case Some(h) => h.apply(e)
case None => // do nothing
}
}
}
trait Responder extends Actor {
protected val handlers = new EventHandlers
final def eventloop(f: PartialFunction[Message,unit]): scala.All =
receive(new RecursiveProxyHandler(this, f))
def eventblock(f: PartialFunction[Message,unit]): unit = {
try {
receive(new RecursiveProxyHandler(this, f))
}
catch {
case d: Done =>
// do nothing
}
}
private class RecursiveProxyHandler(a: Actor, f: PartialFunction[Message,unit]) extends PartialFunction[Message,unit] {
def isDefinedAt(m: Message): boolean =
true // events should be removed from the mailbox immediately!
def apply(m: Message): unit = {
if (f.isDefinedAt(m)) f(m) // overrides any installed handler
else
if (handlers.compoundHandler.isDefinedAt(m))
handlers.compoundHandler(m)
else {
// do nothing
}
a receive this
}
}
}
case class Subscribe(s: Subscriber)
case class Publish(e: Event)
trait Subscriber extends Responder {
type Handler = PartialFunction[AnyRef,unit]
def subscribe(ps: Publisher*) = for (val p <- ps) p send Subscribe(this)
}
trait Publisher extends Responder {
case class HandlerAdded()
private val subscribers = new ListBuffer[Subscriber]
handlers += { // installs _permanent_ handler!
case Subscribe(s) =>
//Console.println("" + this + ": rec subscription from " + s)
subscribers += s
case Publish(e) => for (val s <- subscribers) s send e
}
//Console.println("" + this + ": exec toplevel eventloop (Publisher)")
eventblock {
case HandlerAdded() =>
//Console.println("" + this + " received HandlerAdded()")
}
def addHandler(h: EventHandlers#Handler) = {
//Console.println("" + this + ": installing new handler")
handlers += h
this send HandlerAdded() // causes currently active eventloop to recursively call itself
}
def publish(e: Event) = {
//Console.println("Publishing event: " + e)
for (val s <- subscribers) s send e
}
// TODO: super.receive might already be overridden!
//final override def receive(f: PartialFunction[Message,unit]): scala.All =
//super.receive(new ProxyPubSubHandler(f))
private class ProxyPubSubHandler(f: PartialFunction[Message,unit]) extends PartialFunction[Message,unit] {
def isDefinedAt(m: Message): boolean =
if (f.isDefinedAt(m)) true
else m match {
case Subscribe(s) => true
case Publish(e) => true
case other => false
}
def apply(m: Message): unit = {
m match {
case Subscribe(s) =>
//Console.println("Rec subscription: " + s)
subscribers += s
case Publish(e) =>
for (val s <- subscribers) s send e
case other =>
// do nothing
}
if (f.isDefinedAt(m)) f(m)
}
}
}

View File

@ -0,0 +1,14 @@
package scala.actors.gui;
import javax.swing._
abstract class SimpleGUIApplication extends GUIApplication {
def top: Frame;
def main(args: Array[String]) = {
run { top.pack.show }
}
implicit def string2label(s: String): Label = new Label(s)
}

View File

@ -0,0 +1,11 @@
package scala.actors.gui
import javax.swing._
import java.awt._
trait SwingComponent extends Component {
val jcomponent = acomponent.asInstanceOf[JComponent];
def border: javax.swing.border.Border = jcomponent.getBorder()
def border_=(x: javax.swing.border.Border): unit = jcomponent.setBorder(x)
}

View File

@ -0,0 +1,22 @@
package scala.actors.gui
import javax.swing._
import javax.swing.text.JTextComponent
import javax.swing.event.{CaretEvent,CaretListener}
import event.CaretUpdate
class TextComponent(val jtextcomponent: JTextComponent)
extends Container(jtextcomponent) with SwingComponent with Publisher {
def text: String = jtextcomponent.getText()
def text_=(x: String) = jtextcomponent.setText(x)
val caret = new Caret(jtextcomponent.getCaret())
jtextcomponent.addCaretListener {
new CaretListener {
def caretUpdate(e: CaretEvent) =
publish(CaretUpdate(TextComponent.this))
}
}
}

View File

@ -0,0 +1,22 @@
package scala.actors.gui;
import javax.swing._
import java.awt.event._
import event._
class TextField(val jtextfield: JTextField) extends TextComponent(jtextfield) {
def this(text: String, columns: int) = this(new JTextField(text, columns));
def this(text: String) = this(new JTextField(text));
def this(columns: int) = this(new JTextField(columns));
def this() = this(new JTextField());
def columns: int = jtextfield.getColumns()
def columns_=(x: int) = jtextfield.setColumns(x)
jtextfield.addActionListener {
new ActionListener {
def actionPerformed(e: ActionEvent) =
publish(TextModified(TextField.this))
}
}
}

View File

@ -0,0 +1,3 @@
package scala.actors.gui.event
case class ButtonPressed(b: Button) extends Event

View File

@ -0,0 +1,3 @@
package scala.actors.gui.event
case class CaretUpdate(text: TextComponent) extends Event

View File

@ -0,0 +1,3 @@
package scala.actors.gui.event
abstract class Event

View File

@ -0,0 +1,3 @@
package scala.actors.gui.event
case class MouseDragged(override val event: java.awt.event.MouseEvent) extends MouseEvent(event);

View File

@ -0,0 +1,3 @@
package scala.actors.gui.event
abstract class MouseEvent(val event: java.awt.event.MouseEvent) extends Event;

View File

@ -0,0 +1,3 @@
package scala.actors.gui.event
case class MouseMoved(override val event: java.awt.event.MouseEvent) extends MouseEvent(event);

View File

@ -0,0 +1,3 @@
package scala.actors.gui.event
case class TextModified(text: TextComponent) extends Event

View File

@ -0,0 +1,3 @@
package scala.actors.gui.event
case class WindowActivated(window: Frame) extends WindowEvent;

View File

@ -0,0 +1,3 @@
package scala.actors.gui.event
case class WindowClosed(window: Frame) extends WindowEvent;

View File

@ -0,0 +1,3 @@
package scala.actors.gui.event
case class WindowClosing(window: Frame) extends WindowEvent;

View File

@ -0,0 +1,3 @@
package scala.actors.gui.event
case class WindowDeactivated(window: Frame) extends WindowEvent;

View File

@ -0,0 +1,3 @@
package scala.actors.gui.event
case class WindowDeiconified(window: Frame) extends WindowEvent;

View File

@ -0,0 +1,5 @@
package scala.actors.gui.event
abstract class WindowEvent extends Event {
val window: Frame
}

View File

@ -0,0 +1,3 @@
package scala.actors.gui.event
case class WindowIconified(window: Frame) extends WindowEvent;

View File

@ -0,0 +1,3 @@
package scala.actors.gui.event
case class WindowOpened(window: Frame) extends WindowEvent;

View File

@ -0,0 +1,12 @@
package scala.actors.gui
import java.awt._
object layout {
val flex = 0
def grid(rows: int, columns: int) = new GridLayout(rows, columns)
def row = new FlowLayout()
def column = grid(flex, 1)
}

View File

@ -0,0 +1,10 @@
package scala.actors.multi;
/**
* @author Philipp Haller
*/
trait AbstractPid {
def !(msg: MailBox#Message): unit
def become(clos: Actor => Unit): unit
def becomeReceiveLoop(f: PartialFunction[MailBox#Message,unit]): unit
}

View File

@ -0,0 +1,300 @@
package scala.actors.multi
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
import scala.collection.mutable.Stack
case class ExcHandlerDesc(pid: Pid, eid: int)
class ExcHandler(actions: PartialFunction[Throwable, unit],
actor: Actor,
parent: ExcHandlerDesc) {
def handle(e: Throwable): unit = {
if (!actions.isDefinedAt(e)) {
if (parent != null) actor.forwardExc(parent, e)
}
else
actions(e)
}
}
/**
* @author Philipp Haller
*/
abstract class Actor extends MailBox {
def run(): Unit = {}
def start(): Unit = {
var finished = true
try { run }
catch {
case d: Done =>
finished = false
case t: Throwable =>
if (!excHandlerDescs.isEmpty)
forwardExc(excHandlerDescs.top, t)
else
exit(new Symbol(t.toString()))
}
if (finished) die()
}
case class Exit(from: Pid, reason: Symbol) extends Message
private var pid: Pid = null
def self: Pid = {
if (pid == null) pid = new LocalPid(this)
pid
}
def self_= (p: Pid) = pid = p
private val links = new HashSet[Pid]
def link(to: Pid): unit = {
// TODO: check if exists (eff: dont need to.linkTo...)
links += to
to.linkTo(self)
}
def linkTo(to: Pid): unit = links += to
def unlink(from: Pid): unit = {
// TODO: check if exists (eff)
links -= from
from.unlinkFrom(self)
}
def unlinkFrom(from: Pid): unit = links -= from
private var trapExit = false
def processFlag(flag: Symbol, set: boolean) = {
if (flag.name.equals("trapExit")) trapExit = set
}
def exit(reason: Symbol): unit = {
exitLinked(reason, new HashSet[Pid])
if (isAlive) {
isAlive = false
//Debug.info("" + this + " died.")
}
}
def exit(from: Pid, reason: Symbol): unit = {
if (from == self) {
exit(reason)
}
else {
if (trapExit)
this send Exit(from, reason)
else if (!reason.name.equals("normal"))
exit(reason)
}
}
def exitLinked(reason: Symbol, exitMarks: HashSet[Pid]): unit = {
if (exitMarks contains self) {
// we are marked, do nothing
}
else {
exitMarks += self // mark self as exiting
//Console.println("" + self + " is exiting (" + reason + ").")
// exit linked scala.actors
val iter = links.elements
while (iter.hasNext) {
val linkedPid = iter.next
unlink(linkedPid)
linkedPid.exit(self, reason)
}
exitMarks -= self
}
}
def spawnLink(body: Actor => unit): Pid = {
val a = new Actor {
override def run = body(this)
}
if (!excHandlerDescs.isEmpty)
a.pushExcHandlerDesc(excHandlerDescs.top)
// link new process to self
link(a.self)
a.start
a.self
}
val excHandlerDescs = new Stack[ExcHandlerDesc]
// exception handler table
val excHandlers = new HashMap[int, ExcHandler]
def forwardExc(destDesc: ExcHandlerDesc, e: Throwable) = {
// locality check (handler local to this actor?)
if (destDesc.pid == self) {
handleExc(destDesc, e)
}
else {
// forward to pid of destination descriptor
destDesc.pid.handleExc(destDesc, e)
}
}
def pushExcHandlerDesc(desc: ExcHandlerDesc) = {
excHandlerDescs += desc
}
/** is only called for local handlers
(i.e. destDesc.pid == self) */
def handleExc(destDesc: ExcHandlerDesc, e: Throwable) =
(excHandlers get destDesc.eid) match {
case Some(handler) =>
handler.handle(e)
case None =>
error("exc desc refers to non-registered handler")
};
var excCnt = 0
def freshExcId = { excCnt = excCnt + 1; excCnt }
def tryAsync(block: => unit,
handlerFun: PartialFunction[Throwable, unit]) = {
val excHandler =
new ExcHandler(handlerFun,
this,
if (excHandlerDescs.isEmpty) null
else excHandlerDescs.top)
val desc = ExcHandlerDesc(self, freshExcId)
// associate desc with handler
excHandlers += desc.eid -> excHandler
// push desc onto stack
excHandlerDescs += desc
//Console.println("desc stack height: " + excHandlerDescs.length)
// execute code block
block
}
def die(reason: Symbol) = {
if (isAlive) {
isAlive = false
//Debug.info("" + this + " died.")
exit(reason)
}
}
override def die() = {
if (isAlive) {
isAlive = false
//Debug.info("" + this + " died.")
exit('normal)
}
}
def process(f: PartialFunction[Message,unit], msg: Message): unit = {
try {
f(msg)
}
catch {
case d: Done =>
throw new Done
case t: Throwable =>
throw t
if (!excHandlerDescs.isEmpty)
forwardExc(excHandlerDescs.top, t)
else
die(Symbol(t.toString()))
}
}
override def receive(f: PartialFunction[Message,unit]): scala.All = {
if (isAlive) {
Scheduler.tick(this)
continuation = null
sent.dequeueFirst(f.isDefinedAt) match {
case Some(msg) =>
process(f, msg)
die()
case None =>
continuation = f
//Debug.info("No msg found. " + this + " has continuation " + continuation + ".")
}
}
throw new Done
}
override def receiveMsg(msg: MailBox#Message) = {
//Debug.info("" + Thread.currentThread() + ": Resuming " + this)
if (continuation != null) {
val f = continuation
continuation = null
scheduled = false
process(f, msg)
die()
}
else {
// use more complex receive-and-return continuation
val cases = contCases
val then = contThen
contCases = null
contThen = null
scheduled = false
val result = cases(msg)
then(result)
die()
}
}
def spawn(a: Actor): Pid = {
// let "a" inherit active exception handler
if (!excHandlerDescs.isEmpty)
a.pushExcHandlerDesc(excHandlerDescs.top)
a.start
a.self
}
def spawn(body: Actor => unit): Pid = {
val a = new Actor {
override def run = body(this)
}
if (!excHandlerDescs.isEmpty)
a.pushExcHandlerDesc(excHandlerDescs.top)
a.start
a.self
}
def spawnReceive(cases: PartialFunction[MailBox#Message,unit]) = {
val a = new Actor {
override def run = receive(cases)
}
if (!excHandlerDescs.isEmpty)
a.pushExcHandlerDesc(excHandlerDescs.top)
a.start
a.self
}
def join(pid1: Pid, pid2: Pid, cont: List[Pid]): unit = {
receive {
case Pair(pid1, msg1) => receive {
case Pair(pid2, msg2) => cont match {
case x::xs => x ! Pair(self, Pair(Pair(msg1, msg2), xs))
}
}
case Pair(pid2, msg2) => receive {
case Pair(pid1, msg1) => cont match {
case x::xs => x ! Pair(self, Pair(Pair(msg1, msg2), xs))
}
}
}
}
def makeRef = Actor.makeRef
}
object Actor {
private var counter = 0
type Tag = int
def makeRef: Tag = {
counter = counter + 1
counter
}
}

View File

@ -0,0 +1,94 @@
package scala.actors.multi;
import scala.collection.mutable.Queue;
/**
* @author Philipp Haller
*/
class LocalPid(actor: Actor) extends Pid {
var target = actor
def !(msg: MailBox#Message): unit = target send msg
def link(other: Pid): unit = target link other
def linkTo(other: Pid): unit = target linkTo other
def unlink(other: Pid): unit = target unlink other
def unlinkFrom(other: Pid): unit = target unlinkFrom other
def exit(reason: Symbol): unit = target exit reason
def exit(from: Pid, reason: Symbol): unit = target.exit(from, reason)
def spawn(body: Actor => Unit): Pid = {
val a = new Actor {
override def run: Unit = body(this)
}
a.start
a.self
}
def spawnReceive(cases: PartialFunction[MailBox#Message,Unit]) = {
val a = new Actor {
override def run: Unit = receive(cases)
}
a.start
a.self
}
override def toString() = "LocalPid(" + target + ")"
def handleExc(destDesc: ExcHandlerDesc, e: Throwable): unit =
actor.handleExc(destDesc, e);
def become(clos: Actor => Unit) = {
// old actor should become anonymous (cannot receive any messages any more)
// achieved by removing it from target of pid.
val oldActor = target
//Debug.info("old actor: " + oldActor);
// change our target to point to a newly created actor with the same mailbox.
val newActor = new Actor {
override def run: Unit = clos(this)
}
newActor.sent = oldActor.sent
target = newActor
newActor.self = this
//Debug.info("new actor: " + newActor);
// clean mailbox of now anonymous actor (must not receive any messages any more; pending messages are for new actor)
oldActor.sent = new Queue[MailBox#Message]
//Debug.info("Starting new actor.");
newActor.start // important to start after changing pid because actor may send messages to itself.
}
private class ProxyPartialFunction(a: Actor, f: PartialFunction[MailBox#Message,unit]) extends PartialFunction[MailBox#Message, unit] {
def isDefinedAt(m: MailBox#Message): boolean = f.isDefinedAt(m)
def apply(m: MailBox#Message): unit = {
f(m)
a receive this
}
}
def becomeReceiveLoop(f: PartialFunction[MailBox#Message,unit]) = {
become(a => a receive new ProxyPartialFunction(a, f))
/*become(
a:Actor => {
def loop: unit = {
def proxyFun(m: MailBox#Message): unit = {
if (f.isDefinedAt(m)) {
f(m);
loop
}
};
//a receive proxyFun
}
loop
}
)*/
}
}

View File

@ -0,0 +1,175 @@
package scala.actors.multi;
import scala.collection.mutable.Queue;
/**
* @author Philipp Haller
*/
class MailBox {
type Message = AnyRef
case class TIMEOUT() extends Message
/** Unconsumed messages. */
var sent = new Queue[Message]
var continuation: PartialFunction[Message,Unit] = null
// more complex continuation
var contCases: PartialFunction[Message,Message] = null
var contThen: Message => unit = null
def hasCont =
if ((continuation == null) && (contCases == null)) false
else true
def contDefinedAt(msg: Message) =
if (((continuation != null) && continuation.isDefinedAt(msg)) ||
((contCases != null) && contCases.isDefinedAt(msg)))
true
else
false
var isAlive = true
var scheduled = false
private var pendingSignal = false
def send(msg: Message): unit = synchronized {
if (isAlive) {
if (!hasCont || scheduled) {
//Debug.info("no cont avail/task already scheduled. appending msg to mailbox.")
msg match {
case Signal() =>
// do not add to mailbox
case _ =>
sent += msg
}
}
else
msg match {
case Signal() =>
if (!contDefinedAt(TIMEOUT())) die()
else {
val task = new ReceiverTask(this, TIMEOUT())
//Debug.info("ready to receive. dispatch new task " + task)
scheduled = true
Scheduler.execute(task)
}
case _ =>
if (!contDefinedAt(msg))
sent += msg
else {
if (pendingSignal) {
pendingSignal = false
TimerThread.trashRequest(this)
}
val task = new ReceiverTask(this, msg)
//Debug.info("ready to receive. dispatch new task " + task)
scheduled = true
Scheduler.execute(task)
}
}
}
}
def receiveMsg(msg: MailBox#Message) = {
//Debug.info("" + Thread.currentThread() + ": Resuming " + this)
if (continuation != null) {
val f = continuation
continuation = null
scheduled = false
f(msg)
die()
}
else {
// use more complex receive-and-return continuation
val cases = contCases
val then = contThen
contCases = null
contThen = null
scheduled = false
val result = cases(msg)
then(result)
die()
}
}
def receive(f: PartialFunction[Message,unit]): scala.All = {
if (isAlive) {
Scheduler.tick(this)
continuation = null
sent.dequeueFirst(f.isDefinedAt) match {
case Some(msg) =>
f(msg)
die()
case None =>
continuation = f
//Debug.info("No msg found. " + this + " has continuation " + continuation + ".")
}
}
throw new Done
}
def receiveWithin(msec: long)(f: PartialFunction[Message, unit]): scala.All = {
Scheduler.tick(this)
continuation = null
sent.dequeueFirst(f.isDefinedAt) match {
case Some(msg) =>
f(msg)
die()
case None =>
// if timeout == 0 then execute timeout action if specified (see Erlang book)
if (msec == 0) {
if (f.isDefinedAt(TIMEOUT()))
f(TIMEOUT())
die()
}
else {
if (msec > 0) {
TimerThread.requestTimeout(this, msec)
pendingSignal = true
}
continuation = f
//Debug.info("No msg found. " + this + " has continuation " + continuation + ".")
}
}
throw new Done
}
// original wish:
// receiveAndReturn[A, B](cases: PartialFunction[Message, A], then: A => B): B
// receiveAndReturn[A](cases: PartialFunction[Message, A], then: A => unit): unit
def receiveAndReturn(cases: PartialFunction[Message,Message], then: Message => unit): unit = {
contCases = null
contThen = null
sent.dequeueFirst(cases.isDefinedAt) match {
case Some(msg) => {
val result = cases(msg)
then(result)
die()
}
case None => {
contCases = cases
contThen = then
//Debug.info("No msg found. Saved complex continuation.")
}
}
throw new Done
}
// receiv {...} then (msg => {...msg...})
class ReceiveAndReturn(cases: PartialFunction[Message,Message]) {
def then(body: Message => unit): unit = receiveAndReturn(cases, body)
}
def receiv(cases: PartialFunction[Message,Message]): ReceiveAndReturn =
new ReceiveAndReturn(cases)
def die() = {
if (isAlive) {
isAlive = false
//Debug.info("" + this + " died.")
}
}
}

View File

@ -0,0 +1,20 @@
package scala.actors.multi
/**
* @author Philipp Haller
*/
[serializable]abstract class Pid {
def !(msg: MailBox#Message): unit;
def link(other: Pid): unit;
def linkTo(other: Pid): unit; // uni-directional
def unlink(other: Pid): unit;
def unlinkFrom(other: Pid): unit; // uni-directional
def exit(reason: Symbol): unit;
def exit(from: Pid, reason: Symbol): unit;
def handleExc(destDesc: ExcHandlerDesc, e: Throwable): unit;
//def become(clos: Actor => Unit): unit
//def becomeReceiveLoop(f: PartialFunction[MailBox#Message,unit]): unit
}

View File

@ -0,0 +1,16 @@
package scala.actors.multi;
/**
* @author Philipp Haller
*/
class ReceiverTask(val actor: MailBox, msg: MailBox#Message) extends Runnable {
def run(): unit = {
try {
actor receiveMsg msg
}
catch {
case d: Done =>
// do nothing (continuation is already saved)
}
}
}

View File

@ -0,0 +1,10 @@
package scala.actors.single;
/**
* @author Philipp Haller
*/
trait AbstractPid {
def !(msg: MailBox#Message): unit
def become(clos: Actor => Unit): unit
def becomeReceiveLoop(f: PartialFunction[MailBox#Message,unit]): unit
}

View File

@ -0,0 +1,72 @@
package scala.actors.single
/**
* @author Philipp Haller
*/
abstract class Actor extends MailBox {
def run: Unit = {}
def start: Unit = {
try { run }
catch {
case d:Done =>
// do nothing
}
}
private var pid: Pid = null
def self: Pid = {
if (pid == null) pid = new LocalPid(this)
pid
}
def self_= (p: Pid) = pid = p
def join(pid1: Pid, pid2: Pid, cont: List[Pid]): unit = {
receive {
case Pair(pid1, msg1) => receive {
case Pair(pid2, msg2) => cont match {
case x::xs => x ! Pair(self, Pair(Pair(msg1, msg2), xs))
}
}
case Pair(pid2, msg2) => receive {
case Pair(pid1, msg1) => cont match {
case x::xs => x ! Pair(self, Pair(Pair(msg1, msg2), xs))
}
}
}
}
def spawn(body: Actor => unit): Pid = {
val a = new Actor {
override def run = body(this)
}
a.start
a.self
}
def spawn(a: Actor): Pid = {
a.start
a.self
}
def spawnReceive(cases: PartialFunction[MailBox#Message,unit]) = {
val a = new Actor {
override def run = receive(cases)
}
a.start
a.self
}
def makeRef = Actor.makeRef
}
object Actor {
private var counter = 0
type Tag = int
def makeRef: Tag = {
counter = counter + 1
counter
}
}

View File

@ -0,0 +1,83 @@
package scala.actors.single;
import scala.collection.mutable.Queue;
/**
* @author Philipp Haller
*/
class LocalPid(actor: Actor) extends Pid {
var target = actor
def !(msg: MailBox#Message): unit = target send msg
def become(clos: Actor => Unit) = {
// old actor should become anonymous (cannot receive any messages any more)
// achieved by removing it from target of pid.
val oldActor = target
//Debug.info("old actor: " + oldActor);
// change our target to point to a newly created actor with the same mailbox.
val newActor = new Actor {
override def run: Unit = clos(this)
}
newActor.sent = oldActor.sent
target = newActor
newActor.self = this
//Debug.info("new actor: " + newActor);
// clean mailbox of now anonymous actor (must not receive any messages any more; pending messages are for new actor)
oldActor.sent = new Queue[MailBox#Message]
//Debug.info("Starting new actor.");
newActor.start // important to start after changing pid because actor may send messages to itself.
}
private class ProxyPartialFunction(a: Actor, f: PartialFunction[MailBox#Message,unit]) extends PartialFunction[MailBox#Message, unit] {
def isDefinedAt(m: MailBox#Message): boolean = f.isDefinedAt(m)
def apply(m: MailBox#Message): unit = {
f(m)
a receive this
}
}
def becomeReceiveLoop(f: PartialFunction[MailBox#Message,unit]) = {
become(a => a receive new ProxyPartialFunction(a, f))
/*become(
a:Actor => {
def loop: unit = {
def proxyFun(m: MailBox#Message): unit = {
if (f.isDefinedAt(m)) {
f(m);
loop
}
};
//a receive proxyFun
}
loop
}
)*/
}
def spawn(body: Actor => Unit): Pid = {
val a = new Actor {
override def run: Unit = body(this)
}
a.start
a.self
}
def spawnReceive(cases: PartialFunction[MailBox#Message,Unit]) = {
val a = new Actor {
override def run: Unit = receive(cases)
}
a.start
a.self
}
override def toString() = "LocalPid(" + target + ")"
}

View File

@ -0,0 +1,155 @@
package scala.actors.single;
import scala.collection.mutable.Queue;
/**
* @author Philipp Haller
*/
class MailBox {
type Message = AnyRef
case class TIMEOUT() extends Message
/** Unconsumed messages. */
var sent = new Queue[Message]
var continuation: PartialFunction[Message,Unit] = null
// more complex continuation
var contCases: PartialFunction[Message,Message] = null
var contThen: Message => unit = null
def hasCont =
if ((continuation == null) && (contCases == null)) false
else true
def contDefinedAt(msg: Message) =
if (((continuation != null) && continuation.isDefinedAt(msg)) ||
((contCases != null) && contCases.isDefinedAt(msg)))
true
else
false
var isAlive = true
private var duration: long = 0
private var timeInitial: long = 0
private var timeoutEnabled: boolean = false
def send(msg: Message): unit = synchronized {
if (isAlive)
if (!hasCont) {
Debug.info("no cont avail/task already scheduled. appending msg to mailbox.")
sent += msg
}
else {
var message = msg
var timeoutOccurred = false
if (timeoutEnabled && (System.currentTimeMillis() - timeInitial > duration))
timeoutOccurred = true
if (timeoutOccurred && !contDefinedAt(TIMEOUT()))
die()
else {
if (timeoutOccurred) message = TIMEOUT()
if (contDefinedAt(message)) {
// we exit receive, so reset timeoutEnabled
timeoutEnabled = false
try {
if (continuation != null) {
val f = continuation
continuation = null
f(msg)
die()
}
else {
// use more complex receive-and-return continuation
val cases = contCases
val then = contThen
contCases = null
contThen = null
val result = cases(msg)
then(result)
die()
}
}
catch {
case d: Done =>
// do nothing (continuation is already saved)
}
}
else {
Debug.info("cont not defined at msg. appending to mailbox.")
if (!timeoutOccurred) sent += message
}
}
}
}
def receive(f: PartialFunction[Message,unit]): scala.All = {
continuation = null
sent.dequeueFirst(f.isDefinedAt) match {
case Some(msg) =>
f(msg)
die()
case None =>
continuation = f
Debug.info("No msg found. " + this + " has continuation " + continuation + ".")
}
throw new Done
}
def receiveWithin(msec: long)(f: PartialFunction[Message, unit]): scala.All = {
timeInitial = System.currentTimeMillis()
duration = msec
continuation = null
sent.dequeueFirst(f.isDefinedAt) match {
case Some(msg) =>
f(msg)
die()
case None =>
// if timeout == 0 then execute timeout action if specified (see Erlang book)
if (duration == 0) {
if (f.isDefinedAt(TIMEOUT()))
f(TIMEOUT())
die()
}
else {
timeoutEnabled = true
continuation = f
Debug.info("No msg found. " + this + " has continuation " + continuation + ".")
}
}
throw new Done
}
// original wish:
// receiveAndReturn[A, B](cases: PartialFunction[Message, A], then: A => B): B
// receiveAndReturn[A](cases: PartialFunction[Message, A], then: A => unit): unit
def receiveAndReturn(cases: PartialFunction[Message,Message], then: Message => unit): scala.All = {
contCases = null
contThen = null
sent.dequeueFirst(cases.isDefinedAt) match {
case Some(msg) => {
val result = cases(msg)
then(result)
die()
}
case None => {
contCases = cases
contThen = then
Debug.info("No msg found. Saved complex continuation.")
}
}
throw new Done
}
def die() = {
isAlive = false
Debug.info("" + this + " died.")
}
}

View File

@ -0,0 +1,10 @@
package scala.actors.single;
/**
* @author Philipp Haller
*/
abstract class Pid {
def !(msg: MailBox#Message): unit;
//def become(clos: Actor => Unit): unit
//def becomeReceiveLoop(f: PartialFunction[MailBox#Message,unit]): unit
}