Skip to content

Commit d620940

Browse files
committed
Address all comments beside the separate configs discussion
1 parent f0ef970 commit d620940

File tree

2 files changed

+65
-40
lines changed

2 files changed

+65
-40
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -40,25 +40,30 @@ import org.apache.spark.sql.internal.SQLConf
4040
* COALESCE_BUCKETS_IN_SHUFFLED_HASH_JOIN_MAX_BUCKET_RATIO (`ShuffledHashJoin`).
4141
*/
4242
case class CoalesceBucketsInJoin(conf: SQLConf) extends Rule[SparkPlan] {
43+
private def updateNumCoalescedBucketsInScan(
44+
plan: SparkPlan,
45+
numCoalescedBuckets: Int): SparkPlan = {
46+
plan transformUp {
47+
case f: FileSourceScanExec =>
48+
f.copy(optionalNumCoalescedBuckets = Some(numCoalescedBuckets))
49+
}
50+
}
51+
4352
private def updateNumCoalescedBuckets(
4453
join: BaseJoinExec,
4554
numLeftBuckets: Int,
4655
numRightBucket: Int,
4756
numCoalescedBuckets: Int): BaseJoinExec = {
4857
if (numCoalescedBuckets != numLeftBuckets) {
49-
val leftCoalescedChild = join.left transformUp {
50-
case f: FileSourceScanExec =>
51-
f.copy(optionalNumCoalescedBuckets = Some(numCoalescedBuckets))
52-
}
58+
val leftCoalescedChild =
59+
updateNumCoalescedBucketsInScan(join.left, numCoalescedBuckets)
5360
join match {
5461
case j: SortMergeJoinExec => j.copy(left = leftCoalescedChild)
5562
case j: ShuffledHashJoinExec => j.copy(left = leftCoalescedChild)
5663
}
5764
} else {
58-
val rightCoalescedChild = join.right transformUp {
59-
case f: FileSourceScanExec =>
60-
f.copy(optionalNumCoalescedBuckets = Some(numCoalescedBuckets))
61-
}
65+
val rightCoalescedChild =
66+
updateNumCoalescedBucketsInScan(join.right, numCoalescedBuckets)
6267
join match {
6368
case j: SortMergeJoinExec => j.copy(right = rightCoalescedChild)
6469
case j: ShuffledHashJoinExec => j.copy(right = rightCoalescedChild)
@@ -160,12 +165,12 @@ object ExtractJoinWithBuckets {
160165

161166
def unapply(plan: SparkPlan): Option[(BaseJoinExec, Int, Int)] = {
162167
plan match {
163-
case s: BaseJoinExec if isApplicable(s) =>
164-
val leftBucket = getBucketSpec(s.left)
165-
val rightBucket = getBucketSpec(s.right)
168+
case j: BaseJoinExec if isApplicable(j) =>
169+
val leftBucket = getBucketSpec(j.left)
170+
val rightBucket = getBucketSpec(j.right)
166171
if (leftBucket.isDefined && rightBucket.isDefined &&
167172
isDivisible(leftBucket.get.numBuckets, rightBucket.get.numBuckets)) {
168-
Some(s, leftBucket.get.numBuckets, rightBucket.get.numBuckets)
173+
Some(j, leftBucket.get.numBuckets, rightBucket.get.numBuckets)
169174
} else {
170175
None
171176
}

sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoinSuite.scala

Lines changed: 48 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@ import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
3030
import org.apache.spark.sql.types.{IntegerType, StructType}
3131

3232
class CoalesceBucketsInJoinSuite extends SQLTestUtils with SharedSparkSession {
33-
private val sortMergeJoin = "sortMergeJoin"
34-
private val shuffledHashJoin = "shuffledHashJoin"
35-
private val broadcastHashJoin = "broadcastHashJoin"
33+
private val SORT_MERGE_JOIN = "sortMergeJoin"
34+
private val SHUFFLED_HASH_JOIN = "shuffledHashJoin"
35+
private val BROADCAST_HASH_JOIN = "broadcastHashJoin"
3636

3737
case class RelationSetting(
3838
cols: Seq[Attribute],
@@ -58,7 +58,7 @@ class CoalesceBucketsInJoinSuite extends SQLTestUtils with SharedSparkSession {
5858
def apply(
5959
l: RelationSetting,
6060
r: RelationSetting,
61-
joinOperator: String = sortMergeJoin,
61+
joinOperator: String = SORT_MERGE_JOIN,
6262
shjBuildSide: Option[BuildSide] = None): JoinSetting = {
6363
JoinSetting(l.cols, r.cols, l, r, joinOperator, shjBuildSide)
6464
}
@@ -82,17 +82,17 @@ class CoalesceBucketsInJoinSuite extends SQLTestUtils with SharedSparkSession {
8282
leftRelation = setting.rightRelation,
8383
rightRelation = setting.leftRelation)
8484

85-
val settings = if (setting.joinOperator != shuffledHashJoin) {
85+
val settings = if (setting.joinOperator != SHUFFLED_HASH_JOIN) {
8686
Seq(setting, swappedSetting)
8787
} else {
8888
Seq(setting)
8989
}
9090
settings.foreach { s =>
9191
val lScan = newFileSourceScanExec(s.leftRelation)
9292
val rScan = newFileSourceScanExec(s.rightRelation)
93-
val join = if (s.joinOperator == sortMergeJoin) {
93+
val join = if (s.joinOperator == SORT_MERGE_JOIN) {
9494
SortMergeJoinExec(s.leftKeys, s.rightKeys, Inner, None, lScan, rScan)
95-
} else if (s.joinOperator == shuffledHashJoin) {
95+
} else if (s.joinOperator == SHUFFLED_HASH_JOIN) {
9696
ShuffledHashJoinExec(s.leftKeys, s.rightKeys, Inner, s.shjBuildSide.get, None, lScan, rScan)
9797
} else {
9898
BroadcastHashJoinExec(
@@ -121,47 +121,59 @@ class CoalesceBucketsInJoinSuite extends SQLTestUtils with SharedSparkSession {
121121
test("bucket coalescing - basic") {
122122
withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") {
123123
run(JoinSetting(
124-
RelationSetting(4, None), RelationSetting(8, Some(4)), joinOperator = sortMergeJoin))
124+
RelationSetting(4, None), RelationSetting(8, Some(4)), joinOperator = SORT_MERGE_JOIN))
125125
run(JoinSetting(
126-
RelationSetting(4, None), RelationSetting(8, Some(4)), joinOperator = shuffledHashJoin,
126+
RelationSetting(4, None), RelationSetting(8, Some(4)), joinOperator = SHUFFLED_HASH_JOIN,
127127
shjBuildSide = Some(BuildLeft)))
128-
// Coalescing bucket should not happen when the target is on shuffled hash join
129-
// build side.
130-
run(JoinSetting(
131-
RelationSetting(4, None), RelationSetting(8, None), joinOperator = shuffledHashJoin,
132-
shjBuildSide = Some(BuildRight)))
133128
}
129+
134130
withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "false") {
135131
run(JoinSetting(
136-
RelationSetting(4, None), RelationSetting(8, None), joinOperator = broadcastHashJoin))
132+
RelationSetting(4, None), RelationSetting(8, None), joinOperator = BROADCAST_HASH_JOIN))
133+
run(JoinSetting(
134+
RelationSetting(4, None), RelationSetting(8, None), joinOperator = SORT_MERGE_JOIN))
135+
run(JoinSetting(
136+
RelationSetting(4, None), RelationSetting(8, None), joinOperator = SHUFFLED_HASH_JOIN,
137+
shjBuildSide = Some(BuildLeft)))
138+
run(JoinSetting(
139+
RelationSetting(4, None), RelationSetting(8, None), joinOperator = SHUFFLED_HASH_JOIN,
140+
shjBuildSide = Some(BuildRight)))
137141
}
138142
}
139143

140144
test("bucket coalescing should work only for sort merge join and shuffled hash join") {
141145
Seq(true, false).foreach { enabled =>
142146
withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> enabled.toString) {
143147
run(JoinSetting(
144-
RelationSetting(4, None), RelationSetting(8, None), joinOperator = broadcastHashJoin))
148+
RelationSetting(4, None), RelationSetting(8, None), joinOperator = BROADCAST_HASH_JOIN))
145149
}
146150
}
147151
}
148152

153+
test("bucket coalescing shouldn't be applied to shuffled hash join build side") {
154+
withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") {
155+
run(JoinSetting(
156+
RelationSetting(4, None), RelationSetting(8, None), joinOperator = SHUFFLED_HASH_JOIN,
157+
shjBuildSide = Some(BuildRight)))
158+
}
159+
}
160+
149161
test("bucket coalescing shouldn't be applied when the number of buckets are the same") {
150162
withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") {
151163
run(JoinSetting(
152-
RelationSetting(8, None), RelationSetting(8, None), joinOperator = sortMergeJoin))
164+
RelationSetting(8, None), RelationSetting(8, None), joinOperator = SORT_MERGE_JOIN))
153165
run(JoinSetting(
154-
RelationSetting(8, None), RelationSetting(8, None), joinOperator = shuffledHashJoin,
166+
RelationSetting(8, None), RelationSetting(8, None), joinOperator = SHUFFLED_HASH_JOIN,
155167
shjBuildSide = Some(BuildLeft)))
156168
}
157169
}
158170

159171
test("number of bucket is not divisible by other number of bucket") {
160172
withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") {
161173
run(JoinSetting(
162-
RelationSetting(3, None), RelationSetting(8, None), joinOperator = sortMergeJoin))
174+
RelationSetting(3, None), RelationSetting(8, None), joinOperator = SORT_MERGE_JOIN))
163175
run(JoinSetting(
164-
RelationSetting(3, None), RelationSetting(8, None), joinOperator = shuffledHashJoin,
176+
RelationSetting(3, None), RelationSetting(8, None), joinOperator = SHUFFLED_HASH_JOIN,
165177
shjBuildSide = Some(BuildLeft)))
166178
}
167179
}
@@ -170,11 +182,11 @@ class CoalesceBucketsInJoinSuite extends SQLTestUtils with SharedSparkSession {
170182
withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") {
171183
withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO.key -> "2") {
172184
run(JoinSetting(
173-
RelationSetting(4, None), RelationSetting(16, None), joinOperator = sortMergeJoin))
185+
RelationSetting(4, None), RelationSetting(16, None), joinOperator = SORT_MERGE_JOIN))
174186
}
175187
withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SHUFFLED_HASH_JOIN_MAX_BUCKET_RATIO.key -> "2") {
176188
run(JoinSetting(
177-
RelationSetting(4, None), RelationSetting(16, None), joinOperator = shuffledHashJoin,
189+
RelationSetting(4, None), RelationSetting(16, None), joinOperator = SHUFFLED_HASH_JOIN,
178190
shjBuildSide = Some(BuildLeft)))
179191
}
180192
}
@@ -199,15 +211,15 @@ class CoalesceBucketsInJoinSuite extends SQLTestUtils with SharedSparkSession {
199211
rightKeys = Seq(rCols.head),
200212
leftRelation = lRel,
201213
rightRelation = rRel,
202-
joinOperator = sortMergeJoin,
214+
joinOperator = SORT_MERGE_JOIN,
203215
shjBuildSide = None))
204216

205217
run(JoinSetting(
206218
leftKeys = Seq(lCols.head),
207219
rightKeys = Seq(rCols.head),
208220
leftRelation = lRel,
209221
rightRelation = rRel,
210-
joinOperator = shuffledHashJoin,
222+
joinOperator = SHUFFLED_HASH_JOIN,
211223
shjBuildSide = Some(BuildLeft)))
212224

213225
// The following should not be coalesced because join keys do not match with output
@@ -217,15 +229,15 @@ class CoalesceBucketsInJoinSuite extends SQLTestUtils with SharedSparkSession {
217229
rightKeys = rCols :+ AttributeReference("r3", IntegerType)(),
218230
leftRelation = lRel,
219231
rightRelation = rRel,
220-
joinOperator = sortMergeJoin,
232+
joinOperator = SORT_MERGE_JOIN,
221233
shjBuildSide = None))
222234

223235
run(JoinSetting(
224236
leftKeys = lCols :+ AttributeReference("l3", IntegerType)(),
225237
rightKeys = rCols :+ AttributeReference("r3", IntegerType)(),
226238
leftRelation = lRel,
227239
rightRelation = rRel,
228-
joinOperator = shuffledHashJoin,
240+
joinOperator = SHUFFLED_HASH_JOIN,
229241
shjBuildSide = Some(BuildLeft)))
230242

231243
// The following will be coalesced since ordering should not matter because it will be
@@ -235,16 +247,24 @@ class CoalesceBucketsInJoinSuite extends SQLTestUtils with SharedSparkSession {
235247
rightKeys = rCols.reverse,
236248
leftRelation = lRel,
237249
rightRelation = RelationSetting(rCols, 8, Some(4)),
238-
joinOperator = sortMergeJoin,
250+
joinOperator = SORT_MERGE_JOIN,
239251
shjBuildSide = None))
240252

241253
run(JoinSetting(
242254
leftKeys = lCols.reverse,
243255
rightKeys = rCols.reverse,
244256
leftRelation = lRel,
245257
rightRelation = RelationSetting(rCols, 8, Some(4)),
246-
joinOperator = shuffledHashJoin,
258+
joinOperator = SHUFFLED_HASH_JOIN,
247259
shjBuildSide = Some(BuildLeft)))
260+
261+
run(JoinSetting(
262+
leftKeys = rCols.reverse,
263+
rightKeys = lCols.reverse,
264+
leftRelation = RelationSetting(rCols, 8, Some(4)),
265+
rightRelation = lRel,
266+
joinOperator = SHUFFLED_HASH_JOIN,
267+
shjBuildSide = Some(BuildRight)))
248268
}
249269
}
250270

0 commit comments

Comments
 (0)