Skip to content

Commit d64b261

Browse files
committed
[Spark-15155][Mesos] Optionally ignore default role resources
Add new boolean property: `spark.mesos.ignoreDefaultRoleResources`. When this property is set Spark will only accept resources from the role passed in the `spark.mesos.role` property. If `spark.mesos.role` has not been set, `spark.mesos.ignoreDefaultRoleResources` has no effect. Additional unit tests added to `MesosSchedulerBackendSuite`, extending the original multi-role test suite.
1 parent c8c0906 commit d64b261

File tree

8 files changed

+366
-68
lines changed

8 files changed

+366
-68
lines changed

docs/running-on-mesos.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -428,6 +428,13 @@ See the [configuration page](configuration.html) for information on Spark config
428428
and resource weight sharing.
429429
</td>
430430
</tr>
431+
<tr>
432+
<td><code>spark.mesos.ignoreDefaultRoleResources</code></td>
433+
<td>false</td>
434+
<td>
435+
Only if `spark.mesos.role` has been set, ignore mesos resources with the role `*`.
436+
</td>
437+
</tr>
431438
<tr>
432439
<td><code>spark.mesos.constraints</code></td>
433440
<td>(none)</td>

mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ private[spark] class MesosDriverDescription(
4141
val cores: Double,
4242
val supervise: Boolean,
4343
val command: Command,
44-
schedulerProperties: Map[String, String],
44+
val schedulerProperties: Map[String, String],
4545
val submissionId: String,
4646
val submissionDate: Date,
4747
val retryState: Option[MesosClusterRetryState] = None)

mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -551,12 +551,16 @@ private[spark] class MesosClusterScheduler(
551551
currentOffers: List[ResourceOffer],
552552
tasks: mutable.HashMap[OfferID, ArrayBuffer[TaskInfo]]): Unit = {
553553
for (submission <- candidates) {
554+
val acceptedResourceRoles = getAcceptedResourceRoles(submission.schedulerProperties)
554555
val driverCpu = submission.cores
555556
val driverMem = submission.mem
556557
logTrace(s"Finding offer to launch driver with cpu: $driverCpu, mem: $driverMem")
557558
val offerOption = currentOffers.find { o =>
558-
getResource(o.resources, "cpus") >= driverCpu &&
559-
getResource(o.resources, "mem") >= driverMem
559+
val acceptableResources = o.resources.asScala
560+
.filter((r: Resource) => acceptedResourceRoles(r.getRole))
561+
.asJava
562+
getResource(acceptableResources, "cpus") >= driverCpu &&
563+
getResource(acceptableResources, "mem") >= driverMem
560564
}
561565
if (offerOption.isEmpty) {
562566
logDebug(s"Unable to find offer to launch driver id: ${submission.submissionId}, " +

mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
382382
val remainingResources = mutable.Map(offers.map(offer =>
383383
(offer.getId.getValue, offer.getResourcesList)): _*)
384384

385+
val acceptedResourceRoles = getAcceptedResourceRoles(sc.conf)
386+
385387
var launchTasks = true
386388

387389
// TODO(mgummelt): combine offers for a single slave
@@ -393,15 +395,19 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
393395
for (offer <- offers) {
394396
val slaveId = offer.getSlaveId.getValue
395397
val offerId = offer.getId.getValue
396-
val resources = remainingResources(offerId)
398+
val resources =
399+
remainingResources(offerId).asScala
400+
.filter((r: Resource) => acceptedResourceRoles(r.getRole))
401+
.asJava
397402

398-
if (canLaunchTask(slaveId, resources)) {
403+
if (canLaunchTask(slaveId, resources, acceptedResourceRoles)) {
399404
// Create a task
400405
launchTasks = true
401406
val taskId = newMesosTaskId()
402407
val offerCPUs = getResource(resources, "cpus").toInt
403408
val taskGPUs = Math.min(
404-
Math.max(0, maxGpus - totalGpusAcquired), getResource(resources, "gpus").toInt)
409+
Math.max(0, maxGpus - totalGpusAcquired),
410+
getResource(resources, "gpus").toInt)
405411

406412
val taskCPUs = executorCores(offerCPUs)
407413
val taskMemory = executorMemory(sc)
@@ -466,7 +472,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
466472
cpuResourcesToUse ++ memResourcesToUse ++ portResourcesToUse ++ gpuResourcesToUse)
467473
}
468474

469-
private def canLaunchTask(slaveId: String, resources: JList[Resource]): Boolean = {
475+
private def canLaunchTask(
476+
slaveId: String,
477+
resources: JList[Resource],
478+
acceptedResourceRoles: Set[String]): Boolean = {
470479
val offerMem = getResource(resources, "mem")
471480
val offerCPUs = getResource(resources, "cpus").toInt
472481
val cpus = executorCores(offerCPUs)

mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -249,10 +249,15 @@ private[spark] class MesosFineGrainedSchedulerBackend(
249249
.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build())
250250
}
251251

252+
val acceptedResourceRoles = getAcceptedResourceRoles(sc.conf)
253+
252254
// Of the matching constraints, see which ones give us enough memory and cores
253255
val (usableOffers, unUsableOffers) = offersMatchingConstraints.partition { o =>
254-
val mem = getResource(o.getResourcesList, "mem")
255-
val cpus = getResource(o.getResourcesList, "cpus")
256+
val acceptableResources = o.getResourcesList.asScala
257+
.filter((r: Resource) => acceptedResourceRoles(r.getRole))
258+
.asJava
259+
val mem = getResource(acceptableResources, "mem")
260+
val cpus = getResource(acceptableResources, "cpus")
256261
val slaveId = o.getSlaveId.getValue
257262
val offerAttributes = toAttributeMap(o.getAttributesList)
258263

mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,44 @@ trait MesosSchedulerUtils extends Logging {
4949
// Driver for talking to Mesos
5050
protected var mesosDriver: SchedulerDriver = null
5151

52+
/**
53+
* Returns the configured set of roles that an offer can be selected from
54+
* @param conf Spark configuration
55+
*/
56+
protected def getAcceptedResourceRoles(conf: SparkConf): Set[String] = {
57+
getAcceptedResourceRoles(
58+
conf.getBoolean("spark.mesos.ignoreDefaultRoleResources", false),
59+
conf.getOption("spark.mesos.role"))
60+
}
61+
/**
62+
* Returns the configured set of roles that an offer can be selected from
63+
* @param props Mesos driver description schedulerProperties map
64+
*/
65+
protected def getAcceptedResourceRoles(props: Map[String, String]): Set[String] = {
66+
getAcceptedResourceRoles(
67+
props.get("spark.mesos.ignoreDefaultRoleResources") match {
68+
case Some(truth) => truth.toBoolean
69+
case None => false
70+
},
71+
props.get("spark.mesos.role"))
72+
}
73+
/**
74+
* Internal version of getAcceptedResourceRoles
75+
* @param ignoreDefaultRoleResources user specified property
76+
* @param role user specified property
77+
*/
78+
private def getAcceptedResourceRoles(
79+
ignoreDefaultRoleResources: Boolean,
80+
role: Option[String]) = {
81+
val roles = ignoreDefaultRoleResources match {
82+
case true if role.isDefined => Set(role)
83+
case _ => Set(Some("*"), role)
84+
}
85+
val acceptedRoles = roles.flatten
86+
logDebug(s"Accepting resources from role(s): ${acceptedRoles.mkString(",")}")
87+
acceptedRoles
88+
}
89+
5290
/**
5391
* Creates a new MesosSchedulerDriver that communicates to the Mesos master.
5492
*

0 commit comments

Comments
 (0)