@@ -1102,7 +1102,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
11021102 }
11031103 }
11041104
1105- test(" Completions in zombie tasksets update status of non-zombie taskset" ) {
1105+ test(" SPARK-23433/25250 Completions in zombie tasksets update status of non-zombie taskset" ) {
11061106 val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
11071107 val valueSer = SparkEnv .get.serializer.newInstance()
11081108
@@ -1114,9 +1114,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
11141114 }
11151115
11161116 // Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt,
1117- // two times, so we have three active task sets for one stage. (For this to really happen,
1118- // you'd need the previous stage to also get restarted, and then succeed, in between each
1119- // attempt, but that happens outside what we're mocking here.)
1117+ // two times, so we have three TaskSetManagers(2 zombie, 1 active) for one stage. (For this
1118+ // to really happen, you'd need the previous stage to also get restarted, and then succeed,
1119+ // in between each attempt, but that happens outside what we're mocking here.)
11201120 val zombieAttempts = (0 until 2 ).map { stageAttempt =>
11211121 val attempt = FakeTask .createTaskSet(10 , stageAttemptId = stageAttempt)
11221122 taskScheduler.submitTasks(attempt)
@@ -1133,30 +1133,51 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
11331133 assert(tsm.runningTasks === 9 )
11341134 tsm
11351135 }
1136+ // we've now got 2 zombie attempts, each with 9 tasks still active but zero active attempt
1137+ // in taskScheduler.
1138+
1139+ // finish partition 1,2 by completing the tasks before a new attempt for the same stage submit.
1140+ // And it's possible since the behaviour of submitting new attempt and handling successful task
1141+ // is from two different threads, which are "task-result-getter" and "dag-scheduler-event-loop"
1142+ // separately.
1143+ (0 until 2 ).foreach { i =>
1144+ completeTaskSuccessfully(zombieAttempts(i), i + 1 )
1145+ assert(taskScheduler.stageIdToFinishedPartitions(0 ).contains(i + 1 ))
1146+ }
11361147
1137- // we've now got 2 zombie attempts, each with 9 tasks still active. Submit the 3rd attempt for
1138- // the stage, but this time with insufficient resources so not all tasks are active.
1139-
1148+ // Submit the 3rd attempt still with 10 tasks, this happens due to the race between thread
1149+ // "task-result-getter" and "dag-scheduler-event-loop", where a TaskSet gets submitted with
1150+ // already completed tasks. And this time with insufficient resources so not all tasks are
1151+ // active.
11401152 val finalAttempt = FakeTask .createTaskSet(10 , stageAttemptId = 2 )
11411153 taskScheduler.submitTasks(finalAttempt)
11421154 val finalTsm = taskScheduler.taskSetManagerForAttempt(0 , 2 ).get
1155+ // Though, finalTsm gets submitted after some tasks succeeds, but it could also know about the
1156+ // finished partition by looking into `stageIdToFinishedPartitions` when it is being created,
1157+ // so that it won't launch any duplicate tasks later.
1158+ (0 until 2 ).map(_ + 1 ).foreach { partitionId =>
1159+ val index = finalTsm.partitionToIndex(partitionId)
1160+ assert(finalTsm.successful(index))
1161+ }
1162+
11431163 val offers = (0 until 5 ).map{ idx => WorkerOffer (s " exec- $idx" , s " host- $idx" , 1 ) }
11441164 val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task =>
11451165 finalAttempt.tasks(task.index).partitionId
11461166 }.toSet
11471167 assert(finalTsm.runningTasks === 5 )
11481168 assert(! finalTsm.isZombie)
11491169
1150- // We simulate late completions from our zombie tasksets, corresponding to all the pending
1151- // partitions in our final attempt. This means we're only waiting on the tasks we've already
1152- // launched.
1170+ // We continually simulate late completions from our zombie tasksets(but this time, there's one
1171+ // active attempt exists in taskScheduler), corresponding to all the pending partitions in our
1172+ // final attempt. This means we're only waiting on the tasks we've already launched.
11531173 val finalAttemptPendingPartitions = (0 until 10 ).toSet.diff(finalAttemptLaunchedPartitions)
11541174 finalAttemptPendingPartitions.foreach { partition =>
11551175 completeTaskSuccessfully(zombieAttempts(0 ), partition)
1176+ assert(taskScheduler.stageIdToFinishedPartitions(0 ).contains(partition))
11561177 }
11571178
11581179 // If there is another resource offer, we shouldn't run anything. Though our final attempt
1159- // used to have pending tasks, now those tasks have been completed by zombie attempts. The
1180+ // used to have pending tasks, now those tasks have been completed by zombie attempts. The
11601181 // remaining tasks to compute are already active in the non-zombie attempt.
11611182 assert(
11621183 taskScheduler.resourceOffers(IndexedSeq (WorkerOffer (" exec-1" , " host-1" , 1 ))).flatten.isEmpty)
@@ -1179,6 +1200,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
11791200 zombieAttempts(partition % 2 )
11801201 }
11811202 completeTaskSuccessfully(tsm, partition)
1203+ assert(taskScheduler.stageIdToFinishedPartitions(0 ).contains(partition))
11821204 }
11831205
11841206 assert(finalTsm.isZombie)
@@ -1204,67 +1226,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
12041226 // perspective, as the failures weren't from a problem w/ the tasks themselves.
12051227 verify(blacklist).updateBlacklistForSuccessfulTaskSet(meq(0 ), meq(stageAttempt), any())
12061228 }
1207- }
1208-
1209- test(" successful tasks from previous attempts could be learnt by later active taskset" ) {
1210- val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
1211- val valueSer = SparkEnv .get.serializer.newInstance()
1212- val result = new DirectTaskResult [Int ](valueSer.serialize(1 ), Seq ())
1213-
1214- // submit a taskset with 10 tasks to taskScheduler
1215- val attempt0 = FakeTask .createTaskSet(10 , stageId = 0 , stageAttemptId = 0 )
1216- taskScheduler.submitTasks(attempt0)
1217- // get the current active tsm
1218- val tsm0 = taskScheduler.taskSetManagerForAttempt(0 , 0 ).get
1219- // offer sufficient resources
1220- val offers0 = (0 until 10 ).map{ idx => WorkerOffer (s " exec- $idx" , s " host- $idx" , 1 ) }
1221- taskScheduler.resourceOffers(offers0)
1222- assert(tsm0.runningTasks === 10 )
1223- // fail task 0.0 and mark tsm0 as zombie
1224- tsm0.handleFailedTask(tsm0.taskAttempts(0 )(0 ).taskId, TaskState .FAILED ,
1225- FetchFailed (null , 0 , 0 , 0 , " fetch failed" ))
1226- // the attempt0 is a zombie, but the tasks are still running (this could be true even if
1227- // we actively killed those tasks, as killing is best-effort)
1228- assert(tsm0.isZombie)
1229- assert(tsm0.runningTasks === 9 )
1230-
1231-
1232- // success task 1.0 , finish partition 1. But now,
1233- // no active tsm exists in TaskScheduler for stage0.
1234- tsm0.handleSuccessfulTask(tsm0.taskAttempts(1 )(0 ).taskId, result)
1235- assert(tsm0.runningTasks === 8 )
1236- assert(taskScheduler.stageIdToFinishedPartitions(0 ).contains(1 ))
1237-
1238- // submit a new taskset with 10 tasks after someone previous task attempt succeed
1239- val attempt1 = FakeTask .createTaskSet(10 , stageId = 0 , stageAttemptId = 1 )
1240- taskScheduler.submitTasks(attempt1)
1241- // get the current active tsm
1242- val tsm1 = taskScheduler.taskSetManagerForAttempt(0 , 1 ).get
1243- // tsm1 learns about the finished partition 1 during constructing, so it only need
1244- // to execute other 9 tasks
1245- assert(tsm1.taskSet.tasks.length == 9 )
1246- // offer one resource
1247- val offers1 = (10 until 11 ).map{ idx => WorkerOffer (s " exec- $idx" , s " host- $idx" , 1 ) }
1248- taskScheduler.resourceOffers(offers1)
1249- assert(tsm1.runningTasks === 1 )
1250- // success task 0.0 in tsm1 and finish partition 0
1251- tsm1.handleSuccessfulTask(tsm1.taskAttempts(0 )(0 ).taskId, result)
1252- assert(taskScheduler.stageIdToFinishedPartitions(0 ).contains(0 ))
1253-
1254-
1255- val runningTasks = tsm0.taskSet.tasks.filterNot{ t =>
1256- taskScheduler.stageIdToFinishedPartitions(0 ).contains(t.partitionId)
1257- }
1258- // finish tsm1 by previous task attempts from tsm0, this remains same behavior with SPARK-23433
1259- runningTasks.foreach{ t =>
1260- val attempt = tsm0.taskAttempts(tsm0.partitionToIndex(t.partitionId)).head
1261- tsm0.handleSuccessfulTask(attempt.taskId, result)
1262- }
1263-
1264- assert(taskScheduler.taskSetManagerForAttempt(0 , 0 ).isEmpty)
1265- assert(taskScheduler.taskSetManagerForAttempt(0 , 1 ).isEmpty)
12661229 assert(taskScheduler.stageIdToFinishedPartitions.isEmpty)
1267-
12681230 }
12691231
12701232 test(" don't schedule for a barrier taskSet if available slots are less than pending tasks" ) {
0 commit comments