File tree Expand file tree Collapse file tree 1 file changed +3
-0
lines changed
core/src/main/scala/org/apache/spark/scheduler Expand file tree Collapse file tree 1 file changed +3
-0
lines changed Original file line number Diff line number Diff line change @@ -715,6 +715,7 @@ class DAGScheduler(
715715 val stageFailedMessage = " Stage cancelled because SparkContext was shut down"
716716 runningStages.foreach { stage =>
717717 stage.latestInfo.stageFailed(stageFailedMessage)
718+ outputCommitCoordinator.stageEnd(stage.id)
718719 listenerBus.post(SparkListenerStageCompleted (stage.latestInfo))
719720 }
720721 listenerBus.post(SparkListenerJobEnd (job.jobId, clock.getTimeMillis(), JobFailed (error)))
@@ -984,6 +985,7 @@ class DAGScheduler(
984985 stage.latestInfo.stageFailed(errorMessage.get)
985986 logInfo(" %s (%s) failed in %s s" .format(stage, stage.name, serviceTime))
986987 }
988+ outputCommitCoordinator.stageEnd(stage.id)
987989 listenerBus.post(SparkListenerStageCompleted (stage.latestInfo))
988990 runningStages -= stage
989991 }
@@ -1268,6 +1270,7 @@ class DAGScheduler(
12681270 try { // cancelTasks will fail if a SchedulerBackend does not implement killTask
12691271 taskScheduler.cancelTasks(stageId, shouldInterruptThread)
12701272 stage.latestInfo.stageFailed(failureReason)
1273+ outputCommitCoordinator.stageEnd(stage.id)
12711274 listenerBus.post(SparkListenerStageCompleted (stage.latestInfo))
12721275 } catch {
12731276 case e : UnsupportedOperationException =>
You can’t perform that action at this time.
0 commit comments