Skip to content

Commit 74ca88b

Browse files
author
Sital Kedia
committed
[SPARK-19753][CORE] All shuffle files on a host should be removed in case of fetch failure or slave lost
1 parent eff7b40 commit 74ca88b

File tree

4 files changed

+63
-6
lines changed

4 files changed

+63
-6
lines changed

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,8 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf,
289289

290290
// HashMaps for storing mapStatuses and cached serialized statuses in the driver.
291291
// Statuses are dropped only by explicit de-registering.
292-
protected val mapStatuses = new ConcurrentHashMap[Int, Array[MapStatus]]().asScala
292+
// Exposed for testing
293+
val mapStatuses = new ConcurrentHashMap[Int, Array[MapStatus]]().asScala
293294
private val cachedSerializedStatuses = new ConcurrentHashMap[Int, Array[Byte]]().asScala
294295

295296
private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ class DAGScheduler(
149149
*/
150150
private[scheduler] val shuffleIdToMapStage = new HashMap[Int, ShuffleMapStage]
151151
private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob]
152+
private[scheduler] val execIdToHost = new HashMap[String, String]
152153

153154
// Stages we need to run whose parents aren't done
154155
private[scheduler] val waitingStages = new HashSet[Stage]
@@ -1331,7 +1332,7 @@ class DAGScheduler(
13311332

13321333
// TODO: mark the executor as failed only if there were lots of fetch failures on it
13331334
if (bmAddress != null) {
1334-
handleExecutorLost(bmAddress.executorId, filesLost = true, Some(task.epoch))
1335+
handleExecutorLost(bmAddress.executorId, slaveLost = true, Some(task.epoch))
13351336
}
13361337
}
13371338

@@ -1365,19 +1366,26 @@ class DAGScheduler(
13651366
*/
13661367
private[scheduler] def handleExecutorLost(
13671368
execId: String,
1368-
filesLost: Boolean,
1369+
slaveLost: Boolean,
13691370
maybeEpoch: Option[Long] = None) {
13701371
val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch)
13711372
if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) {
13721373
failedEpoch(execId) = currentEpoch
13731374
logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch))
13741375
blockManagerMaster.removeExecutor(execId)
13751376

1376-
if (filesLost || !env.blockManager.externalShuffleServiceEnabled) {
1377-
logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch))
1377+
if (slaveLost || !env.blockManager.externalShuffleServiceEnabled) {
13781378
// TODO: This will be really slow if we keep accumulating shuffle map stages
13791379
for ((shuffleId, stage) <- shuffleIdToMapStage) {
1380-
stage.removeOutputsOnExecutor(execId)
1380+
if (slaveLost) {
1381+
val host = execIdToHost.get(execId).get
1382+
logInfo(("Shuffle files lost for executor: %s (epoch %d)," +
1383+
" removing shuffle files on host: %s").format(execId, currentEpoch, host ))
1384+
stage.removeOutputsOnHost(host)
1385+
} else {
1386+
logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch))
1387+
stage.removeOutputsOnExecutor(execId)
1388+
}
13811389
mapOutputTracker.registerMapOutputs(
13821390
shuffleId,
13831391
stage.outputLocInMapOutputTrackerFormat(),
@@ -1400,6 +1408,7 @@ class DAGScheduler(
14001408
logInfo("Host added was in lost list earlier: " + host)
14011409
failedEpoch -= execId
14021410
}
1411+
execIdToHost.put(execId, host)
14031412
}
14041413

14051414
private[scheduler] def handleStageCancellation(stageId: Int, reason: Option[String]) {

core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,28 @@ private[spark] class ShuffleMapStage(
132132
outputLocs.map(_.headOption.orNull)
133133
}
134134

135+
/**
136+
* Removes all shuffle outputs associated with this host. Note that this will also remove
137+
* outputs which are served by an external shuffle server (if one exists), as they are still
138+
* registered with this execId.
139+
*/
140+
def removeOutputsOnHost(host: String): Unit = {
141+
var becameUnavailable = false
142+
for (partition <- 0 until numPartitions) {
143+
val prevList = outputLocs(partition)
144+
val newList = prevList.filterNot(_.location.host == host)
145+
outputLocs(partition) = newList
146+
if (prevList != Nil && newList == Nil) {
147+
becameUnavailable = true
148+
_numAvailableOutputs -= 1
149+
}
150+
}
151+
if (becameUnavailable) {
152+
logInfo("%s is now unavailable on host %s (%d/%d, %s)".format(
153+
this, host, _numAvailableOutputs, numPartitions, isAvailable))
154+
}
155+
}
156+
135157
/**
136158
* Removes all shuffle outputs associated with this executor. Note that this will also remove
137159
* outputs which are served by an external shuffle server (if one exists), as they are still

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -394,6 +394,31 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
394394
assertDataStructuresEmpty()
395395
}
396396

397+
test("All shuffle files should on the slave should be cleaned up when slave lost") {
398+
// reset the test context with the right shuffle service config
399+
afterEach()
400+
val conf = new SparkConf()
401+
conf.set("spark.shuffle.service.enabled", "true")
402+
init(conf)
403+
runEvent(ExecutorAdded("exec-hostA1", "hostA"))
404+
runEvent(ExecutorAdded("exec-hostA2", "hostA"))
405+
runEvent(ExecutorAdded("exec-hostB", "hostB"))
406+
val shuffleMapRdd = new MyRDD(sc, 3, Nil)
407+
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1))
408+
val shuffleId = shuffleDep.shuffleId
409+
val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker)
410+
submit(reduceRdd, Array(0))
411+
complete(taskSets(0), Seq(
412+
(Success, makeMapStatus("hostA", 1)),
413+
(Success, makeMapStatus("hostA", 1)),
414+
(Success, makeMapStatus("hostB", 1))))
415+
runEvent(ExecutorLost("exec-hostA1", SlaveLost("", true)))
416+
val mapStatus = mapOutputTracker.mapStatuses.get(0).get.filter(_!= null)
417+
assert(mapStatus.size === 1)
418+
assert(mapStatus(0).location.executorId === "exec-hostB")
419+
assert(mapStatus(0).location.host === "hostB")
420+
}
421+
397422
test("zero split job") {
398423
var numResults = 0
399424
var failureReason: Option[Exception] = None

0 commit comments

Comments
 (0)