@@ -697,13 +697,7 @@ private[spark] class TaskSetManager(
697697 val index = info.index
698698 info.markFinished(TaskState .FINISHED )
699699 removeRunningTask(tid)
700- // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
701- // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
702- // "deserialize" the value when holding a lock to avoid blocking other threads. So we call
703- // "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.
704- // Note: "result.value()" only deserializes the value when it's called at the first time, so
705- // here "result.value()" just returns the value and won't block other threads.
706- sched.dagScheduler.taskEnded(tasks(index), Success , result.value(), result.accumUpdates, info)
700+
707701 // Kill any other attempts for the same task (since those are unnecessary now that one
708702 // attempt completed successfully).
709703 for (attemptInfo <- taskAttempts(index) if attemptInfo.running) {
@@ -726,6 +720,13 @@ private[spark] class TaskSetManager(
726720 logInfo(" Ignoring task-finished event for " + info.id + " in stage " + taskSet.id +
727721 " because task " + index + " has already completed successfully" )
728722 }
723+ // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
724+ // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
725+ // "deserialize" the value when holding a lock to avoid blocking other threads. So we call
726+ // "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.
727+ // Note: "result.value()" only deserializes the value when it's called at the first time, so
728+ // here "result.value()" just returns the value and won't block other threads.
729+ sched.dagScheduler.taskEnded(tasks(index), Success , result.value(), result.accumUpdates, info)
729730 maybeFinishTaskSet()
730731 }
731732
0 commit comments