@@ -23,7 +23,7 @@ import scala.collection.mutable.ArrayBuffer
2323
2424import org .apache .spark .{InterruptibleIterator , Partition , Partitioner , SparkEnv , TaskContext }
2525import org .apache .spark .{Dependency , OneToOneDependency , ShuffleDependency }
26- import org .apache .spark .util .collection .{FlexibleExternalAppendOnlyMap , AppendOnlyMap }
26+ import org .apache .spark .util .collection .{ExternalAppendOnlyMap , AppendOnlyMap }
2727import org .apache .spark .serializer .Serializer
2828
2929private [spark] sealed trait CoGroupSplitDep extends Serializable
@@ -58,14 +58,14 @@ private[spark] class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep]
5858 * @param part partitioner used to partition the shuffle output.
5959 */
6060class CoGroupedRDD [K ](@ transient var rdds : Seq [RDD [_ <: Product2 [K , _]]], part : Partitioner )
61- extends RDD [(K , Seq [Iterator [_]])](rdds.head.context, Nil ) {
61+ extends RDD [(K , Seq [Seq [_]])](rdds.head.context, Nil ) {
6262
6363 // For example, `(k, a) cogroup (k, b)` produces k -> Seq(ArrayBuffer as, ArrayBuffer bs).
6464 // Each ArrayBuffer is represented as a CoGroup, and the resulting Seq as a CoGroupCombiner.
6565 // CoGroupValue is the intermediate state of each value before being merged in compute.
6666 private type CoGroup = ArrayBuffer [Any ]
6767 private type CoGroupValue = (Any , Int ) // Int is dependency number
68- private type CoGroupCombiner = Array [CoGroup ]
68+ private type CoGroupCombiner = Seq [CoGroup ]
6969
7070 private var serializer : Serializer = null
7171
@@ -105,7 +105,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
105105
106106 override val partitioner : Some [Partitioner ] = Some (part)
107107
108- override def compute (s : Partition , context : TaskContext ): Iterator [(K , Iterator [ CoGroup ] )] = {
108+ override def compute (s : Partition , context : TaskContext ): Iterator [(K , CoGroupCombiner )] = {
109109 val sparkConf = SparkEnv .get.conf
110110 val externalSorting = sparkConf.getBoolean(" spark.shuffle.spill" , true )
111111 val split = s.asInstanceOf [CoGroupPartition ]
@@ -141,12 +141,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
141141 getCombiner(kv._1)(depNum) += kv._2
142142 }
143143 }
144- // Convert to iterators
145- val finalMap = new AppendOnlyMap [K , Iterator [CoGroup ]](math.max(map.size, 64 ))
146- map.foreach { case (it, k) =>
147- finalMap.update(it, k.iterator)
148- }
149- new InterruptibleIterator (context, finalMap.iterator)
144+ new InterruptibleIterator (context, map.iterator)
150145 } else {
151146 val map = createExternalMap(numRdds)
152147 rddIterators.foreach { case (it, depNum) =>
@@ -162,7 +157,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
162157 }
163158
164159 private def createExternalMap (numRdds : Int )
165- : FlexibleExternalAppendOnlyMap [K , CoGroupValue , CoGroupCombiner , Iterator [ CoGroup ] ] = {
160+ : ExternalAppendOnlyMap [K , CoGroupValue , CoGroupCombiner ] = {
166161
167162 val createCombiner : (CoGroupValue => CoGroupCombiner ) = value => {
168163 val newCombiner = Array .fill(numRdds)(new CoGroup )
@@ -174,14 +169,12 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
174169 value match { case (v, depNum) => combiner(depNum) += v }
175170 combiner
176171 }
177- val mergeCombiners : (CoGroupCombiner , Iterator [ CoGroup ] ) => Iterator [ CoGroup ] =
172+ val mergeCombiners : (CoGroupCombiner , CoGroupCombiner ) => CoGroupCombiner =
178173 (combiner1, combiner2) => {
179- combiner1.toIterator. zip(combiner2).map { case (v1, v2) => v1 ++ v2 }
174+ combiner1.zip(combiner2).map { case (v1, v2) => v1 ++ v2 }
180175 }
181- val returnCombiner : (CoGroupCombiner ) => Iterator [CoGroup ] =
182- (combiner) => combiner.toIterator
183- new FlexibleExternalAppendOnlyMap [K , CoGroupValue , CoGroupCombiner , Iterator [CoGroup ]](
184- createCombiner, mergeValue, mergeCombiners, returnCombiner)
176+ new ExternalAppendOnlyMap [K , CoGroupValue , CoGroupCombiner ](
177+ createCombiner, mergeValue, mergeCombiners)
185178 }
186179
187180 override def clearDependencies () {
0 commit comments