@@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
917917 taskScheduler.initialize(new FakeSchedulerBackend )
918918 }
919919 }
920+
921+ test(" Completions in zombie tasksets update status of non-zombie taskset" ) {
922+ val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
923+ val valueSer = SparkEnv .get.serializer.newInstance()
924+
925+ def completeTaskSuccessfully (tsm : TaskSetManager , partition : Int ): Unit = {
926+ val indexInTsm = tsm.partitionToIndex(partition)
927+ val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == indexInTsm).head
928+ val result = new DirectTaskResult [Int ](valueSer.serialize(1 ), Seq ())
929+ tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result)
930+ }
931+
932+ // Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt,
933+ // two times, so we have three active task sets for one stage. (For this to really happen,
934+ // you'd need the previous stage to also get restarted, and then succeed, in between each
935+ // attempt, but that happens outside what we're mocking here.)
936+ val zombieAttempts = (0 until 2 ).map { stageAttempt =>
937+ val attempt = FakeTask .createTaskSet(10 , stageAttemptId = stageAttempt)
938+ taskScheduler.submitTasks(attempt)
939+ val tsm = taskScheduler.taskSetManagerForAttempt(0 , stageAttempt).get
940+ val offers = (0 until 10 ).map{ idx => WorkerOffer (s " exec- $idx" , s " host- $idx" , 1 ) }
941+ taskScheduler.resourceOffers(offers)
942+ assert(tsm.runningTasks === 10 )
943+ if (stageAttempt < 2 ) {
944+ // fail attempt
945+ tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState .FAILED ,
946+ FetchFailed (null , 0 , 0 , 0 , " fetch failed" ))
947+ // the attempt is a zombie, but the tasks are still running (this could be true even if
948+ // we actively killed those tasks, as killing is best-effort)
949+ assert(tsm.isZombie)
950+ assert(tsm.runningTasks === 9 )
951+ }
952+ tsm
953+ }
954+
955+ // we've now got 2 zombie attempts, each with 9 tasks still active. Submit the 3rd attempt for
956+ // the stage, but this time with insufficient resources so not all tasks are active.
957+
958+ val finalAttempt = FakeTask .createTaskSet(10 , stageAttemptId = 2 )
959+ taskScheduler.submitTasks(finalAttempt)
960+ val finalTsm = taskScheduler.taskSetManagerForAttempt(0 , 2 ).get
961+ val offers = (0 until 5 ).map{ idx => WorkerOffer (s " exec- $idx" , s " host- $idx" , 1 ) }
962+ val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task =>
963+ finalAttempt.tasks(task.index).partitionId
964+ }.toSet
965+ assert(finalTsm.runningTasks === 5 )
966+ assert(! finalTsm.isZombie)
967+
968+ // We simulate late completions from our zombie tasksets, corresponding to all the pending
969+ // partitions in our final attempt. This means we're only waiting on the tasks we've already
970+ // launched.
971+ val finalAttemptPendingPartitions = (0 until 10 ).toSet.diff(finalAttemptLaunchedPartitions)
972+ finalAttemptPendingPartitions.foreach { partition =>
973+ completeTaskSuccessfully(zombieAttempts(0 ), partition)
974+ }
975+
976+ // If there is another resource offer, we shouldn't run anything. Though our final attempt
977+ // used to have pending tasks, now those tasks have been completed by zombie attempts. The
978+ // remaining tasks to compute are already active in the non-zombie attempt.
979+ assert(
980+ taskScheduler.resourceOffers(IndexedSeq (WorkerOffer (" exec-1" , " host-1" , 1 ))).flatten.isEmpty)
981+
982+ val allTaskSets = zombieAttempts ++ Seq (finalTsm)
983+ val remainingTasks = (0 until 10 ).toSet.diff(finalAttemptPendingPartitions)
984+
985+ // finally, if we finish the remaining partitions from a mix of tasksets, all attempts should be
986+ // marked as zombie.
987+ // for each of the remaining tasks, find the tasksets with an active copy of the task, and
988+ // finish the task.
989+ remainingTasks.foreach { partition =>
990+ val tsm = if (partition == 0 ) {
991+ // we failed this task on both zombie attempts, this one is only present in the latest
992+ // taskset
993+ finalTsm
994+ } else {
995+ // should be active in every taskset. We choose a zombie taskset just to make sure that
996+ // we transition the active taskset correctly even if the final completion comes
997+ // from a zombie.
998+ zombieAttempts(partition % 2 )
999+ }
1000+ completeTaskSuccessfully(tsm, partition)
1001+ }
1002+
1003+ assert(finalTsm.isZombie)
1004+
1005+ // no taskset has completed all of its tasks, so no updates to the blacklist tracker yet
1006+ verify(blacklist, never).updateBlacklistForSuccessfulTaskSet(anyInt(), anyInt(), anyObject())
1007+
1008+ // finally, lets complete all the tasks. We simulate failures in attempt 1, but everything
1009+ // else succeeds, to make sure we get the right updates to the blacklist in all cases.
1010+ (zombieAttempts ++ Seq (finalTsm)).foreach { tsm =>
1011+ val stageAttempt = tsm.taskSet.stageAttemptId
1012+ tsm.runningTasksSet.foreach { index =>
1013+ if (stageAttempt == 1 ) {
1014+ tsm.handleFailedTask(tsm.taskInfos(index).taskId, TaskState .FAILED , TaskResultLost )
1015+ } else {
1016+ val result = new DirectTaskResult [Int ](valueSer.serialize(1 ), Seq ())
1017+ tsm.handleSuccessfulTask(tsm.taskInfos(index).taskId, result)
1018+ }
1019+ }
1020+
1021+ // we update the blacklist for the stage attempts with all successful tasks. Even though
1022+ // some tasksets had failures, we still consider them all successful from a blacklisting
1023+ // perspective, as the failures weren't from a problem w/ the tasks themselves.
1024+ verify(blacklist).updateBlacklistForSuccessfulTaskSet(meq(0 ), meq(stageAttempt), anyObject())
1025+ }
1026+ }
9201027}
0 commit comments