Skip to content

Commit f6bf93f

Browse files
committed
code clean
1 parent 19c2bb4 commit f6bf93f

File tree

2 files changed

+14
-15
lines changed

2 files changed

+14
-15
lines changed

core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,4 @@ package org.apache.spark.scheduler
2121
* Represents free resources available on an executor.
2222
*/
2323
private[spark]
24-
case class WorkerOffer(executorId: String, host: String, cores: Int);
24+
case class WorkerOffer(executorId: String, host: String, cores: Int)

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
5151

5252
class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
5353
private val executorActor = new HashMap[String, ActorRef]
54-
private val workerOffers = new HashMap[String, WorkerOffer]
54+
private val executorAddress = new HashMap[String, Address]
55+
private val executorHost = new HashMap[String, String]
5556
private val freeCores = new HashMap[String, Int]
5657
private val totalCores = new HashMap[String, Int]
5758
private val addressToExecutorId = new HashMap[Address, String]
@@ -75,10 +76,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
7576
logInfo("Registered executor: " + sender + " with ID " + executorId)
7677
sender ! RegisteredExecutor(sparkProperties)
7778
executorActor(executorId) = sender
78-
workerOffers += (executorId ->
79-
new WorkerOffer(executorId, Utils.parseHostPort(hostPort)._1, cores))
80-
totalCores += (executorId -> cores)
81-
freeCores += (executorId -> cores)
79+
executorHost(executorId) = Utils.parseHostPort(hostPort)._1
80+
totalCores(executorId) = cores
81+
freeCores(executorId) = cores
82+
executorAddress(executorId) = sender.path.address
8283
addressToExecutorId(sender.path.address) = executorId
8384
totalCoreCount.addAndGet(cores)
8485
makeOffers()
@@ -126,18 +127,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
126127

127128
// Make fake resource offers on all executors
128129
def makeOffers() {
129-
// reconstruct workerOffers
130-
workerOffers.keys.foreach { executorId =>
131-
workerOffers(executorId) = workerOffers(executorId).copy(cores = freeCores(executorId))
132-
}
133-
launchTasks(scheduler.resourceOffers(workerOffers.values.toSeq))
130+
launchTasks(scheduler.resourceOffers(
131+
executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))
134132
}
135133

136134
// Make fake resource offers on just one executor
137135
def makeOffers(executorId: String) {
138-
// update the workerOffer
139-
workerOffers(executorId) = workerOffers(executorId).copy(cores = freeCores(executorId))
140-
launchTasks(scheduler.resourceOffers(Seq(workerOffers(executorId))))
136+
launchTasks(scheduler.resourceOffers(
137+
Seq(new WorkerOffer(executorId, executorHost(executorId), freeCores(executorId)))))
141138
}
142139

143140
// Launch tasks returned by a set of resource offers
@@ -154,7 +151,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
154151
logInfo("Executor " + executorId + " disconnected, so removing it")
155152
val numCores = totalCores(executorId)
156153
executorActor -= executorId
157-
workerOffers -= executorId
154+
executorHost -= executorId
155+
addressToExecutorId -= executorAddress(executorId)
156+
executorAddress -= executorId
158157
totalCores -= executorId
159158
freeCores -= executorId
160159
totalCoreCount.addAndGet(-numCores)

0 commit comments

Comments
 (0)