diff --git a/build.xml b/build.xml index acfd406d0..e347f59ce 100644 --- a/build.xml +++ b/build.xml @@ -1563,13 +1563,13 @@ BOOTRAPING TEST AND TEST SUITE - - - + + + diff --git a/src/library/scala/collection/generic/ParSetFactory.scala b/src/library/scala/collection/generic/ParSetFactory.scala new file mode 100644 index 000000000..ac6fe79ba --- /dev/null +++ b/src/library/scala/collection/generic/ParSetFactory.scala @@ -0,0 +1,33 @@ +package scala.collection.generic + + + + + +import collection.mutable.Builder +import collection.parallel.Combiner +import collection.parallel.ParSet +import collection.parallel.ParSetLike + + + + + + +abstract class ParSetFactory[CC[X] <: ParSet[X] with ParSetLike[X, CC[X], _] with GenericParTemplate[X, CC]] + extends SetFactory[CC] + with GenericParCompanion[CC] +{ + def newBuilder[A]: Combiner[A, CC[A]] = newCombiner[A] + + def newCombiner[A]: Combiner[A, CC[A]] + + class GenericCanCombineFrom[A] extends CanCombineFrom[CC[_], A, CC[A]] { + override def apply(from: Coll) = from.genericCombiner[A] + override def apply() = newCombiner[A] + } +} + + + + diff --git a/src/library/scala/collection/generic/Signalling.scala b/src/library/scala/collection/generic/Signalling.scala index 88ac0547c..8e6a279ba 100644 --- a/src/library/scala/collection/generic/Signalling.scala +++ b/src/library/scala/collection/generic/Signalling.scala @@ -91,10 +91,7 @@ trait Signalling { /** * This signalling implementation returns default values and ignores received signals. */ -class DefaultSignalling extends Signalling { - def isAborted = false - def abort {} - +class DefaultSignalling extends Signalling with VolatileAbort { def indexFlag = -1 def setIndexFlag(f: Int) {} def setIndexFlagIfGreater(f: Int) {} @@ -115,8 +112,8 @@ object IdleSignalling extends DefaultSignalling */ trait VolatileAbort extends Signalling { @volatile private var abortflag = false - abstract override def isAborted = abortflag - abstract override def abort = abortflag = true + override def isAborted = abortflag + override def abort = abortflag = true } diff --git a/src/library/scala/collection/immutable/HashMap.scala b/src/library/scala/collection/immutable/HashMap.scala index fba076cba..1ed876f54 100644 --- a/src/library/scala/collection/immutable/HashMap.scala +++ b/src/library/scala/collection/immutable/HashMap.scala @@ -15,7 +15,7 @@ import generic._ import annotation.unchecked.uncheckedVariance -import parallel.immutable.ParHashTrie +import parallel.immutable.ParHashMap /** This class implements immutable maps using a hash trie. @@ -36,7 +36,7 @@ import parallel.immutable.ParHashTrie * @define willNotTerminateInf */ @serializable @SerialVersionUID(2L) -class HashMap[A, +B] extends Map[A,B] with MapLike[A, B, HashMap[A, B]] with Parallelizable[ParHashTrie[A, B]] { +class HashMap[A, +B] extends Map[A,B] with MapLike[A, B, HashMap[A, B]] with Parallelizable[ParHashMap[A, B]] { override def size: Int = 0 @@ -90,7 +90,7 @@ class HashMap[A, +B] extends Map[A,B] with MapLike[A, B, HashMap[A, B]] with Par protected def merge0[B1 >: B](that: HashMap[A, B1], level: Int, merger: Merger[B1]): HashMap[A, B1] = that - def par = ParHashTrie.fromTrie(this) + def par = ParHashMap.fromTrie(this) } @@ -307,78 +307,7 @@ object HashMap extends ImmutableMapFactory[HashMap] { } } -/* - override def iterator = { // TODO: optimize (use a stack to keep track of pos) - - def iter(m: HashTrieMap[A,B], k: => Stream[(A,B)]): Stream[(A,B)] = { - def horiz(elems: Array[HashMap[A,B]], i: Int, k: => Stream[(A,B)]): Stream[(A,B)] = { - if (i < elems.length) { - elems(i) match { - case m: HashTrieMap[A,B] => iter(m, horiz(elems, i+1, k)) - case m: HashMap1[A,B] => new Stream.Cons(m.ensurePair, horiz(elems, i+1, k)) - } - } else k - } - horiz(m.elems, 0, k) - } - iter(this, Stream.empty).iterator - } -*/ - - - override def iterator = new Iterator[(A,B)] { - private[this] var depth = 0 - private[this] var arrayStack = new Array[Array[HashMap[A,B]]](6) - private[this] var posStack = new Array[Int](6) - - private[this] var arrayD = elems - private[this] var posD = 0 - - private[this] var subIter: Iterator[(A,B)] = null // to traverse collision nodes - - def hasNext = (subIter ne null) || depth >= 0 - - def next: (A,B) = { - if (subIter ne null) { - val el = subIter.next - if (!subIter.hasNext) - subIter = null - el - } else - next0(arrayD, posD) - } - - @scala.annotation.tailrec private[this] def next0(elems: Array[HashMap[A,B]], i: Int): (A,B) = { - if (i == elems.length-1) { // reached end of level, pop stack - depth -= 1 - if (depth >= 0) { - arrayD = arrayStack(depth) - posD = posStack(depth) - arrayStack(depth) = null - } else { - arrayD = null - posD = 0 - } - } else - posD += 1 - - elems(i) match { - case m: HashTrieMap[A,B] => // push current pos onto stack and descend - if (depth >= 0) { - arrayStack(depth) = arrayD - posStack(depth) = posD - } - depth += 1 - arrayD = m.elems - posD = 0 - next0(m.elems, 0) - case m: HashMap1[A,B] => m.ensurePair - case m => - subIter = m.iterator - subIter.next - } - } - } + override def iterator = new TrieIterator[A, B](elems) /* @@ -534,6 +463,125 @@ time { mNew.iterator.foreach( p => ()) } } + class TrieIterator[A, +B](elems: Array[HashMap[A, B]]) extends Iterator[(A, B)] { + private[this] var depth = 0 + private[this] var arrayStack = new Array[Array[HashMap[A,B]]](6) + private[this] var posStack = new Array[Int](6) + + private[this] var arrayD = elems + private[this] var posD = 0 + + private[this] var subIter: Iterator[(A, B)] = null // to traverse collision nodes + + def hasNext = (subIter ne null) || depth >= 0 + + def next: (A,B) = { + if (subIter ne null) { + val el = subIter.next + if (!subIter.hasNext) + subIter = null + el + } else + next0(arrayD, posD) + } + + @scala.annotation.tailrec private[this] def next0(elems: Array[HashMap[A,B]], i: Int): (A,B) = { + if (i == elems.length-1) { // reached end of level, pop stack + depth -= 1 + if (depth >= 0) { + arrayD = arrayStack(depth) + posD = posStack(depth) + arrayStack(depth) = null + } else { + arrayD = null + posD = 0 + } + } else + posD += 1 + + elems(i) match { + case m: HashTrieMap[A,B] => // push current pos onto stack and descend + if (depth >= 0) { + arrayStack(depth) = arrayD + posStack(depth) = posD + } + depth += 1 + arrayD = m.elems + posD = 0 + next0(m.elems, 0) + case m: HashMap1[A,B] => m.ensurePair + case m => + subIter = m.iterator + subIter.next + } + } + + // assumption: contains 2 or more elements + // splits this iterator into 2 iterators + // returns the 1st iterator, its number of elements, and the second iterator + def split: ((Iterator[(A, B)], Int), Iterator[(A, B)]) = { + // 0) simple case: no elements have been iterated - simply divide arrayD + if (arrayD != null && depth == 0 && posD == 0) { + val (fst, snd) = arrayD.splitAt(arrayD.length / 2) + val szfst = fst.foldLeft(0)(_ + _.size) + return ((new TrieIterator(fst), szfst), new TrieIterator(snd)) + } + + // otherwise, some elements have been iterated over + // 1) collision case: if we have a subIter, we return subIter and elements after it + if (subIter ne null) { + val buff = subIter.toBuffer + subIter = null + ((buff.iterator, buff.length), this) + } else { + // otherwise find the topmost array stack element + if (depth > 0) { + // 2) topmost comes before (is not) arrayD + // steal a portion of top to create a new iterator + val topmost = arrayStack(0) + if (posStack(0) == arrayStack(0).length - 1) { + // 2a) only a single entry left on top + // this means we have to modify this iterator - pop topmost + val snd = Array(arrayStack(0).last) + val szsnd = snd(0).size + // modify this - pop + depth -= 1 + arrayStack = arrayStack.tail ++ Array[Array[HashMap[A, B]]](null) + posStack = posStack.tail ++ Array[Int](0) + // we know that `this` is not empty, since it had something on the arrayStack and arrayStack elements are always non-empty + ((new TrieIterator[A, B](snd), szsnd), this) + } else { + // 2b) more than a single entry left on top + val (fst, snd) = arrayStack(0).splitAt(arrayStack(0).length - (arrayStack(0).length - posStack(0) + 1) / 2) + arrayStack(0) = fst + val szsnd = snd.foldLeft(0)(_ + _.size) + ((new TrieIterator[A, B](snd), szsnd), this) + } + } else { + // 3) no topmost element (arrayD is at the top) + // steal a portion of it and update this iterator + if (posD == arrayD.length - 1) { + // 3a) positioned at the last element of arrayD + val arr: Array[HashMap[A, B]] = arrayD(posD) match { + case c: HashMapCollision1[_, _] => c.asInstanceOf[HashMapCollision1[A, B]].kvs.toArray map { HashMap() + _ } + case ht: HashTrieMap[_, _] => ht.asInstanceOf[HashTrieMap[A, B]].elems + case _ => error("cannot divide single element") + } + val (fst, snd) = arr.splitAt(arr.length / 2) + val szsnd = snd.foldLeft(0)(_ + _.size) + ((new TrieIterator(snd), szsnd), new TrieIterator(fst)) + } else { + // 3b) arrayD has more free elements + val (fst, snd) = arrayD.splitAt(arrayD.length - (arrayD.length - posD + 1) / 2) + arrayD = fst + val szsnd = snd.foldLeft(0)(_ + _.size) + ((new TrieIterator[A, B](snd), szsnd), this) + } + } + } + } + } + private def check[K](x: HashMap[K, _], y: HashMap[K, _], xy: HashMap[K, _]) = { // TODO remove this debugging helper var xs = Set[K]() for (elem <- x) xs += elem._1 diff --git a/src/library/scala/collection/immutable/HashSet.scala b/src/library/scala/collection/immutable/HashSet.scala index 498c7bbf8..94e7b079b 100644 --- a/src/library/scala/collection/immutable/HashSet.scala +++ b/src/library/scala/collection/immutable/HashSet.scala @@ -14,6 +14,8 @@ package immutable import generic._ import annotation.unchecked.uncheckedVariance +import collection.parallel.immutable.ParHashSet + /** This class implements immutable sets using a hash trie. * * '''Note:''' the builder of a hash set returns specialized representations `EmptySet`,`Set1`,..., `Set4` @@ -31,7 +33,9 @@ import annotation.unchecked.uncheckedVariance @serializable @SerialVersionUID(2L) class HashSet[A] extends Set[A] with GenericSetTemplate[A, HashSet] - with SetLike[A, HashSet[A]] { + with SetLike[A, HashSet[A]] + with Parallelizable[ParHashSet[A]] +{ override def companion: GenericCompanion[HashSet] = HashSet //class HashSet[A] extends Set[A] with SetLike[A, HashSet[A]] { @@ -54,7 +58,9 @@ class HashSet[A] extends Set[A] def - (e: A): HashSet[A] = removed0(e, computeHash(e), 0) - + + def par = ParHashSet.fromTrie(this) + protected def elemHashCode(key: A) = if (key == null) 0 else key.## protected final def improve(hcode: Int) = { @@ -68,7 +74,7 @@ class HashSet[A] extends Set[A] protected def get0(key: A, hash: Int, level: Int): Boolean = false - protected def updated0(key: A, hash: Int, level: Int): HashSet[A] = + def updated0(key: A, hash: Int, level: Int): HashSet[A] = new HashSet.HashSet1(key, hash) protected def removed0(key: A, hash: Int, level: Int): HashSet[A] = this @@ -169,11 +175,11 @@ object HashSet extends ImmutableSetFactory[HashSet] { } - class HashTrieSet[A](private var bitmap: Int, private var elems: Array[HashSet[A]], + class HashTrieSet[A](private var bitmap: Int, private[HashSet] var elems: Array[HashSet[A]], private var size0: Int) extends HashSet[A] { - + override def size = size0 - + override def get0(key: A, hash: Int, level: Int): Boolean = { val index = (hash >>> level) & 0x1f val mask = (1 << index) @@ -186,7 +192,7 @@ object HashSet extends ImmutableSetFactory[HashSet] { } else false } - + override def updated0(key: A, hash: Int, level: Int): HashSet[A] = { val index = (hash >>> level) & 0x1f val mask = (1 << index) @@ -208,7 +214,7 @@ object HashSet extends ImmutableSetFactory[HashSet] { new HashTrieSet(bitmapNew, elemsNew, size + 1) } } - + override def removed0(key: A, hash: Int, level: Int): HashSet[A] = { val index = (hash >>> level) & 0x1f val mask = (1 << index) @@ -238,62 +244,9 @@ object HashSet extends ImmutableSetFactory[HashSet] { this } } - - - override def iterator = new Iterator[A] { - private[this] var depth = 0 - private[this] var arrayStack = new Array[Array[HashSet[A]]](6) - private[this] var posStack = new Array[Int](6) - - private[this] var arrayD = elems - private[this] var posD = 0 - - private[this] var subIter: Iterator[A] = null // to traverse collision nodes - - def hasNext = (subIter ne null) || depth >= 0 - - def next: A = { - if (subIter ne null) { - val el = subIter.next - if (!subIter.hasNext) - subIter = null - el - } else - next0(arrayD, posD) - } - - @scala.annotation.tailrec private[this] def next0(elems: Array[HashSet[A]], i: Int): A = { - if (i == elems.length-1) { // reached end of level, pop stack - depth -= 1 - if (depth >= 0) { - arrayD = arrayStack(depth) - posD = posStack(depth) - arrayStack(depth) = null - } else { - arrayD = null - posD = 0 - } - } else - posD += 1 - - elems(i) match { - case m: HashTrieSet[A] => // push current pos onto stack and descend - if (depth >= 0) { - arrayStack(depth) = arrayD - posStack(depth) = posD - } - depth += 1 - arrayD = m.elems - posD = 0 - next0(m.elems, 0) - case m: HashSet1[A] => m.key - case m => - subIter = m.iterator - subIter.next - } - } - } - + + override def iterator = new TrieIterator[A](elems) + /* import collection.immutable._ @@ -314,8 +267,7 @@ time { mNew.iterator.foreach( p => ()) } time { mNew.iterator.foreach( p => ()) } */ - - + override def foreach[U](f: A => U): Unit = { var i = 0; while (i < elems.length) { @@ -325,6 +277,126 @@ time { mNew.iterator.foreach( p => ()) } } } + + class TrieIterator[A](elems: Array[HashSet[A]]) extends Iterator[A] { + private[this] var depth = 0 + private[this] var arrayStack = new Array[Array[HashSet[A]]](6) + private[this] var posStack = new Array[Int](6) + + private[this] var arrayD = elems + private[this] var posD = 0 + + private[this] var subIter: Iterator[A] = null // to traverse collision nodes + + def hasNext = (subIter ne null) || depth >= 0 + + def next: A = { + if (subIter ne null) { + val el = subIter.next + if (!subIter.hasNext) + subIter = null + el + } else + next0(arrayD, posD) + } + + @scala.annotation.tailrec private[this] def next0(elems: Array[HashSet[A]], i: Int): A = { + if (i == elems.length-1) { // reached end of level, pop stack + depth -= 1 + if (depth >= 0) { + arrayD = arrayStack(depth) + posD = posStack(depth) + arrayStack(depth) = null + } else { + arrayD = null + posD = 0 + } + } else + posD += 1 + + elems(i) match { + case m: HashTrieSet[A] => // push current pos onto stack and descend + if (depth >= 0) { + arrayStack(depth) = arrayD + posStack(depth) = posD + } + depth += 1 + arrayD = m.elems + posD = 0 + next0(m.elems, 0) + case m: HashSet1[A] => m.key + case m => + subIter = m.iterator + subIter.next + } + } + + // assumption: contains 2 or more elements + // splits this iterator into 2 iterators + // returns the 1st iterator, its number of elements, and the second iterator + def split: ((Iterator[A], Int), Iterator[A]) = { + // 0) simple case: no elements have been iterated - simply divide arrayD + if (arrayD != null && depth == 0 && posD == 0) { + val (fst, snd) = arrayD.splitAt(arrayD.length / 2) + val szfst = fst.foldLeft(0)(_ + _.size) + return ((new TrieIterator(fst), szfst), new TrieIterator(snd)) + } + + // otherwise, some elements have been iterated over + // 1) collision case: if we have a subIter, we return subIter and elements after it + if (subIter ne null) { + val buff = subIter.toBuffer + subIter = null + ((buff.iterator, buff.length), this) + } else { + // otherwise find the topmost array stack element + if (depth > 0) { + // 2) topmost comes before (is not) arrayD + // steal a portion of top to create a new iterator + val topmost = arrayStack(0) + if (posStack(0) == arrayStack(0).length - 1) { + // 2a) only a single entry left on top + // this means we have to modify this iterator - pop topmost + val snd = Array(arrayStack(0).last) + val szsnd = snd(0).size + // modify this - pop + depth -= 1 + arrayStack = arrayStack.tail ++ Array[Array[HashSet[A]]](null) + posStack = posStack.tail ++ Array[Int](0) + // we know that `this` is not empty, since it had something on the arrayStack and arrayStack elements are always non-empty + ((new TrieIterator[A](snd), szsnd), this) + } else { + // 2b) more than a single entry left on top + val (fst, snd) = arrayStack(0).splitAt(arrayStack(0).length - (arrayStack(0).length - posStack(0) + 1) / 2) + arrayStack(0) = fst + val szsnd = snd.foldLeft(0)(_ + _.size) + ((new TrieIterator[A](snd), szsnd), this) + } + } else { + // 3) no topmost element (arrayD is at the top) + // steal a portion of it and update this iterator + if (posD == arrayD.length - 1) { + // 3a) positioned at the last element of arrayD + val arr: Array[HashSet[A]] = arrayD(posD) match { + case c: HashSetCollision1[_] => c.asInstanceOf[HashSetCollision1[A]].ks.toList map { HashSet() + _ } toArray + case ht: HashTrieSet[_] => ht.asInstanceOf[HashTrieSet[A]].elems + case _ => error("cannot divide single element") + } + val (fst, snd) = arr.splitAt(arr.length / 2) + val szsnd = snd.foldLeft(0)(_ + _.size) + ((new TrieIterator(snd), szsnd), new TrieIterator(fst)) + } else { + // 3b) arrayD has more free elements + val (fst, snd) = arrayD.splitAt(arrayD.length - (arrayD.length - posD + 1) / 2) + arrayD = fst + val szsnd = snd.foldLeft(0)(_ + _.size) + ((new TrieIterator[A](snd), szsnd), this) + } + } + } + } + } + @serializable @SerialVersionUID(2L) private class SerializationProxy[A,B](@transient private var orig: HashSet[A]) { private def writeObject(out: java.io.ObjectOutputStream) { val s = orig.size diff --git a/src/library/scala/collection/mutable/ConcurrentMap.scala b/src/library/scala/collection/mutable/ConcurrentMap.scala index 5239b489b..18921d566 100644 --- a/src/library/scala/collection/mutable/ConcurrentMap.scala +++ b/src/library/scala/collection/mutable/ConcurrentMap.scala @@ -20,12 +20,13 @@ package mutable * Note: The concurrent maps do not accept `null` for keys or values. * * @define atomicop - * This is done atomically. + * This is an atomic operation. */ trait ConcurrentMap[A, B] extends Map[A, B] { /** * Associates the given key with a given value, unless the key was already associated with some other value. + * * $atomicop * * @param k key with which the specified value is to be associated with @@ -37,6 +38,7 @@ trait ConcurrentMap[A, B] extends Map[A, B] { /** * Removes the entry for the specified key if its currently mapped to the specified value. + * * $atomicop * * @param k key for which the entry should be removed @@ -47,6 +49,7 @@ trait ConcurrentMap[A, B] extends Map[A, B] { /** * Replaces the entry for the given key only if it was previously mapped to a given value. + * * $atomicop * * @param k key for which the entry should be replaced @@ -58,6 +61,7 @@ trait ConcurrentMap[A, B] extends Map[A, B] { /** * Replaces the entry for the given key only if it was previously mapped to some value. + * * $atomicop * * @param k key for which the entry should be replaced diff --git a/src/library/scala/collection/parallel/ParIterable.scala b/src/library/scala/collection/parallel/ParIterable.scala index b44704436..7f1a8c7c3 100644 --- a/src/library/scala/collection/parallel/ParIterable.scala +++ b/src/library/scala/collection/parallel/ParIterable.scala @@ -26,8 +26,7 @@ trait ParIterable[+T] extends Iterable[T] /** $factoryinfo */ object ParIterable extends ParFactory[ParIterable] { - implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParIterable[T]] = - new GenericCanCombineFrom[T] + implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParIterable[T]] = new GenericCanCombineFrom[T] def newBuilder[T]: Combiner[T, ParIterable[T]] = ParArrayCombiner[T] diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala index af26463c3..b1b135ab5 100644 --- a/src/library/scala/collection/parallel/ParIterableLike.scala +++ b/src/library/scala/collection/parallel/ParIterableLike.scala @@ -427,7 +427,10 @@ self => executeAndWaitResult(new Find(pred, parallelIterator assign new DefaultSignalling with VolatileAbort)) } - protected[this] def cbfactory = () => newCombiner + protected[this] def cbfactory ={ + println(newCombiner + ", " + newCombiner.getClass) + () => newCombiner + } override def filter(pred: T => Boolean): Repr = { executeAndWaitResult(new Filter(pred, cbfactory, parallelIterator) mapResult { _.result }) @@ -655,6 +658,7 @@ self => protected[this] def newSubtask(p: ParIterableIterator[T]): Accessor[R, Tp] def shouldSplitFurther = pit.remaining > threshold(size, parallelismLevel) def split = pit.split.map(newSubtask(_)) // default split procedure + private[parallel] override def signalAbort = pit.abort override def toString = "Accessor(" + pit.toString + ")" } @@ -672,6 +676,10 @@ self => val st: Second def combineResults(fr: FR, sr: SR): R var result: R = null.asInstanceOf[R] + private[parallel] override def signalAbort { + ft.signalAbort + st.signalAbort + } } /** Sequentially performs one task after another. */ @@ -703,6 +711,9 @@ self => inner.compute result = map(inner.result) } + private[parallel] override def signalAbort { + inner.signalAbort + } } protected trait Transformer[R, Tp] extends Accessor[R, Tp] @@ -1187,6 +1198,7 @@ self => new ScanWithScanTree(Some(st.left.value), op, st.right, src, dest) ) def shouldSplitFurther = (st.left ne null) && (st.right ne null) + } protected[this] class FromArray[S, A, That](array: Array[A], from: Int, len: Int, cbf: CanCombineFrom[Repr, S, That]) diff --git a/src/library/scala/collection/parallel/ParMap.scala b/src/library/scala/collection/parallel/ParMap.scala index b95a81ef6..9f21f52e9 100644 --- a/src/library/scala/collection/parallel/ParMap.scala +++ b/src/library/scala/collection/parallel/ParMap.scala @@ -26,7 +26,7 @@ self => def mapCompanion: GenericParMapCompanion[ParMap] = ParMap - override def empty: ParMap[K, V] = new immutable.ParHashTrie[K, V] + override def empty: ParMap[K, V] = new immutable.ParHashMap[K, V] override def stringPrefix = "ParMap" } @@ -34,9 +34,9 @@ self => object ParMap extends ParMapFactory[ParMap] { - def empty[K, V]: ParMap[K, V] = new immutable.ParHashTrie[K, V] + def empty[K, V]: ParMap[K, V] = new immutable.ParHashMap[K, V] - def newCombiner[K, V]: Combiner[(K, V), ParMap[K, V]] = immutable.HashTrieCombiner[K, V] + def newCombiner[K, V]: Combiner[(K, V), ParMap[K, V]] = immutable.HashMapCombiner[K, V] implicit def canBuildFrom[K, V]: CanCombineFrom[Coll, (K, V), ParMap[K, V]] = new CanCombineFromMap[K, V] diff --git a/src/library/scala/collection/parallel/ParSet.scala b/src/library/scala/collection/parallel/ParSet.scala new file mode 100644 index 000000000..f6508e9f9 --- /dev/null +++ b/src/library/scala/collection/parallel/ParSet.scala @@ -0,0 +1,83 @@ +package scala.collection.parallel + + + + + + + +import scala.collection.Map +import scala.collection.mutable.Builder +import scala.collection.generic._ + + + + + + +trait ParSet[T] +extends Set[T] + with GenericParTemplate[T, ParSet] + with ParIterable[T] + with ParSetLike[T, ParSet[T], Set[T]] +{ +self => + override def empty: ParSet[T] = immutable.ParHashSet[T]() + + override def companion: GenericCompanion[ParSet] with GenericParCompanion[ParSet] = ParSet + + override def stringPrefix = "ParSet" +} + + + +object ParSet extends ParSetFactory[ParSet] { + def newCombiner[T]: Combiner[T, ParSet[T]] = immutable.HashSetCombiner[T] + + implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParSet[T]] = new GenericCanCombineFrom[T] +} + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/library/scala/collection/parallel/ParSetLike.scala b/src/library/scala/collection/parallel/ParSetLike.scala new file mode 100644 index 000000000..b60728345 --- /dev/null +++ b/src/library/scala/collection/parallel/ParSetLike.scala @@ -0,0 +1,81 @@ +package scala.collection.parallel + + + +import scala.collection.SetLike +import scala.collection.Set +import scala.collection.mutable.Builder + + + + + + + + +trait ParSetLike[T, + +Repr <: ParSetLike[T, Repr, Sequential] with ParSet[T], + +Sequential <: Set[T] with SetLike[T, Sequential]] +extends SetLike[T, Repr] + with ParIterableLike[T, Repr, Sequential] +{ self => + + protected[this] override def newBuilder: Builder[T, Repr] = newCombiner + + protected[this] override def newCombiner: Combiner[T, Repr] + + override def empty: Repr + +} + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/library/scala/collection/parallel/RemainsIterator.scala b/src/library/scala/collection/parallel/RemainsIterator.scala index 94cbbb7de..36f897cb3 100644 --- a/src/library/scala/collection/parallel/RemainsIterator.scala +++ b/src/library/scala/collection/parallel/RemainsIterator.scala @@ -366,12 +366,13 @@ self => def next = { remaining -= 1; self.next } def split: Seq[ParIterableIterator[T]] = takeSeq(self.split) { (p, n) => p.take(n) } protected[this] def takeSeq[PI <: ParIterableIterator[T]](sq: Seq[PI])(taker: (PI, Int) => PI) = { - val shortened = for ((it, total) <- sq zip sq.scanLeft(0)(_ + _.remaining).tail) yield - if (total < remaining) it else taker(it, total - remaining) + val sizes = sq.scanLeft(0)(_ + _.remaining) + val shortened = for ((it, (from, until)) <- sq zip (sizes.init zip sizes.tail)) yield + if (until < remaining) it else taker(it, remaining - from) shortened filter { _.remaining > 0 } } } - + override def take(n: Int) = new Taken(n) override def slice(from1: Int, until1: Int) = { diff --git a/src/library/scala/collection/parallel/Splitter.scala b/src/library/scala/collection/parallel/Splitter.scala index 5c8b24e92..cf81a6ac4 100644 --- a/src/library/scala/collection/parallel/Splitter.scala +++ b/src/library/scala/collection/parallel/Splitter.scala @@ -34,6 +34,15 @@ trait Splitter[+T] extends Iterator[T] { } +object Splitter { + def empty[T]: Splitter[T] = new Splitter[T] { + def hasNext = false + def next = Iterator.empty.next + def split = Seq(this) + } +} + + /** A precise splitter (or a precise split iterator) can be split into arbitrary number of splitters * that traverse disjoint subsets of arbitrary sizes. * diff --git a/src/library/scala/collection/parallel/Tasks.scala b/src/library/scala/collection/parallel/Tasks.scala index b6265c665..1e03ac7e0 100644 --- a/src/library/scala/collection/parallel/Tasks.scala +++ b/src/library/scala/collection/parallel/Tasks.scala @@ -4,9 +4,7 @@ package scala.collection.parallel import scala.concurrent.forkjoin._ - - - +import scala.util.control.Breaks._ @@ -59,6 +57,31 @@ trait Tasks { protected[this] def split: Seq[Task[R, Tp]] /** Read of results of `that` task and merge them into results of this one. */ protected[this] def merge(that: Tp) {} + + // exception handling mechanism + var exception: Exception = null + def forwardException = if (exception != null) throw exception + // tries to do the leaf computation, storing the possible exception + protected def tryLeaf(result: Option[R]) { + try { + tryBreakable { + leaf(result) + } catchBreak { + signalAbort + } + } catch { + case e: Exception => + exception = e + signalAbort + } + } + protected[this] def tryMerge(t: Tp) { + val that = t.asInstanceOf[Task[R, Tp]] + if (this.exception == null && that.exception == null) merge(that.repr) + else if (that.exception != null) this.exception = that.exception + } + // override in concrete task implementations to signal abort to other tasks + private[parallel] def signalAbort {} } type TaskType[R, +Tp] <: Task[R, Tp] @@ -66,13 +89,13 @@ trait Tasks { var environment: ExecutionEnvironment - /** Executes a task and returns a future. */ + /** Executes a task and returns a future. Forwards an exception if some task threw it. */ def execute[R, Tp](fjtask: TaskType[R, Tp]): () => R - /** Executes a task and waits for it to finish. */ + /** Executes a task and waits for it to finish. Forwards an exception if some task threw it. */ def executeAndWait[R, Tp](task: TaskType[R, Tp]) - /** Executes a result task, waits for it to finish, then returns its result. */ + /** Executes a result task, waits for it to finish, then returns its result. Forwards an exception if some task threw it. */ def executeAndWaitResult[R, Tp](task: TaskType[R, Tp]): R /** Retrieves the parallelism level of the task execution environment. */ @@ -96,19 +119,19 @@ trait AdaptiveWorkStealingTasks extends Tasks { /** The actual leaf computation. */ def leaf(result: Option[R]): Unit - def compute = if (shouldSplitFurther) internal else leaf(None) + def compute = if (shouldSplitFurther) internal else tryLeaf(None) def internal = { var last = spawnSubtasks - last.leaf(None) + last.tryLeaf(None) result = last.result while (last.next != null) { val lastresult = Option(last.result) last = last.next - if (last.tryCancel) last.leaf(lastresult) else last.sync - merge(last.repr) + if (last.tryCancel) last.tryLeaf(lastresult) else last.sync + tryMerge(last.repr) } } @@ -150,7 +173,6 @@ trait HavingForkJoinPool { } - /** An implementation trait for parallel tasks based on the fork/join framework. * * @define fjdispatch @@ -187,6 +209,7 @@ trait ForkJoinTasks extends Tasks with HavingForkJoinPool { () => { fjtask.join + fjtask.forwardException fjtask.result } } @@ -202,6 +225,7 @@ trait ForkJoinTasks extends Tasks with HavingForkJoinPool { forkJoinPool.execute(fjtask) } fjtask.join + fjtask.forwardException } /** Executes a task on a fork/join pool and waits for it to finish. @@ -218,6 +242,7 @@ trait ForkJoinTasks extends Tasks with HavingForkJoinPool { forkJoinPool.execute(fjtask) } fjtask.join + fjtask.forwardException fjtask.result } @@ -225,8 +250,9 @@ trait ForkJoinTasks extends Tasks with HavingForkJoinPool { } + object ForkJoinTasks { - val defaultForkJoinPool = new ForkJoinPool + val defaultForkJoinPool: ForkJoinPool = new ForkJoinPool defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors) defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors) } diff --git a/src/library/scala/collection/parallel/immutable/ParHashTrie.scala b/src/library/scala/collection/parallel/immutable/ParHashMap.scala similarity index 52% rename from src/library/scala/collection/parallel/immutable/ParHashTrie.scala rename to src/library/scala/collection/parallel/immutable/ParHashMap.scala index 0d51628b2..b2ebfb3f1 100644 --- a/src/library/scala/collection/parallel/immutable/ParHashTrie.scala +++ b/src/library/scala/collection/parallel/immutable/ParHashMap.scala @@ -9,6 +9,7 @@ package scala.collection.parallel.immutable import scala.collection.parallel.ParMap import scala.collection.parallel.ParMapLike import scala.collection.parallel.Combiner +import scala.collection.parallel.ParIterableIterator import scala.collection.parallel.EnvironmentPassingCombiner import scala.collection.generic.ParMapFactory import scala.collection.generic.CanCombineFrom @@ -25,26 +26,26 @@ import scala.collection.immutable.HashMap * * @author prokopec */ -class ParHashTrie[K, +V] private[immutable] (private[this] val trie: HashMap[K, V]) +class ParHashMap[K, +V] private[immutable] (private[this] val trie: HashMap[K, V]) extends ParMap[K, V] - with GenericParMapTemplate[K, V, ParHashTrie] - with ParMapLike[K, V, ParHashTrie[K, V], HashMap[K, V]] + with GenericParMapTemplate[K, V, ParHashMap] + with ParMapLike[K, V, ParHashMap[K, V], HashMap[K, V]] { self => def this() = this(HashMap.empty[K, V]) - override def mapCompanion: GenericParMapCompanion[ParHashTrie] = ParHashTrie + override def mapCompanion: GenericParMapCompanion[ParHashMap] = ParHashMap - override def empty: ParHashTrie[K, V] = new ParHashTrie[K, V] + override def empty: ParHashMap[K, V] = new ParHashMap[K, V] - def parallelIterator = new ParHashTrieIterator(trie) with SCPI + def parallelIterator: ParIterableIterator[(K, V)] = new ParHashMapIterator(trie.iterator, trie.size) with SCPI def seq = trie - def -(k: K) = new ParHashTrie(trie - k) + def -(k: K) = new ParHashMap(trie - k) - def +[U >: V](kv: (K, U)) = new ParHashTrie(trie + kv) + def +[U >: V](kv: (K, U)) = new ParHashMap(trie + kv) def get(k: K) = trie.get(k) @@ -55,59 +56,66 @@ self => case None => newc } - type SCPI = SignalContextPassingIterator[ParHashTrieIterator] + type SCPI = SignalContextPassingIterator[ParHashMapIterator] - class ParHashTrieIterator(val ht: HashMap[K, V]) + class ParHashMapIterator(val triter: Iterator[(K, V)], val sz: Int) extends super.ParIterator { - self: SignalContextPassingIterator[ParHashTrieIterator] => - // println("created iterator " + ht) + self: SignalContextPassingIterator[ParHashMapIterator] => var i = 0 - lazy val triter = ht.iterator - def split: Seq[ParIterator] = { - // println("splitting " + ht + " into " + ht.split.map(new ParHashTrieIterator(_) with SCPI).map(_.toList)) - ht.split.map(new ParHashTrieIterator(_) with SCPI) + def split: Seq[ParIterator] = if (remaining < 2) Seq(this) else triter match { + case t: HashMap.TrieIterator[_, _] => + val previousRemaining = remaining + val ((fst, fstlength), snd) = t.asInstanceOf[HashMap.TrieIterator[K, V]].split + val sndlength = previousRemaining - fstlength + Seq( + new ParHashMapIterator(fst, fstlength) with SCPI, + new ParHashMapIterator(snd, sndlength) with SCPI + ) + case _ => + // iterator of the collision map case + val buff = triter.toBuffer + val (fp, sp) = buff.splitAt(buff.length / 2) + Seq(fp, sp) map { b => new ParHashMapIterator(b.iterator, b.length) with SCPI } } def next: (K, V) = { - // println("taking next after " + i + ", in " + ht) i += 1 triter.next } def hasNext: Boolean = { - // println("hasNext: " + i + ", " + ht.size + ", " + ht) - i < ht.size + i < sz } - def remaining = ht.size - i + def remaining = sz - i } } -object ParHashTrie extends ParMapFactory[ParHashTrie] { - def empty[K, V]: ParHashTrie[K, V] = new ParHashTrie[K, V] +object ParHashMap extends ParMapFactory[ParHashMap] { + def empty[K, V]: ParHashMap[K, V] = new ParHashMap[K, V] - def newCombiner[K, V]: Combiner[(K, V), ParHashTrie[K, V]] = HashTrieCombiner[K, V] + def newCombiner[K, V]: Combiner[(K, V), ParHashMap[K, V]] = HashMapCombiner[K, V] - implicit def canBuildFrom[K, V]: CanCombineFrom[Coll, (K, V), ParHashTrie[K, V]] = { + implicit def canBuildFrom[K, V]: CanCombineFrom[Coll, (K, V), ParHashMap[K, V]] = { new CanCombineFromMap[K, V] } - def fromTrie[K, V](t: HashMap[K, V]) = new ParHashTrie(t) + def fromTrie[K, V](t: HashMap[K, V]) = new ParHashMap(t) var totalcombines = new java.util.concurrent.atomic.AtomicInteger(0) } -trait HashTrieCombiner[K, V] -extends Combiner[(K, V), ParHashTrie[K, V]] { -self: EnvironmentPassingCombiner[(K, V), ParHashTrie[K, V]] => - import HashTrieCombiner._ - var heads = new Array[Unrolled[K, V]](rootsize) - var lasts = new Array[Unrolled[K, V]](rootsize) +private[immutable] trait HashMapCombiner[K, V] +extends Combiner[(K, V), ParHashMap[K, V]] { +self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] => + import HashMapCombiner._ + var heads = new Array[Unrolled[(K, V)]](rootsize) + var lasts = new Array[Unrolled[(K, V)]](rootsize) var size: Int = 0 def clear = { - heads = new Array[Unrolled[K, V]](rootsize) - lasts = new Array[Unrolled[K, V]](rootsize) + heads = new Array[Unrolled[(K, V)]](rootsize) + lasts = new Array[Unrolled[(K, V)]](rootsize) } def +=(elem: (K, V)) = { @@ -116,7 +124,7 @@ self: EnvironmentPassingCombiner[(K, V), ParHashTrie[K, V]] => val pos = hc & 0x1f if (lasts(pos) eq null) { // initialize bucket - heads(pos) = new Unrolled[K, V] + heads(pos) = new Unrolled[(K, V)] lasts(pos) = heads(pos) } // add to bucket @@ -124,10 +132,10 @@ self: EnvironmentPassingCombiner[(K, V), ParHashTrie[K, V]] => this } - def combine[N <: (K, V), NewTo >: ParHashTrie[K, V]](other: Combiner[N, NewTo]): Combiner[N, NewTo] = if (this ne other) { - // ParHashTrie.totalcombines.incrementAndGet - if (other.isInstanceOf[HashTrieCombiner[_, _]]) { - val that = other.asInstanceOf[HashTrieCombiner[K, V]] + def combine[N <: (K, V), NewTo >: ParHashMap[K, V]](other: Combiner[N, NewTo]): Combiner[N, NewTo] = if (this ne other) { + // ParHashMap.totalcombines.incrementAndGet + if (other.isInstanceOf[HashMapCombiner[_, _]]) { + val that = other.asInstanceOf[HashMapCombiner[K, V]] var i = 0 while (i < rootsize) { if (lasts(i) eq null) { @@ -158,17 +166,17 @@ self: EnvironmentPassingCombiner[(K, V), ParHashTrie[K, V]] => } val sz = root.foldLeft(0)(_ + _.size) - if (sz == 0) new ParHashTrie[K, V] - else if (sz == 1) new ParHashTrie[K, V](root(0)) + if (sz == 0) new ParHashMap[K, V] + else if (sz == 1) new ParHashMap[K, V](root(0)) else { val trie = new HashMap.HashTrieMap(bitmap, root, sz) - new ParHashTrie[K, V](trie) + new ParHashMap[K, V](trie) } } /* tasks */ - class CreateTrie(buckets: Array[Unrolled[K, V]], root: Array[HashMap[K, V]], offset: Int, howmany: Int) extends super.Task[Unit, CreateTrie] { + class CreateTrie(buckets: Array[Unrolled[(K, V)]], root: Array[HashMap[K, V]], offset: Int, howmany: Int) extends super.Task[Unit, CreateTrie] { var result = () def leaf(prev: Option[Unit]) = { var i = offset @@ -178,7 +186,7 @@ self: EnvironmentPassingCombiner[(K, V), ParHashTrie[K, V]] => i += 1 } } - private def createTrie(elems: Unrolled[K, V]): HashMap[K, V] = { + private def createTrie(elems: Unrolled[(K, V)]): HashMap[K, V] = { var trie = new HashMap[K, V] var unrolled = elems @@ -208,28 +216,11 @@ self: EnvironmentPassingCombiner[(K, V), ParHashTrie[K, V]] => } -object HashTrieCombiner { - def apply[K, V] = new HashTrieCombiner[K, V] with EnvironmentPassingCombiner[(K, V), ParHashTrie[K, V]] {} +object HashMapCombiner { + def apply[K, V] = new HashMapCombiner[K, V] with EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] {} private[immutable] val rootbits = 5 private[immutable] val rootsize = 1 << 5 - private[immutable] val unrolledsize = 16 - - private[immutable] class Unrolled[K, V] { - var size = 0 - var array = new Array[(K, V)](unrolledsize) - var next: Unrolled[K, V] = null - // adds and returns itself or the new unrolled if full - def add(elem: (K, V)): Unrolled[K, V] = if (size < unrolledsize) { - array(size) = elem - size += 1 - this - } else { - next = new Unrolled[K, V] - next.add(elem) - } - override def toString = "Unrolled(" + array.mkString(", ") + ")" - } } @@ -246,3 +237,5 @@ object HashTrieCombiner { + + diff --git a/src/library/scala/collection/parallel/immutable/ParHashSet.scala b/src/library/scala/collection/parallel/immutable/ParHashSet.scala new file mode 100644 index 000000000..4ffd04454 --- /dev/null +++ b/src/library/scala/collection/parallel/immutable/ParHashSet.scala @@ -0,0 +1,279 @@ +package scala.collection.parallel.immutable + + + + + + + +import scala.collection.parallel.ParSet +import scala.collection.parallel.ParSetLike +import scala.collection.parallel.Combiner +import scala.collection.parallel.ParIterableIterator +import scala.collection.parallel.EnvironmentPassingCombiner +import scala.collection.generic.ParSetFactory +import scala.collection.generic.CanCombineFrom +import scala.collection.generic.GenericParTemplate +import scala.collection.generic.GenericParCompanion +import scala.collection.generic.GenericCompanion +import scala.collection.immutable.HashSet + + + + + + +/** Parallel hash trie set. + * + * @author prokopec + */ +class ParHashSet[T] private[immutable] (private[this] val trie: HashSet[T]) +extends ParSet[T] + with GenericParTemplate[T, ParHashSet] + with ParSetLike[T, ParHashSet[T], HashSet[T]] +{ +self => + + def this() = this(HashSet.empty[T]) + + override def companion: GenericCompanion[ParHashSet] with GenericParCompanion[ParHashSet] = ParHashSet + + override def empty: ParHashSet[T] = new ParHashSet[T] + + def parallelIterator: ParIterableIterator[T] = new ParHashSetIterator(trie.iterator, trie.size) with SCPI + + def seq = trie + + def -(e: T) = new ParHashSet(trie - e) + + def +(e: T) = new ParHashSet(trie + e) + + def contains(e: T): Boolean = trie.contains(e) + + override def size = trie.size + + protected override def reuse[S, That](oldc: Option[Combiner[S, That]], newc: Combiner[S, That]) = oldc match { + case Some(old) => old + case None => newc + } + + type SCPI = SignalContextPassingIterator[ParHashSetIterator] + + class ParHashSetIterator(val triter: Iterator[T], val sz: Int) + extends super.ParIterator { + self: SignalContextPassingIterator[ParHashSetIterator] => + var i = 0 + def split: Seq[ParIterator] = if (remaining < 2) Seq(this) else triter match { + case t: HashSet.TrieIterator[_] => + val previousRemaining = remaining + val ((fst, fstlength), snd) = t.asInstanceOf[HashSet.TrieIterator[T]].split + val sndlength = previousRemaining - fstlength + Seq( + new ParHashSetIterator(fst, fstlength) with SCPI, + new ParHashSetIterator(snd, sndlength) with SCPI + ) + case _ => + // iterator of the collision map case + val buff = triter.toBuffer + val (fp, sp) = buff.splitAt(buff.length / 2) + Seq(fp, sp) map { b => new ParHashSetIterator(b.iterator, b.length) with SCPI } + } + def next: T = { + i += 1 + triter.next + } + def hasNext: Boolean = { + i < sz + } + def remaining = sz - i + } + +} + + +object ParHashSet extends ParSetFactory[ParHashSet] { + def newCombiner[T]: Combiner[T, ParHashSet[T]] = HashSetCombiner[T] + + implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParHashSet[T]] = + new GenericCanCombineFrom[T] + + def fromTrie[T](t: HashSet[T]) = new ParHashSet(t) +} + + +private[immutable] trait HashSetCombiner[T] +extends Combiner[T, ParHashSet[T]] { +self: EnvironmentPassingCombiner[T, ParHashSet[T]] => + import HashSetCombiner._ + var heads = new Array[Unrolled[Any]](rootsize) + var lasts = new Array[Unrolled[Any]](rootsize) + var size: Int = 0 + + def clear = { + heads = new Array[Unrolled[Any]](rootsize) + lasts = new Array[Unrolled[Any]](rootsize) + } + + def +=(elem: T) = { + size += 1 + val hc = elem.## + val pos = hc & 0x1f + if (lasts(pos) eq null) { + // initialize bucket + heads(pos) = new Unrolled[Any] + lasts(pos) = heads(pos) + } + // add to bucket + lasts(pos) = lasts(pos).add(elem) + this + } + + def combine[N <: T, NewTo >: ParHashSet[T]](other: Combiner[N, NewTo]): Combiner[N, NewTo] = if (this ne other) { + if (other.isInstanceOf[HashSetCombiner[_]]) { + val that = other.asInstanceOf[HashSetCombiner[T]] + var i = 0 + while (i < rootsize) { + if (lasts(i) eq null) { + heads(i) = that.heads(i) + lasts(i) = that.lasts(i) + } else { + lasts(i).next = that.heads(i) + if (that.lasts(i) ne null) lasts(i) = that.lasts(i) + } + i += 1 + } + size = size + that.size + this + } else error("Unexpected combiner type.") + } else this + + def result = { + val buckets = heads.filter(_ != null) + val root = new Array[HashSet[T]](buckets.length) + + executeAndWait(new CreateTrie(buckets, root, 0, buckets.length)) + + var bitmap = 0 + var i = 0 + while (i < rootsize) { + if (heads(i) ne null) bitmap |= 1 << i + i += 1 + } + val sz = root.foldLeft(0)(_ + _.size) + + if (sz == 0) new ParHashSet[T] + else if (sz == 1) new ParHashSet[T](root(0)) + else { + val trie = new HashSet.HashTrieSet(bitmap, root, sz) + new ParHashSet[T](trie) + } + } + + /* tasks */ + + class CreateTrie(buckets: Array[Unrolled[Any]], root: Array[HashSet[T]], offset: Int, howmany: Int) extends super.Task[Unit, CreateTrie] { + var result = () + def leaf(prev: Option[Unit]) = { + var i = offset + val until = offset + howmany + while (i < until) { + root(i) = createTrie(buckets(i)) + i += 1 + } + } + private def createTrie(elems: Unrolled[Any]): HashSet[T] = { + var trie = new HashSet[T] + + var unrolled = elems + var i = 0 + while (unrolled ne null) { + val chunkarr = unrolled.array + val chunksz = unrolled.size + while (i < chunksz) { + val v = chunkarr(i).asInstanceOf[T] + val hc = v.## + trie = trie.updated0(v, hc, rootbits) + i += 1 + } + i = 0 + unrolled = unrolled.next + } + + trie + } + def split = { + val fp = howmany / 2 + List(new CreateTrie(buckets, root, offset, fp), new CreateTrie(buckets, root, offset + fp, howmany - fp)) + } + def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(root.length, parallelismLevel) + } + +} + + +object HashSetCombiner { + def apply[T] = new HashSetCombiner[T] with EnvironmentPassingCombiner[T, ParHashSet[T]] {} + + private[immutable] val rootbits = 5 + private[immutable] val rootsize = 1 << 5 +} + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/library/scala/collection/parallel/immutable/package.scala b/src/library/scala/collection/parallel/immutable/package.scala index 8a968b68e..b18d78885 100644 --- a/src/library/scala/collection/parallel/immutable/package.scala +++ b/src/library/scala/collection/parallel/immutable/package.scala @@ -12,8 +12,29 @@ package scala.collection.parallel package object immutable { + /* package level methods */ def repetition[T](elem: T, len: Int) = new Repetition(elem, len) + /* properties */ + private[immutable] val unrolledsize = 16 + + /* classes */ + private[immutable] class Unrolled[T: ClassManifest] { + var size = 0 + var array = new Array[T](unrolledsize) + var next: Unrolled[T] = null + // adds and returns itself or the new unrolled if full + def add(elem: T): Unrolled[T] = if (size < unrolledsize) { + array(size) = elem + size += 1 + this + } else { + next = new Unrolled[T] + next.add(elem) + } + override def toString = "Unrolled(" + array.mkString(", ") + ")" + } + /** A (parallel) sequence consisting of `length` elements `elem`. Used in the `padTo` method. * * @tparam T type of the elements diff --git a/src/library/scala/util/control/Breaks.scala b/src/library/scala/util/control/Breaks.scala index 5ca1aa973..664b01631 100644 --- a/src/library/scala/util/control/Breaks.scala +++ b/src/library/scala/util/control/Breaks.scala @@ -38,6 +38,16 @@ class Breaks { if (ex ne breakException) throw ex } } + + def tryBreakable(op: => Unit) = new { + def catchBreak(onBreak: => Unit) = try { + op + } catch { + case ex: BreakControl => + if (ex ne breakException) throw ex + onBreak + } + } /* Break from dynamically closest enclosing breakable block * @note this might be different than the statically closest enclosing diff --git a/test/files/scalacheck/HashTrieSplit.scala b/test/files/scalacheck/HashTrieSplit.scala new file mode 100644 index 000000000..cbf565095 --- /dev/null +++ b/test/files/scalacheck/HashTrieSplit.scala @@ -0,0 +1,47 @@ + + + + + +import collection._ + + + + +// checks whether hash tries split their iterators correctly +// even after some elements have been traversed +object Test { + def main(args: Array[String]) { + doesSplitOk + } + + def doesSplitOk = { + val sz = 2000 + var ht = new parallel.immutable.ParHashMap[Int, Int] + // println("creating trie") + for (i <- 0 until sz) ht += ((i + sz, i)) + // println("created trie") + for (n <- 0 until (sz - 1)) { + // println("---------> n = " + n) + val pit = ht.parallelIterator + val pit2 = ht.parallelIterator + var i = 0 + while (i < n) { + pit.next + pit2.next + i += 1 + } + // println("splitting") + val pits = pit.split + val fst = pits(0).toSet + val snd = pits(1).toSet + val orig = pit2.toSet + if (orig.size != (fst.size + snd.size) || orig != (fst ++ snd)) { + println("Original: " + orig) + println("First: " + fst) + println("Second: " + snd) + assert(false) + } + } + } +}