Skip to content

Commit 6698186

Browse files
committed
Revert "I think this might be a bad rabbit hole. Started work to make CoGroupedRDD use iterator and then went crazy"
This reverts commit df9afbec7e9fb558cf75d4e8dc94d8f44f101301.
1 parent fe992fe commit 6698186

File tree

2 files changed

+17
-34
lines changed

2 files changed

+17
-34
lines changed

core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import scala.collection.mutable.ArrayBuffer
2323

2424
import org.apache.spark.{InterruptibleIterator, Partition, Partitioner, SparkEnv, TaskContext}
2525
import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency}
26-
import org.apache.spark.util.collection.{FlexibleExternalAppendOnlyMap, AppendOnlyMap}
26+
import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap}
2727
import org.apache.spark.serializer.Serializer
2828

2929
private[spark] sealed trait CoGroupSplitDep extends Serializable
@@ -58,14 +58,14 @@ private[spark] class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep]
5858
* @param part partitioner used to partition the shuffle output.
5959
*/
6060
class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: Partitioner)
61-
extends RDD[(K, Seq[Iterator[_]])](rdds.head.context, Nil) {
61+
extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) {
6262

6363
// For example, `(k, a) cogroup (k, b)` produces k -> Seq(ArrayBuffer as, ArrayBuffer bs).
6464
// Each ArrayBuffer is represented as a CoGroup, and the resulting Seq as a CoGroupCombiner.
6565
// CoGroupValue is the intermediate state of each value before being merged in compute.
6666
private type CoGroup = ArrayBuffer[Any]
6767
private type CoGroupValue = (Any, Int) // Int is dependency number
68-
private type CoGroupCombiner = Array[CoGroup]
68+
private type CoGroupCombiner = Seq[CoGroup]
6969

7070
private var serializer: Serializer = null
7171

@@ -105,7 +105,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
105105

106106
override val partitioner: Some[Partitioner] = Some(part)
107107

108-
override def compute(s: Partition, context: TaskContext): Iterator[(K, Iterator[CoGroup])] = {
108+
override def compute(s: Partition, context: TaskContext): Iterator[(K, CoGroupCombiner)] = {
109109
val sparkConf = SparkEnv.get.conf
110110
val externalSorting = sparkConf.getBoolean("spark.shuffle.spill", true)
111111
val split = s.asInstanceOf[CoGroupPartition]
@@ -141,12 +141,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
141141
getCombiner(kv._1)(depNum) += kv._2
142142
}
143143
}
144-
// Convert to iterators
145-
val finalMap = new AppendOnlyMap[K, Iterator[CoGroup]](math.max(map.size, 64))
146-
map.foreach { case (it, k) =>
147-
finalMap.update(it, k.iterator)
148-
}
149-
new InterruptibleIterator(context, finalMap.iterator)
144+
new InterruptibleIterator(context, map.iterator)
150145
} else {
151146
val map = createExternalMap(numRdds)
152147
rddIterators.foreach { case (it, depNum) =>
@@ -162,7 +157,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
162157
}
163158

164159
private def createExternalMap(numRdds: Int)
165-
: FlexibleExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner, Iterator[CoGroup]] = {
160+
: ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner] = {
166161

167162
val createCombiner: (CoGroupValue => CoGroupCombiner) = value => {
168163
val newCombiner = Array.fill(numRdds)(new CoGroup)
@@ -174,14 +169,12 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
174169
value match { case (v, depNum) => combiner(depNum) += v }
175170
combiner
176171
}
177-
val mergeCombiners: (CoGroupCombiner, Iterator[CoGroup]) => Iterator[CoGroup] =
172+
val mergeCombiners: (CoGroupCombiner, CoGroupCombiner) => CoGroupCombiner =
178173
(combiner1, combiner2) => {
179-
combiner1.toIterator.zip(combiner2).map { case (v1, v2) => v1 ++ v2 }
174+
combiner1.zip(combiner2).map { case (v1, v2) => v1 ++ v2 }
180175
}
181-
val returnCombiner: (CoGroupCombiner) => Iterator[CoGroup] =
182-
(combiner) => combiner.toIterator
183-
new FlexibleExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner, Iterator[CoGroup]](
184-
createCombiner, mergeValue, mergeCombiners, returnCombiner)
176+
new ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner](
177+
createCombiner, mergeValue, mergeCombiners)
185178
}
186179

187180
override def clearDependencies() {

core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -55,25 +55,16 @@ import org.apache.spark.storage.{BlockId, BlockManager}
5555
* `spark.shuffle.safetyFraction` specifies an additional margin of safety as a fraction of
5656
* this threshold, in case map size estimation is not sufficiently accurate.
5757
*/
58+
5859
private[spark] class ExternalAppendOnlyMap[K, V, C](
5960
createCombiner: V => C,
6061
mergeValue: (C, V) => C,
6162
mergeCombiners: (C, C) => C,
6263
serializer: Serializer = SparkEnv.get.serializer,
6364
blockManager: BlockManager = SparkEnv.get.blockManager)
64-
extends FlexibleExternalAppendOnlyMap[K, V, C, C](createCombiner, mergeValue, mergeCombiners, (x => x),
65-
serializer, blockManager) {
66-
}
67-
private[spark] class FlexibleExternalAppendOnlyMap[K, V, C, T](
68-
createCombiner: V => C,
69-
mergeValue: (C, V) => C,
70-
mergeCombiners: (C, T) => T,
71-
returnCombiner: C => T,
72-
serializer: Serializer = SparkEnv.get.serializer,
73-
blockManager: BlockManager = SparkEnv.get.blockManager)
74-
extends Iterable[(K, T)] with Serializable with Logging {
65+
extends Iterable[(K, C)] with Serializable with Logging {
7566

76-
import FlexibleExternalAppendOnlyMap._
67+
import ExternalAppendOnlyMap._
7768

7869
private var currentMap = new SizeTrackingAppendOnlyMap[K, C]
7970
private val spilledMaps = new ArrayBuffer[DiskMapIterator]
@@ -272,13 +263,13 @@ private[spark] class FlexibleExternalAppendOnlyMap[K, V, C, T](
272263
* If the given buffer contains a value for the given key, merge that value into
273264
* baseCombiner and remove the corresponding (K, C) pair from the buffer.
274265
*/
275-
private def mergeIfKeyExists(key: K, baseCombiner: T, buffer: StreamBuffer): T = {
266+
private def mergeIfKeyExists(key: K, baseCombiner: C, buffer: StreamBuffer): C = {
276267
var i = 0
277268
while (i < buffer.pairs.length) {
278269
val (k, c) = buffer.pairs(i)
279270
if (k == key) {
280271
buffer.pairs.remove(i)
281-
return mergeCombiners(c, baseCombiner)
272+
return mergeCombiners(baseCombiner, c)
282273
}
283274
i += 1
284275
}
@@ -301,8 +292,7 @@ private[spark] class FlexibleExternalAppendOnlyMap[K, V, C, T](
301292
// Select a key from the StreamBuffer that holds the lowest key hash
302293
val minBuffer = mergeHeap.dequeue()
303294
val (minPairs, minHash) = (minBuffer.pairs, minBuffer.minKeyHash)
304-
var (minKey, minCombinerC) = minPairs.remove(0)
305-
var minCombiner = returnCombiner(minCombinerC)
295+
var (minKey, minCombiner) = minPairs.remove(0)
306296
assert(minKey.hashCode() == minHash)
307297

308298
// For all other streams that may have this key (i.e. have the same minimum key hash),
@@ -428,7 +418,7 @@ private[spark] class FlexibleExternalAppendOnlyMap[K, V, C, T](
428418
}
429419
}
430420

431-
private[spark] object FlexibleExternalAppendOnlyMap {
421+
private[spark] object ExternalAppendOnlyMap {
432422
private class KCComparator[K, C] extends Comparator[(K, C)] {
433423
def compare(kc1: (K, C), kc2: (K, C)): Int = {
434424
kc1._1.hashCode().compareTo(kc2._1.hashCode())

0 commit comments

Comments
 (0)