Skip to content

Commit 0c0e409

Browse files
committed
simplify the implementation of CoarseGrainedSchedulerBackend
1 parent 55a4f11 commit 0c0e409

File tree

2 files changed

+11
-17
lines changed

2 files changed

+11
-17
lines changed

core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,6 @@ package org.apache.spark.scheduler
2121
* Represents free resources available on an executor.
2222
*/
2323
private[spark]
24-
class WorkerOffer(val executorId: String, val host: String, val cores: Int)
24+
class WorkerOffer(val executorId: String, val host: String, var cores: Int) {
25+
@transient val totalcores = cores
26+
}

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
5151

5252
class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
5353
private val executorActor = new HashMap[String, ActorRef]
54-
private val executorAddress = new HashMap[String, Address]
55-
private val executorHost = new HashMap[String, String]
56-
private val freeCores = new HashMap[String, Int]
54+
private val workerOffers = new HashMap[String, WorkerOffer]
5755
private val addressToExecutorId = new HashMap[Address, String]
5856

5957
override def preStart() {
@@ -75,9 +73,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
7573
logInfo("Registered executor: " + sender + " with ID " + executorId)
7674
sender ! RegisteredExecutor(sparkProperties)
7775
executorActor(executorId) = sender
78-
executorHost(executorId) = Utils.parseHostPort(hostPort)._1
79-
freeCores(executorId) = cores
80-
executorAddress(executorId) = sender.path.address
76+
workerOffers += (executorId -> new WorkerOffer(executorId, Utils.parseHostPort(hostPort)._1, cores))
8177
addressToExecutorId(sender.path.address) = executorId
8278
totalCoreCount.addAndGet(cores)
8379
makeOffers()
@@ -87,7 +83,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
8783
scheduler.statusUpdate(taskId, state, data.value)
8884
if (TaskState.isFinished(state)) {
8985
if (executorActor.contains(executorId)) {
90-
freeCores(executorId) += 1
86+
workerOffers(executorId).cores += 1
9187
makeOffers(executorId)
9288
} else {
9389
// Ignoring the update since we don't know about the executor.
@@ -125,20 +121,18 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
125121

126122
// Make fake resource offers on all executors
127123
def makeOffers() {
128-
launchTasks(scheduler.resourceOffers(
129-
executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))
124+
launchTasks(scheduler.resourceOffers(workerOffers.values.toSeq))
130125
}
131126

132127
// Make fake resource offers on just one executor
133128
def makeOffers(executorId: String) {
134-
launchTasks(scheduler.resourceOffers(
135-
Seq(new WorkerOffer(executorId, executorHost(executorId), freeCores(executorId)))))
129+
launchTasks(scheduler.resourceOffers(Seq(workerOffers(executorId))))
136130
}
137131

138132
// Launch tasks returned by a set of resource offers
139133
def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
140134
for (task <- tasks.flatten) {
141-
freeCores(task.executorId) -= 1
135+
workerOffers(task.executorId).cores -= 1
142136
executorActor(task.executorId) ! LaunchTask(task)
143137
}
144138
}
@@ -147,11 +141,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
147141
def removeExecutor(executorId: String, reason: String) {
148142
if (executorActor.contains(executorId)) {
149143
logInfo("Executor " + executorId + " disconnected, so removing it")
150-
val numCores = freeCores(executorId)
151-
addressToExecutorId -= executorAddress(executorId)
144+
val numCores = workerOffers(executorId).totalcores
152145
executorActor -= executorId
153-
executorHost -= executorId
154-
freeCores -= executorId
146+
workerOffers -= executorId
155147
totalCoreCount.addAndGet(-numCores)
156148
scheduler.executorLost(executorId, SlaveLost(reason))
157149
}

0 commit comments

Comments
 (0)