File tree Expand file tree Collapse file tree 2 files changed +19
-10
lines changed
main/scala/org/apache/spark/executor
test/scala/org/apache/spark Expand file tree Collapse file tree 2 files changed +19
-10
lines changed Original file line number Diff line number Diff line change @@ -432,7 +432,8 @@ private[spark] class Executor(
432432 setTaskFinishedAndClearInterruptStatus()
433433 execBackend.statusUpdate(taskId, TaskState .KILLED , ser.serialize(TaskKilled (t.reason)))
434434
435- case NonFatal (_) if task != null && task.reasonIfKilled.isDefined =>
435+ case _ : InterruptedException | NonFatal (_) if
436+ task != null && task.reasonIfKilled.isDefined =>
436437 val killReason = task.reasonIfKilled.getOrElse(" unknown reason" )
437438 logInfo(s " Executor interrupted and killed $taskName (TID $taskId), reason: $killReason" )
438439 setTaskFinishedAndClearInterruptStatus()
Original file line number Diff line number Diff line change @@ -540,10 +540,24 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
540540 }
541541 }
542542
543- // Launches one task that will run forever. Once the SparkListener detects the task has
543+ testCancellingTasks(" that raise interrupted exception on cancel" ) {
544+ Thread .sleep(9999999 )
545+ }
546+
547+ // SPARK-20217 should not fail stage if task throws non-interrupted exception
548+ testCancellingTasks(" that raise runtime exception on cancel" ) {
549+ try {
550+ Thread .sleep(9999999 )
551+ } catch {
552+ case t : Throwable =>
553+ throw new RuntimeException (" killed" )
554+ }
555+ }
556+
557+ // Launches one task that will block forever. Once the SparkListener detects the task has
544558 // started, kill and re-schedule it. The second run of the task will complete immediately.
545559 // If this test times out, then the first version of the task wasn't killed successfully.
546- test(" Killing tasks" ) {
560+ def testCancellingTasks ( desc : String )( blockFn : => Unit ) : Unit = test(s " Killing tasks $desc " ) {
547561 sc = new SparkContext (new SparkConf ().setAppName(" test" ).setMaster(" local" ))
548562
549563 SparkContextSuite .isTaskStarted = false
@@ -572,13 +586,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
572586 // first attempt will hang
573587 if (! SparkContextSuite .isTaskStarted) {
574588 SparkContextSuite .isTaskStarted = true
575- try {
576- Thread .sleep(9999999 )
577- } catch {
578- case t : Throwable =>
579- // SPARK-20217 should not fail stage if task throws non-interrupted exception
580- throw new RuntimeException (" killed" )
581- }
589+ blockFn
582590 }
583591 // second attempt succeeds immediately
584592 }
You can’t perform that action at this time.
0 commit comments