Skip to content

Commit 8773b01

Browse files
committed
Update comment / minor changes
1 parent 3afde3f commit 8773b01

File tree

1 file changed

+4
-3
lines changed

1 file changed

+4
-3
lines changed

core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,14 @@ import org.apache.spark.scheduler._
2828
*/
2929
@DeveloperApi
3030
class StorageStatusListener extends SparkListener {
31+
// This maintains only blocks that are cached (i.e. storage level is not StorageLevel.NONE)
3132
private val executorIdToStorageStatus = mutable.Map[String, StorageStatus]()
3233

3334
def storageStatusList = executorIdToStorageStatus.values.toSeq
3435

3536
/** Update storage status list to reflect updated block statuses */
36-
def updateStorageStatus(execId: String, updatedBlocks: Seq[(BlockId, BlockStatus)]) {
37-
val filteredStatus = storageStatusList.find(_.blockManagerId.executorId == execId)
37+
private def updateStorageStatus(execId: String, updatedBlocks: Seq[(BlockId, BlockStatus)]) {
38+
val filteredStatus = executorIdToStorageStatus.get(execId)
3839
filteredStatus.foreach { storageStatus =>
3940
updatedBlocks.foreach { case (blockId, updatedStatus) =>
4041
if (updatedStatus.storageLevel == StorageLevel.NONE) {
@@ -47,7 +48,7 @@ class StorageStatusListener extends SparkListener {
4748
}
4849

4950
/** Update storage status list to reflect the removal of an RDD from the cache */
50-
def updateStorageStatus(unpersistedRDDId: Int) {
51+
private def updateStorageStatus(unpersistedRDDId: Int) {
5152
storageStatusList.foreach { storageStatus =>
5253
val unpersistedBlocksIds = storageStatus.rddBlocks.keys.filter(_.rddId == unpersistedRDDId)
5354
unpersistedBlocksIds.foreach { blockId =>

0 commit comments

Comments
 (0)