File tree Expand file tree Collapse file tree 1 file changed +13
-3
lines changed
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange Expand file tree Collapse file tree 1 file changed +13
-3
lines changed Original file line number Diff line number Diff line change @@ -79,10 +79,20 @@ case class BroadcastExchangeExec(
7979 SQLExecution .withExecutionId(sparkContext, executionId) {
8080 try {
8181 val broadcasted = if (executorBroadcast) {
82+ val moreThanOnePartition = child.execute().partitions.size > 0
8283 val before = System .nanoTime()
83- val res = child.execute().coalesce(1 ).mapPartitions { iter =>
84- Seq (mode.transform(iter.map(_.copy()).toArray)).iterator
85- }.broadcast()
84+ val res =
85+ if (moreThanOnePartition) {
86+ child.execute().coalesce(1 ).mapPartitions { iter =>
87+ Seq (mode.transform(iter.map(_.copy()).toArray)).iterator
88+ }.broadcast()
89+ } else {
90+ // handle the corner case that the child rdd have no partitions
91+ // in this case rdd.broadcast will throw a assert error.
92+ sparkContext.makeRDD(Array (1 ), 1 ).mapPartitions { iter =>
93+ Seq (mode.transform(Array .empty[InternalRow ])).iterator
94+ }.broadcast()
95+ }
8696 longMetric(" collect_build_broadcastTime" ) += (System .nanoTime() - before) / 1000000
8797 res
8898 } else {
You can’t perform that action at this time.
0 commit comments