Skip to content

Commit 67ba875

Browse files
committed
java to python, and python to java
1 parent bcc0f23 commit 67ba875

File tree

2 files changed

+8
-2
lines changed

2 files changed

+8
-2
lines changed

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -289,8 +289,12 @@ private[spark] object PythonRDD {
289289
def pythonToJava(pyRDD: JavaRDD[Array[Byte]]): JavaRDD[_] = {
290290
pyRDD.rdd.mapPartitions { iter =>
291291
val unpickle = new Unpickler
292-
iter.map { row =>
293-
unpickle.loads(row)
292+
iter.flatMap { row =>
293+
unpickle.loads(row) match {
294+
case objs: java.util.ArrayList[Any] => objs
295+
// Incase the partition doesn't have a collection
296+
case obj => Seq(obj)
297+
}
294298
}
295299
}
296300
}

python/pyspark/context.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,8 @@ def _ensure_initialized(cls, instance=None, gateway=None):
174174
SparkContext._gateway = gateway or launch_gateway()
175175
SparkContext._jvm = SparkContext._gateway.jvm
176176
SparkContext._writeToFile = SparkContext._jvm.PythonRDD.writeToFile
177+
SparkContext._pythonToJava = SparkContext._jvm.PythonRDD.pythonToJava
178+
SparkContext._javaToPython = SparkContext._jvm.PythonRDD.javaToPython
177179

178180
if instance:
179181
if SparkContext._active_spark_context and SparkContext._active_spark_context != instance:

0 commit comments

Comments
 (0)