Skip to content

Commit 8be3dcf

Browse files
committed
SPARK-1259 Make RDD locally iterable
1 parent 33ecb17 commit 8be3dcf

File tree

4 files changed

+32
-5
lines changed

4 files changed

+32
-5
lines changed

core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717

1818
package org.apache.spark.api.java
1919

20-
import java.util.{Comparator, List => JList}
20+
import java.util.{Comparator, Iterator => JIterator, List => JList}
21+
import java.lang.{Iterable => JIterable}
2122

2223
import scala.Tuple2
2324
import scala.collection.JavaConversions._
@@ -281,6 +282,21 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
281282
new java.util.ArrayList(arr)
282283
}
283284

285+
/**
286+
* Return a Stream that contains all of the elements in this RDD.
287+
*
288+
* In case of iterating it consumes memory as the biggest partition in cluster.
289+
*/
290+
def toLocallyIterable(): JIterable[T] = {
291+
new JIterable[T](){
292+
def iterator(): JIterator[T] = {
293+
import scala.collection.JavaConversions._
294+
asJavaIterator(rdd.toLocallyIterable.iterator)
295+
}
296+
}
297+
}
298+
299+
284300
/**
285301
* Return an array that contains all of the elements in this RDD.
286302
*/

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -668,11 +668,13 @@ abstract class RDD[T: ClassTag](
668668
*
669669
* In case of iterating it consumes memory as the biggest partition in cluster.
670670
*/
671-
def toStream(): Stream[T] = {
672-
def collectPartition(p: Int): Array[T] = sc.runJob(this, (iter: Iterator[T]) => iter.toArray, Seq(p), allowLocal = false).head
671+
def toLocallyIterable: Stream[T] = {
672+
def collectPartition(p: Int): Array[T] = {
673+
sc.runJob(this, (iter: Iterator[T]) => iter.toArray, Seq(p), allowLocal = false).head
674+
}
673675
var buffer = Stream.empty[T]
674676
for (p <- 0 until this.partitions.length) {
675-
buffer = buffer #::: {
677+
buffer = buffer append {
676678
collectPartition(p).toStream
677679
}
678680
}

core/src/test/java/org/apache/spark/JavaAPISuite.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
import scala.Tuple2;
2626

27+
import com.google.common.collect.Lists;
2728
import com.google.common.base.Optional;
2829
import com.google.common.base.Charsets;
2930
import com.google.common.io.Files;
@@ -149,6 +150,14 @@ public void call(String s) {
149150
Assert.assertEquals(2, foreachCalls);
150151
}
151152

153+
@Test
154+
public void toLocallyIterable() {
155+
List<Integer> correct = Arrays.asList(1, 2, 3, 4);
156+
JavaRDD<Integer> rdd = sc.parallelize(correct);
157+
List<Integer> result = Lists.newArrayList(rdd.toLocallyIterable());
158+
Assert.assertTrue(correct.equals(result));
159+
}
160+
152161
@SuppressWarnings("unchecked")
153162
@Test
154163
public void lookup() {

core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
3333
test("basic operations") {
3434
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
3535
assert(nums.collect().toList === List(1, 2, 3, 4))
36-
assert(nums.toStream().toList === List(1, 2, 3, 4))
36+
assert(nums.toLocallyIterable.toList === List(1, 2, 3, 4))
3737
val dups = sc.makeRDD(Array(1, 1, 2, 2, 3, 3, 4, 4), 2)
3838
assert(dups.distinct().count() === 4)
3939
assert(dups.distinct.count === 4) // Can distinct and count be called without parentheses?

0 commit comments

Comments
 (0)