Skip to content

Commit f4696ba

Browse files
JkSelfcloud-fan
authored andcommitted
[SPARK-30922][SQL] remove the max splits config in skewed join
### What changes were proposed in this pull request? When skewed join optimization split more skewed readers, the plan may be very large and can not be shown in ui quickly. The config `spark.sql.adaptive.skewedJoinOptimization.skewedPartitionMaxSplits` is to resolve the above ui shown issue. And after [PR#27493](#27493) combined the skewed readers into one, we not need this config. ### Why are the changes needed? remove the unnecessary config ### Does this PR introduce any user-facing change? No ### How was this patch tested? existing test Closes #27673 from JkSelf/removeMaxSplitNum. Authored-by: jiake <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent c41ef39 commit f4696ba

File tree

3 files changed

+18
-29
lines changed

3 files changed

+18
-29
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -447,14 +447,6 @@ object SQLConf {
447447
.intConf
448448
.createWithDefault(10)
449449

450-
val ADAPTIVE_EXECUTION_SKEWED_PARTITION_MAX_SPLITS =
451-
buildConf("spark.sql.adaptive.skewedJoinOptimization.skewedPartitionMaxSplits")
452-
.doc("Configures the maximum number of task to handle a skewed partition in adaptive skewed" +
453-
"join.")
454-
.intConf
455-
.checkValue( _ >= 1, "The split size at least be 1")
456-
.createWithDefault(5)
457-
458450
val NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN =
459451
buildConf("spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin")
460452
.doc("The relation with a non-empty partition ratio lower than this config will not be " +

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
7575
private def getMapStartIndices(stage: ShuffleQueryStageExec, partitionId: Int): Array[Int] = {
7676
val shuffleId = stage.shuffle.shuffleDependency.shuffleHandle.shuffleId
7777
val mapPartitionSizes = getMapSizesForReduceId(shuffleId, partitionId)
78-
val maxSplits = math.min(conf.getConf(
79-
SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_MAX_SPLITS), mapPartitionSizes.length)
80-
val avgPartitionSize = mapPartitionSizes.sum / maxSplits
78+
val avgPartitionSize = mapPartitionSizes.sum / mapPartitionSizes.length
8179
val advisoryPartitionSize = math.max(avgPartitionSize,
8280
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD))
8381
val partitionStartIndices = ArrayBuffer[Int]()
@@ -95,9 +93,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
9593
i += 1
9694
}
9795

98-
if (partitionStartIndices.size > maxSplits) {
99-
partitionStartIndices.take(maxSplits).toArray
100-
} else partitionStartIndices.toArray
96+
partitionStartIndices.toArray
10197
}
10298

10399
private def getStatistics(stage: ShuffleQueryStageExec): MapOutputStatistics = {

sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -609,7 +609,7 @@ class AdaptiveQueryExecSuite
609609
withSQLConf(
610610
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
611611
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
612-
SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD.key -> "100",
612+
SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD.key -> "2000",
613613
SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key -> "700") {
614614
withTempView("skewData1", "skewData2") {
615615
spark
@@ -636,42 +636,43 @@ class AdaptiveQueryExecSuite
636636
"SELECT * FROM skewData1 join skewData2 ON key1 = key2")
637637
// left stats: [3496, 0, 0, 0, 4014]
638638
// right stats:[6292, 0, 0, 0, 0]
639-
// Partition 0: both left and right sides are skewed, and divide into 5 splits, so
640-
// 5 x 5 sub-partitions.
639+
// Partition 0: both left and right sides are skewed, left side is divided
640+
// into 2 splits and right side is divided into 4 splits, so
641+
// 2 x 4 sub-partitions.
641642
// Partition 1, 2, 3: not skewed, and coalesced into 1 partition.
642-
// Partition 4: only left side is skewed, and divide into 5 splits, so
643-
// 5 sub-partitions.
644-
// So total (25 + 1 + 5) partitions.
643+
// Partition 4: only left side is skewed, and divide into 3 splits, so
644+
// 3 sub-partitions.
645+
// So total (8 + 1 + 3) partitions.
645646
val innerSmj = findTopLevelSortMergeJoin(innerAdaptivePlan)
646-
checkSkewJoin(innerSmj, 25 + 1 + 5)
647+
checkSkewJoin(innerSmj, 8 + 1 + 3)
647648

648649
// skewed left outer join optimization
649650
val (_, leftAdaptivePlan) = runAdaptiveAndVerifyResult(
650651
"SELECT * FROM skewData1 left outer join skewData2 ON key1 = key2")
651652
// left stats: [3496, 0, 0, 0, 4014]
652653
// right stats:[6292, 0, 0, 0, 0]
653654
// Partition 0: both left and right sides are skewed, but left join can't split right side,
654-
// so only left side is divided into 5 splits, and thus 5 sub-partitions.
655+
// so only left side is divided into 2 splits, and thus 2 sub-partitions.
655656
// Partition 1, 2, 3: not skewed, and coalesced into 1 partition.
656-
// Partition 4: only left side is skewed, and divide into 5 splits, so
657-
// 5 sub-partitions.
658-
// So total (5 + 1 + 5) partitions.
657+
// Partition 4: only left side is skewed, and divide into 3 splits, so
658+
// 3 sub-partitions.
659+
// So total (2 + 1 + 3) partitions.
659660
val leftSmj = findTopLevelSortMergeJoin(leftAdaptivePlan)
660-
checkSkewJoin(leftSmj, 5 + 1 + 5)
661+
checkSkewJoin(leftSmj, 2 + 1 + 3)
661662

662663
// skewed right outer join optimization
663664
val (_, rightAdaptivePlan) = runAdaptiveAndVerifyResult(
664665
"SELECT * FROM skewData1 right outer join skewData2 ON key1 = key2")
665666
// left stats: [3496, 0, 0, 0, 4014]
666667
// right stats:[6292, 0, 0, 0, 0]
667668
// Partition 0: both left and right sides are skewed, but right join can't split left side,
668-
// so only right side is divided into 5 splits, and thus 5 sub-partitions.
669+
// so only right side is divided into 4 splits, and thus 4 sub-partitions.
669670
// Partition 1, 2, 3: not skewed, and coalesced into 1 partition.
670671
// Partition 4: only left side is skewed, but right join can't split left side, so just
671672
// 1 partition.
672-
// So total (5 + 1 + 1) partitions.
673+
// So total (4 + 1 + 1) partitions.
673674
val rightSmj = findTopLevelSortMergeJoin(rightAdaptivePlan)
674-
checkSkewJoin(rightSmj, 5 + 1 + 1)
675+
checkSkewJoin(rightSmj, 4 + 1 + 1)
675676
}
676677
}
677678
}

0 commit comments

Comments
 (0)