Skip to content

Commit c53961f

Browse files
committed
address comments
1 parent 76bb765 commit c53961f

File tree

2 files changed

+6
-6
lines changed

2 files changed

+6
-6
lines changed

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -858,12 +858,12 @@ private[spark] class TaskSchedulerImpl(
858858
private[scheduler] def markPartitionCompletedInAllTaskSets(
859859
stageId: Int,
860860
partitionId: Int,
861-
taskInfo: Option[TaskInfo]) = {
861+
taskInfo: TaskInfo) = {
862862
val finishedPartitions =
863863
stageIdToFinishedPartitions.getOrElseUpdate(stageId, new BitSet)
864864
finishedPartitions += partitionId
865865
taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm =>
866-
tsm.markPartitionCompleted(partitionId, taskInfo)
866+
tsm.markPartitionCompleted(partitionId, Some(taskInfo))
867867
}
868868
}
869869

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -196,9 +196,9 @@ private[spark] class TaskSetManager(
196196
// those tasks as finished here to avoid launching duplicate tasks, while
197197
// holding the TaskSchedulerImpl lock.
198198
// See SPARK-25250 and markPartitionCompletedInAllTaskSets()`
199-
sched.stageIdToFinishedPartitions
200-
.getOrElseUpdate(taskSet.stageId, new BitSet)
201-
.foreach(markPartitionCompleted(_, None))
199+
sched.stageIdToFinishedPartitions.get(taskSet.stageId).foreach {
200+
finishedPartitions => finishedPartitions.foreach(markPartitionCompleted(_, None))
201+
}
202202
}
203203

204204
/**
@@ -798,7 +798,7 @@ private[spark] class TaskSetManager(
798798
}
799799
// There may be multiple tasksets for this stage -- we let all of them know that the partition
800800
// was completed. This may result in some of the tasksets getting completed.
801-
sched.markPartitionCompletedInAllTaskSets(stageId, tasks(index).partitionId, Some(info))
801+
sched.markPartitionCompletedInAllTaskSets(stageId, tasks(index).partitionId, info)
802802
// This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
803803
// "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
804804
// "deserialize" the value when holding a lock to avoid blocking other threads. So we call

0 commit comments

Comments
 (0)