Skip to content

Commit bfc2b26

Browse files
committed
merge master
2 parents 8e4f1f5 + ce8ec54 commit bfc2b26

File tree

43 files changed

+412
-239
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+412
-239
lines changed

bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -220,27 +220,31 @@ object Bagel extends Logging {
220220
*/
221221
private def comp[K: Manifest, V <: Vertex, M <: Message[K], C](
222222
sc: SparkContext,
223-
grouped: RDD[(K, (Seq[C], Seq[V]))],
223+
grouped: RDD[(K, (Iterable[C], Iterable[V]))],
224224
compute: (V, Option[C]) => (V, Array[M]),
225225
storageLevel: StorageLevel
226226
): (RDD[(K, (V, Array[M]))], Int, Int) = {
227227
var numMsgs = sc.accumulator(0)
228228
var numActiveVerts = sc.accumulator(0)
229-
val processed = grouped.flatMapValues {
230-
case (_, vs) if vs.size == 0 => None
231-
case (c, vs) =>
229+
val processed = grouped.mapValues(x => (x._1.iterator, x._2.iterator))
230+
.flatMapValues {
231+
case (_, vs) if !vs.hasNext => None
232+
case (c, vs) => {
232233
val (newVert, newMsgs) =
233-
compute(vs(0), c match {
234-
case Seq(comb) => Some(comb)
235-
case Seq() => None
236-
})
234+
compute(vs.next,
235+
c.hasNext match {
236+
case true => Some(c.next)
237+
case false => None
238+
}
239+
)
237240

238241
numMsgs += newMsgs.size
239242
if (newVert.active) {
240243
numActiveVerts += 1
241244
}
242245

243246
Some((newVert, newMsgs))
247+
}
244248
}.persist(storageLevel)
245249

246250
// Force evaluation of processed RDD for accurate performance measurements

bin/compute-classpath.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ fi
6363
# built with Hive, so first check if the datanucleus jars exist, and then ensure the current Spark
6464
# assembly is built for Hive, before actually populating the CLASSPATH with the jars.
6565
# Note that this check order is faster (by up to half a second) in the case where Hive is not used.
66-
num_datanucleus_jars=$(ls "$FWDIR"/lib_managed/jars/ | grep "datanucleus-.*\\.jar" | wc -l)
66+
num_datanucleus_jars=$(ls "$FWDIR"/lib_managed/jars/ 2>/dev/null | grep "datanucleus-.*\\.jar" | wc -l)
6767
if [ $num_datanucleus_jars -gt 0 ]; then
6868
AN_ASSEMBLY_JAR=${ASSEMBLY_JAR:-$DEPS_ASSEMBLY_JAR}
6969
num_hive_files=$(jar tvf "$AN_ASSEMBLY_JAR" org/apache/hadoop/hive/ql/exec 2>/dev/null | wc -l)

core/src/main/scala/org/apache/spark/FutureAction.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
141141
private def awaitResult(): Try[T] = {
142142
jobWaiter.awaitResult() match {
143143
case JobSucceeded => scala.util.Success(resultFunc)
144-
case JobFailed(e: Exception, _) => scala.util.Failure(e)
144+
case JobFailed(e: Exception) => scala.util.Failure(e)
145145
}
146146
}
147147
}

core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.api.java
1919

2020
import java.util.{Comparator, List => JList}
21+
import java.lang.{Iterable => JIterable}
2122

2223
import scala.collection.JavaConversions._
2324
import scala.reflect.ClassTag
@@ -250,14 +251,14 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
250251
* Group the values for each key in the RDD into a single sequence. Allows controlling the
251252
* partitioning of the resulting key-value pair RDD by passing a Partitioner.
252253
*/
253-
def groupByKey(partitioner: Partitioner): JavaPairRDD[K, JList[V]] =
254+
def groupByKey(partitioner: Partitioner): JavaPairRDD[K, JIterable[V]] =
254255
fromRDD(groupByResultToJava(rdd.groupByKey(partitioner)))
255256

256257
/**
257258
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
258259
* resulting RDD with into `numPartitions` partitions.
259260
*/
260-
def groupByKey(numPartitions: Int): JavaPairRDD[K, JList[V]] =
261+
def groupByKey(numPartitions: Int): JavaPairRDD[K, JIterable[V]] =
261262
fromRDD(groupByResultToJava(rdd.groupByKey(numPartitions)))
262263

263264
/**
@@ -367,7 +368,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
367368
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
368369
* resulting RDD with the existing partitioner/parallelism level.
369370
*/
370-
def groupByKey(): JavaPairRDD[K, JList[V]] =
371+
def groupByKey(): JavaPairRDD[K, JIterable[V]] =
371372
fromRDD(groupByResultToJava(rdd.groupByKey()))
372373

373374
/**
@@ -462,55 +463,55 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
462463
* list of values for that key in `this` as well as `other`.
463464
*/
464465
def cogroup[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
465-
: JavaPairRDD[K, (JList[V], JList[W])] =
466+
: JavaPairRDD[K, (JIterable[V], JIterable[W])] =
466467
fromRDD(cogroupResultToJava(rdd.cogroup(other, partitioner)))
467468

468469
/**
469470
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
470471
* tuple with the list of values for that key in `this`, `other1` and `other2`.
471472
*/
472473
def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2],
473-
partitioner: Partitioner): JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
474+
partitioner: Partitioner): JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] =
474475
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, partitioner)))
475476

476477
/**
477478
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
478479
* list of values for that key in `this` as well as `other`.
479480
*/
480-
def cogroup[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] =
481+
def cogroup[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JIterable[V], JIterable[W])] =
481482
fromRDD(cogroupResultToJava(rdd.cogroup(other)))
482483

483484
/**
484485
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
485486
* tuple with the list of values for that key in `this`, `other1` and `other2`.
486487
*/
487488
def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2])
488-
: JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
489+
: JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] =
489490
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2)))
490491

491492
/**
492493
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
493494
* list of values for that key in `this` as well as `other`.
494495
*/
495496
def cogroup[W](other: JavaPairRDD[K, W], numPartitions: Int)
496-
: JavaPairRDD[K, (JList[V], JList[W])] =
497+
: JavaPairRDD[K, (JIterable[V], JIterable[W])] =
497498
fromRDD(cogroupResultToJava(rdd.cogroup(other, numPartitions)))
498499

499500
/**
500501
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
501502
* tuple with the list of values for that key in `this`, `other1` and `other2`.
502503
*/
503504
def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], numPartitions: Int)
504-
: JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
505+
: JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] =
505506
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, numPartitions)))
506507

507508
/** Alias for cogroup. */
508-
def groupWith[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] =
509+
def groupWith[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JIterable[V], JIterable[W])] =
509510
fromRDD(cogroupResultToJava(rdd.groupWith(other)))
510511

511512
/** Alias for cogroup. */
512513
def groupWith[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2])
513-
: JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
514+
: JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] =
514515
fromRDD(cogroupResult2ToJava(rdd.groupWith(other1, other2)))
515516

516517
/**
@@ -695,21 +696,22 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
695696

696697
object JavaPairRDD {
697698
private[spark]
698-
def groupByResultToJava[K: ClassTag, T](rdd: RDD[(K, Seq[T])]): RDD[(K, JList[T])] = {
699-
rddToPairRDDFunctions(rdd).mapValues(seqAsJavaList)
699+
def groupByResultToJava[K: ClassTag, T](rdd: RDD[(K, Iterable[T])]): RDD[(K, JIterable[T])] = {
700+
rddToPairRDDFunctions(rdd).mapValues(asJavaIterable)
700701
}
701702

702703
private[spark]
703704
def cogroupResultToJava[K: ClassTag, V, W](
704-
rdd: RDD[(K, (Seq[V], Seq[W]))]): RDD[(K, (JList[V], JList[W]))] = {
705-
rddToPairRDDFunctions(rdd).mapValues(x => (seqAsJavaList(x._1), seqAsJavaList(x._2)))
705+
rdd: RDD[(K, (Iterable[V], Iterable[W]))]): RDD[(K, (JIterable[V], JIterable[W]))] = {
706+
rddToPairRDDFunctions(rdd).mapValues(x => (asJavaIterable(x._1), asJavaIterable(x._2)))
706707
}
707708

708709
private[spark]
709710
def cogroupResult2ToJava[K: ClassTag, V, W1, W2](
710-
rdd: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))]): RDD[(K, (JList[V], JList[W1], JList[W2]))] = {
711+
rdd: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))])
712+
: RDD[(K, (JIterable[V], JIterable[W1], JIterable[W2]))] = {
711713
rddToPairRDDFunctions(rdd)
712-
.mapValues(x => (seqAsJavaList(x._1), seqAsJavaList(x._2), seqAsJavaList(x._3)))
714+
.mapValues(x => (asJavaIterable(x._1), asJavaIterable(x._2), asJavaIterable(x._3)))
713715
}
714716

715717
def fromRDD[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]): JavaPairRDD[K, V] = {

core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.api.java
1919

20-
import java.util.{Comparator, Iterator => JIterator, List => JList}
20+
import java.util.{Comparator, List => JList, Iterator => JIterator}
2121
import java.lang.{Iterable => JIterable}
2222

2323
import scala.collection.JavaConversions._
@@ -204,7 +204,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
204204
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
205205
* mapping to that key.
206206
*/
207-
def groupBy[K](f: JFunction[T, K]): JavaPairRDD[K, JList[T]] = {
207+
def groupBy[K](f: JFunction[T, K]): JavaPairRDD[K, JIterable[T]] = {
208208
implicit val ctagK: ClassTag[K] = fakeClassTag
209209
implicit val ctagV: ClassTag[JList[T]] = fakeClassTag
210210
JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f)(fakeClassTag)))
@@ -214,7 +214,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
214214
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
215215
* mapping to that key.
216216
*/
217-
def groupBy[K](f: JFunction[T, K], numPartitions: Int): JavaPairRDD[K, JList[T]] = {
217+
def groupBy[K](f: JFunction[T, K], numPartitions: Int): JavaPairRDD[K, JIterable[T]] = {
218218
implicit val ctagK: ClassTag[K] = fakeClassTag
219219
implicit val ctagV: ClassTag[JList[T]] = fakeClassTag
220220
JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numPartitions)(fakeClassTag[K])))

core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class MasterWebUI(val master: Master, requestedPort: Int) extends Logging {
5959

6060
def bind() {
6161
try {
62-
serverInfo = Some(startJettyServer(host, port, handlers, master.conf))
62+
serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, master.conf))
6363
logInfo("Started Master web UI at http://%s:%d".format(host, boundPort))
6464
} catch {
6565
case e: Exception =>

core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
6060

6161
def bind() {
6262
try {
63-
serverInfo = Some(JettyUtils.startJettyServer(host, port, handlers, worker.conf))
63+
serverInfo = Some(JettyUtils.startJettyServer("0.0.0.0", port, handlers, worker.conf))
6464
logInfo("Started Worker web UI at http://%s:%d".format(host, boundPort))
6565
} catch {
6666
case e: Exception =>

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
261261
* Group the values for each key in the RDD into a single sequence. Allows controlling the
262262
* partitioning of the resulting key-value pair RDD by passing a Partitioner.
263263
*/
264-
def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = {
264+
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = {
265265
// groupByKey shouldn't use map side combine because map side combine does not
266266
// reduce the amount of data shuffled and requires all map side data be inserted
267267
// into a hash table, leading to more objects in the old gen.
@@ -270,14 +270,14 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
270270
def mergeCombiners(c1: ArrayBuffer[V], c2: ArrayBuffer[V]) = c1 ++ c2
271271
val bufs = combineByKey[ArrayBuffer[V]](
272272
createCombiner _, mergeValue _, mergeCombiners _, partitioner, mapSideCombine=false)
273-
bufs.asInstanceOf[RDD[(K, Seq[V])]]
273+
bufs.mapValues(_.toIterable)
274274
}
275275

276276
/**
277277
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
278278
* resulting RDD with into `numPartitions` partitions.
279279
*/
280-
def groupByKey(numPartitions: Int): RDD[(K, Seq[V])] = {
280+
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = {
281281
groupByKey(new HashPartitioner(numPartitions))
282282
}
283283

@@ -298,7 +298,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
298298
*/
299299
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
300300
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
301-
for (v <- vs.iterator; w <- ws.iterator) yield (v, w)
301+
for (v <- vs; w <- ws) yield (v, w)
302302
}
303303
}
304304

@@ -311,9 +311,9 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
311311
def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = {
312312
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
313313
if (ws.isEmpty) {
314-
vs.iterator.map(v => (v, None))
314+
vs.map(v => (v, None))
315315
} else {
316-
for (v <- vs.iterator; w <- ws.iterator) yield (v, Some(w))
316+
for (v <- vs; w <- ws) yield (v, Some(w))
317317
}
318318
}
319319
}
@@ -328,9 +328,9 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
328328
: RDD[(K, (Option[V], W))] = {
329329
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
330330
if (vs.isEmpty) {
331-
ws.iterator.map(w => (None, w))
331+
ws.map(w => (None, w))
332332
} else {
333-
for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), w)
333+
for (v <- vs; w <- ws) yield (Some(v), w)
334334
}
335335
}
336336
}
@@ -358,7 +358,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
358358
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
359359
* resulting RDD with the existing partitioner/parallelism level.
360360
*/
361-
def groupByKey(): RDD[(K, Seq[V])] = {
361+
def groupByKey(): RDD[(K, Iterable[V])] = {
362362
groupByKey(defaultPartitioner(self))
363363
}
364364

@@ -453,7 +453,8 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
453453
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
454454
* list of values for that key in `this` as well as `other`.
455455
*/
456-
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Seq[V], Seq[W]))] = {
456+
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
457+
: RDD[(K, (Iterable[V], Iterable[W]))] = {
457458
if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) {
458459
throw new SparkException("Default partitioner cannot partition array keys.")
459460
}
@@ -468,21 +469,23 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
468469
* tuple with the list of values for that key in `this`, `other1` and `other2`.
469470
*/
470471
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner)
471-
: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
472+
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = {
472473
if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) {
473474
throw new SparkException("Default partitioner cannot partition array keys.")
474475
}
475476
val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
476477
cg.mapValues { case Seq(vs, w1s, w2s) =>
477-
(vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]])
478+
(vs.asInstanceOf[Seq[V]],
479+
w1s.asInstanceOf[Seq[W1]],
480+
w2s.asInstanceOf[Seq[W2]])
478481
}
479482
}
480483

481484
/**
482485
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
483486
* list of values for that key in `this` as well as `other`.
484487
*/
485-
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
488+
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = {
486489
cogroup(other, defaultPartitioner(self, other))
487490
}
488491

@@ -491,15 +494,15 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
491494
* tuple with the list of values for that key in `this`, `other1` and `other2`.
492495
*/
493496
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
494-
: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
497+
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = {
495498
cogroup(other1, other2, defaultPartitioner(self, other1, other2))
496499
}
497500

498501
/**
499502
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
500503
* list of values for that key in `this` as well as `other`.
501504
*/
502-
def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Seq[V], Seq[W]))] = {
505+
def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))] = {
503506
cogroup(other, new HashPartitioner(numPartitions))
504507
}
505508

@@ -508,18 +511,18 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
508511
* tuple with the list of values for that key in `this`, `other1` and `other2`.
509512
*/
510513
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int)
511-
: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
514+
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = {
512515
cogroup(other1, other2, new HashPartitioner(numPartitions))
513516
}
514517

515518
/** Alias for cogroup. */
516-
def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
519+
def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = {
517520
cogroup(other, defaultPartitioner(self, other))
518521
}
519522

520523
/** Alias for cogroup. */
521524
def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
522-
: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
525+
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = {
523526
cogroup(other1, other2, defaultPartitioner(self, other1, other2))
524527
}
525528

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -438,20 +438,20 @@ abstract class RDD[T: ClassTag](
438438
/**
439439
* Return an RDD of grouped items.
440440
*/
441-
def groupBy[K: ClassTag](f: T => K): RDD[(K, Seq[T])] =
441+
def groupBy[K: ClassTag](f: T => K): RDD[(K, Iterable[T])] =
442442
groupBy[K](f, defaultPartitioner(this))
443443

444444
/**
445445
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
446446
* mapping to that key.
447447
*/
448-
def groupBy[K: ClassTag](f: T => K, numPartitions: Int): RDD[(K, Seq[T])] =
448+
def groupBy[K: ClassTag](f: T => K, numPartitions: Int): RDD[(K, Iterable[T])] =
449449
groupBy(f, new HashPartitioner(numPartitions))
450450

451451
/**
452452
* Return an RDD of grouped items.
453453
*/
454-
def groupBy[K: ClassTag](f: T => K, p: Partitioner): RDD[(K, Seq[T])] = {
454+
def groupBy[K: ClassTag](f: T => K, p: Partitioner): RDD[(K, Iterable[T])] = {
455455
val cleanF = sc.clean(f)
456456
this.map(t => (cleanF(t), t)).groupByKey(p)
457457
}

0 commit comments

Comments
 (0)