@@ -150,8 +150,7 @@ private[spark] class TaskSetManager(
150150 // of task index so that tasks with low indices get launched first.
151151 val delaySchedule = conf.getBoolean(" spark.schedule.delaySchedule" , true )
152152 for (i <- (0 until numTasks).reverse) {
153- // if delay schedule is set, we shouldn't enforce check since executors may haven't registered yet
154- addPendingTask(i, enforceCheck = ! delaySchedule)
153+ addPendingTask(i)
155154 }
156155
157156 // Figure out which locality levels we have in our TaskSet, so we can do delay scheduling
@@ -171,10 +170,8 @@ private[spark] class TaskSetManager(
171170 /**
172171 * Add a task to all the pending-task lists that it should be on. If readding is set, we are
173172 * re-adding the task so only include it in each list if it's not already there.
174- * If enforceCheck is set, we'll check the availability of executors/hosts before adding a task
175- * to the pending list, otherwise, we simply add the task according to its preference.
176173 */
177- private def addPendingTask (index : Int , readding : Boolean = false , enforceCheck : Boolean = true ) {
174+ private def addPendingTask (index : Int , readding : Boolean = false ) {
178175 // Utility method that adds `index` to a list only if readding=false or it's not already there
179176 def addTo (list : ArrayBuffer [Int ]) {
180177 if (! readding || ! list.contains(index)) {
@@ -185,12 +182,12 @@ private[spark] class TaskSetManager(
185182 var hadAliveLocations = false
186183 for (loc <- tasks(index).preferredLocations) {
187184 for (execId <- loc.executorId) {
188- if (! enforceCheck || sched.isExecutorAlive(execId)) {
185+ if (sched.isExecutorAlive(execId)) {
189186 addTo(pendingTasksForExecutor.getOrElseUpdate(execId, new ArrayBuffer ))
190187 hadAliveLocations = true
191188 }
192189 }
193- if (! enforceCheck || sched.hasExecutorsAliveOnHost(loc.host)) {
190+ if (sched.hasExecutorsAliveOnHost(loc.host)) {
194191 addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer ))
195192 for (rack <- sched.getRackForHost(loc.host)) {
196193 addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer ))
@@ -199,7 +196,8 @@ private[spark] class TaskSetManager(
199196 }
200197 }
201198
202- if (! hadAliveLocations) {
199+ if (tasks(index).preferredLocations.isEmpty ||
200+ (! delaySchedule && ! hadAliveLocations)) {
203201 // Even though the task might've had preferred locations, all of those hosts or executors
204202 // are dead; put it in the no-prefs list so we can schedule it elsewhere right away.
205203 addTo(pendingTasksWithNoPrefs)
@@ -742,4 +740,12 @@ private[spark] class TaskSetManager(
742740 logDebug(" Valid locality levels for " + taskSet + " : " + levels.mkString(" , " ))
743741 levels.toArray
744742 }
743+
744+ // Re-compute the pending lists. This should be called when new executor is added
745+ def reAddPendingTasks () {
746+ logInfo(" Re-computing pending task lists." )
747+ for (i <- (0 until numTasks).reverse.filter(index => copiesRunning(index) == 0 && ! successful(index))) {
748+ addPendingTask(i, readding = true )
749+ }
750+ }
745751}
0 commit comments