@@ -51,9 +51,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
5151
5252 class DriverActor (sparkProperties : Seq [(String , String )]) extends Actor {
5353 private val executorActor = new HashMap [String , ActorRef ]
54- private val executorAddress = new HashMap [String , Address ]
55- private val executorHost = new HashMap [String , String ]
56- private val freeCores = new HashMap [String , Int ]
54+ private val workerOffers = new HashMap [String , WorkerOffer ]
5755 private val addressToExecutorId = new HashMap [Address , String ]
5856
5957 override def preStart () {
@@ -75,9 +73,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
7573 logInfo(" Registered executor: " + sender + " with ID " + executorId)
7674 sender ! RegisteredExecutor (sparkProperties)
7775 executorActor(executorId) = sender
78- executorHost(executorId) = Utils .parseHostPort(hostPort)._1
79- freeCores(executorId) = cores
80- executorAddress(executorId) = sender.path.address
76+ workerOffers += (executorId -> new WorkerOffer (executorId, Utils .parseHostPort(hostPort)._1, cores))
8177 addressToExecutorId(sender.path.address) = executorId
8278 totalCoreCount.addAndGet(cores)
8379 makeOffers()
@@ -87,7 +83,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
8783 scheduler.statusUpdate(taskId, state, data.value)
8884 if (TaskState .isFinished(state)) {
8985 if (executorActor.contains(executorId)) {
90- freeCores (executorId) += 1
86+ workerOffers (executorId).cores += 1
9187 makeOffers(executorId)
9288 } else {
9389 // Ignoring the update since we don't know about the executor.
@@ -125,20 +121,18 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
125121
126122 // Make fake resource offers on all executors
127123 def makeOffers () {
128- launchTasks(scheduler.resourceOffers(
129- executorHost.toArray.map {case (id, host) => new WorkerOffer (id, host, freeCores(id))}))
124+ launchTasks(scheduler.resourceOffers(workerOffers.values.toSeq))
130125 }
131126
132127 // Make fake resource offers on just one executor
133128 def makeOffers (executorId : String ) {
134- launchTasks(scheduler.resourceOffers(
135- Seq (new WorkerOffer (executorId, executorHost(executorId), freeCores(executorId)))))
129+ launchTasks(scheduler.resourceOffers(Seq (workerOffers(executorId))))
136130 }
137131
138132 // Launch tasks returned by a set of resource offers
139133 def launchTasks (tasks : Seq [Seq [TaskDescription ]]) {
140134 for (task <- tasks.flatten) {
141- freeCores (task.executorId) -= 1
135+ workerOffers (task.executorId).cores -= 1
142136 executorActor(task.executorId) ! LaunchTask (task)
143137 }
144138 }
@@ -147,11 +141,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
147141 def removeExecutor (executorId : String , reason : String ) {
148142 if (executorActor.contains(executorId)) {
149143 logInfo(" Executor " + executorId + " disconnected, so removing it" )
150- val numCores = freeCores(executorId)
151- addressToExecutorId -= executorAddress(executorId)
144+ val numCores = workerOffers(executorId).totalcores
152145 executorActor -= executorId
153- executorHost -= executorId
154- freeCores -= executorId
146+ workerOffers -= executorId
155147 totalCoreCount.addAndGet(- numCores)
156148 scheduler.executorLost(executorId, SlaveLost (reason))
157149 }
0 commit comments