@@ -30,9 +30,9 @@ import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
3030import org .apache .spark .sql .types .{IntegerType , StructType }
3131
3232class CoalesceBucketsInJoinSuite extends SQLTestUtils with SharedSparkSession {
33- private val sortMergeJoin = " sortMergeJoin"
34- private val shuffledHashJoin = " shuffledHashJoin"
35- private val broadcastHashJoin = " broadcastHashJoin"
33+ private val SORT_MERGE_JOIN = " sortMergeJoin"
34+ private val SHUFFLED_HASH_JOIN = " shuffledHashJoin"
35+ private val BROADCAST_HASH_JOIN = " broadcastHashJoin"
3636
3737 case class RelationSetting (
3838 cols : Seq [Attribute ],
@@ -58,7 +58,7 @@ class CoalesceBucketsInJoinSuite extends SQLTestUtils with SharedSparkSession {
5858 def apply (
5959 l : RelationSetting ,
6060 r : RelationSetting ,
61- joinOperator : String = sortMergeJoin ,
61+ joinOperator : String = SORT_MERGE_JOIN ,
6262 shjBuildSide : Option [BuildSide ] = None ): JoinSetting = {
6363 JoinSetting (l.cols, r.cols, l, r, joinOperator, shjBuildSide)
6464 }
@@ -82,17 +82,17 @@ class CoalesceBucketsInJoinSuite extends SQLTestUtils with SharedSparkSession {
8282 leftRelation = setting.rightRelation,
8383 rightRelation = setting.leftRelation)
8484
85- val settings = if (setting.joinOperator != shuffledHashJoin ) {
85+ val settings = if (setting.joinOperator != SHUFFLED_HASH_JOIN ) {
8686 Seq (setting, swappedSetting)
8787 } else {
8888 Seq (setting)
8989 }
9090 settings.foreach { s =>
9191 val lScan = newFileSourceScanExec(s.leftRelation)
9292 val rScan = newFileSourceScanExec(s.rightRelation)
93- val join = if (s.joinOperator == sortMergeJoin ) {
93+ val join = if (s.joinOperator == SORT_MERGE_JOIN ) {
9494 SortMergeJoinExec (s.leftKeys, s.rightKeys, Inner , None , lScan, rScan)
95- } else if (s.joinOperator == shuffledHashJoin ) {
95+ } else if (s.joinOperator == SHUFFLED_HASH_JOIN ) {
9696 ShuffledHashJoinExec (s.leftKeys, s.rightKeys, Inner , s.shjBuildSide.get, None , lScan, rScan)
9797 } else {
9898 BroadcastHashJoinExec (
@@ -121,47 +121,59 @@ class CoalesceBucketsInJoinSuite extends SQLTestUtils with SharedSparkSession {
121121 test(" bucket coalescing - basic" ) {
122122 withSQLConf(SQLConf .COALESCE_BUCKETS_IN_JOIN_ENABLED .key -> " true" ) {
123123 run(JoinSetting (
124- RelationSetting (4 , None ), RelationSetting (8 , Some (4 )), joinOperator = sortMergeJoin ))
124+ RelationSetting (4 , None ), RelationSetting (8 , Some (4 )), joinOperator = SORT_MERGE_JOIN ))
125125 run(JoinSetting (
126- RelationSetting (4 , None ), RelationSetting (8 , Some (4 )), joinOperator = shuffledHashJoin ,
126+ RelationSetting (4 , None ), RelationSetting (8 , Some (4 )), joinOperator = SHUFFLED_HASH_JOIN ,
127127 shjBuildSide = Some (BuildLeft )))
128- // Coalescing bucket should not happen when the target is on shuffled hash join
129- // build side.
130- run(JoinSetting (
131- RelationSetting (4 , None ), RelationSetting (8 , None ), joinOperator = shuffledHashJoin,
132- shjBuildSide = Some (BuildRight )))
133128 }
129+
134130 withSQLConf(SQLConf .COALESCE_BUCKETS_IN_JOIN_ENABLED .key -> " false" ) {
135131 run(JoinSetting (
136- RelationSetting (4 , None ), RelationSetting (8 , None ), joinOperator = broadcastHashJoin))
132+ RelationSetting (4 , None ), RelationSetting (8 , None ), joinOperator = BROADCAST_HASH_JOIN ))
133+ run(JoinSetting (
134+ RelationSetting (4 , None ), RelationSetting (8 , None ), joinOperator = SORT_MERGE_JOIN ))
135+ run(JoinSetting (
136+ RelationSetting (4 , None ), RelationSetting (8 , None ), joinOperator = SHUFFLED_HASH_JOIN ,
137+ shjBuildSide = Some (BuildLeft )))
138+ run(JoinSetting (
139+ RelationSetting (4 , None ), RelationSetting (8 , None ), joinOperator = SHUFFLED_HASH_JOIN ,
140+ shjBuildSide = Some (BuildRight )))
137141 }
138142 }
139143
140144 test(" bucket coalescing should work only for sort merge join and shuffled hash join" ) {
141145 Seq (true , false ).foreach { enabled =>
142146 withSQLConf(SQLConf .COALESCE_BUCKETS_IN_JOIN_ENABLED .key -> enabled.toString) {
143147 run(JoinSetting (
144- RelationSetting (4 , None ), RelationSetting (8 , None ), joinOperator = broadcastHashJoin ))
148+ RelationSetting (4 , None ), RelationSetting (8 , None ), joinOperator = BROADCAST_HASH_JOIN ))
145149 }
146150 }
147151 }
148152
153+ test(" bucket coalescing shouldn't be applied to shuffled hash join build side" ) {
154+ withSQLConf(SQLConf .COALESCE_BUCKETS_IN_JOIN_ENABLED .key -> " true" ) {
155+ run(JoinSetting (
156+ RelationSetting (4 , None ), RelationSetting (8 , None ), joinOperator = SHUFFLED_HASH_JOIN ,
157+ shjBuildSide = Some (BuildRight )))
158+ }
159+ }
160+
149161 test(" bucket coalescing shouldn't be applied when the number of buckets are the same" ) {
150162 withSQLConf(SQLConf .COALESCE_BUCKETS_IN_JOIN_ENABLED .key -> " true" ) {
151163 run(JoinSetting (
152- RelationSetting (8 , None ), RelationSetting (8 , None ), joinOperator = sortMergeJoin ))
164+ RelationSetting (8 , None ), RelationSetting (8 , None ), joinOperator = SORT_MERGE_JOIN ))
153165 run(JoinSetting (
154- RelationSetting (8 , None ), RelationSetting (8 , None ), joinOperator = shuffledHashJoin ,
166+ RelationSetting (8 , None ), RelationSetting (8 , None ), joinOperator = SHUFFLED_HASH_JOIN ,
155167 shjBuildSide = Some (BuildLeft )))
156168 }
157169 }
158170
159171 test(" number of bucket is not divisible by other number of bucket" ) {
160172 withSQLConf(SQLConf .COALESCE_BUCKETS_IN_JOIN_ENABLED .key -> " true" ) {
161173 run(JoinSetting (
162- RelationSetting (3 , None ), RelationSetting (8 , None ), joinOperator = sortMergeJoin ))
174+ RelationSetting (3 , None ), RelationSetting (8 , None ), joinOperator = SORT_MERGE_JOIN ))
163175 run(JoinSetting (
164- RelationSetting (3 , None ), RelationSetting (8 , None ), joinOperator = shuffledHashJoin ,
176+ RelationSetting (3 , None ), RelationSetting (8 , None ), joinOperator = SHUFFLED_HASH_JOIN ,
165177 shjBuildSide = Some (BuildLeft )))
166178 }
167179 }
@@ -170,11 +182,11 @@ class CoalesceBucketsInJoinSuite extends SQLTestUtils with SharedSparkSession {
170182 withSQLConf(SQLConf .COALESCE_BUCKETS_IN_JOIN_ENABLED .key -> " true" ) {
171183 withSQLConf(SQLConf .COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO .key -> " 2" ) {
172184 run(JoinSetting (
173- RelationSetting (4 , None ), RelationSetting (16 , None ), joinOperator = sortMergeJoin ))
185+ RelationSetting (4 , None ), RelationSetting (16 , None ), joinOperator = SORT_MERGE_JOIN ))
174186 }
175187 withSQLConf(SQLConf .COALESCE_BUCKETS_IN_SHUFFLED_HASH_JOIN_MAX_BUCKET_RATIO .key -> " 2" ) {
176188 run(JoinSetting (
177- RelationSetting (4 , None ), RelationSetting (16 , None ), joinOperator = shuffledHashJoin ,
189+ RelationSetting (4 , None ), RelationSetting (16 , None ), joinOperator = SHUFFLED_HASH_JOIN ,
178190 shjBuildSide = Some (BuildLeft )))
179191 }
180192 }
@@ -199,15 +211,15 @@ class CoalesceBucketsInJoinSuite extends SQLTestUtils with SharedSparkSession {
199211 rightKeys = Seq (rCols.head),
200212 leftRelation = lRel,
201213 rightRelation = rRel,
202- joinOperator = sortMergeJoin ,
214+ joinOperator = SORT_MERGE_JOIN ,
203215 shjBuildSide = None ))
204216
205217 run(JoinSetting (
206218 leftKeys = Seq (lCols.head),
207219 rightKeys = Seq (rCols.head),
208220 leftRelation = lRel,
209221 rightRelation = rRel,
210- joinOperator = shuffledHashJoin ,
222+ joinOperator = SHUFFLED_HASH_JOIN ,
211223 shjBuildSide = Some (BuildLeft )))
212224
213225 // The following should not be coalesced because join keys do not match with output
@@ -217,15 +229,15 @@ class CoalesceBucketsInJoinSuite extends SQLTestUtils with SharedSparkSession {
217229 rightKeys = rCols :+ AttributeReference (" r3" , IntegerType )(),
218230 leftRelation = lRel,
219231 rightRelation = rRel,
220- joinOperator = sortMergeJoin ,
232+ joinOperator = SORT_MERGE_JOIN ,
221233 shjBuildSide = None ))
222234
223235 run(JoinSetting (
224236 leftKeys = lCols :+ AttributeReference (" l3" , IntegerType )(),
225237 rightKeys = rCols :+ AttributeReference (" r3" , IntegerType )(),
226238 leftRelation = lRel,
227239 rightRelation = rRel,
228- joinOperator = shuffledHashJoin ,
240+ joinOperator = SHUFFLED_HASH_JOIN ,
229241 shjBuildSide = Some (BuildLeft )))
230242
231243 // The following will be coalesced since ordering should not matter because it will be
@@ -235,16 +247,24 @@ class CoalesceBucketsInJoinSuite extends SQLTestUtils with SharedSparkSession {
235247 rightKeys = rCols.reverse,
236248 leftRelation = lRel,
237249 rightRelation = RelationSetting (rCols, 8 , Some (4 )),
238- joinOperator = sortMergeJoin ,
250+ joinOperator = SORT_MERGE_JOIN ,
239251 shjBuildSide = None ))
240252
241253 run(JoinSetting (
242254 leftKeys = lCols.reverse,
243255 rightKeys = rCols.reverse,
244256 leftRelation = lRel,
245257 rightRelation = RelationSetting (rCols, 8 , Some (4 )),
246- joinOperator = shuffledHashJoin ,
258+ joinOperator = SHUFFLED_HASH_JOIN ,
247259 shjBuildSide = Some (BuildLeft )))
260+
261+ run(JoinSetting (
262+ leftKeys = rCols.reverse,
263+ rightKeys = lCols.reverse,
264+ leftRelation = RelationSetting (rCols, 8 , Some (4 )),
265+ rightRelation = lRel,
266+ joinOperator = SHUFFLED_HASH_JOIN ,
267+ shjBuildSide = Some (BuildRight )))
248268 }
249269 }
250270
0 commit comments