Adding immutable parallel hashsets.

Fixing an issue with hashset splitters where the splitting does not work if some elements have already been iterated.
Added parallel collections exception handling.
Added parallel collections break control.
Renaming ParHashTrie -> ParHashMap.
The part with immutable.{HashSet, HashMap} - review by rompf

git-svn-id: http://lampsvn.epfl.ch/svn-repos/scala/scala/trunk@23200 5e8d7ff9-d8ef-0310-90f0-a4852d11357a
This commit is contained in:
prokopec 2010-10-05 16:22:21 +00:00
parent 4a5e28e666
commit 7cd12014a7
19 changed files with 949 additions and 234 deletions

View File

@ -1563,13 +1563,13 @@ BOOTRAPING TEST AND TEST SUITE
<include name="run/**/*.scala"/>
</runtests>
<jvmtests dir="${partest.dir}/${partest.srcdir}/jvm" includes="*.scala"/>
<scalachecktests dir="${partest.dir}/${partest.srcdir}/scalacheck">
<include name="*.scala"/>
</scalachecktests>
<residenttests dir="${partest.dir}/${partest.srcdir}/res" includes="*.res"/>
<buildmanagertests dir="${partest.dir}/${partest.srcdir}/buildmanager" includes="*"/>
<shootouttests dir="${partest.dir}/${partest.srcdir}/shootout" includes="*.scala"/>
<scalaptests dir="${partest.dir}/${partest.srcdir}/scalap" includes="**/*.scala"/>
<scalachecktests dir="${partest.dir}/${partest.srcdir}/scalacheck">
<include name="*.scala"/>
</scalachecktests>
<!-- <scripttests dir="${partest.dir}/${partest.srcdir}/script" includes="*.scala"/> -->
</partest>
</target>

View File

@ -0,0 +1,33 @@
package scala.collection.generic
import collection.mutable.Builder
import collection.parallel.Combiner
import collection.parallel.ParSet
import collection.parallel.ParSetLike
abstract class ParSetFactory[CC[X] <: ParSet[X] with ParSetLike[X, CC[X], _] with GenericParTemplate[X, CC]]
extends SetFactory[CC]
with GenericParCompanion[CC]
{
def newBuilder[A]: Combiner[A, CC[A]] = newCombiner[A]
def newCombiner[A]: Combiner[A, CC[A]]
class GenericCanCombineFrom[A] extends CanCombineFrom[CC[_], A, CC[A]] {
override def apply(from: Coll) = from.genericCombiner[A]
override def apply() = newCombiner[A]
}
}

View File

@ -91,10 +91,7 @@ trait Signalling {
/**
* This signalling implementation returns default values and ignores received signals.
*/
class DefaultSignalling extends Signalling {
def isAborted = false
def abort {}
class DefaultSignalling extends Signalling with VolatileAbort {
def indexFlag = -1
def setIndexFlag(f: Int) {}
def setIndexFlagIfGreater(f: Int) {}
@ -115,8 +112,8 @@ object IdleSignalling extends DefaultSignalling
*/
trait VolatileAbort extends Signalling {
@volatile private var abortflag = false
abstract override def isAborted = abortflag
abstract override def abort = abortflag = true
override def isAborted = abortflag
override def abort = abortflag = true
}

View File

@ -15,7 +15,7 @@ import generic._
import annotation.unchecked.uncheckedVariance
import parallel.immutable.ParHashTrie
import parallel.immutable.ParHashMap
/** This class implements immutable maps using a hash trie.
@ -36,7 +36,7 @@ import parallel.immutable.ParHashTrie
* @define willNotTerminateInf
*/
@serializable @SerialVersionUID(2L)
class HashMap[A, +B] extends Map[A,B] with MapLike[A, B, HashMap[A, B]] with Parallelizable[ParHashTrie[A, B]] {
class HashMap[A, +B] extends Map[A,B] with MapLike[A, B, HashMap[A, B]] with Parallelizable[ParHashMap[A, B]] {
override def size: Int = 0
@ -90,7 +90,7 @@ class HashMap[A, +B] extends Map[A,B] with MapLike[A, B, HashMap[A, B]] with Par
protected def merge0[B1 >: B](that: HashMap[A, B1], level: Int, merger: Merger[B1]): HashMap[A, B1] = that
def par = ParHashTrie.fromTrie(this)
def par = ParHashMap.fromTrie(this)
}
@ -307,78 +307,7 @@ object HashMap extends ImmutableMapFactory[HashMap] {
}
}
/*
override def iterator = { // TODO: optimize (use a stack to keep track of pos)
def iter(m: HashTrieMap[A,B], k: => Stream[(A,B)]): Stream[(A,B)] = {
def horiz(elems: Array[HashMap[A,B]], i: Int, k: => Stream[(A,B)]): Stream[(A,B)] = {
if (i < elems.length) {
elems(i) match {
case m: HashTrieMap[A,B] => iter(m, horiz(elems, i+1, k))
case m: HashMap1[A,B] => new Stream.Cons(m.ensurePair, horiz(elems, i+1, k))
}
} else k
}
horiz(m.elems, 0, k)
}
iter(this, Stream.empty).iterator
}
*/
override def iterator = new Iterator[(A,B)] {
private[this] var depth = 0
private[this] var arrayStack = new Array[Array[HashMap[A,B]]](6)
private[this] var posStack = new Array[Int](6)
private[this] var arrayD = elems
private[this] var posD = 0
private[this] var subIter: Iterator[(A,B)] = null // to traverse collision nodes
def hasNext = (subIter ne null) || depth >= 0
def next: (A,B) = {
if (subIter ne null) {
val el = subIter.next
if (!subIter.hasNext)
subIter = null
el
} else
next0(arrayD, posD)
}
@scala.annotation.tailrec private[this] def next0(elems: Array[HashMap[A,B]], i: Int): (A,B) = {
if (i == elems.length-1) { // reached end of level, pop stack
depth -= 1
if (depth >= 0) {
arrayD = arrayStack(depth)
posD = posStack(depth)
arrayStack(depth) = null
} else {
arrayD = null
posD = 0
}
} else
posD += 1
elems(i) match {
case m: HashTrieMap[A,B] => // push current pos onto stack and descend
if (depth >= 0) {
arrayStack(depth) = arrayD
posStack(depth) = posD
}
depth += 1
arrayD = m.elems
posD = 0
next0(m.elems, 0)
case m: HashMap1[A,B] => m.ensurePair
case m =>
subIter = m.iterator
subIter.next
}
}
}
override def iterator = new TrieIterator[A, B](elems)
/*
@ -534,6 +463,125 @@ time { mNew.iterator.foreach( p => ()) }
}
class TrieIterator[A, +B](elems: Array[HashMap[A, B]]) extends Iterator[(A, B)] {
private[this] var depth = 0
private[this] var arrayStack = new Array[Array[HashMap[A,B]]](6)
private[this] var posStack = new Array[Int](6)
private[this] var arrayD = elems
private[this] var posD = 0
private[this] var subIter: Iterator[(A, B)] = null // to traverse collision nodes
def hasNext = (subIter ne null) || depth >= 0
def next: (A,B) = {
if (subIter ne null) {
val el = subIter.next
if (!subIter.hasNext)
subIter = null
el
} else
next0(arrayD, posD)
}
@scala.annotation.tailrec private[this] def next0(elems: Array[HashMap[A,B]], i: Int): (A,B) = {
if (i == elems.length-1) { // reached end of level, pop stack
depth -= 1
if (depth >= 0) {
arrayD = arrayStack(depth)
posD = posStack(depth)
arrayStack(depth) = null
} else {
arrayD = null
posD = 0
}
} else
posD += 1
elems(i) match {
case m: HashTrieMap[A,B] => // push current pos onto stack and descend
if (depth >= 0) {
arrayStack(depth) = arrayD
posStack(depth) = posD
}
depth += 1
arrayD = m.elems
posD = 0
next0(m.elems, 0)
case m: HashMap1[A,B] => m.ensurePair
case m =>
subIter = m.iterator
subIter.next
}
}
// assumption: contains 2 or more elements
// splits this iterator into 2 iterators
// returns the 1st iterator, its number of elements, and the second iterator
def split: ((Iterator[(A, B)], Int), Iterator[(A, B)]) = {
// 0) simple case: no elements have been iterated - simply divide arrayD
if (arrayD != null && depth == 0 && posD == 0) {
val (fst, snd) = arrayD.splitAt(arrayD.length / 2)
val szfst = fst.foldLeft(0)(_ + _.size)
return ((new TrieIterator(fst), szfst), new TrieIterator(snd))
}
// otherwise, some elements have been iterated over
// 1) collision case: if we have a subIter, we return subIter and elements after it
if (subIter ne null) {
val buff = subIter.toBuffer
subIter = null
((buff.iterator, buff.length), this)
} else {
// otherwise find the topmost array stack element
if (depth > 0) {
// 2) topmost comes before (is not) arrayD
// steal a portion of top to create a new iterator
val topmost = arrayStack(0)
if (posStack(0) == arrayStack(0).length - 1) {
// 2a) only a single entry left on top
// this means we have to modify this iterator - pop topmost
val snd = Array(arrayStack(0).last)
val szsnd = snd(0).size
// modify this - pop
depth -= 1
arrayStack = arrayStack.tail ++ Array[Array[HashMap[A, B]]](null)
posStack = posStack.tail ++ Array[Int](0)
// we know that `this` is not empty, since it had something on the arrayStack and arrayStack elements are always non-empty
((new TrieIterator[A, B](snd), szsnd), this)
} else {
// 2b) more than a single entry left on top
val (fst, snd) = arrayStack(0).splitAt(arrayStack(0).length - (arrayStack(0).length - posStack(0) + 1) / 2)
arrayStack(0) = fst
val szsnd = snd.foldLeft(0)(_ + _.size)
((new TrieIterator[A, B](snd), szsnd), this)
}
} else {
// 3) no topmost element (arrayD is at the top)
// steal a portion of it and update this iterator
if (posD == arrayD.length - 1) {
// 3a) positioned at the last element of arrayD
val arr: Array[HashMap[A, B]] = arrayD(posD) match {
case c: HashMapCollision1[_, _] => c.asInstanceOf[HashMapCollision1[A, B]].kvs.toArray map { HashMap() + _ }
case ht: HashTrieMap[_, _] => ht.asInstanceOf[HashTrieMap[A, B]].elems
case _ => error("cannot divide single element")
}
val (fst, snd) = arr.splitAt(arr.length / 2)
val szsnd = snd.foldLeft(0)(_ + _.size)
((new TrieIterator(snd), szsnd), new TrieIterator(fst))
} else {
// 3b) arrayD has more free elements
val (fst, snd) = arrayD.splitAt(arrayD.length - (arrayD.length - posD + 1) / 2)
arrayD = fst
val szsnd = snd.foldLeft(0)(_ + _.size)
((new TrieIterator[A, B](snd), szsnd), this)
}
}
}
}
}
private def check[K](x: HashMap[K, _], y: HashMap[K, _], xy: HashMap[K, _]) = { // TODO remove this debugging helper
var xs = Set[K]()
for (elem <- x) xs += elem._1

View File

@ -14,6 +14,8 @@ package immutable
import generic._
import annotation.unchecked.uncheckedVariance
import collection.parallel.immutable.ParHashSet
/** This class implements immutable sets using a hash trie.
*
* '''Note:''' the builder of a hash set returns specialized representations `EmptySet`,`Set1`,..., `Set4`
@ -31,7 +33,9 @@ import annotation.unchecked.uncheckedVariance
@serializable @SerialVersionUID(2L)
class HashSet[A] extends Set[A]
with GenericSetTemplate[A, HashSet]
with SetLike[A, HashSet[A]] {
with SetLike[A, HashSet[A]]
with Parallelizable[ParHashSet[A]]
{
override def companion: GenericCompanion[HashSet] = HashSet
//class HashSet[A] extends Set[A] with SetLike[A, HashSet[A]] {
@ -54,7 +58,9 @@ class HashSet[A] extends Set[A]
def - (e: A): HashSet[A] =
removed0(e, computeHash(e), 0)
def par = ParHashSet.fromTrie(this)
protected def elemHashCode(key: A) = if (key == null) 0 else key.##
protected final def improve(hcode: Int) = {
@ -68,7 +74,7 @@ class HashSet[A] extends Set[A]
protected def get0(key: A, hash: Int, level: Int): Boolean = false
protected def updated0(key: A, hash: Int, level: Int): HashSet[A] =
def updated0(key: A, hash: Int, level: Int): HashSet[A] =
new HashSet.HashSet1(key, hash)
protected def removed0(key: A, hash: Int, level: Int): HashSet[A] = this
@ -169,11 +175,11 @@ object HashSet extends ImmutableSetFactory[HashSet] {
}
class HashTrieSet[A](private var bitmap: Int, private var elems: Array[HashSet[A]],
class HashTrieSet[A](private var bitmap: Int, private[HashSet] var elems: Array[HashSet[A]],
private var size0: Int) extends HashSet[A] {
override def size = size0
override def get0(key: A, hash: Int, level: Int): Boolean = {
val index = (hash >>> level) & 0x1f
val mask = (1 << index)
@ -186,7 +192,7 @@ object HashSet extends ImmutableSetFactory[HashSet] {
} else
false
}
override def updated0(key: A, hash: Int, level: Int): HashSet[A] = {
val index = (hash >>> level) & 0x1f
val mask = (1 << index)
@ -208,7 +214,7 @@ object HashSet extends ImmutableSetFactory[HashSet] {
new HashTrieSet(bitmapNew, elemsNew, size + 1)
}
}
override def removed0(key: A, hash: Int, level: Int): HashSet[A] = {
val index = (hash >>> level) & 0x1f
val mask = (1 << index)
@ -238,62 +244,9 @@ object HashSet extends ImmutableSetFactory[HashSet] {
this
}
}
override def iterator = new Iterator[A] {
private[this] var depth = 0
private[this] var arrayStack = new Array[Array[HashSet[A]]](6)
private[this] var posStack = new Array[Int](6)
private[this] var arrayD = elems
private[this] var posD = 0
private[this] var subIter: Iterator[A] = null // to traverse collision nodes
def hasNext = (subIter ne null) || depth >= 0
def next: A = {
if (subIter ne null) {
val el = subIter.next
if (!subIter.hasNext)
subIter = null
el
} else
next0(arrayD, posD)
}
@scala.annotation.tailrec private[this] def next0(elems: Array[HashSet[A]], i: Int): A = {
if (i == elems.length-1) { // reached end of level, pop stack
depth -= 1
if (depth >= 0) {
arrayD = arrayStack(depth)
posD = posStack(depth)
arrayStack(depth) = null
} else {
arrayD = null
posD = 0
}
} else
posD += 1
elems(i) match {
case m: HashTrieSet[A] => // push current pos onto stack and descend
if (depth >= 0) {
arrayStack(depth) = arrayD
posStack(depth) = posD
}
depth += 1
arrayD = m.elems
posD = 0
next0(m.elems, 0)
case m: HashSet1[A] => m.key
case m =>
subIter = m.iterator
subIter.next
}
}
}
override def iterator = new TrieIterator[A](elems)
/*
import collection.immutable._
@ -314,8 +267,7 @@ time { mNew.iterator.foreach( p => ()) }
time { mNew.iterator.foreach( p => ()) }
*/
override def foreach[U](f: A => U): Unit = {
var i = 0;
while (i < elems.length) {
@ -325,6 +277,126 @@ time { mNew.iterator.foreach( p => ()) }
}
}
class TrieIterator[A](elems: Array[HashSet[A]]) extends Iterator[A] {
private[this] var depth = 0
private[this] var arrayStack = new Array[Array[HashSet[A]]](6)
private[this] var posStack = new Array[Int](6)
private[this] var arrayD = elems
private[this] var posD = 0
private[this] var subIter: Iterator[A] = null // to traverse collision nodes
def hasNext = (subIter ne null) || depth >= 0
def next: A = {
if (subIter ne null) {
val el = subIter.next
if (!subIter.hasNext)
subIter = null
el
} else
next0(arrayD, posD)
}
@scala.annotation.tailrec private[this] def next0(elems: Array[HashSet[A]], i: Int): A = {
if (i == elems.length-1) { // reached end of level, pop stack
depth -= 1
if (depth >= 0) {
arrayD = arrayStack(depth)
posD = posStack(depth)
arrayStack(depth) = null
} else {
arrayD = null
posD = 0
}
} else
posD += 1
elems(i) match {
case m: HashTrieSet[A] => // push current pos onto stack and descend
if (depth >= 0) {
arrayStack(depth) = arrayD
posStack(depth) = posD
}
depth += 1
arrayD = m.elems
posD = 0
next0(m.elems, 0)
case m: HashSet1[A] => m.key
case m =>
subIter = m.iterator
subIter.next
}
}
// assumption: contains 2 or more elements
// splits this iterator into 2 iterators
// returns the 1st iterator, its number of elements, and the second iterator
def split: ((Iterator[A], Int), Iterator[A]) = {
// 0) simple case: no elements have been iterated - simply divide arrayD
if (arrayD != null && depth == 0 && posD == 0) {
val (fst, snd) = arrayD.splitAt(arrayD.length / 2)
val szfst = fst.foldLeft(0)(_ + _.size)
return ((new TrieIterator(fst), szfst), new TrieIterator(snd))
}
// otherwise, some elements have been iterated over
// 1) collision case: if we have a subIter, we return subIter and elements after it
if (subIter ne null) {
val buff = subIter.toBuffer
subIter = null
((buff.iterator, buff.length), this)
} else {
// otherwise find the topmost array stack element
if (depth > 0) {
// 2) topmost comes before (is not) arrayD
// steal a portion of top to create a new iterator
val topmost = arrayStack(0)
if (posStack(0) == arrayStack(0).length - 1) {
// 2a) only a single entry left on top
// this means we have to modify this iterator - pop topmost
val snd = Array(arrayStack(0).last)
val szsnd = snd(0).size
// modify this - pop
depth -= 1
arrayStack = arrayStack.tail ++ Array[Array[HashSet[A]]](null)
posStack = posStack.tail ++ Array[Int](0)
// we know that `this` is not empty, since it had something on the arrayStack and arrayStack elements are always non-empty
((new TrieIterator[A](snd), szsnd), this)
} else {
// 2b) more than a single entry left on top
val (fst, snd) = arrayStack(0).splitAt(arrayStack(0).length - (arrayStack(0).length - posStack(0) + 1) / 2)
arrayStack(0) = fst
val szsnd = snd.foldLeft(0)(_ + _.size)
((new TrieIterator[A](snd), szsnd), this)
}
} else {
// 3) no topmost element (arrayD is at the top)
// steal a portion of it and update this iterator
if (posD == arrayD.length - 1) {
// 3a) positioned at the last element of arrayD
val arr: Array[HashSet[A]] = arrayD(posD) match {
case c: HashSetCollision1[_] => c.asInstanceOf[HashSetCollision1[A]].ks.toList map { HashSet() + _ } toArray
case ht: HashTrieSet[_] => ht.asInstanceOf[HashTrieSet[A]].elems
case _ => error("cannot divide single element")
}
val (fst, snd) = arr.splitAt(arr.length / 2)
val szsnd = snd.foldLeft(0)(_ + _.size)
((new TrieIterator(snd), szsnd), new TrieIterator(fst))
} else {
// 3b) arrayD has more free elements
val (fst, snd) = arrayD.splitAt(arrayD.length - (arrayD.length - posD + 1) / 2)
arrayD = fst
val szsnd = snd.foldLeft(0)(_ + _.size)
((new TrieIterator[A](snd), szsnd), this)
}
}
}
}
}
@serializable @SerialVersionUID(2L) private class SerializationProxy[A,B](@transient private var orig: HashSet[A]) {
private def writeObject(out: java.io.ObjectOutputStream) {
val s = orig.size

View File

@ -20,12 +20,13 @@ package mutable
* Note: The concurrent maps do not accept `null` for keys or values.
*
* @define atomicop
* This is done atomically.
* This is an atomic operation.
*/
trait ConcurrentMap[A, B] extends Map[A, B] {
/**
* Associates the given key with a given value, unless the key was already associated with some other value.
*
* $atomicop
*
* @param k key with which the specified value is to be associated with
@ -37,6 +38,7 @@ trait ConcurrentMap[A, B] extends Map[A, B] {
/**
* Removes the entry for the specified key if its currently mapped to the specified value.
*
* $atomicop
*
* @param k key for which the entry should be removed
@ -47,6 +49,7 @@ trait ConcurrentMap[A, B] extends Map[A, B] {
/**
* Replaces the entry for the given key only if it was previously mapped to a given value.
*
* $atomicop
*
* @param k key for which the entry should be replaced
@ -58,6 +61,7 @@ trait ConcurrentMap[A, B] extends Map[A, B] {
/**
* Replaces the entry for the given key only if it was previously mapped to some value.
*
* $atomicop
*
* @param k key for which the entry should be replaced

View File

@ -26,8 +26,7 @@ trait ParIterable[+T] extends Iterable[T]
/** $factoryinfo
*/
object ParIterable extends ParFactory[ParIterable] {
implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParIterable[T]] =
new GenericCanCombineFrom[T]
implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParIterable[T]] = new GenericCanCombineFrom[T]
def newBuilder[T]: Combiner[T, ParIterable[T]] = ParArrayCombiner[T]

View File

@ -427,7 +427,10 @@ self =>
executeAndWaitResult(new Find(pred, parallelIterator assign new DefaultSignalling with VolatileAbort))
}
protected[this] def cbfactory = () => newCombiner
protected[this] def cbfactory ={
println(newCombiner + ", " + newCombiner.getClass)
() => newCombiner
}
override def filter(pred: T => Boolean): Repr = {
executeAndWaitResult(new Filter(pred, cbfactory, parallelIterator) mapResult { _.result })
@ -655,6 +658,7 @@ self =>
protected[this] def newSubtask(p: ParIterableIterator[T]): Accessor[R, Tp]
def shouldSplitFurther = pit.remaining > threshold(size, parallelismLevel)
def split = pit.split.map(newSubtask(_)) // default split procedure
private[parallel] override def signalAbort = pit.abort
override def toString = "Accessor(" + pit.toString + ")"
}
@ -672,6 +676,10 @@ self =>
val st: Second
def combineResults(fr: FR, sr: SR): R
var result: R = null.asInstanceOf[R]
private[parallel] override def signalAbort {
ft.signalAbort
st.signalAbort
}
}
/** Sequentially performs one task after another. */
@ -703,6 +711,9 @@ self =>
inner.compute
result = map(inner.result)
}
private[parallel] override def signalAbort {
inner.signalAbort
}
}
protected trait Transformer[R, Tp] extends Accessor[R, Tp]
@ -1187,6 +1198,7 @@ self =>
new ScanWithScanTree(Some(st.left.value), op, st.right, src, dest)
)
def shouldSplitFurther = (st.left ne null) && (st.right ne null)
}
protected[this] class FromArray[S, A, That](array: Array[A], from: Int, len: Int, cbf: CanCombineFrom[Repr, S, That])

View File

@ -26,7 +26,7 @@ self =>
def mapCompanion: GenericParMapCompanion[ParMap] = ParMap
override def empty: ParMap[K, V] = new immutable.ParHashTrie[K, V]
override def empty: ParMap[K, V] = new immutable.ParHashMap[K, V]
override def stringPrefix = "ParMap"
}
@ -34,9 +34,9 @@ self =>
object ParMap extends ParMapFactory[ParMap] {
def empty[K, V]: ParMap[K, V] = new immutable.ParHashTrie[K, V]
def empty[K, V]: ParMap[K, V] = new immutable.ParHashMap[K, V]
def newCombiner[K, V]: Combiner[(K, V), ParMap[K, V]] = immutable.HashTrieCombiner[K, V]
def newCombiner[K, V]: Combiner[(K, V), ParMap[K, V]] = immutable.HashMapCombiner[K, V]
implicit def canBuildFrom[K, V]: CanCombineFrom[Coll, (K, V), ParMap[K, V]] = new CanCombineFromMap[K, V]

View File

@ -0,0 +1,83 @@
package scala.collection.parallel
import scala.collection.Map
import scala.collection.mutable.Builder
import scala.collection.generic._
trait ParSet[T]
extends Set[T]
with GenericParTemplate[T, ParSet]
with ParIterable[T]
with ParSetLike[T, ParSet[T], Set[T]]
{
self =>
override def empty: ParSet[T] = immutable.ParHashSet[T]()
override def companion: GenericCompanion[ParSet] with GenericParCompanion[ParSet] = ParSet
override def stringPrefix = "ParSet"
}
object ParSet extends ParSetFactory[ParSet] {
def newCombiner[T]: Combiner[T, ParSet[T]] = immutable.HashSetCombiner[T]
implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParSet[T]] = new GenericCanCombineFrom[T]
}

View File

@ -0,0 +1,81 @@
package scala.collection.parallel
import scala.collection.SetLike
import scala.collection.Set
import scala.collection.mutable.Builder
trait ParSetLike[T,
+Repr <: ParSetLike[T, Repr, Sequential] with ParSet[T],
+Sequential <: Set[T] with SetLike[T, Sequential]]
extends SetLike[T, Repr]
with ParIterableLike[T, Repr, Sequential]
{ self =>
protected[this] override def newBuilder: Builder[T, Repr] = newCombiner
protected[this] override def newCombiner: Combiner[T, Repr]
override def empty: Repr
}

View File

@ -366,12 +366,13 @@ self =>
def next = { remaining -= 1; self.next }
def split: Seq[ParIterableIterator[T]] = takeSeq(self.split) { (p, n) => p.take(n) }
protected[this] def takeSeq[PI <: ParIterableIterator[T]](sq: Seq[PI])(taker: (PI, Int) => PI) = {
val shortened = for ((it, total) <- sq zip sq.scanLeft(0)(_ + _.remaining).tail) yield
if (total < remaining) it else taker(it, total - remaining)
val sizes = sq.scanLeft(0)(_ + _.remaining)
val shortened = for ((it, (from, until)) <- sq zip (sizes.init zip sizes.tail)) yield
if (until < remaining) it else taker(it, remaining - from)
shortened filter { _.remaining > 0 }
}
}
override def take(n: Int) = new Taken(n)
override def slice(from1: Int, until1: Int) = {

View File

@ -34,6 +34,15 @@ trait Splitter[+T] extends Iterator[T] {
}
object Splitter {
def empty[T]: Splitter[T] = new Splitter[T] {
def hasNext = false
def next = Iterator.empty.next
def split = Seq(this)
}
}
/** A precise splitter (or a precise split iterator) can be split into arbitrary number of splitters
* that traverse disjoint subsets of arbitrary sizes.
*

View File

@ -4,9 +4,7 @@ package scala.collection.parallel
import scala.concurrent.forkjoin._
import scala.util.control.Breaks._
@ -59,6 +57,31 @@ trait Tasks {
protected[this] def split: Seq[Task[R, Tp]]
/** Read of results of `that` task and merge them into results of this one. */
protected[this] def merge(that: Tp) {}
// exception handling mechanism
var exception: Exception = null
def forwardException = if (exception != null) throw exception
// tries to do the leaf computation, storing the possible exception
protected def tryLeaf(result: Option[R]) {
try {
tryBreakable {
leaf(result)
} catchBreak {
signalAbort
}
} catch {
case e: Exception =>
exception = e
signalAbort
}
}
protected[this] def tryMerge(t: Tp) {
val that = t.asInstanceOf[Task[R, Tp]]
if (this.exception == null && that.exception == null) merge(that.repr)
else if (that.exception != null) this.exception = that.exception
}
// override in concrete task implementations to signal abort to other tasks
private[parallel] def signalAbort {}
}
type TaskType[R, +Tp] <: Task[R, Tp]
@ -66,13 +89,13 @@ trait Tasks {
var environment: ExecutionEnvironment
/** Executes a task and returns a future. */
/** Executes a task and returns a future. Forwards an exception if some task threw it. */
def execute[R, Tp](fjtask: TaskType[R, Tp]): () => R
/** Executes a task and waits for it to finish. */
/** Executes a task and waits for it to finish. Forwards an exception if some task threw it. */
def executeAndWait[R, Tp](task: TaskType[R, Tp])
/** Executes a result task, waits for it to finish, then returns its result. */
/** Executes a result task, waits for it to finish, then returns its result. Forwards an exception if some task threw it. */
def executeAndWaitResult[R, Tp](task: TaskType[R, Tp]): R
/** Retrieves the parallelism level of the task execution environment. */
@ -96,19 +119,19 @@ trait AdaptiveWorkStealingTasks extends Tasks {
/** The actual leaf computation. */
def leaf(result: Option[R]): Unit
def compute = if (shouldSplitFurther) internal else leaf(None)
def compute = if (shouldSplitFurther) internal else tryLeaf(None)
def internal = {
var last = spawnSubtasks
last.leaf(None)
last.tryLeaf(None)
result = last.result
while (last.next != null) {
val lastresult = Option(last.result)
last = last.next
if (last.tryCancel) last.leaf(lastresult) else last.sync
merge(last.repr)
if (last.tryCancel) last.tryLeaf(lastresult) else last.sync
tryMerge(last.repr)
}
}
@ -150,7 +173,6 @@ trait HavingForkJoinPool {
}
/** An implementation trait for parallel tasks based on the fork/join framework.
*
* @define fjdispatch
@ -187,6 +209,7 @@ trait ForkJoinTasks extends Tasks with HavingForkJoinPool {
() => {
fjtask.join
fjtask.forwardException
fjtask.result
}
}
@ -202,6 +225,7 @@ trait ForkJoinTasks extends Tasks with HavingForkJoinPool {
forkJoinPool.execute(fjtask)
}
fjtask.join
fjtask.forwardException
}
/** Executes a task on a fork/join pool and waits for it to finish.
@ -218,6 +242,7 @@ trait ForkJoinTasks extends Tasks with HavingForkJoinPool {
forkJoinPool.execute(fjtask)
}
fjtask.join
fjtask.forwardException
fjtask.result
}
@ -225,8 +250,9 @@ trait ForkJoinTasks extends Tasks with HavingForkJoinPool {
}
object ForkJoinTasks {
val defaultForkJoinPool = new ForkJoinPool
val defaultForkJoinPool: ForkJoinPool = new ForkJoinPool
defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors)
defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors)
}

View File

@ -9,6 +9,7 @@ package scala.collection.parallel.immutable
import scala.collection.parallel.ParMap
import scala.collection.parallel.ParMapLike
import scala.collection.parallel.Combiner
import scala.collection.parallel.ParIterableIterator
import scala.collection.parallel.EnvironmentPassingCombiner
import scala.collection.generic.ParMapFactory
import scala.collection.generic.CanCombineFrom
@ -25,26 +26,26 @@ import scala.collection.immutable.HashMap
*
* @author prokopec
*/
class ParHashTrie[K, +V] private[immutable] (private[this] val trie: HashMap[K, V])
class ParHashMap[K, +V] private[immutable] (private[this] val trie: HashMap[K, V])
extends ParMap[K, V]
with GenericParMapTemplate[K, V, ParHashTrie]
with ParMapLike[K, V, ParHashTrie[K, V], HashMap[K, V]]
with GenericParMapTemplate[K, V, ParHashMap]
with ParMapLike[K, V, ParHashMap[K, V], HashMap[K, V]]
{
self =>
def this() = this(HashMap.empty[K, V])
override def mapCompanion: GenericParMapCompanion[ParHashTrie] = ParHashTrie
override def mapCompanion: GenericParMapCompanion[ParHashMap] = ParHashMap
override def empty: ParHashTrie[K, V] = new ParHashTrie[K, V]
override def empty: ParHashMap[K, V] = new ParHashMap[K, V]
def parallelIterator = new ParHashTrieIterator(trie) with SCPI
def parallelIterator: ParIterableIterator[(K, V)] = new ParHashMapIterator(trie.iterator, trie.size) with SCPI
def seq = trie
def -(k: K) = new ParHashTrie(trie - k)
def -(k: K) = new ParHashMap(trie - k)
def +[U >: V](kv: (K, U)) = new ParHashTrie(trie + kv)
def +[U >: V](kv: (K, U)) = new ParHashMap(trie + kv)
def get(k: K) = trie.get(k)
@ -55,59 +56,66 @@ self =>
case None => newc
}
type SCPI = SignalContextPassingIterator[ParHashTrieIterator]
type SCPI = SignalContextPassingIterator[ParHashMapIterator]
class ParHashTrieIterator(val ht: HashMap[K, V])
class ParHashMapIterator(val triter: Iterator[(K, V)], val sz: Int)
extends super.ParIterator {
self: SignalContextPassingIterator[ParHashTrieIterator] =>
// println("created iterator " + ht)
self: SignalContextPassingIterator[ParHashMapIterator] =>
var i = 0
lazy val triter = ht.iterator
def split: Seq[ParIterator] = {
// println("splitting " + ht + " into " + ht.split.map(new ParHashTrieIterator(_) with SCPI).map(_.toList))
ht.split.map(new ParHashTrieIterator(_) with SCPI)
def split: Seq[ParIterator] = if (remaining < 2) Seq(this) else triter match {
case t: HashMap.TrieIterator[_, _] =>
val previousRemaining = remaining
val ((fst, fstlength), snd) = t.asInstanceOf[HashMap.TrieIterator[K, V]].split
val sndlength = previousRemaining - fstlength
Seq(
new ParHashMapIterator(fst, fstlength) with SCPI,
new ParHashMapIterator(snd, sndlength) with SCPI
)
case _ =>
// iterator of the collision map case
val buff = triter.toBuffer
val (fp, sp) = buff.splitAt(buff.length / 2)
Seq(fp, sp) map { b => new ParHashMapIterator(b.iterator, b.length) with SCPI }
}
def next: (K, V) = {
// println("taking next after " + i + ", in " + ht)
i += 1
triter.next
}
def hasNext: Boolean = {
// println("hasNext: " + i + ", " + ht.size + ", " + ht)
i < ht.size
i < sz
}
def remaining = ht.size - i
def remaining = sz - i
}
}
object ParHashTrie extends ParMapFactory[ParHashTrie] {
def empty[K, V]: ParHashTrie[K, V] = new ParHashTrie[K, V]
object ParHashMap extends ParMapFactory[ParHashMap] {
def empty[K, V]: ParHashMap[K, V] = new ParHashMap[K, V]
def newCombiner[K, V]: Combiner[(K, V), ParHashTrie[K, V]] = HashTrieCombiner[K, V]
def newCombiner[K, V]: Combiner[(K, V), ParHashMap[K, V]] = HashMapCombiner[K, V]
implicit def canBuildFrom[K, V]: CanCombineFrom[Coll, (K, V), ParHashTrie[K, V]] = {
implicit def canBuildFrom[K, V]: CanCombineFrom[Coll, (K, V), ParHashMap[K, V]] = {
new CanCombineFromMap[K, V]
}
def fromTrie[K, V](t: HashMap[K, V]) = new ParHashTrie(t)
def fromTrie[K, V](t: HashMap[K, V]) = new ParHashMap(t)
var totalcombines = new java.util.concurrent.atomic.AtomicInteger(0)
}
trait HashTrieCombiner[K, V]
extends Combiner[(K, V), ParHashTrie[K, V]] {
self: EnvironmentPassingCombiner[(K, V), ParHashTrie[K, V]] =>
import HashTrieCombiner._
var heads = new Array[Unrolled[K, V]](rootsize)
var lasts = new Array[Unrolled[K, V]](rootsize)
private[immutable] trait HashMapCombiner[K, V]
extends Combiner[(K, V), ParHashMap[K, V]] {
self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] =>
import HashMapCombiner._
var heads = new Array[Unrolled[(K, V)]](rootsize)
var lasts = new Array[Unrolled[(K, V)]](rootsize)
var size: Int = 0
def clear = {
heads = new Array[Unrolled[K, V]](rootsize)
lasts = new Array[Unrolled[K, V]](rootsize)
heads = new Array[Unrolled[(K, V)]](rootsize)
lasts = new Array[Unrolled[(K, V)]](rootsize)
}
def +=(elem: (K, V)) = {
@ -116,7 +124,7 @@ self: EnvironmentPassingCombiner[(K, V), ParHashTrie[K, V]] =>
val pos = hc & 0x1f
if (lasts(pos) eq null) {
// initialize bucket
heads(pos) = new Unrolled[K, V]
heads(pos) = new Unrolled[(K, V)]
lasts(pos) = heads(pos)
}
// add to bucket
@ -124,10 +132,10 @@ self: EnvironmentPassingCombiner[(K, V), ParHashTrie[K, V]] =>
this
}
def combine[N <: (K, V), NewTo >: ParHashTrie[K, V]](other: Combiner[N, NewTo]): Combiner[N, NewTo] = if (this ne other) {
// ParHashTrie.totalcombines.incrementAndGet
if (other.isInstanceOf[HashTrieCombiner[_, _]]) {
val that = other.asInstanceOf[HashTrieCombiner[K, V]]
def combine[N <: (K, V), NewTo >: ParHashMap[K, V]](other: Combiner[N, NewTo]): Combiner[N, NewTo] = if (this ne other) {
// ParHashMap.totalcombines.incrementAndGet
if (other.isInstanceOf[HashMapCombiner[_, _]]) {
val that = other.asInstanceOf[HashMapCombiner[K, V]]
var i = 0
while (i < rootsize) {
if (lasts(i) eq null) {
@ -158,17 +166,17 @@ self: EnvironmentPassingCombiner[(K, V), ParHashTrie[K, V]] =>
}
val sz = root.foldLeft(0)(_ + _.size)
if (sz == 0) new ParHashTrie[K, V]
else if (sz == 1) new ParHashTrie[K, V](root(0))
if (sz == 0) new ParHashMap[K, V]
else if (sz == 1) new ParHashMap[K, V](root(0))
else {
val trie = new HashMap.HashTrieMap(bitmap, root, sz)
new ParHashTrie[K, V](trie)
new ParHashMap[K, V](trie)
}
}
/* tasks */
class CreateTrie(buckets: Array[Unrolled[K, V]], root: Array[HashMap[K, V]], offset: Int, howmany: Int) extends super.Task[Unit, CreateTrie] {
class CreateTrie(buckets: Array[Unrolled[(K, V)]], root: Array[HashMap[K, V]], offset: Int, howmany: Int) extends super.Task[Unit, CreateTrie] {
var result = ()
def leaf(prev: Option[Unit]) = {
var i = offset
@ -178,7 +186,7 @@ self: EnvironmentPassingCombiner[(K, V), ParHashTrie[K, V]] =>
i += 1
}
}
private def createTrie(elems: Unrolled[K, V]): HashMap[K, V] = {
private def createTrie(elems: Unrolled[(K, V)]): HashMap[K, V] = {
var trie = new HashMap[K, V]
var unrolled = elems
@ -208,28 +216,11 @@ self: EnvironmentPassingCombiner[(K, V), ParHashTrie[K, V]] =>
}
object HashTrieCombiner {
def apply[K, V] = new HashTrieCombiner[K, V] with EnvironmentPassingCombiner[(K, V), ParHashTrie[K, V]] {}
object HashMapCombiner {
def apply[K, V] = new HashMapCombiner[K, V] with EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] {}
private[immutable] val rootbits = 5
private[immutable] val rootsize = 1 << 5
private[immutable] val unrolledsize = 16
private[immutable] class Unrolled[K, V] {
var size = 0
var array = new Array[(K, V)](unrolledsize)
var next: Unrolled[K, V] = null
// adds and returns itself or the new unrolled if full
def add(elem: (K, V)): Unrolled[K, V] = if (size < unrolledsize) {
array(size) = elem
size += 1
this
} else {
next = new Unrolled[K, V]
next.add(elem)
}
override def toString = "Unrolled(" + array.mkString(", ") + ")"
}
}
@ -246,3 +237,5 @@ object HashTrieCombiner {

View File

@ -0,0 +1,279 @@
package scala.collection.parallel.immutable
import scala.collection.parallel.ParSet
import scala.collection.parallel.ParSetLike
import scala.collection.parallel.Combiner
import scala.collection.parallel.ParIterableIterator
import scala.collection.parallel.EnvironmentPassingCombiner
import scala.collection.generic.ParSetFactory
import scala.collection.generic.CanCombineFrom
import scala.collection.generic.GenericParTemplate
import scala.collection.generic.GenericParCompanion
import scala.collection.generic.GenericCompanion
import scala.collection.immutable.HashSet
/** Parallel hash trie set.
*
* @author prokopec
*/
class ParHashSet[T] private[immutable] (private[this] val trie: HashSet[T])
extends ParSet[T]
with GenericParTemplate[T, ParHashSet]
with ParSetLike[T, ParHashSet[T], HashSet[T]]
{
self =>
def this() = this(HashSet.empty[T])
override def companion: GenericCompanion[ParHashSet] with GenericParCompanion[ParHashSet] = ParHashSet
override def empty: ParHashSet[T] = new ParHashSet[T]
def parallelIterator: ParIterableIterator[T] = new ParHashSetIterator(trie.iterator, trie.size) with SCPI
def seq = trie
def -(e: T) = new ParHashSet(trie - e)
def +(e: T) = new ParHashSet(trie + e)
def contains(e: T): Boolean = trie.contains(e)
override def size = trie.size
protected override def reuse[S, That](oldc: Option[Combiner[S, That]], newc: Combiner[S, That]) = oldc match {
case Some(old) => old
case None => newc
}
type SCPI = SignalContextPassingIterator[ParHashSetIterator]
class ParHashSetIterator(val triter: Iterator[T], val sz: Int)
extends super.ParIterator {
self: SignalContextPassingIterator[ParHashSetIterator] =>
var i = 0
def split: Seq[ParIterator] = if (remaining < 2) Seq(this) else triter match {
case t: HashSet.TrieIterator[_] =>
val previousRemaining = remaining
val ((fst, fstlength), snd) = t.asInstanceOf[HashSet.TrieIterator[T]].split
val sndlength = previousRemaining - fstlength
Seq(
new ParHashSetIterator(fst, fstlength) with SCPI,
new ParHashSetIterator(snd, sndlength) with SCPI
)
case _ =>
// iterator of the collision map case
val buff = triter.toBuffer
val (fp, sp) = buff.splitAt(buff.length / 2)
Seq(fp, sp) map { b => new ParHashSetIterator(b.iterator, b.length) with SCPI }
}
def next: T = {
i += 1
triter.next
}
def hasNext: Boolean = {
i < sz
}
def remaining = sz - i
}
}
object ParHashSet extends ParSetFactory[ParHashSet] {
def newCombiner[T]: Combiner[T, ParHashSet[T]] = HashSetCombiner[T]
implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParHashSet[T]] =
new GenericCanCombineFrom[T]
def fromTrie[T](t: HashSet[T]) = new ParHashSet(t)
}
private[immutable] trait HashSetCombiner[T]
extends Combiner[T, ParHashSet[T]] {
self: EnvironmentPassingCombiner[T, ParHashSet[T]] =>
import HashSetCombiner._
var heads = new Array[Unrolled[Any]](rootsize)
var lasts = new Array[Unrolled[Any]](rootsize)
var size: Int = 0
def clear = {
heads = new Array[Unrolled[Any]](rootsize)
lasts = new Array[Unrolled[Any]](rootsize)
}
def +=(elem: T) = {
size += 1
val hc = elem.##
val pos = hc & 0x1f
if (lasts(pos) eq null) {
// initialize bucket
heads(pos) = new Unrolled[Any]
lasts(pos) = heads(pos)
}
// add to bucket
lasts(pos) = lasts(pos).add(elem)
this
}
def combine[N <: T, NewTo >: ParHashSet[T]](other: Combiner[N, NewTo]): Combiner[N, NewTo] = if (this ne other) {
if (other.isInstanceOf[HashSetCombiner[_]]) {
val that = other.asInstanceOf[HashSetCombiner[T]]
var i = 0
while (i < rootsize) {
if (lasts(i) eq null) {
heads(i) = that.heads(i)
lasts(i) = that.lasts(i)
} else {
lasts(i).next = that.heads(i)
if (that.lasts(i) ne null) lasts(i) = that.lasts(i)
}
i += 1
}
size = size + that.size
this
} else error("Unexpected combiner type.")
} else this
def result = {
val buckets = heads.filter(_ != null)
val root = new Array[HashSet[T]](buckets.length)
executeAndWait(new CreateTrie(buckets, root, 0, buckets.length))
var bitmap = 0
var i = 0
while (i < rootsize) {
if (heads(i) ne null) bitmap |= 1 << i
i += 1
}
val sz = root.foldLeft(0)(_ + _.size)
if (sz == 0) new ParHashSet[T]
else if (sz == 1) new ParHashSet[T](root(0))
else {
val trie = new HashSet.HashTrieSet(bitmap, root, sz)
new ParHashSet[T](trie)
}
}
/* tasks */
class CreateTrie(buckets: Array[Unrolled[Any]], root: Array[HashSet[T]], offset: Int, howmany: Int) extends super.Task[Unit, CreateTrie] {
var result = ()
def leaf(prev: Option[Unit]) = {
var i = offset
val until = offset + howmany
while (i < until) {
root(i) = createTrie(buckets(i))
i += 1
}
}
private def createTrie(elems: Unrolled[Any]): HashSet[T] = {
var trie = new HashSet[T]
var unrolled = elems
var i = 0
while (unrolled ne null) {
val chunkarr = unrolled.array
val chunksz = unrolled.size
while (i < chunksz) {
val v = chunkarr(i).asInstanceOf[T]
val hc = v.##
trie = trie.updated0(v, hc, rootbits)
i += 1
}
i = 0
unrolled = unrolled.next
}
trie
}
def split = {
val fp = howmany / 2
List(new CreateTrie(buckets, root, offset, fp), new CreateTrie(buckets, root, offset + fp, howmany - fp))
}
def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(root.length, parallelismLevel)
}
}
object HashSetCombiner {
def apply[T] = new HashSetCombiner[T] with EnvironmentPassingCombiner[T, ParHashSet[T]] {}
private[immutable] val rootbits = 5
private[immutable] val rootsize = 1 << 5
}

View File

@ -12,8 +12,29 @@ package scala.collection.parallel
package object immutable {
/* package level methods */
def repetition[T](elem: T, len: Int) = new Repetition(elem, len)
/* properties */
private[immutable] val unrolledsize = 16
/* classes */
private[immutable] class Unrolled[T: ClassManifest] {
var size = 0
var array = new Array[T](unrolledsize)
var next: Unrolled[T] = null
// adds and returns itself or the new unrolled if full
def add(elem: T): Unrolled[T] = if (size < unrolledsize) {
array(size) = elem
size += 1
this
} else {
next = new Unrolled[T]
next.add(elem)
}
override def toString = "Unrolled(" + array.mkString(", ") + ")"
}
/** A (parallel) sequence consisting of `length` elements `elem`. Used in the `padTo` method.
*
* @tparam T type of the elements

View File

@ -38,6 +38,16 @@ class Breaks {
if (ex ne breakException) throw ex
}
}
def tryBreakable(op: => Unit) = new {
def catchBreak(onBreak: => Unit) = try {
op
} catch {
case ex: BreakControl =>
if (ex ne breakException) throw ex
onBreak
}
}
/* Break from dynamically closest enclosing breakable block
* @note this might be different than the statically closest enclosing

View File

@ -0,0 +1,47 @@
import collection._
// checks whether hash tries split their iterators correctly
// even after some elements have been traversed
object Test {
def main(args: Array[String]) {
doesSplitOk
}
def doesSplitOk = {
val sz = 2000
var ht = new parallel.immutable.ParHashMap[Int, Int]
// println("creating trie")
for (i <- 0 until sz) ht += ((i + sz, i))
// println("created trie")
for (n <- 0 until (sz - 1)) {
// println("---------> n = " + n)
val pit = ht.parallelIterator
val pit2 = ht.parallelIterator
var i = 0
while (i < n) {
pit.next
pit2.next
i += 1
}
// println("splitting")
val pits = pit.split
val fst = pits(0).toSet
val snd = pits(1).toSet
val orig = pit2.toSet
if (orig.size != (fst.size + snd.size) || orig != (fst ++ snd)) {
println("Original: " + orig)
println("First: " + fst)
println("Second: " + snd)
assert(false)
}
}
}
}