Skip to content

Commit 0ca13bb

Browse files
committed
SPARK-1700: Close socket file descriptors on task completion
This will ensure that sockets do not build up over the course of a job, and that cancellation successfully cleans up sockets. Tested in standalone mode. More file descriptors spawn than expected (around 1000ish rather than the expected 8ish) but they do not pile up between runs, or as high as before (where they went up to around 5k).
1 parent f25ebed commit 0ca13bb

File tree

1 file changed

+10
-1
lines changed

1 file changed

+10
-1
lines changed

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,16 @@ private[spark] class PythonRDD[T: ClassTag](
5454
override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = {
5555
val startTime = System.currentTimeMillis
5656
val env = SparkEnv.get
57-
val worker = env.createPythonWorker(pythonExec, envVars.toMap)
57+
val worker: Socket = env.createPythonWorker(pythonExec, envVars.toMap)
58+
59+
// Ensure worker socket is closed on task completion. Closing sockets is idempotent.
60+
context.addOnCompleteCallback(() =>
61+
try {
62+
worker.close()
63+
} catch {
64+
case e: Exception => logWarning("Failed to close worker socket", e)
65+
}
66+
)
5867

5968
@volatile var readerException: Exception = null
6069

0 commit comments

Comments
 (0)