@@ -51,7 +51,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
5151
5252 class DriverActor (sparkProperties : Seq [(String , String )]) extends Actor {
5353 private val executorActor = new HashMap [String , ActorRef ]
54- private val workerOffers = new HashMap [String , WorkerOffer ]
54+ private val executorAddress = new HashMap [String , Address ]
55+ private val executorHost = new HashMap [String , String ]
5556 private val freeCores = new HashMap [String , Int ]
5657 private val totalCores = new HashMap [String , Int ]
5758 private val addressToExecutorId = new HashMap [Address , String ]
@@ -75,10 +76,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
7576 logInfo(" Registered executor: " + sender + " with ID " + executorId)
7677 sender ! RegisteredExecutor (sparkProperties)
7778 executorActor(executorId) = sender
78- workerOffers += (executorId ->
79- new WorkerOffer (executorId, Utils .parseHostPort(hostPort)._1, cores))
80- totalCores += (executorId -> cores)
81- freeCores += (executorId -> cores)
79+ executorHost(executorId) = Utils .parseHostPort(hostPort)._1
80+ totalCores (executorId) = cores
81+ freeCores (executorId) = cores
82+ executorAddress (executorId) = sender.path.address
8283 addressToExecutorId(sender.path.address) = executorId
8384 totalCoreCount.addAndGet(cores)
8485 makeOffers()
@@ -126,18 +127,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
126127
127128 // Make fake resource offers on all executors
128129 def makeOffers () {
129- // reconstruct workerOffers
130- workerOffers.keys.foreach { executorId =>
131- workerOffers(executorId) = workerOffers(executorId).copy(cores = freeCores(executorId))
132- }
133- launchTasks(scheduler.resourceOffers(workerOffers.values.toSeq))
130+ launchTasks(scheduler.resourceOffers(
131+ executorHost.toArray.map {case (id, host) => new WorkerOffer (id, host, freeCores(id))}))
134132 }
135133
136134 // Make fake resource offers on just one executor
137135 def makeOffers (executorId : String ) {
138- // update the workerOffer
139- workerOffers(executorId) = workerOffers(executorId).copy(cores = freeCores(executorId))
140- launchTasks(scheduler.resourceOffers(Seq (workerOffers(executorId))))
136+ launchTasks(scheduler.resourceOffers(
137+ Seq (new WorkerOffer (executorId, executorHost(executorId), freeCores(executorId)))))
141138 }
142139
143140 // Launch tasks returned by a set of resource offers
@@ -154,7 +151,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
154151 logInfo(" Executor " + executorId + " disconnected, so removing it" )
155152 val numCores = totalCores(executorId)
156153 executorActor -= executorId
157- workerOffers -= executorId
154+ executorHost -= executorId
155+ addressToExecutorId -= executorAddress(executorId)
156+ executorAddress -= executorId
158157 totalCores -= executorId
159158 freeCores -= executorId
160159 totalCoreCount.addAndGet(- numCores)
0 commit comments