Skip to content

Commit 45109c7

Browse files
committed
Ignores Thrift server ThriftServerPageSuite
1 parent 1ba9edf commit 45109c7

File tree

3 files changed

+59
-13
lines changed

3 files changed

+59
-13
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,14 @@ object SQLConf {
349349
.checkValue(_ > 0, "The value of spark.sql.shuffle.partitions must be positive")
350350
.createWithDefault(200)
351351

352+
val SHUFFLE_WITHOUT_SHUFFLE_SIDE_RATIO =
353+
buildConf("spark.sql.shuffle.withoutShuffleSideRatio")
354+
.doc("The maximum number of without shuffle partition ratio lower than this config " +
355+
"will not add shuffle exchange for it.")
356+
.doubleConf
357+
.checkValue(ratio => ratio > 0 && ratio <= 1, "The ratio value must be in [0, 1].")
358+
.createWithDefault(1.0)
359+
352360
val ADAPTIVE_EXECUTION_ENABLED = buildConf("spark.sql.adaptive.enabled")
353361
.doc("When true, enable adaptive query execution.")
354362
.booleanConf
@@ -2162,6 +2170,8 @@ class SQLConf extends Serializable with Logging {
21622170

21632171
def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS)
21642172

2173+
def withoutShuffleSideRatio: Double = getConf(SHUFFLE_WITHOUT_SHUFFLE_SIDE_RATIO)
2174+
21652175
def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED)
21662176

21672177
def targetPostShuffleInputSize: Long = getConf(SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE)

sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -83,22 +83,19 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
8383
numPartitionsSet.headOption
8484
}
8585

86-
// maxNumPostShufflePartitions is usually larger than numShufflePartitions,
87-
// which causes some bucket map join lose efficacy after enabling adaptive execution.
88-
// Please see SPARK-29655 for more details.
89-
val expectedChildrenNumPartitions = if (conf.adaptiveExecutionEnabled) {
90-
val withoutShuffleChildrenNumPartitions =
91-
childrenIndexes.filterNot(children(_).isInstanceOf[ShuffleExchangeExec])
92-
.map(children(_).outputPartitioning.numPartitions).toSet
93-
if (withoutShuffleChildrenNumPartitions.nonEmpty &&
94-
conf.maxNumPostShufflePartitions > conf.numShufflePartitions) {
95-
math.min(math.max(withoutShuffleChildrenNumPartitions.max, conf.numShufflePartitions),
96-
conf.maxNumPostShufflePartitions)
86+
val maxNumPartition = childrenNumPartitions.max
87+
val withoutShuffleChildrenNumPartitions =
88+
childrenIndexes.filterNot(children(_).isInstanceOf[ShuffleExchangeExec])
89+
.map(children(_).outputPartitioning.numPartitions).toSet
90+
val expectedChildrenNumPartitions = if (withoutShuffleChildrenNumPartitions.nonEmpty) {
91+
val withoutShuffleMaxNumPartition = withoutShuffleChildrenNumPartitions.max
92+
if (withoutShuffleMaxNumPartition * 1.0 / maxNumPartition >= conf.withoutShuffleSideRatio) {
93+
withoutShuffleMaxNumPartition
9794
} else {
98-
childrenNumPartitions.max
95+
maxNumPartition
9996
}
10097
} else {
101-
childrenNumPartitions.max
98+
maxNumPartition
10299
}
103100

104101
val targetNumPartitions = requiredNumPartitions.getOrElse(expectedChildrenNumPartitions)

sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -795,4 +795,43 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils {
795795
}
796796
}
797797

798+
test("Support spark.sql.shuffle.withoutShuffleSideRatio") {
799+
// numBuckets >= spark.sql.shuffle.partitions
800+
withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "5") {
801+
val bucketSpec = Some(BucketSpec(6, Seq("i", "j"), Nil))
802+
val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = false)
803+
val bucketedTableTestSpecRight = BucketedTableTestSpec(None, expectedShuffle = true)
804+
testBucketing(
805+
bucketedTableTestSpecLeft = bucketedTableTestSpecLeft,
806+
bucketedTableTestSpecRight = bucketedTableTestSpecRight,
807+
joinCondition = joinCondition(Seq("i", "j"))
808+
)
809+
}
810+
811+
// numBuckets < spark.sql.shuffle.partitions
812+
withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "5") {
813+
val bucketSpec = Some(BucketSpec(4, Seq("i", "j"), Nil))
814+
val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = true)
815+
val bucketedTableTestSpecRight = BucketedTableTestSpec(None, expectedShuffle = true)
816+
testBucketing(
817+
bucketedTableTestSpecLeft = bucketedTableTestSpecLeft,
818+
bucketedTableTestSpecRight = bucketedTableTestSpecRight,
819+
joinCondition = joinCondition(Seq("i", "j"))
820+
)
821+
}
822+
823+
// numBuckets < spark.sql.shuffle.partitions and withoutShuffleSideRatio = 0.1
824+
withSQLConf(
825+
SQLConf.SHUFFLE_PARTITIONS.key -> "5",
826+
SQLConf.SHUFFLE_WITHOUT_SHUFFLE_SIDE_RATIO.key -> "0.1") {
827+
val bucketSpec = Some(BucketSpec(4, Seq("i", "j"), Nil))
828+
val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = false)
829+
val bucketedTableTestSpecRight = BucketedTableTestSpec(None, expectedShuffle = true)
830+
testBucketing(
831+
bucketedTableTestSpecLeft = bucketedTableTestSpecLeft,
832+
bucketedTableTestSpecRight = bucketedTableTestSpecRight,
833+
joinCondition = joinCondition(Seq("i", "j"))
834+
)
835+
}
836+
}
798837
}

0 commit comments

Comments
 (0)