Skip to content

Commit 7896899

Browse files
committed
Fix SPARK-6737 by informing OutputCommitCoordinator of all stage end events.
1 parent 4ead1dc commit 7896899

File tree

1 file changed

+3
-0
lines changed

1 file changed

+3
-0
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -715,6 +715,7 @@ class DAGScheduler(
715715
val stageFailedMessage = "Stage cancelled because SparkContext was shut down"
716716
runningStages.foreach { stage =>
717717
stage.latestInfo.stageFailed(stageFailedMessage)
718+
outputCommitCoordinator.stageEnd(stage.id)
718719
listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
719720
}
720721
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobFailed(error)))
@@ -984,6 +985,7 @@ class DAGScheduler(
984985
stage.latestInfo.stageFailed(errorMessage.get)
985986
logInfo("%s (%s) failed in %s s".format(stage, stage.name, serviceTime))
986987
}
988+
outputCommitCoordinator.stageEnd(stage.id)
987989
listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
988990
runningStages -= stage
989991
}
@@ -1268,6 +1270,7 @@ class DAGScheduler(
12681270
try { // cancelTasks will fail if a SchedulerBackend does not implement killTask
12691271
taskScheduler.cancelTasks(stageId, shouldInterruptThread)
12701272
stage.latestInfo.stageFailed(failureReason)
1273+
outputCommitCoordinator.stageEnd(stage.id)
12711274
listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
12721275
} catch {
12731276
case e: UnsupportedOperationException =>

0 commit comments

Comments
 (0)