@@ -122,6 +122,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
122122 override def cancelTasks (stageId : Int , interruptThread : Boolean ) {
123123 cancelledStages += stageId
124124 }
125+ override def zombieTasks (stageId : Int ): Unit = {}
125126 override def setDAGScheduler (dagScheduler : DAGScheduler ) = {}
126127 override def defaultParallelism () = 2
127128 override def executorLost (executorId : String , reason : ExecutorLossReason ): Unit = {}
@@ -480,6 +481,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
480481 override def cancelTasks (stageId : Int , interruptThread : Boolean ) {
481482 throw new UnsupportedOperationException
482483 }
484+ override def zombieTasks (stageId : Int ): Unit = {}
483485 override def setDAGScheduler (dagScheduler : DAGScheduler ): Unit = {}
484486 override def defaultParallelism (): Int = 2
485487 override def executorHeartbeatReceived (
@@ -1272,13 +1274,14 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
12721274 Success ,
12731275 makeMapStatus(" hostA" , reduceRdd.partitions.length)))
12741276
1275- // now that host goes down
1276- runEvent(ExecutorLost (" exec-hostA" ))
1277-
12781277 // so we resubmit those tasks
1278+ // note these resubmit events arrived earlier than ExecutorLost
12791279 runEvent(makeCompletionEvent(taskSets(0 ).tasks(0 ), Resubmitted , null ))
12801280 runEvent(makeCompletionEvent(taskSets(0 ).tasks(1 ), Resubmitted , null ))
12811281
1282+ // now that host goes down
1283+ runEvent(ExecutorLost (" exec-hostA" ))
1284+
12821285 // now complete everything on a different host
12831286 complete(taskSets(0 ), Seq (
12841287 (Success , makeMapStatus(" hostB" , reduceRdd.partitions.length)),
@@ -1304,6 +1307,72 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
13041307 assert(stage1TaskSet.stageAttemptId == 0 )
13051308 }
13061309
1310+ test(" Resubmit stage while lost partition in ZombieTasksets or RemovedTaskSets" ) {
1311+ val firstRDD = new MyRDD (sc, 3 , Nil )
1312+ val firstShuffleDep = new ShuffleDependency (firstRDD, new HashPartitioner (3 ))
1313+ val firstShuffleId = firstShuffleDep.shuffleId
1314+ val shuffleMapRdd = new MyRDD (sc, 3 , List (firstShuffleDep))
1315+ val shuffleDep = new ShuffleDependency (shuffleMapRdd, new HashPartitioner (3 ))
1316+ val reduceRdd = new MyRDD (sc, 1 , List (shuffleDep))
1317+ submit(reduceRdd, Array (0 ))
1318+
1319+ // things start out smoothly, stage 0 completes with no issues
1320+ complete(taskSets(0 ), Seq (
1321+ (Success , makeMapStatus(" hostB" , shuffleMapRdd.partitions.length)),
1322+ (Success , makeMapStatus(" hostB" , shuffleMapRdd.partitions.length)),
1323+ (Success , makeMapStatus(" hostA" , shuffleMapRdd.partitions.length))
1324+ ))
1325+
1326+ // then start running stage 1
1327+ runEvent(makeCompletionEvent(
1328+ taskSets(1 ).tasks(0 ),
1329+ Success ,
1330+ makeMapStatus(" hostD" , shuffleMapRdd.partitions.length)))
1331+
1332+ // simulate make stage 1 resubmit, notice for stage1.0
1333+ // partitionId=1 already finished in hostD, so if we resubmit stage1,
1334+ // stage 1.1 only resubmit tasks for partitionId = 0,2
1335+ runEvent(makeCompletionEvent(
1336+ taskSets(1 ).tasks(1 ),
1337+ FetchFailed (null , firstShuffleId, 2 , 1 , " Fetch failed" ), null ))
1338+ scheduler.resubmitFailedStages()
1339+
1340+ val stage1Resubmit1 = taskSets(2 )
1341+ assert(stage1Resubmit1.stageId == 1 )
1342+ assert(stage1Resubmit1.tasks.size == 2 )
1343+
1344+ // now exec-hostD lost, so the output loc of stage1 partitionId=1 will lost.
1345+ // runEvent(makeCompletionEvent(taskSets(1).tasks(0), Resubmitted, null))
1346+ runEvent(ExecutorLost (" exec-hostD" ))
1347+ scheduler.resubmitFailedStages()
1348+ sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
1349+
1350+ assert(taskSets(3 ).tasks.size == 3 ) // both stage 1 partition 0/1/2
1351+
1352+ // let stage1Resubmit1 complete
1353+ complete(taskSets(2 ), Seq (
1354+ (Success , makeMapStatus(" hostB" , shuffleMapRdd.partitions.length)),
1355+ (Success , makeMapStatus(" hostB" , shuffleMapRdd.partitions.length))
1356+ ))
1357+
1358+ // and let we complete stage1Resubmit0's active running Tasks
1359+ runEvent(makeCompletionEvent(
1360+ taskSets(1 ).tasks(1 ),
1361+ Success ,
1362+ makeMapStatus(" hostC" , shuffleMapRdd.partitions.length)))
1363+ runEvent(makeCompletionEvent(
1364+ taskSets(1 ).tasks(2 ),
1365+ Success ,
1366+ makeMapStatus(" hostC" , shuffleMapRdd.partitions.length)))
1367+
1368+ runEvent(makeCompletionEvent(
1369+ taskSets(3 ).tasks(0 ),
1370+ Success ,
1371+ makeMapStatus(" hostC" , shuffleMapRdd.partitions.length)))
1372+
1373+ assert(scheduler.runningStages.head.isInstanceOf [ResultStage ])
1374+ }
1375+
13071376 /**
13081377 * Makes sure that failures of stage used by multiple jobs are correctly handled.
13091378 *
@@ -1467,16 +1536,20 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
14671536 // blockManagerMaster.removeExecutor("exec-hostA")
14681537 // pretend we were told hostA went away
14691538 runEvent(ExecutorLost (" exec-hostA" ))
1539+
14701540 // DAGScheduler will immediately resubmit the stage after it appears to have no pending tasks
14711541 // rather than marking it is as failed and waiting.
14721542 complete(taskSets(0 ), Seq (
14731543 (Success , makeMapStatus(" hostA" , 1 )),
14741544 (Success , makeMapStatus(" hostB" , 1 ))))
1545+
1546+ // In previous due to pendingPartitions -= expiredTask.partitonID,
1547+ // so will cause Stage resubmit, now we ignored expiredTask partition.
14751548 // have hostC complete the resubmitted task
1476- complete(taskSets(1 ), Seq ((Success , makeMapStatus(" hostC" , 1 ))))
1549+ complete(taskSets(0 ), Seq ((Success , makeMapStatus(" hostC" , 1 ))))
14771550 assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0 ).map(_._1).toSet ===
14781551 HashSet (makeBlockManagerId(" hostC" ), makeBlockManagerId(" hostB" )))
1479- complete(taskSets(2 ), Seq ((Success , 42 )))
1552+ complete(taskSets(1 ), Seq ((Success , 42 )))
14801553 assert(results === Map (0 -> 42 ))
14811554 assertDataStructuresEmpty()
14821555 }
@@ -1927,8 +2000,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
19272000 runEvent(makeCompletionEvent(oldTaskSet.tasks(0 ), Success , makeMapStatus(" hostA" , 2 )))
19282001 assert(results.size === 0 ) // Map stage job should not be complete yet
19292002
2003+
19302004 // Pretend host A was lost
19312005 val oldEpoch = mapOutputTracker.getEpoch
2006+ runEvent(makeCompletionEvent(taskSets(0 ).tasks(0 ), Resubmitted , null ))
19322007 runEvent(ExecutorLost (" exec-hostA" ))
19332008 val newEpoch = mapOutputTracker.getEpoch
19342009 assert(newEpoch > oldEpoch)
@@ -1941,20 +2016,18 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
19412016 runEvent(makeCompletionEvent(oldTaskSet.tasks(2 ), Success , makeMapStatus(" hostB" , 2 )))
19422017 assert(results.size === 0 ) // Map stage job should not be complete yet
19432018
1944- // Now complete tasks in the second task set
1945- val newTaskSet = taskSets(1 )
1946- assert(newTaskSet.tasks.size === 2 ) // Both tasks 0 and 1 were on on hostA
1947- runEvent(makeCompletionEvent(newTaskSet.tasks(0 ), Success , makeMapStatus(" hostB" , 2 )))
2019+ assert(scheduler.runningStages.head.pendingPartitions.size === 2 ) // Both tasks 0 and 1
2020+ runEvent(makeCompletionEvent(oldTaskSet.tasks(0 ), Success , makeMapStatus(" hostB" , 2 )))
19482021 assert(results.size === 0 ) // Map stage job should not be complete yet
1949- runEvent(makeCompletionEvent(newTaskSet .tasks(1 ), Success , makeMapStatus(" hostB" , 2 )))
2022+ runEvent(makeCompletionEvent(oldTaskSet .tasks(1 ), Success , makeMapStatus(" hostB" , 2 )))
19502023 assert(results.size === 1 ) // Map stage job should now finally be complete
19512024 assertDataStructuresEmpty()
19522025
19532026 // Also test that a reduce stage using this shuffled data can immediately run
19542027 val reduceRDD = new MyRDD (sc, 2 , List (shuffleDep), tracker = mapOutputTracker)
19552028 results.clear()
19562029 submit(reduceRDD, Array (0 , 1 ))
1957- complete(taskSets(2 ), Seq ((Success , 42 ), (Success , 43 )))
2030+ complete(taskSets(1 ), Seq ((Success , 42 ), (Success , 43 )))
19582031 assert(results === Map (0 -> 42 , 1 -> 43 ))
19592032 results.clear()
19602033 assertDataStructuresEmpty()
0 commit comments