@@ -217,7 +217,7 @@ private[spark] class BlockManager(
217217 logInfo(s " Reporting ${blockInfoManager.size} blocks to the master. " )
218218 for ((blockId, info) <- blockInfoManager.entries) {
219219 val status = getCurrentBlockStatus(blockId, info)
220- if (! tryToReportBlockStatus(blockId, info , status)) {
220+ if (info.tellMaster && ! tryToReportBlockStatus(blockId, status)) {
221221 logError(s " Failed to report $blockId to master; giving up. " )
222222 return
223223 }
@@ -333,10 +333,9 @@ private[spark] class BlockManager(
333333 */
334334 private def reportBlockStatus (
335335 blockId : BlockId ,
336- info : BlockInfo ,
337336 status : BlockStatus ,
338337 droppedMemorySize : Long = 0L ): Unit = {
339- val needReregister = ! tryToReportBlockStatus(blockId, info, status, droppedMemorySize)
338+ val needReregister = ! tryToReportBlockStatus(blockId, status, droppedMemorySize)
340339 if (needReregister) {
341340 logInfo(s " Got told to re-register updating block $blockId" )
342341 // Re-registering will report our new block for free.
@@ -352,17 +351,12 @@ private[spark] class BlockManager(
352351 */
353352 private def tryToReportBlockStatus (
354353 blockId : BlockId ,
355- info : BlockInfo ,
356354 status : BlockStatus ,
357355 droppedMemorySize : Long = 0L ): Boolean = {
358- if (info.tellMaster) {
359- val storageLevel = status.storageLevel
360- val inMemSize = Math .max(status.memSize, droppedMemorySize)
361- val onDiskSize = status.diskSize
362- master.updateBlockInfo(blockManagerId, blockId, storageLevel, inMemSize, onDiskSize)
363- } else {
364- true
365- }
356+ val storageLevel = status.storageLevel
357+ val inMemSize = Math .max(status.memSize, droppedMemorySize)
358+ val onDiskSize = status.diskSize
359+ master.updateBlockInfo(blockManagerId, blockId, storageLevel, inMemSize, onDiskSize)
366360 }
367361
368362 /**
@@ -807,8 +801,8 @@ private[spark] class BlockManager(
807801 // Now that the block is in either the memory or disk store,
808802 // tell the master about it.
809803 info.size = size
810- if (tellMaster) {
811- reportBlockStatus(blockId, info, putBlockStatus)
804+ if (tellMaster && info.tellMaster ) {
805+ reportBlockStatus(blockId, putBlockStatus)
812806 }
813807 Option (TaskContext .get()).foreach { c =>
814808 c.taskMetrics().incUpdatedBlockStatuses(blockId -> putBlockStatus)
@@ -964,8 +958,8 @@ private[spark] class BlockManager(
964958 // Now that the block is in either the memory, externalBlockStore, or disk store,
965959 // tell the master about it.
966960 info.size = size
967- if (tellMaster) {
968- reportBlockStatus(blockId, info, putBlockStatus)
961+ if (tellMaster && info.tellMaster ) {
962+ reportBlockStatus(blockId, putBlockStatus)
969963 }
970964 Option (TaskContext .get()).foreach { c =>
971965 c.taskMetrics().incUpdatedBlockStatuses(blockId -> putBlockStatus)
@@ -1271,7 +1265,7 @@ private[spark] class BlockManager(
12711265
12721266 val status = getCurrentBlockStatus(blockId, info)
12731267 if (info.tellMaster) {
1274- reportBlockStatus(blockId, info, status, droppedMemorySize)
1268+ reportBlockStatus(blockId, status, droppedMemorySize)
12751269 }
12761270 if (blockIsUpdated) {
12771271 Option (TaskContext .get()).foreach { c =>
@@ -1334,7 +1328,7 @@ private[spark] class BlockManager(
13341328 }
13351329 blockInfoManager.removeBlock(blockId)
13361330 if (tellMaster && info.tellMaster) {
1337- reportBlockStatus(blockId, info, BlockStatus .empty)
1331+ reportBlockStatus(blockId, BlockStatus .empty)
13381332 }
13391333 Option (TaskContext .get()).foreach { c =>
13401334 c.taskMetrics().incUpdatedBlockStatuses(blockId -> BlockStatus .empty)
0 commit comments