File tree Expand file tree Collapse file tree 2 files changed +6
-6
lines changed
core/src/main/scala/org/apache/spark/scheduler Expand file tree Collapse file tree 2 files changed +6
-6
lines changed Original file line number Diff line number Diff line change @@ -858,12 +858,12 @@ private[spark] class TaskSchedulerImpl(
858858 private [scheduler] def markPartitionCompletedInAllTaskSets (
859859 stageId : Int ,
860860 partitionId : Int ,
861- taskInfo : Option [ TaskInfo ] ) = {
861+ taskInfo : TaskInfo ) = {
862862 val finishedPartitions =
863863 stageIdToFinishedPartitions.getOrElseUpdate(stageId, new BitSet )
864864 finishedPartitions += partitionId
865865 taskSetsByStageIdAndAttempt.getOrElse(stageId, Map ()).values.foreach { tsm =>
866- tsm.markPartitionCompleted(partitionId, taskInfo)
866+ tsm.markPartitionCompleted(partitionId, Some ( taskInfo) )
867867 }
868868 }
869869
Original file line number Diff line number Diff line change @@ -196,9 +196,9 @@ private[spark] class TaskSetManager(
196196 // those tasks as finished here to avoid launching duplicate tasks, while
197197 // holding the TaskSchedulerImpl lock.
198198 // See SPARK-25250 and markPartitionCompletedInAllTaskSets()`
199- sched.stageIdToFinishedPartitions
200- .getOrElseUpdate(taskSet.stageId, new BitSet )
201- .foreach(markPartitionCompleted(_, None ))
199+ sched.stageIdToFinishedPartitions.get(taskSet.stageId).foreach {
200+ finishedPartitions => finishedPartitions.foreach(markPartitionCompleted(_, None ) )
201+ }
202202 }
203203
204204 /**
@@ -798,7 +798,7 @@ private[spark] class TaskSetManager(
798798 }
799799 // There may be multiple tasksets for this stage -- we let all of them know that the partition
800800 // was completed. This may result in some of the tasksets getting completed.
801- sched.markPartitionCompletedInAllTaskSets(stageId, tasks(index).partitionId, Some ( info) )
801+ sched.markPartitionCompletedInAllTaskSets(stageId, tasks(index).partitionId, info)
802802 // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
803803 // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
804804 // "deserialize" the value when holding a lock to avoid blocking other threads. So we call
You can’t perform that action at this time.
0 commit comments