Skip to content

Commit cdab885

Browse files
committed
improve and fix compile error
1 parent 1bb8a44 commit cdab885

File tree

4 files changed

+9
-4
lines changed

4 files changed

+9
-4
lines changed

core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](
107107
def getNumBlocks: Int = numBlocks
108108

109109
/** Total number of blocks this broadcast variable contains. */
110-
private val numBlocks: Int = if (!isExecutorSide) writeBlocks(obj) else nBlocks.getOrElse(-1)
110+
private val numBlocks: Int = nBlocks.getOrElse(writeBlocks(obj))
111111

112112
/** Whether to generate checksum for blocks or not. */
113113
private var checksumEnabled: Boolean = false
@@ -154,7 +154,9 @@ private[spark] class TorrentBroadcast[T: ClassTag](
154154
checksums(i) = calcChecksum(block)
155155
}
156156
val pieceId = BroadcastBlockId(id, "piece" + i)
157-
blockManager.persistBroadcastPiece(pieceId, block)
157+
if (isExecutorSide) {
158+
blockManager.persistBroadcastPiece(pieceId, block)
159+
}
158160
val bytes = new ChunkedByteBuffer(block.duplicate())
159161
if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) {
160162
throw new SparkException(s"Failed to store $pieceId of $broadcastId in local BlockManager")

core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ private[spark] class TorrentBroadcastFactory extends BroadcastFactory {
4242
}
4343

4444
override def uploadBroadcast[T: ClassTag](value_ : T, id: Long): Int = {
45-
new TorrentBroadcast[T](value_, id).getNumBlocks
45+
new TorrentBroadcast[T](value_, id, true).getNumBlocks
4646
}
4747

4848
override def stop() { }

core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,9 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
205205
}
206206
try {
207207
val fs = dirPath.getFileSystem(SparkHadoopUtil.get.conf)
208+
if (!fs.exists(dirPath)) {
209+
return
210+
}
208211
val files = fs.listStatus(dirPath, fileFilter).map(_.getPath)
209212
for (file <- files) {
210213
if (fs.exists(file)) {

sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1071,7 +1071,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
10711071

10721072
agg.queryExecution.executedPlan.collectFirst {
10731073
case ShuffleExchange(_, _: RDDScanExec, _) =>
1074-
case BroadcastExchangeExec(_, _: RDDScanExec) =>
1074+
case BroadcastExchangeExec(_, _: RDDScanExec, _) =>
10751075
}.foreach { _ =>
10761076
fail(
10771077
"No Exchange should be inserted above RDDScanExec since the checkpointed Dataset " +

0 commit comments

Comments
 (0)