From 7cd12014a7cb9db76e30c53f612730f6eb80585a Mon Sep 17 00:00:00 2001 From: prokopec Date: Tue, 5 Oct 2010 16:22:21 +0000 Subject: [PATCH] Adding immutable parallel hashsets. Fixing an issue with hashset splitters where the splitting does not work if some elements have already been iterated. Added parallel collections exception handling. Added parallel collections break control. Renaming ParHashTrie -> ParHashMap. The part with immutable.{HashSet, HashMap} - review by rompf git-svn-id: http://lampsvn.epfl.ch/svn-repos/scala/scala/trunk@23200 5e8d7ff9-d8ef-0310-90f0-a4852d11357a --- build.xml | 6 +- .../collection/generic/ParSetFactory.scala | 33 +++ .../scala/collection/generic/Signalling.scala | 9 +- .../scala/collection/immutable/HashMap.scala | 198 ++++++++----- .../scala/collection/immutable/HashSet.scala | 204 ++++++++----- .../collection/mutable/ConcurrentMap.scala | 6 +- .../collection/parallel/ParIterable.scala | 3 +- .../collection/parallel/ParIterableLike.scala | 14 +- .../scala/collection/parallel/ParMap.scala | 6 +- .../scala/collection/parallel/ParSet.scala | 83 ++++++ .../collection/parallel/ParSetLike.scala | 81 +++++ .../collection/parallel/RemainsIterator.scala | 7 +- .../scala/collection/parallel/Splitter.scala | 9 + .../scala/collection/parallel/Tasks.scala | 50 +++- .../{ParHashTrie.scala => ParHashMap.scala} | 117 ++++---- .../parallel/immutable/ParHashSet.scala | 279 ++++++++++++++++++ .../parallel/immutable/package.scala | 21 ++ src/library/scala/util/control/Breaks.scala | 10 + test/files/scalacheck/HashTrieSplit.scala | 47 +++ 19 files changed, 949 insertions(+), 234 deletions(-) create mode 100644 src/library/scala/collection/generic/ParSetFactory.scala create mode 100644 src/library/scala/collection/parallel/ParSet.scala create mode 100644 src/library/scala/collection/parallel/ParSetLike.scala rename src/library/scala/collection/parallel/immutable/{ParHashTrie.scala => ParHashMap.scala} (52%) create mode 100644 src/library/scala/collection/parallel/immutable/ParHashSet.scala create mode 100644 test/files/scalacheck/HashTrieSplit.scala diff --git a/build.xml b/build.xml index acfd406d0..e347f59ce 100644 --- a/build.xml +++ b/build.xml @@ -1563,13 +1563,13 @@ BOOTRAPING TEST AND TEST SUITE - - - + + + diff --git a/src/library/scala/collection/generic/ParSetFactory.scala b/src/library/scala/collection/generic/ParSetFactory.scala new file mode 100644 index 000000000..ac6fe79ba --- /dev/null +++ b/src/library/scala/collection/generic/ParSetFactory.scala @@ -0,0 +1,33 @@ +package scala.collection.generic + + + + + +import collection.mutable.Builder +import collection.parallel.Combiner +import collection.parallel.ParSet +import collection.parallel.ParSetLike + + + + + + +abstract class ParSetFactory[CC[X] <: ParSet[X] with ParSetLike[X, CC[X], _] with GenericParTemplate[X, CC]] + extends SetFactory[CC] + with GenericParCompanion[CC] +{ + def newBuilder[A]: Combiner[A, CC[A]] = newCombiner[A] + + def newCombiner[A]: Combiner[A, CC[A]] + + class GenericCanCombineFrom[A] extends CanCombineFrom[CC[_], A, CC[A]] { + override def apply(from: Coll) = from.genericCombiner[A] + override def apply() = newCombiner[A] + } +} + + + + diff --git a/src/library/scala/collection/generic/Signalling.scala b/src/library/scala/collection/generic/Signalling.scala index 88ac0547c..8e6a279ba 100644 --- a/src/library/scala/collection/generic/Signalling.scala +++ b/src/library/scala/collection/generic/Signalling.scala @@ -91,10 +91,7 @@ trait Signalling { /** * This signalling implementation returns default values and ignores received signals. */ -class DefaultSignalling extends Signalling { - def isAborted = false - def abort {} - +class DefaultSignalling extends Signalling with VolatileAbort { def indexFlag = -1 def setIndexFlag(f: Int) {} def setIndexFlagIfGreater(f: Int) {} @@ -115,8 +112,8 @@ object IdleSignalling extends DefaultSignalling */ trait VolatileAbort extends Signalling { @volatile private var abortflag = false - abstract override def isAborted = abortflag - abstract override def abort = abortflag = true + override def isAborted = abortflag + override def abort = abortflag = true } diff --git a/src/library/scala/collection/immutable/HashMap.scala b/src/library/scala/collection/immutable/HashMap.scala index fba076cba..1ed876f54 100644 --- a/src/library/scala/collection/immutable/HashMap.scala +++ b/src/library/scala/collection/immutable/HashMap.scala @@ -15,7 +15,7 @@ import generic._ import annotation.unchecked.uncheckedVariance -import parallel.immutable.ParHashTrie +import parallel.immutable.ParHashMap /** This class implements immutable maps using a hash trie. @@ -36,7 +36,7 @@ import parallel.immutable.ParHashTrie * @define willNotTerminateInf */ @serializable @SerialVersionUID(2L) -class HashMap[A, +B] extends Map[A,B] with MapLike[A, B, HashMap[A, B]] with Parallelizable[ParHashTrie[A, B]] { +class HashMap[A, +B] extends Map[A,B] with MapLike[A, B, HashMap[A, B]] with Parallelizable[ParHashMap[A, B]] { override def size: Int = 0 @@ -90,7 +90,7 @@ class HashMap[A, +B] extends Map[A,B] with MapLike[A, B, HashMap[A, B]] with Par protected def merge0[B1 >: B](that: HashMap[A, B1], level: Int, merger: Merger[B1]): HashMap[A, B1] = that - def par = ParHashTrie.fromTrie(this) + def par = ParHashMap.fromTrie(this) } @@ -307,78 +307,7 @@ object HashMap extends ImmutableMapFactory[HashMap] { } } -/* - override def iterator = { // TODO: optimize (use a stack to keep track of pos) - - def iter(m: HashTrieMap[A,B], k: => Stream[(A,B)]): Stream[(A,B)] = { - def horiz(elems: Array[HashMap[A,B]], i: Int, k: => Stream[(A,B)]): Stream[(A,B)] = { - if (i < elems.length) { - elems(i) match { - case m: HashTrieMap[A,B] => iter(m, horiz(elems, i+1, k)) - case m: HashMap1[A,B] => new Stream.Cons(m.ensurePair, horiz(elems, i+1, k)) - } - } else k - } - horiz(m.elems, 0, k) - } - iter(this, Stream.empty).iterator - } -*/ - - - override def iterator = new Iterator[(A,B)] { - private[this] var depth = 0 - private[this] var arrayStack = new Array[Array[HashMap[A,B]]](6) - private[this] var posStack = new Array[Int](6) - - private[this] var arrayD = elems - private[this] var posD = 0 - - private[this] var subIter: Iterator[(A,B)] = null // to traverse collision nodes - - def hasNext = (subIter ne null) || depth >= 0 - - def next: (A,B) = { - if (subIter ne null) { - val el = subIter.next - if (!subIter.hasNext) - subIter = null - el - } else - next0(arrayD, posD) - } - - @scala.annotation.tailrec private[this] def next0(elems: Array[HashMap[A,B]], i: Int): (A,B) = { - if (i == elems.length-1) { // reached end of level, pop stack - depth -= 1 - if (depth >= 0) { - arrayD = arrayStack(depth) - posD = posStack(depth) - arrayStack(depth) = null - } else { - arrayD = null - posD = 0 - } - } else - posD += 1 - - elems(i) match { - case m: HashTrieMap[A,B] => // push current pos onto stack and descend - if (depth >= 0) { - arrayStack(depth) = arrayD - posStack(depth) = posD - } - depth += 1 - arrayD = m.elems - posD = 0 - next0(m.elems, 0) - case m: HashMap1[A,B] => m.ensurePair - case m => - subIter = m.iterator - subIter.next - } - } - } + override def iterator = new TrieIterator[A, B](elems) /* @@ -534,6 +463,125 @@ time { mNew.iterator.foreach( p => ()) } } + class TrieIterator[A, +B](elems: Array[HashMap[A, B]]) extends Iterator[(A, B)] { + private[this] var depth = 0 + private[this] var arrayStack = new Array[Array[HashMap[A,B]]](6) + private[this] var posStack = new Array[Int](6) + + private[this] var arrayD = elems + private[this] var posD = 0 + + private[this] var subIter: Iterator[(A, B)] = null // to traverse collision nodes + + def hasNext = (subIter ne null) || depth >= 0 + + def next: (A,B) = { + if (subIter ne null) { + val el = subIter.next + if (!subIter.hasNext) + subIter = null + el + } else + next0(arrayD, posD) + } + + @scala.annotation.tailrec private[this] def next0(elems: Array[HashMap[A,B]], i: Int): (A,B) = { + if (i == elems.length-1) { // reached end of level, pop stack + depth -= 1 + if (depth >= 0) { + arrayD = arrayStack(depth) + posD = posStack(depth) + arrayStack(depth) = null + } else { + arrayD = null + posD = 0 + } + } else + posD += 1 + + elems(i) match { + case m: HashTrieMap[A,B] => // push current pos onto stack and descend + if (depth >= 0) { + arrayStack(depth) = arrayD + posStack(depth) = posD + } + depth += 1 + arrayD = m.elems + posD = 0 + next0(m.elems, 0) + case m: HashMap1[A,B] => m.ensurePair + case m => + subIter = m.iterator + subIter.next + } + } + + // assumption: contains 2 or more elements + // splits this iterator into 2 iterators + // returns the 1st iterator, its number of elements, and the second iterator + def split: ((Iterator[(A, B)], Int), Iterator[(A, B)]) = { + // 0) simple case: no elements have been iterated - simply divide arrayD + if (arrayD != null && depth == 0 && posD == 0) { + val (fst, snd) = arrayD.splitAt(arrayD.length / 2) + val szfst = fst.foldLeft(0)(_ + _.size) + return ((new TrieIterator(fst), szfst), new TrieIterator(snd)) + } + + // otherwise, some elements have been iterated over + // 1) collision case: if we have a subIter, we return subIter and elements after it + if (subIter ne null) { + val buff = subIter.toBuffer + subIter = null + ((buff.iterator, buff.length), this) + } else { + // otherwise find the topmost array stack element + if (depth > 0) { + // 2) topmost comes before (is not) arrayD + // steal a portion of top to create a new iterator + val topmost = arrayStack(0) + if (posStack(0) == arrayStack(0).length - 1) { + // 2a) only a single entry left on top + // this means we have to modify this iterator - pop topmost + val snd = Array(arrayStack(0).last) + val szsnd = snd(0).size + // modify this - pop + depth -= 1 + arrayStack = arrayStack.tail ++ Array[Array[HashMap[A, B]]](null) + posStack = posStack.tail ++ Array[Int](0) + // we know that `this` is not empty, since it had something on the arrayStack and arrayStack elements are always non-empty + ((new TrieIterator[A, B](snd), szsnd), this) + } else { + // 2b) more than a single entry left on top + val (fst, snd) = arrayStack(0).splitAt(arrayStack(0).length - (arrayStack(0).length - posStack(0) + 1) / 2) + arrayStack(0) = fst + val szsnd = snd.foldLeft(0)(_ + _.size) + ((new TrieIterator[A, B](snd), szsnd), this) + } + } else { + // 3) no topmost element (arrayD is at the top) + // steal a portion of it and update this iterator + if (posD == arrayD.length - 1) { + // 3a) positioned at the last element of arrayD + val arr: Array[HashMap[A, B]] = arrayD(posD) match { + case c: HashMapCollision1[_, _] => c.asInstanceOf[HashMapCollision1[A, B]].kvs.toArray map { HashMap() + _ } + case ht: HashTrieMap[_, _] => ht.asInstanceOf[HashTrieMap[A, B]].elems + case _ => error("cannot divide single element") + } + val (fst, snd) = arr.splitAt(arr.length / 2) + val szsnd = snd.foldLeft(0)(_ + _.size) + ((new TrieIterator(snd), szsnd), new TrieIterator(fst)) + } else { + // 3b) arrayD has more free elements + val (fst, snd) = arrayD.splitAt(arrayD.length - (arrayD.length - posD + 1) / 2) + arrayD = fst + val szsnd = snd.foldLeft(0)(_ + _.size) + ((new TrieIterator[A, B](snd), szsnd), this) + } + } + } + } + } + private def check[K](x: HashMap[K, _], y: HashMap[K, _], xy: HashMap[K, _]) = { // TODO remove this debugging helper var xs = Set[K]() for (elem <- x) xs += elem._1 diff --git a/src/library/scala/collection/immutable/HashSet.scala b/src/library/scala/collection/immutable/HashSet.scala index 498c7bbf8..94e7b079b 100644 --- a/src/library/scala/collection/immutable/HashSet.scala +++ b/src/library/scala/collection/immutable/HashSet.scala @@ -14,6 +14,8 @@ package immutable import generic._ import annotation.unchecked.uncheckedVariance +import collection.parallel.immutable.ParHashSet + /** This class implements immutable sets using a hash trie. * * '''Note:''' the builder of a hash set returns specialized representations `EmptySet`,`Set1`,..., `Set4` @@ -31,7 +33,9 @@ import annotation.unchecked.uncheckedVariance @serializable @SerialVersionUID(2L) class HashSet[A] extends Set[A] with GenericSetTemplate[A, HashSet] - with SetLike[A, HashSet[A]] { + with SetLike[A, HashSet[A]] + with Parallelizable[ParHashSet[A]] +{ override def companion: GenericCompanion[HashSet] = HashSet //class HashSet[A] extends Set[A] with SetLike[A, HashSet[A]] { @@ -54,7 +58,9 @@ class HashSet[A] extends Set[A] def - (e: A): HashSet[A] = removed0(e, computeHash(e), 0) - + + def par = ParHashSet.fromTrie(this) + protected def elemHashCode(key: A) = if (key == null) 0 else key.## protected final def improve(hcode: Int) = { @@ -68,7 +74,7 @@ class HashSet[A] extends Set[A] protected def get0(key: A, hash: Int, level: Int): Boolean = false - protected def updated0(key: A, hash: Int, level: Int): HashSet[A] = + def updated0(key: A, hash: Int, level: Int): HashSet[A] = new HashSet.HashSet1(key, hash) protected def removed0(key: A, hash: Int, level: Int): HashSet[A] = this @@ -169,11 +175,11 @@ object HashSet extends ImmutableSetFactory[HashSet] { } - class HashTrieSet[A](private var bitmap: Int, private var elems: Array[HashSet[A]], + class HashTrieSet[A](private var bitmap: Int, private[HashSet] var elems: Array[HashSet[A]], private var size0: Int) extends HashSet[A] { - + override def size = size0 - + override def get0(key: A, hash: Int, level: Int): Boolean = { val index = (hash >>> level) & 0x1f val mask = (1 << index) @@ -186,7 +192,7 @@ object HashSet extends ImmutableSetFactory[HashSet] { } else false } - + override def updated0(key: A, hash: Int, level: Int): HashSet[A] = { val index = (hash >>> level) & 0x1f val mask = (1 << index) @@ -208,7 +214,7 @@ object HashSet extends ImmutableSetFactory[HashSet] { new HashTrieSet(bitmapNew, elemsNew, size + 1) } } - + override def removed0(key: A, hash: Int, level: Int): HashSet[A] = { val index = (hash >>> level) & 0x1f val mask = (1 << index) @@ -238,62 +244,9 @@ object HashSet extends ImmutableSetFactory[HashSet] { this } } - - - override def iterator = new Iterator[A] { - private[this] var depth = 0 - private[this] var arrayStack = new Array[Array[HashSet[A]]](6) - private[this] var posStack = new Array[Int](6) - - private[this] var arrayD = elems - private[this] var posD = 0 - - private[this] var subIter: Iterator[A] = null // to traverse collision nodes - - def hasNext = (subIter ne null) || depth >= 0 - - def next: A = { - if (subIter ne null) { - val el = subIter.next - if (!subIter.hasNext) - subIter = null - el - } else - next0(arrayD, posD) - } - - @scala.annotation.tailrec private[this] def next0(elems: Array[HashSet[A]], i: Int): A = { - if (i == elems.length-1) { // reached end of level, pop stack - depth -= 1 - if (depth >= 0) { - arrayD = arrayStack(depth) - posD = posStack(depth) - arrayStack(depth) = null - } else { - arrayD = null - posD = 0 - } - } else - posD += 1 - - elems(i) match { - case m: HashTrieSet[A] => // push current pos onto stack and descend - if (depth >= 0) { - arrayStack(depth) = arrayD - posStack(depth) = posD - } - depth += 1 - arrayD = m.elems - posD = 0 - next0(m.elems, 0) - case m: HashSet1[A] => m.key - case m => - subIter = m.iterator - subIter.next - } - } - } - + + override def iterator = new TrieIterator[A](elems) + /* import collection.immutable._ @@ -314,8 +267,7 @@ time { mNew.iterator.foreach( p => ()) } time { mNew.iterator.foreach( p => ()) } */ - - + override def foreach[U](f: A => U): Unit = { var i = 0; while (i < elems.length) { @@ -325,6 +277,126 @@ time { mNew.iterator.foreach( p => ()) } } } + + class TrieIterator[A](elems: Array[HashSet[A]]) extends Iterator[A] { + private[this] var depth = 0 + private[this] var arrayStack = new Array[Array[HashSet[A]]](6) + private[this] var posStack = new Array[Int](6) + + private[this] var arrayD = elems + private[this] var posD = 0 + + private[this] var subIter: Iterator[A] = null // to traverse collision nodes + + def hasNext = (subIter ne null) || depth >= 0 + + def next: A = { + if (subIter ne null) { + val el = subIter.next + if (!subIter.hasNext) + subIter = null + el + } else + next0(arrayD, posD) + } + + @scala.annotation.tailrec private[this] def next0(elems: Array[HashSet[A]], i: Int): A = { + if (i == elems.length-1) { // reached end of level, pop stack + depth -= 1 + if (depth >= 0) { + arrayD = arrayStack(depth) + posD = posStack(depth) + arrayStack(depth) = null + } else { + arrayD = null + posD = 0 + } + } else + posD += 1 + + elems(i) match { + case m: HashTrieSet[A] => // push current pos onto stack and descend + if (depth >= 0) { + arrayStack(depth) = arrayD + posStack(depth) = posD + } + depth += 1 + arrayD = m.elems + posD = 0 + next0(m.elems, 0) + case m: HashSet1[A] => m.key + case m => + subIter = m.iterator + subIter.next + } + } + + // assumption: contains 2 or more elements + // splits this iterator into 2 iterators + // returns the 1st iterator, its number of elements, and the second iterator + def split: ((Iterator[A], Int), Iterator[A]) = { + // 0) simple case: no elements have been iterated - simply divide arrayD + if (arrayD != null && depth == 0 && posD == 0) { + val (fst, snd) = arrayD.splitAt(arrayD.length / 2) + val szfst = fst.foldLeft(0)(_ + _.size) + return ((new TrieIterator(fst), szfst), new TrieIterator(snd)) + } + + // otherwise, some elements have been iterated over + // 1) collision case: if we have a subIter, we return subIter and elements after it + if (subIter ne null) { + val buff = subIter.toBuffer + subIter = null + ((buff.iterator, buff.length), this) + } else { + // otherwise find the topmost array stack element + if (depth > 0) { + // 2) topmost comes before (is not) arrayD + // steal a portion of top to create a new iterator + val topmost = arrayStack(0) + if (posStack(0) == arrayStack(0).length - 1) { + // 2a) only a single entry left on top + // this means we have to modify this iterator - pop topmost + val snd = Array(arrayStack(0).last) + val szsnd = snd(0).size + // modify this - pop + depth -= 1 + arrayStack = arrayStack.tail ++ Array[Array[HashSet[A]]](null) + posStack = posStack.tail ++ Array[Int](0) + // we know that `this` is not empty, since it had something on the arrayStack and arrayStack elements are always non-empty + ((new TrieIterator[A](snd), szsnd), this) + } else { + // 2b) more than a single entry left on top + val (fst, snd) = arrayStack(0).splitAt(arrayStack(0).length - (arrayStack(0).length - posStack(0) + 1) / 2) + arrayStack(0) = fst + val szsnd = snd.foldLeft(0)(_ + _.size) + ((new TrieIterator[A](snd), szsnd), this) + } + } else { + // 3) no topmost element (arrayD is at the top) + // steal a portion of it and update this iterator + if (posD == arrayD.length - 1) { + // 3a) positioned at the last element of arrayD + val arr: Array[HashSet[A]] = arrayD(posD) match { + case c: HashSetCollision1[_] => c.asInstanceOf[HashSetCollision1[A]].ks.toList map { HashSet() + _ } toArray + case ht: HashTrieSet[_] => ht.asInstanceOf[HashTrieSet[A]].elems + case _ => error("cannot divide single element") + } + val (fst, snd) = arr.splitAt(arr.length / 2) + val szsnd = snd.foldLeft(0)(_ + _.size) + ((new TrieIterator(snd), szsnd), new TrieIterator(fst)) + } else { + // 3b) arrayD has more free elements + val (fst, snd) = arrayD.splitAt(arrayD.length - (arrayD.length - posD + 1) / 2) + arrayD = fst + val szsnd = snd.foldLeft(0)(_ + _.size) + ((new TrieIterator[A](snd), szsnd), this) + } + } + } + } + } + @serializable @SerialVersionUID(2L) private class SerializationProxy[A,B](@transient private var orig: HashSet[A]) { private def writeObject(out: java.io.ObjectOutputStream) { val s = orig.size diff --git a/src/library/scala/collection/mutable/ConcurrentMap.scala b/src/library/scala/collection/mutable/ConcurrentMap.scala index 5239b489b..18921d566 100644 --- a/src/library/scala/collection/mutable/ConcurrentMap.scala +++ b/src/library/scala/collection/mutable/ConcurrentMap.scala @@ -20,12 +20,13 @@ package mutable * Note: The concurrent maps do not accept `null` for keys or values. * * @define atomicop - * This is done atomically. + * This is an atomic operation. */ trait ConcurrentMap[A, B] extends Map[A, B] { /** * Associates the given key with a given value, unless the key was already associated with some other value. + * * $atomicop * * @param k key with which the specified value is to be associated with @@ -37,6 +38,7 @@ trait ConcurrentMap[A, B] extends Map[A, B] { /** * Removes the entry for the specified key if its currently mapped to the specified value. + * * $atomicop * * @param k key for which the entry should be removed @@ -47,6 +49,7 @@ trait ConcurrentMap[A, B] extends Map[A, B] { /** * Replaces the entry for the given key only if it was previously mapped to a given value. + * * $atomicop * * @param k key for which the entry should be replaced @@ -58,6 +61,7 @@ trait ConcurrentMap[A, B] extends Map[A, B] { /** * Replaces the entry for the given key only if it was previously mapped to some value. + * * $atomicop * * @param k key for which the entry should be replaced diff --git a/src/library/scala/collection/parallel/ParIterable.scala b/src/library/scala/collection/parallel/ParIterable.scala index b44704436..7f1a8c7c3 100644 --- a/src/library/scala/collection/parallel/ParIterable.scala +++ b/src/library/scala/collection/parallel/ParIterable.scala @@ -26,8 +26,7 @@ trait ParIterable[+T] extends Iterable[T] /** $factoryinfo */ object ParIterable extends ParFactory[ParIterable] { - implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParIterable[T]] = - new GenericCanCombineFrom[T] + implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParIterable[T]] = new GenericCanCombineFrom[T] def newBuilder[T]: Combiner[T, ParIterable[T]] = ParArrayCombiner[T] diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala index af26463c3..b1b135ab5 100644 --- a/src/library/scala/collection/parallel/ParIterableLike.scala +++ b/src/library/scala/collection/parallel/ParIterableLike.scala @@ -427,7 +427,10 @@ self => executeAndWaitResult(new Find(pred, parallelIterator assign new DefaultSignalling with VolatileAbort)) } - protected[this] def cbfactory = () => newCombiner + protected[this] def cbfactory ={ + println(newCombiner + ", " + newCombiner.getClass) + () => newCombiner + } override def filter(pred: T => Boolean): Repr = { executeAndWaitResult(new Filter(pred, cbfactory, parallelIterator) mapResult { _.result }) @@ -655,6 +658,7 @@ self => protected[this] def newSubtask(p: ParIterableIterator[T]): Accessor[R, Tp] def shouldSplitFurther = pit.remaining > threshold(size, parallelismLevel) def split = pit.split.map(newSubtask(_)) // default split procedure + private[parallel] override def signalAbort = pit.abort override def toString = "Accessor(" + pit.toString + ")" } @@ -672,6 +676,10 @@ self => val st: Second def combineResults(fr: FR, sr: SR): R var result: R = null.asInstanceOf[R] + private[parallel] override def signalAbort { + ft.signalAbort + st.signalAbort + } } /** Sequentially performs one task after another. */ @@ -703,6 +711,9 @@ self => inner.compute result = map(inner.result) } + private[parallel] override def signalAbort { + inner.signalAbort + } } protected trait Transformer[R, Tp] extends Accessor[R, Tp] @@ -1187,6 +1198,7 @@ self => new ScanWithScanTree(Some(st.left.value), op, st.right, src, dest) ) def shouldSplitFurther = (st.left ne null) && (st.right ne null) + } protected[this] class FromArray[S, A, That](array: Array[A], from: Int, len: Int, cbf: CanCombineFrom[Repr, S, That]) diff --git a/src/library/scala/collection/parallel/ParMap.scala b/src/library/scala/collection/parallel/ParMap.scala index b95a81ef6..9f21f52e9 100644 --- a/src/library/scala/collection/parallel/ParMap.scala +++ b/src/library/scala/collection/parallel/ParMap.scala @@ -26,7 +26,7 @@ self => def mapCompanion: GenericParMapCompanion[ParMap] = ParMap - override def empty: ParMap[K, V] = new immutable.ParHashTrie[K, V] + override def empty: ParMap[K, V] = new immutable.ParHashMap[K, V] override def stringPrefix = "ParMap" } @@ -34,9 +34,9 @@ self => object ParMap extends ParMapFactory[ParMap] { - def empty[K, V]: ParMap[K, V] = new immutable.ParHashTrie[K, V] + def empty[K, V]: ParMap[K, V] = new immutable.ParHashMap[K, V] - def newCombiner[K, V]: Combiner[(K, V), ParMap[K, V]] = immutable.HashTrieCombiner[K, V] + def newCombiner[K, V]: Combiner[(K, V), ParMap[K, V]] = immutable.HashMapCombiner[K, V] implicit def canBuildFrom[K, V]: CanCombineFrom[Coll, (K, V), ParMap[K, V]] = new CanCombineFromMap[K, V] diff --git a/src/library/scala/collection/parallel/ParSet.scala b/src/library/scala/collection/parallel/ParSet.scala new file mode 100644 index 000000000..f6508e9f9 --- /dev/null +++ b/src/library/scala/collection/parallel/ParSet.scala @@ -0,0 +1,83 @@ +package scala.collection.parallel + + + + + + + +import scala.collection.Map +import scala.collection.mutable.Builder +import scala.collection.generic._ + + + + + + +trait ParSet[T] +extends Set[T] + with GenericParTemplate[T, ParSet] + with ParIterable[T] + with ParSetLike[T, ParSet[T], Set[T]] +{ +self => + override def empty: ParSet[T] = immutable.ParHashSet[T]() + + override def companion: GenericCompanion[ParSet] with GenericParCompanion[ParSet] = ParSet + + override def stringPrefix = "ParSet" +} + + + +object ParSet extends ParSetFactory[ParSet] { + def newCombiner[T]: Combiner[T, ParSet[T]] = immutable.HashSetCombiner[T] + + implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParSet[T]] = new GenericCanCombineFrom[T] +} + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/library/scala/collection/parallel/ParSetLike.scala b/src/library/scala/collection/parallel/ParSetLike.scala new file mode 100644 index 000000000..b60728345 --- /dev/null +++ b/src/library/scala/collection/parallel/ParSetLike.scala @@ -0,0 +1,81 @@ +package scala.collection.parallel + + + +import scala.collection.SetLike +import scala.collection.Set +import scala.collection.mutable.Builder + + + + + + + + +trait ParSetLike[T, + +Repr <: ParSetLike[T, Repr, Sequential] with ParSet[T], + +Sequential <: Set[T] with SetLike[T, Sequential]] +extends SetLike[T, Repr] + with ParIterableLike[T, Repr, Sequential] +{ self => + + protected[this] override def newBuilder: Builder[T, Repr] = newCombiner + + protected[this] override def newCombiner: Combiner[T, Repr] + + override def empty: Repr + +} + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/library/scala/collection/parallel/RemainsIterator.scala b/src/library/scala/collection/parallel/RemainsIterator.scala index 94cbbb7de..36f897cb3 100644 --- a/src/library/scala/collection/parallel/RemainsIterator.scala +++ b/src/library/scala/collection/parallel/RemainsIterator.scala @@ -366,12 +366,13 @@ self => def next = { remaining -= 1; self.next } def split: Seq[ParIterableIterator[T]] = takeSeq(self.split) { (p, n) => p.take(n) } protected[this] def takeSeq[PI <: ParIterableIterator[T]](sq: Seq[PI])(taker: (PI, Int) => PI) = { - val shortened = for ((it, total) <- sq zip sq.scanLeft(0)(_ + _.remaining).tail) yield - if (total < remaining) it else taker(it, total - remaining) + val sizes = sq.scanLeft(0)(_ + _.remaining) + val shortened = for ((it, (from, until)) <- sq zip (sizes.init zip sizes.tail)) yield + if (until < remaining) it else taker(it, remaining - from) shortened filter { _.remaining > 0 } } } - + override def take(n: Int) = new Taken(n) override def slice(from1: Int, until1: Int) = { diff --git a/src/library/scala/collection/parallel/Splitter.scala b/src/library/scala/collection/parallel/Splitter.scala index 5c8b24e92..cf81a6ac4 100644 --- a/src/library/scala/collection/parallel/Splitter.scala +++ b/src/library/scala/collection/parallel/Splitter.scala @@ -34,6 +34,15 @@ trait Splitter[+T] extends Iterator[T] { } +object Splitter { + def empty[T]: Splitter[T] = new Splitter[T] { + def hasNext = false + def next = Iterator.empty.next + def split = Seq(this) + } +} + + /** A precise splitter (or a precise split iterator) can be split into arbitrary number of splitters * that traverse disjoint subsets of arbitrary sizes. * diff --git a/src/library/scala/collection/parallel/Tasks.scala b/src/library/scala/collection/parallel/Tasks.scala index b6265c665..1e03ac7e0 100644 --- a/src/library/scala/collection/parallel/Tasks.scala +++ b/src/library/scala/collection/parallel/Tasks.scala @@ -4,9 +4,7 @@ package scala.collection.parallel import scala.concurrent.forkjoin._ - - - +import scala.util.control.Breaks._ @@ -59,6 +57,31 @@ trait Tasks { protected[this] def split: Seq[Task[R, Tp]] /** Read of results of `that` task and merge them into results of this one. */ protected[this] def merge(that: Tp) {} + + // exception handling mechanism + var exception: Exception = null + def forwardException = if (exception != null) throw exception + // tries to do the leaf computation, storing the possible exception + protected def tryLeaf(result: Option[R]) { + try { + tryBreakable { + leaf(result) + } catchBreak { + signalAbort + } + } catch { + case e: Exception => + exception = e + signalAbort + } + } + protected[this] def tryMerge(t: Tp) { + val that = t.asInstanceOf[Task[R, Tp]] + if (this.exception == null && that.exception == null) merge(that.repr) + else if (that.exception != null) this.exception = that.exception + } + // override in concrete task implementations to signal abort to other tasks + private[parallel] def signalAbort {} } type TaskType[R, +Tp] <: Task[R, Tp] @@ -66,13 +89,13 @@ trait Tasks { var environment: ExecutionEnvironment - /** Executes a task and returns a future. */ + /** Executes a task and returns a future. Forwards an exception if some task threw it. */ def execute[R, Tp](fjtask: TaskType[R, Tp]): () => R - /** Executes a task and waits for it to finish. */ + /** Executes a task and waits for it to finish. Forwards an exception if some task threw it. */ def executeAndWait[R, Tp](task: TaskType[R, Tp]) - /** Executes a result task, waits for it to finish, then returns its result. */ + /** Executes a result task, waits for it to finish, then returns its result. Forwards an exception if some task threw it. */ def executeAndWaitResult[R, Tp](task: TaskType[R, Tp]): R /** Retrieves the parallelism level of the task execution environment. */ @@ -96,19 +119,19 @@ trait AdaptiveWorkStealingTasks extends Tasks { /** The actual leaf computation. */ def leaf(result: Option[R]): Unit - def compute = if (shouldSplitFurther) internal else leaf(None) + def compute = if (shouldSplitFurther) internal else tryLeaf(None) def internal = { var last = spawnSubtasks - last.leaf(None) + last.tryLeaf(None) result = last.result while (last.next != null) { val lastresult = Option(last.result) last = last.next - if (last.tryCancel) last.leaf(lastresult) else last.sync - merge(last.repr) + if (last.tryCancel) last.tryLeaf(lastresult) else last.sync + tryMerge(last.repr) } } @@ -150,7 +173,6 @@ trait HavingForkJoinPool { } - /** An implementation trait for parallel tasks based on the fork/join framework. * * @define fjdispatch @@ -187,6 +209,7 @@ trait ForkJoinTasks extends Tasks with HavingForkJoinPool { () => { fjtask.join + fjtask.forwardException fjtask.result } } @@ -202,6 +225,7 @@ trait ForkJoinTasks extends Tasks with HavingForkJoinPool { forkJoinPool.execute(fjtask) } fjtask.join + fjtask.forwardException } /** Executes a task on a fork/join pool and waits for it to finish. @@ -218,6 +242,7 @@ trait ForkJoinTasks extends Tasks with HavingForkJoinPool { forkJoinPool.execute(fjtask) } fjtask.join + fjtask.forwardException fjtask.result } @@ -225,8 +250,9 @@ trait ForkJoinTasks extends Tasks with HavingForkJoinPool { } + object ForkJoinTasks { - val defaultForkJoinPool = new ForkJoinPool + val defaultForkJoinPool: ForkJoinPool = new ForkJoinPool defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors) defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors) } diff --git a/src/library/scala/collection/parallel/immutable/ParHashTrie.scala b/src/library/scala/collection/parallel/immutable/ParHashMap.scala similarity index 52% rename from src/library/scala/collection/parallel/immutable/ParHashTrie.scala rename to src/library/scala/collection/parallel/immutable/ParHashMap.scala index 0d51628b2..b2ebfb3f1 100644 --- a/src/library/scala/collection/parallel/immutable/ParHashTrie.scala +++ b/src/library/scala/collection/parallel/immutable/ParHashMap.scala @@ -9,6 +9,7 @@ package scala.collection.parallel.immutable import scala.collection.parallel.ParMap import scala.collection.parallel.ParMapLike import scala.collection.parallel.Combiner +import scala.collection.parallel.ParIterableIterator import scala.collection.parallel.EnvironmentPassingCombiner import scala.collection.generic.ParMapFactory import scala.collection.generic.CanCombineFrom @@ -25,26 +26,26 @@ import scala.collection.immutable.HashMap * * @author prokopec */ -class ParHashTrie[K, +V] private[immutable] (private[this] val trie: HashMap[K, V]) +class ParHashMap[K, +V] private[immutable] (private[this] val trie: HashMap[K, V]) extends ParMap[K, V] - with GenericParMapTemplate[K, V, ParHashTrie] - with ParMapLike[K, V, ParHashTrie[K, V], HashMap[K, V]] + with GenericParMapTemplate[K, V, ParHashMap] + with ParMapLike[K, V, ParHashMap[K, V], HashMap[K, V]] { self => def this() = this(HashMap.empty[K, V]) - override def mapCompanion: GenericParMapCompanion[ParHashTrie] = ParHashTrie + override def mapCompanion: GenericParMapCompanion[ParHashMap] = ParHashMap - override def empty: ParHashTrie[K, V] = new ParHashTrie[K, V] + override def empty: ParHashMap[K, V] = new ParHashMap[K, V] - def parallelIterator = new ParHashTrieIterator(trie) with SCPI + def parallelIterator: ParIterableIterator[(K, V)] = new ParHashMapIterator(trie.iterator, trie.size) with SCPI def seq = trie - def -(k: K) = new ParHashTrie(trie - k) + def -(k: K) = new ParHashMap(trie - k) - def +[U >: V](kv: (K, U)) = new ParHashTrie(trie + kv) + def +[U >: V](kv: (K, U)) = new ParHashMap(trie + kv) def get(k: K) = trie.get(k) @@ -55,59 +56,66 @@ self => case None => newc } - type SCPI = SignalContextPassingIterator[ParHashTrieIterator] + type SCPI = SignalContextPassingIterator[ParHashMapIterator] - class ParHashTrieIterator(val ht: HashMap[K, V]) + class ParHashMapIterator(val triter: Iterator[(K, V)], val sz: Int) extends super.ParIterator { - self: SignalContextPassingIterator[ParHashTrieIterator] => - // println("created iterator " + ht) + self: SignalContextPassingIterator[ParHashMapIterator] => var i = 0 - lazy val triter = ht.iterator - def split: Seq[ParIterator] = { - // println("splitting " + ht + " into " + ht.split.map(new ParHashTrieIterator(_) with SCPI).map(_.toList)) - ht.split.map(new ParHashTrieIterator(_) with SCPI) + def split: Seq[ParIterator] = if (remaining < 2) Seq(this) else triter match { + case t: HashMap.TrieIterator[_, _] => + val previousRemaining = remaining + val ((fst, fstlength), snd) = t.asInstanceOf[HashMap.TrieIterator[K, V]].split + val sndlength = previousRemaining - fstlength + Seq( + new ParHashMapIterator(fst, fstlength) with SCPI, + new ParHashMapIterator(snd, sndlength) with SCPI + ) + case _ => + // iterator of the collision map case + val buff = triter.toBuffer + val (fp, sp) = buff.splitAt(buff.length / 2) + Seq(fp, sp) map { b => new ParHashMapIterator(b.iterator, b.length) with SCPI } } def next: (K, V) = { - // println("taking next after " + i + ", in " + ht) i += 1 triter.next } def hasNext: Boolean = { - // println("hasNext: " + i + ", " + ht.size + ", " + ht) - i < ht.size + i < sz } - def remaining = ht.size - i + def remaining = sz - i } } -object ParHashTrie extends ParMapFactory[ParHashTrie] { - def empty[K, V]: ParHashTrie[K, V] = new ParHashTrie[K, V] +object ParHashMap extends ParMapFactory[ParHashMap] { + def empty[K, V]: ParHashMap[K, V] = new ParHashMap[K, V] - def newCombiner[K, V]: Combiner[(K, V), ParHashTrie[K, V]] = HashTrieCombiner[K, V] + def newCombiner[K, V]: Combiner[(K, V), ParHashMap[K, V]] = HashMapCombiner[K, V] - implicit def canBuildFrom[K, V]: CanCombineFrom[Coll, (K, V), ParHashTrie[K, V]] = { + implicit def canBuildFrom[K, V]: CanCombineFrom[Coll, (K, V), ParHashMap[K, V]] = { new CanCombineFromMap[K, V] } - def fromTrie[K, V](t: HashMap[K, V]) = new ParHashTrie(t) + def fromTrie[K, V](t: HashMap[K, V]) = new ParHashMap(t) var totalcombines = new java.util.concurrent.atomic.AtomicInteger(0) } -trait HashTrieCombiner[K, V] -extends Combiner[(K, V), ParHashTrie[K, V]] { -self: EnvironmentPassingCombiner[(K, V), ParHashTrie[K, V]] => - import HashTrieCombiner._ - var heads = new Array[Unrolled[K, V]](rootsize) - var lasts = new Array[Unrolled[K, V]](rootsize) +private[immutable] trait HashMapCombiner[K, V] +extends Combiner[(K, V), ParHashMap[K, V]] { +self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] => + import HashMapCombiner._ + var heads = new Array[Unrolled[(K, V)]](rootsize) + var lasts = new Array[Unrolled[(K, V)]](rootsize) var size: Int = 0 def clear = { - heads = new Array[Unrolled[K, V]](rootsize) - lasts = new Array[Unrolled[K, V]](rootsize) + heads = new Array[Unrolled[(K, V)]](rootsize) + lasts = new Array[Unrolled[(K, V)]](rootsize) } def +=(elem: (K, V)) = { @@ -116,7 +124,7 @@ self: EnvironmentPassingCombiner[(K, V), ParHashTrie[K, V]] => val pos = hc & 0x1f if (lasts(pos) eq null) { // initialize bucket - heads(pos) = new Unrolled[K, V] + heads(pos) = new Unrolled[(K, V)] lasts(pos) = heads(pos) } // add to bucket @@ -124,10 +132,10 @@ self: EnvironmentPassingCombiner[(K, V), ParHashTrie[K, V]] => this } - def combine[N <: (K, V), NewTo >: ParHashTrie[K, V]](other: Combiner[N, NewTo]): Combiner[N, NewTo] = if (this ne other) { - // ParHashTrie.totalcombines.incrementAndGet - if (other.isInstanceOf[HashTrieCombiner[_, _]]) { - val that = other.asInstanceOf[HashTrieCombiner[K, V]] + def combine[N <: (K, V), NewTo >: ParHashMap[K, V]](other: Combiner[N, NewTo]): Combiner[N, NewTo] = if (this ne other) { + // ParHashMap.totalcombines.incrementAndGet + if (other.isInstanceOf[HashMapCombiner[_, _]]) { + val that = other.asInstanceOf[HashMapCombiner[K, V]] var i = 0 while (i < rootsize) { if (lasts(i) eq null) { @@ -158,17 +166,17 @@ self: EnvironmentPassingCombiner[(K, V), ParHashTrie[K, V]] => } val sz = root.foldLeft(0)(_ + _.size) - if (sz == 0) new ParHashTrie[K, V] - else if (sz == 1) new ParHashTrie[K, V](root(0)) + if (sz == 0) new ParHashMap[K, V] + else if (sz == 1) new ParHashMap[K, V](root(0)) else { val trie = new HashMap.HashTrieMap(bitmap, root, sz) - new ParHashTrie[K, V](trie) + new ParHashMap[K, V](trie) } } /* tasks */ - class CreateTrie(buckets: Array[Unrolled[K, V]], root: Array[HashMap[K, V]], offset: Int, howmany: Int) extends super.Task[Unit, CreateTrie] { + class CreateTrie(buckets: Array[Unrolled[(K, V)]], root: Array[HashMap[K, V]], offset: Int, howmany: Int) extends super.Task[Unit, CreateTrie] { var result = () def leaf(prev: Option[Unit]) = { var i = offset @@ -178,7 +186,7 @@ self: EnvironmentPassingCombiner[(K, V), ParHashTrie[K, V]] => i += 1 } } - private def createTrie(elems: Unrolled[K, V]): HashMap[K, V] = { + private def createTrie(elems: Unrolled[(K, V)]): HashMap[K, V] = { var trie = new HashMap[K, V] var unrolled = elems @@ -208,28 +216,11 @@ self: EnvironmentPassingCombiner[(K, V), ParHashTrie[K, V]] => } -object HashTrieCombiner { - def apply[K, V] = new HashTrieCombiner[K, V] with EnvironmentPassingCombiner[(K, V), ParHashTrie[K, V]] {} +object HashMapCombiner { + def apply[K, V] = new HashMapCombiner[K, V] with EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] {} private[immutable] val rootbits = 5 private[immutable] val rootsize = 1 << 5 - private[immutable] val unrolledsize = 16 - - private[immutable] class Unrolled[K, V] { - var size = 0 - var array = new Array[(K, V)](unrolledsize) - var next: Unrolled[K, V] = null - // adds and returns itself or the new unrolled if full - def add(elem: (K, V)): Unrolled[K, V] = if (size < unrolledsize) { - array(size) = elem - size += 1 - this - } else { - next = new Unrolled[K, V] - next.add(elem) - } - override def toString = "Unrolled(" + array.mkString(", ") + ")" - } } @@ -246,3 +237,5 @@ object HashTrieCombiner { + + diff --git a/src/library/scala/collection/parallel/immutable/ParHashSet.scala b/src/library/scala/collection/parallel/immutable/ParHashSet.scala new file mode 100644 index 000000000..4ffd04454 --- /dev/null +++ b/src/library/scala/collection/parallel/immutable/ParHashSet.scala @@ -0,0 +1,279 @@ +package scala.collection.parallel.immutable + + + + + + + +import scala.collection.parallel.ParSet +import scala.collection.parallel.ParSetLike +import scala.collection.parallel.Combiner +import scala.collection.parallel.ParIterableIterator +import scala.collection.parallel.EnvironmentPassingCombiner +import scala.collection.generic.ParSetFactory +import scala.collection.generic.CanCombineFrom +import scala.collection.generic.GenericParTemplate +import scala.collection.generic.GenericParCompanion +import scala.collection.generic.GenericCompanion +import scala.collection.immutable.HashSet + + + + + + +/** Parallel hash trie set. + * + * @author prokopec + */ +class ParHashSet[T] private[immutable] (private[this] val trie: HashSet[T]) +extends ParSet[T] + with GenericParTemplate[T, ParHashSet] + with ParSetLike[T, ParHashSet[T], HashSet[T]] +{ +self => + + def this() = this(HashSet.empty[T]) + + override def companion: GenericCompanion[ParHashSet] with GenericParCompanion[ParHashSet] = ParHashSet + + override def empty: ParHashSet[T] = new ParHashSet[T] + + def parallelIterator: ParIterableIterator[T] = new ParHashSetIterator(trie.iterator, trie.size) with SCPI + + def seq = trie + + def -(e: T) = new ParHashSet(trie - e) + + def +(e: T) = new ParHashSet(trie + e) + + def contains(e: T): Boolean = trie.contains(e) + + override def size = trie.size + + protected override def reuse[S, That](oldc: Option[Combiner[S, That]], newc: Combiner[S, That]) = oldc match { + case Some(old) => old + case None => newc + } + + type SCPI = SignalContextPassingIterator[ParHashSetIterator] + + class ParHashSetIterator(val triter: Iterator[T], val sz: Int) + extends super.ParIterator { + self: SignalContextPassingIterator[ParHashSetIterator] => + var i = 0 + def split: Seq[ParIterator] = if (remaining < 2) Seq(this) else triter match { + case t: HashSet.TrieIterator[_] => + val previousRemaining = remaining + val ((fst, fstlength), snd) = t.asInstanceOf[HashSet.TrieIterator[T]].split + val sndlength = previousRemaining - fstlength + Seq( + new ParHashSetIterator(fst, fstlength) with SCPI, + new ParHashSetIterator(snd, sndlength) with SCPI + ) + case _ => + // iterator of the collision map case + val buff = triter.toBuffer + val (fp, sp) = buff.splitAt(buff.length / 2) + Seq(fp, sp) map { b => new ParHashSetIterator(b.iterator, b.length) with SCPI } + } + def next: T = { + i += 1 + triter.next + } + def hasNext: Boolean = { + i < sz + } + def remaining = sz - i + } + +} + + +object ParHashSet extends ParSetFactory[ParHashSet] { + def newCombiner[T]: Combiner[T, ParHashSet[T]] = HashSetCombiner[T] + + implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParHashSet[T]] = + new GenericCanCombineFrom[T] + + def fromTrie[T](t: HashSet[T]) = new ParHashSet(t) +} + + +private[immutable] trait HashSetCombiner[T] +extends Combiner[T, ParHashSet[T]] { +self: EnvironmentPassingCombiner[T, ParHashSet[T]] => + import HashSetCombiner._ + var heads = new Array[Unrolled[Any]](rootsize) + var lasts = new Array[Unrolled[Any]](rootsize) + var size: Int = 0 + + def clear = { + heads = new Array[Unrolled[Any]](rootsize) + lasts = new Array[Unrolled[Any]](rootsize) + } + + def +=(elem: T) = { + size += 1 + val hc = elem.## + val pos = hc & 0x1f + if (lasts(pos) eq null) { + // initialize bucket + heads(pos) = new Unrolled[Any] + lasts(pos) = heads(pos) + } + // add to bucket + lasts(pos) = lasts(pos).add(elem) + this + } + + def combine[N <: T, NewTo >: ParHashSet[T]](other: Combiner[N, NewTo]): Combiner[N, NewTo] = if (this ne other) { + if (other.isInstanceOf[HashSetCombiner[_]]) { + val that = other.asInstanceOf[HashSetCombiner[T]] + var i = 0 + while (i < rootsize) { + if (lasts(i) eq null) { + heads(i) = that.heads(i) + lasts(i) = that.lasts(i) + } else { + lasts(i).next = that.heads(i) + if (that.lasts(i) ne null) lasts(i) = that.lasts(i) + } + i += 1 + } + size = size + that.size + this + } else error("Unexpected combiner type.") + } else this + + def result = { + val buckets = heads.filter(_ != null) + val root = new Array[HashSet[T]](buckets.length) + + executeAndWait(new CreateTrie(buckets, root, 0, buckets.length)) + + var bitmap = 0 + var i = 0 + while (i < rootsize) { + if (heads(i) ne null) bitmap |= 1 << i + i += 1 + } + val sz = root.foldLeft(0)(_ + _.size) + + if (sz == 0) new ParHashSet[T] + else if (sz == 1) new ParHashSet[T](root(0)) + else { + val trie = new HashSet.HashTrieSet(bitmap, root, sz) + new ParHashSet[T](trie) + } + } + + /* tasks */ + + class CreateTrie(buckets: Array[Unrolled[Any]], root: Array[HashSet[T]], offset: Int, howmany: Int) extends super.Task[Unit, CreateTrie] { + var result = () + def leaf(prev: Option[Unit]) = { + var i = offset + val until = offset + howmany + while (i < until) { + root(i) = createTrie(buckets(i)) + i += 1 + } + } + private def createTrie(elems: Unrolled[Any]): HashSet[T] = { + var trie = new HashSet[T] + + var unrolled = elems + var i = 0 + while (unrolled ne null) { + val chunkarr = unrolled.array + val chunksz = unrolled.size + while (i < chunksz) { + val v = chunkarr(i).asInstanceOf[T] + val hc = v.## + trie = trie.updated0(v, hc, rootbits) + i += 1 + } + i = 0 + unrolled = unrolled.next + } + + trie + } + def split = { + val fp = howmany / 2 + List(new CreateTrie(buckets, root, offset, fp), new CreateTrie(buckets, root, offset + fp, howmany - fp)) + } + def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(root.length, parallelismLevel) + } + +} + + +object HashSetCombiner { + def apply[T] = new HashSetCombiner[T] with EnvironmentPassingCombiner[T, ParHashSet[T]] {} + + private[immutable] val rootbits = 5 + private[immutable] val rootsize = 1 << 5 +} + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/library/scala/collection/parallel/immutable/package.scala b/src/library/scala/collection/parallel/immutable/package.scala index 8a968b68e..b18d78885 100644 --- a/src/library/scala/collection/parallel/immutable/package.scala +++ b/src/library/scala/collection/parallel/immutable/package.scala @@ -12,8 +12,29 @@ package scala.collection.parallel package object immutable { + /* package level methods */ def repetition[T](elem: T, len: Int) = new Repetition(elem, len) + /* properties */ + private[immutable] val unrolledsize = 16 + + /* classes */ + private[immutable] class Unrolled[T: ClassManifest] { + var size = 0 + var array = new Array[T](unrolledsize) + var next: Unrolled[T] = null + // adds and returns itself or the new unrolled if full + def add(elem: T): Unrolled[T] = if (size < unrolledsize) { + array(size) = elem + size += 1 + this + } else { + next = new Unrolled[T] + next.add(elem) + } + override def toString = "Unrolled(" + array.mkString(", ") + ")" + } + /** A (parallel) sequence consisting of `length` elements `elem`. Used in the `padTo` method. * * @tparam T type of the elements diff --git a/src/library/scala/util/control/Breaks.scala b/src/library/scala/util/control/Breaks.scala index 5ca1aa973..664b01631 100644 --- a/src/library/scala/util/control/Breaks.scala +++ b/src/library/scala/util/control/Breaks.scala @@ -38,6 +38,16 @@ class Breaks { if (ex ne breakException) throw ex } } + + def tryBreakable(op: => Unit) = new { + def catchBreak(onBreak: => Unit) = try { + op + } catch { + case ex: BreakControl => + if (ex ne breakException) throw ex + onBreak + } + } /* Break from dynamically closest enclosing breakable block * @note this might be different than the statically closest enclosing diff --git a/test/files/scalacheck/HashTrieSplit.scala b/test/files/scalacheck/HashTrieSplit.scala new file mode 100644 index 000000000..cbf565095 --- /dev/null +++ b/test/files/scalacheck/HashTrieSplit.scala @@ -0,0 +1,47 @@ + + + + + +import collection._ + + + + +// checks whether hash tries split their iterators correctly +// even after some elements have been traversed +object Test { + def main(args: Array[String]) { + doesSplitOk + } + + def doesSplitOk = { + val sz = 2000 + var ht = new parallel.immutable.ParHashMap[Int, Int] + // println("creating trie") + for (i <- 0 until sz) ht += ((i + sz, i)) + // println("created trie") + for (n <- 0 until (sz - 1)) { + // println("---------> n = " + n) + val pit = ht.parallelIterator + val pit2 = ht.parallelIterator + var i = 0 + while (i < n) { + pit.next + pit2.next + i += 1 + } + // println("splitting") + val pits = pit.split + val fst = pits(0).toSet + val snd = pits(1).toSet + val orig = pit2.toSet + if (orig.size != (fst.size + snd.size) || orig != (fst ++ snd)) { + println("Original: " + orig) + println("First: " + fst) + println("Second: " + snd) + assert(false) + } + } + } +}