@@ -28,13 +28,14 @@ import org.apache.spark.scheduler._
2828 */
2929@ DeveloperApi
3030class StorageStatusListener extends SparkListener {
31+ // This maintains only blocks that are cached (i.e. storage level is not StorageLevel.NONE)
3132 private val executorIdToStorageStatus = mutable.Map [String , StorageStatus ]()
3233
3334 def storageStatusList = executorIdToStorageStatus.values.toSeq
3435
3536 /** Update storage status list to reflect updated block statuses */
36- def updateStorageStatus (execId : String , updatedBlocks : Seq [(BlockId , BlockStatus )]) {
37- val filteredStatus = storageStatusList.find(_.blockManagerId.executorId == execId)
37+ private def updateStorageStatus (execId : String , updatedBlocks : Seq [(BlockId , BlockStatus )]) {
38+ val filteredStatus = executorIdToStorageStatus.get( execId)
3839 filteredStatus.foreach { storageStatus =>
3940 updatedBlocks.foreach { case (blockId, updatedStatus) =>
4041 if (updatedStatus.storageLevel == StorageLevel .NONE ) {
@@ -47,7 +48,7 @@ class StorageStatusListener extends SparkListener {
4748 }
4849
4950 /** Update storage status list to reflect the removal of an RDD from the cache */
50- def updateStorageStatus (unpersistedRDDId : Int ) {
51+ private def updateStorageStatus (unpersistedRDDId : Int ) {
5152 storageStatusList.foreach { storageStatus =>
5253 val unpersistedBlocksIds = storageStatus.rddBlocks.keys.filter(_.rddId == unpersistedRDDId)
5354 unpersistedBlocksIds.foreach { blockId =>
0 commit comments