File tree Expand file tree Collapse file tree 1 file changed +8
-3
lines changed
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange Expand file tree Collapse file tree 1 file changed +8
-3
lines changed Original file line number Diff line number Diff line change @@ -83,13 +83,18 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
8383 numPartitionsSet.headOption
8484 }
8585
86- // Read bucketed tables always obeys numShufflePartitions because maxNumPostShufflePartitions
87- // is usually much larger than numShufflePartitions,
88- // which causes some bucket map join lose efficacy after enabling adaptive execution.
86+ // If there are non-shuffle children that satisfy the required distribution, we have
87+ // some tradeoffs when picking the expected number of shuffle partitions:
88+ // 1. We should avoid shuffling these children.
89+ // 2. We should have a reasonable parallelism.
8990 val nonShuffleChildrenNumPartitions =
9091 childrenIndexes.map(children).filterNot(_.isInstanceOf [ShuffleExchangeExec ])
9192 .map(_.outputPartitioning.numPartitions)
9293 val expectedChildrenNumPartitions = if (nonShuffleChildrenNumPartitions.nonEmpty) {
94+ // Here we pick the max number of partitions among these non-shuffle children as the
95+ // expected number of shuffle partitions. However, if it's smaller than
96+ // `conf.numShufflePartitions`, we pick `conf.numShufflePartitions` as the
97+ // expected number of shuffle partitions.
9398 math.max(nonShuffleChildrenNumPartitions.max, conf.numShufflePartitions)
9499 } else {
95100 childrenNumPartitions.max
You can’t perform that action at this time.
0 commit comments