Skip to content

Commit 74f285a

Browse files
author
Sital Kedia
committed
Rebase with master
1 parent b26e99d commit 74f285a

File tree

4 files changed

+61
-15
lines changed

4 files changed

+61
-15
lines changed

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ private class ShuffleStatus(numPartitions: Int) {
5555
* locations is so small that we choose to ignore that case and store only a single location
5656
* for each output.
5757
*/
58-
private[this] val mapStatuses = new Array[MapStatus](numPartitions)
58+
// Exposed for testing
59+
val mapStatuses = new Array[MapStatus](numPartitions)
5960

6061
/**
6162
* The cached result of serializing the map statuses array. This cache is lazily populated when
@@ -105,14 +106,30 @@ private class ShuffleStatus(numPartitions: Int) {
105106
}
106107
}
107108

109+
/**
110+
* Removes all shuffle outputs associated with this host. Note that this will also remove
111+
* outputs which are served by an external shuffle server (if one exists).
112+
*/
113+
def removeOutputsOnHost(host: String): Unit = {
114+
removeOutputsByFilter(x => x.host == host)
115+
}
116+
108117
/**
109118
* Removes all map outputs associated with the specified executor. Note that this will also
110119
* remove outputs which are served by an external shuffle server (if one exists), as they are
111120
* still registered with that execId.
112121
*/
113122
def removeOutputsOnExecutor(execId: String): Unit = synchronized {
123+
removeOutputsByFilter(x => x.executorId == execId)
124+
}
125+
126+
/**
127+
* Removes all shuffle outputs which satisfies the filter. Note that this will also
128+
* remove outputs which are served by an external shuffle server (if one exists).
129+
*/
130+
def removeOutputsByFilter(f: (BlockManagerId) => Boolean): Unit = synchronized {
114131
for (mapId <- 0 until mapStatuses.length) {
115-
if (mapStatuses(mapId) != null && mapStatuses(mapId).location.executorId == execId) {
132+
if (mapStatuses(mapId) != null && f(mapStatuses(mapId).location)) {
116133
_numAvailableOutputs -= 1
117134
mapStatuses(mapId) = null
118135
invalidateSerializedMapOutputStatusCache()
@@ -317,7 +334,8 @@ private[spark] class MapOutputTrackerMaster(
317334

318335
// HashMap for storing shuffleStatuses in the driver.
319336
// Statuses are dropped only by explicit de-registering.
320-
private val shuffleStatuses = new ConcurrentHashMap[Int, ShuffleStatus]().asScala
337+
// Exposed for testing
338+
val shuffleStatuses = new ConcurrentHashMap[Int, ShuffleStatus]().asScala
321339

322340
private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
323341

@@ -415,6 +433,15 @@ private[spark] class MapOutputTrackerMaster(
415433
}
416434
}
417435

436+
/**
437+
* Removes all shuffle outputs associated with this host. Note that this will also remove
438+
* outputs which are served by an external shuffle server (if one exists).
439+
*/
440+
def removeOutputsOnHost(host: String): Unit = {
441+
shuffleStatuses.valuesIterator.foreach { _.removeOutputsOnHost(host) }
442+
incrementEpoch()
443+
}
444+
418445
/**
419446
* Removes all shuffle outputs associated with this executor. Note that this will also remove
420447
* outputs which are served by an external shuffle server (if one exists), as they are still

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,14 @@ package object config {
151151
.createOptional
152152
// End blacklist confs
153153

154+
private[spark] val UNREGISTER_OUTPUT_ON_HOST_ON_FETCH_FAILURE =
155+
ConfigBuilder("spark.files.fetchFailure.unRegisterOutputOnHost")
156+
.doc("Whether to un-register all the outputs on the host in condition that we receive " +
157+
" a FetchFailure. This is set default to false, which means, we only un-register the " +
158+
" outputs related to the exact executor(instead of the host) on a FetchFailure.")
159+
.booleanConf
160+
.createWithDefault(false)
161+
154162
private[spark] val LISTENER_BUS_EVENT_QUEUE_CAPACITY =
155163
ConfigBuilder("spark.scheduler.listenerbus.eventqueue.capacity")
156164
.withAlternative("spark.scheduler.listenerbus.eventqueue.size")

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import org.apache.commons.lang3.SerializationUtils
3535
import org.apache.spark._
3636
import org.apache.spark.broadcast.Broadcast
3737
import org.apache.spark.executor.TaskMetrics
38+
import org.apache.spark.internal.config
3839
import org.apache.spark.internal.Logging
3940
import org.apache.spark.network.util.JavaUtils
4041
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
@@ -193,7 +194,7 @@ class DAGScheduler(
193194
* executor(instead of the host) on a FetchFailure.
194195
*/
195196
private[scheduler] val unRegisterOutputOnHostOnFetchFailure =
196-
sc.getConf.getBoolean("spark.files.fetchFailure.unRegisterOutputOnHost", false)
197+
sc.getConf.get(config.UNREGISTER_OUTPUT_ON_HOST_ON_FETCH_FAILURE)
197198

198199
/**
199200
* Number of consecutive stage attempts allowed before a stage is aborted.
@@ -1414,14 +1415,20 @@ class DAGScheduler(
14141415
failedEpoch(execId) = currentEpoch
14151416
logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch))
14161417
blockManagerMaster.removeExecutor(execId)
1417-
1418-
if (filesLost || !env.blockManager.externalShuffleServiceEnabled) {
1419-
logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch))
1420-
mapOutputTracker.removeOutputsOnExecutor(execId)
1418+
if (fileLost) {
1419+
hostToUnregisterOutputs match {
1420+
case Some(host) =>
1421+
logInfo("Shuffle files lost for host: %s (epoch %d)".format(host, currentEpoch))
1422+
mapOutputTracker.removeOutputsOnHost(host)
1423+
case None =>
1424+
logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch))
1425+
mapOutputTracker.removeOutputsOnExecutor(execId)
1426+
}
14211427
clearCacheLocs()
1428+
1429+
} else {
1430+
logDebug("Additional executor lost message for %s (epoch %d)".format(execId, currentEpoch))
14221431
}
1423-
} else {
1424-
logDebug("Additional executor lost message for %s (epoch %d)".format(execId, currentEpoch))
14251432
}
14261433
}
14271434

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
396396
assertDataStructuresEmpty()
397397
}
398398

399-
test("All shuffle files should on the slave should be cleaned up when slave lost") {
399+
test("All shuffle files on the slave should be cleaned up when slave lost") {
400400
// reset the test context with the right shuffle service config
401401
afterEach()
402402
val conf = new SparkConf()
@@ -411,6 +411,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
411411
val firstShuffleId = firstShuffleDep.shuffleId
412412
val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep))
413413
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(3))
414+
val secondShuffleId = shuffleDep.shuffleId
414415
val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
415416
submit(reduceRdd, Array(0))
416417
// map stage1 completes successfully, with one task on each executor
@@ -430,12 +431,14 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
430431
(Success, makeMapStatus("hostB", 1))
431432
))
432433
// make sure our test setup is correct
433-
val initialMapStatus1 = mapOutputTracker.mapStatuses.get(0).get
434+
val initialMapStatus1 = mapOutputTracker.shuffleStatuses(firstShuffleId).mapStatuses
435+
// val initialMapStatus1 = mapOutputTracker.mapStatuses.get(0).get
434436
assert(initialMapStatus1.count(_ != null) === 3)
435437
assert(initialMapStatus1.map{_.location.executorId}.toSet ===
436438
Set("exec-hostA1", "exec-hostA2", "exec-hostB"))
437439

438-
val initialMapStatus2 = mapOutputTracker.mapStatuses.get(0).get
440+
val initialMapStatus2 = mapOutputTracker.shuffleStatuses(secondShuffleId).mapStatuses
441+
// val initialMapStatus1 = mapOutputTracker.mapStatuses.get(0).get
439442
assert(initialMapStatus2.count(_ != null) === 3)
440443
assert(initialMapStatus2.map{_.location.executorId}.toSet ===
441444
Set("exec-hostA1", "exec-hostA2", "exec-hostB"))
@@ -448,12 +451,13 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
448451

449452
// Here is the main assertion -- make sure that we de-register
450453
// the map outputs for both map stage from both executors on hostA
451-
val mapStatus1 = mapOutputTracker.mapStatuses.get(0).get
454+
455+
val mapStatus1 = mapOutputTracker.shuffleStatuses(firstShuffleId).mapStatuses
452456
assert(mapStatus1.count(_ != null) === 1)
453457
assert(mapStatus1(2).location.executorId === "exec-hostB")
454458
assert(mapStatus1(2).location.host === "hostB")
455459

456-
val mapStatus2 = mapOutputTracker.mapStatuses.get(1).get
460+
val mapStatus2 = mapOutputTracker.shuffleStatuses(secondShuffleId).mapStatuses
457461
assert(mapStatus2.count(_ != null) === 1)
458462
assert(mapStatus2(2).location.executorId === "exec-hostB")
459463
assert(mapStatus2(2).location.host === "hostB")

0 commit comments

Comments
 (0)