@@ -114,9 +114,14 @@ private[spark] class TaskSetManager(
114114 // treated as stacks, in which new tasks are added to the end of the
115115 // ArrayBuffer and removed from the end. This makes it faster to detect
116116 // tasks that repeatedly fail because whenever a task failed, it is put
117- // back at the head of the stack. They are also only cleaned up lazily;
118- // when a task is launched, it remains in all the pending lists except
119- // the one that it was launched from, but gets removed from them later.
117+ // back at the head of the stack. These collections may contain duplicates
118+ // for two reasons:
119+ // (1): Tasks are only removed lazily; when a task is launched, it remains
120+ // in all the pending lists except the one that it was launched from.
121+ // (2): Tasks may be re-added to these lists multiple times as a result
122+ // of failures.
123+ // Duplicates are handled in dequeueTaskFromList, which ensures that a
124+ // task hasn't already started running before launching it.
120125 private val pendingTasksForExecutor = new HashMap [String , ArrayBuffer [Int ]]
121126
122127 // Set of pending tasks for each host. Similar to pendingTasksForExecutor,
@@ -179,23 +184,16 @@ private[spark] class TaskSetManager(
179184
180185 /** Add a task to all the pending-task lists that it should be on. */
181186 private def addPendingTask (index : Int ) {
182- // Utility method that adds `index` to a list only if it's not already there
183- def addTo (list : ArrayBuffer [Int ]) {
184- if (! list.contains(index)) {
185- list += index
186- }
187- }
188-
189187 for (loc <- tasks(index).preferredLocations) {
190188 loc match {
191189 case e : ExecutorCacheTaskLocation =>
192- addTo( pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer ))
190+ pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer ) += index
193191 case e : HDFSCacheTaskLocation => {
194192 val exe = sched.getExecutorsAliveOnHost(loc.host)
195193 exe match {
196194 case Some (set) => {
197195 for (e <- set) {
198- addTo( pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer ))
196+ pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer ) += index
199197 }
200198 logInfo(s " Pending task $index has a cached location at ${e.host} " +
201199 " , where there are executors " + set.mkString(" ," ))
@@ -206,14 +204,14 @@ private[spark] class TaskSetManager(
206204 }
207205 case _ => Unit
208206 }
209- addTo( pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer ))
207+ pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer ) += index
210208 for (rack <- sched.getRackForHost(loc.host)) {
211- addTo( pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer ))
209+ pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer ) += index
212210 }
213211 }
214212
215213 if (tasks(index).preferredLocations == Nil ) {
216- addTo( pendingTasksWithNoPrefs)
214+ pendingTasksWithNoPrefs += index
217215 }
218216
219217 allPendingTasks += index // No point scanning this whole list to find the old task there
0 commit comments