Skip to content

Commit 05a4514

Browse files
eatoncyssrowen
authored andcommitted
[SPARK-20386][SPARK CORE] modify the log info if the block exists on the slave already
## What changes were proposed in this pull request? Modify the added memory size to memSize-originalMemSize if the block exists on the slave already since if the block exists, the added memory size should be memSize-originalMemSize; if originalMemSize is bigger than memSize ,then the log info should be Removed memory, removed size should be originalMemSize-memSize ## How was this patch tested? Multiple runs on existing unit tests (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. Author: eatoncys <[email protected]> Closes #17683 from eatoncys/SPARK-20386.
1 parent ad29040 commit 05a4514

File tree

1 file changed

+35
-17
lines changed

1 file changed

+35
-17
lines changed

core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala

Lines changed: 35 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -497,11 +497,17 @@ private[spark] class BlockManagerInfo(
497497

498498
updateLastSeenMs()
499499

500-
if (_blocks.containsKey(blockId)) {
500+
val blockExists = _blocks.containsKey(blockId)
501+
var originalMemSize: Long = 0
502+
var originalDiskSize: Long = 0
503+
var originalLevel: StorageLevel = StorageLevel.NONE
504+
505+
if (blockExists) {
501506
// The block exists on the slave already.
502507
val blockStatus: BlockStatus = _blocks.get(blockId)
503-
val originalLevel: StorageLevel = blockStatus.storageLevel
504-
val originalMemSize: Long = blockStatus.memSize
508+
originalLevel = blockStatus.storageLevel
509+
originalMemSize = blockStatus.memSize
510+
originalDiskSize = blockStatus.diskSize
505511

506512
if (originalLevel.useMemory) {
507513
_remainingMem += originalMemSize
@@ -520,32 +526,44 @@ private[spark] class BlockManagerInfo(
520526
blockStatus = BlockStatus(storageLevel, memSize = memSize, diskSize = 0)
521527
_blocks.put(blockId, blockStatus)
522528
_remainingMem -= memSize
523-
logInfo("Added %s in memory on %s (size: %s, free: %s)".format(
524-
blockId, blockManagerId.hostPort, Utils.bytesToString(memSize),
525-
Utils.bytesToString(_remainingMem)))
529+
if (blockExists) {
530+
logInfo(s"Updated $blockId in memory on ${blockManagerId.hostPort}" +
531+
s" (current size: ${Utils.bytesToString(memSize)}," +
532+
s" original size: ${Utils.bytesToString(originalMemSize)}," +
533+
s" free: ${Utils.bytesToString(_remainingMem)})")
534+
} else {
535+
logInfo(s"Added $blockId in memory on ${blockManagerId.hostPort}" +
536+
s" (size: ${Utils.bytesToString(memSize)}," +
537+
s" free: ${Utils.bytesToString(_remainingMem)})")
538+
}
526539
}
527540
if (storageLevel.useDisk) {
528541
blockStatus = BlockStatus(storageLevel, memSize = 0, diskSize = diskSize)
529542
_blocks.put(blockId, blockStatus)
530-
logInfo("Added %s on disk on %s (size: %s)".format(
531-
blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize)))
543+
if (blockExists) {
544+
logInfo(s"Updated $blockId on disk on ${blockManagerId.hostPort}" +
545+
s" (current size: ${Utils.bytesToString(diskSize)}," +
546+
s" original size: ${Utils.bytesToString(originalDiskSize)})")
547+
} else {
548+
logInfo(s"Added $blockId on disk on ${blockManagerId.hostPort}" +
549+
s" (size: ${Utils.bytesToString(diskSize)})")
550+
}
532551
}
533552
if (!blockId.isBroadcast && blockStatus.isCached) {
534553
_cachedBlocks += blockId
535554
}
536-
} else if (_blocks.containsKey(blockId)) {
555+
} else if (blockExists) {
537556
// If isValid is not true, drop the block.
538-
val blockStatus: BlockStatus = _blocks.get(blockId)
539557
_blocks.remove(blockId)
540558
_cachedBlocks -= blockId
541-
if (blockStatus.storageLevel.useMemory) {
542-
logInfo("Removed %s on %s in memory (size: %s, free: %s)".format(
543-
blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.memSize),
544-
Utils.bytesToString(_remainingMem)))
559+
if (originalLevel.useMemory) {
560+
logInfo(s"Removed $blockId on ${blockManagerId.hostPort} in memory" +
561+
s" (size: ${Utils.bytesToString(originalMemSize)}," +
562+
s" free: ${Utils.bytesToString(_remainingMem)})")
545563
}
546-
if (blockStatus.storageLevel.useDisk) {
547-
logInfo("Removed %s on %s on disk (size: %s)".format(
548-
blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.diskSize)))
564+
if (originalLevel.useDisk) {
565+
logInfo(s"Removed $blockId on ${blockManagerId.hostPort} on disk" +
566+
s" (size: ${Utils.bytesToString(originalDiskSize)})")
549567
}
550568
}
551569
}

0 commit comments

Comments
 (0)