Skip to content

Commit dca6a8a

Browse files
committed
Move if(info.tellMaster) check out of reportBlockStatus.
1 parent f3890a5 commit dca6a8a

File tree

1 file changed

+12
-18
lines changed

1 file changed

+12
-18
lines changed

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ private[spark] class BlockManager(
217217
logInfo(s"Reporting ${blockInfoManager.size} blocks to the master.")
218218
for ((blockId, info) <- blockInfoManager.entries) {
219219
val status = getCurrentBlockStatus(blockId, info)
220-
if (!tryToReportBlockStatus(blockId, info, status)) {
220+
if (info.tellMaster && !tryToReportBlockStatus(blockId, status)) {
221221
logError(s"Failed to report $blockId to master; giving up.")
222222
return
223223
}
@@ -333,10 +333,9 @@ private[spark] class BlockManager(
333333
*/
334334
private def reportBlockStatus(
335335
blockId: BlockId,
336-
info: BlockInfo,
337336
status: BlockStatus,
338337
droppedMemorySize: Long = 0L): Unit = {
339-
val needReregister = !tryToReportBlockStatus(blockId, info, status, droppedMemorySize)
338+
val needReregister = !tryToReportBlockStatus(blockId, status, droppedMemorySize)
340339
if (needReregister) {
341340
logInfo(s"Got told to re-register updating block $blockId")
342341
// Re-registering will report our new block for free.
@@ -352,17 +351,12 @@ private[spark] class BlockManager(
352351
*/
353352
private def tryToReportBlockStatus(
354353
blockId: BlockId,
355-
info: BlockInfo,
356354
status: BlockStatus,
357355
droppedMemorySize: Long = 0L): Boolean = {
358-
if (info.tellMaster) {
359-
val storageLevel = status.storageLevel
360-
val inMemSize = Math.max(status.memSize, droppedMemorySize)
361-
val onDiskSize = status.diskSize
362-
master.updateBlockInfo(blockManagerId, blockId, storageLevel, inMemSize, onDiskSize)
363-
} else {
364-
true
365-
}
356+
val storageLevel = status.storageLevel
357+
val inMemSize = Math.max(status.memSize, droppedMemorySize)
358+
val onDiskSize = status.diskSize
359+
master.updateBlockInfo(blockManagerId, blockId, storageLevel, inMemSize, onDiskSize)
366360
}
367361

368362
/**
@@ -807,8 +801,8 @@ private[spark] class BlockManager(
807801
// Now that the block is in either the memory or disk store,
808802
// tell the master about it.
809803
info.size = size
810-
if (tellMaster) {
811-
reportBlockStatus(blockId, info, putBlockStatus)
804+
if (tellMaster && info.tellMaster) {
805+
reportBlockStatus(blockId, putBlockStatus)
812806
}
813807
Option(TaskContext.get()).foreach { c =>
814808
c.taskMetrics().incUpdatedBlockStatuses(blockId -> putBlockStatus)
@@ -964,8 +958,8 @@ private[spark] class BlockManager(
964958
// Now that the block is in either the memory, externalBlockStore, or disk store,
965959
// tell the master about it.
966960
info.size = size
967-
if (tellMaster) {
968-
reportBlockStatus(blockId, info, putBlockStatus)
961+
if (tellMaster && info.tellMaster) {
962+
reportBlockStatus(blockId, putBlockStatus)
969963
}
970964
Option(TaskContext.get()).foreach { c =>
971965
c.taskMetrics().incUpdatedBlockStatuses(blockId -> putBlockStatus)
@@ -1271,7 +1265,7 @@ private[spark] class BlockManager(
12711265

12721266
val status = getCurrentBlockStatus(blockId, info)
12731267
if (info.tellMaster) {
1274-
reportBlockStatus(blockId, info, status, droppedMemorySize)
1268+
reportBlockStatus(blockId, status, droppedMemorySize)
12751269
}
12761270
if (blockIsUpdated) {
12771271
Option(TaskContext.get()).foreach { c =>
@@ -1334,7 +1328,7 @@ private[spark] class BlockManager(
13341328
}
13351329
blockInfoManager.removeBlock(blockId)
13361330
if (tellMaster && info.tellMaster) {
1337-
reportBlockStatus(blockId, info, BlockStatus.empty)
1331+
reportBlockStatus(blockId, BlockStatus.empty)
13381332
}
13391333
Option(TaskContext.get()).foreach { c =>
13401334
c.taskMetrics().incUpdatedBlockStatuses(blockId -> BlockStatus.empty)

0 commit comments

Comments
 (0)