[SPARK-36612][SQL] Support left outer join build left or right outer join build right in shuffled hash join#41398
Conversation
…join build right in shuffled hash join
9d44062 to
ea32dff
Compare
|
Thank you for making a PR, @szehon-ho . |
937f1ee to
532964b
Compare
|
Also, cc @viirya , @huaxingao , @sunchao , too. |
dbd8960 to
6089beb
Compare
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
Outdated
Show resolved
Hide resolved
|
@szehon-ho Thanks for the PR! The change looks reasonable to me. I have left a few minor comments. |
|
cc @maryannxue |
| override def outputOrdering: Seq[SortOrder] = joinType match { | ||
| case FullOuter => Nil | ||
| case LeftOuter if buildSide == BuildLeft => Nil | ||
| case RightOuter if buildSide == BuildRight => Nil |
There was a problem hiding this comment.
can we add some comments to explain why the ordering can't be preserved?
There was a problem hiding this comment.
Sure, added a comment as per my understanding (please let me know if I misunderstand something)
My thought was, because the second iteration on the build-side (for outer-join semantic) is on a hashedRelation, the result cannot be in order.
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
Outdated
Show resolved
Hide resolved
86c86d0 to
505e234
Compare
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
Outdated
Show resolved
Hide resolved
|
LGTM |
|
@szehon-ho Could you re-trigger the failed CI pipeline? |
|
Yea I couldn't reproduce errors, trying again. |
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
Outdated
Show resolved
Hide resolved
| sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1, t2)" :: Nil)), BuildLeft) | ||
| assertShuffleHashJoin( | ||
| sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1, t2)" :: Nil, "left")), BuildRight) | ||
| sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1, t2)" :: Nil, "left")), BuildLeft) |
There was a problem hiding this comment.
In this situation, t1 is smaller than t2, so it now picks t1. Before it was not possible to pick t1 and so t2 was picked.
There was a problem hiding this comment.
Yes, I meant that the original test coverage (BuildRight) is removed and lost.
| val shjDF = df2.join(df1.hint("SHUFFLE_HASH"), joinExprs, "rightouter") | ||
| assert(collect(shjDF.queryExecution.executedPlan) { | ||
| case _: ShuffledHashJoinExec => true | ||
| }.size === 1) |
There was a problem hiding this comment.
Do we need to verify build side of ShuffledHashJoinExec is BuildLeft here? Or hint is always working?
viirya
left a comment
There was a problem hiding this comment.
Only two minor comments left, otherwise looks good to me.
@viirya Thanks a lot for taking a look! Since these are minor comments for tests, I will merge this PR first, we will follow up after @szehon-ho comes back from vacation. |
|
Merged to master. Thanks @szehon-ho and et al. |
|
cc @c21 too |
|
Thanks everyone for the warm welcome to Spark, and really fast reviews! As I'm out of town, I will look at any follow up improvements when I'm back. |
…join build right in shuffled hash join ### What changes were proposed in this pull request? Add support for shuffle-hash join for following scenarios: * left outer join with left-side build * right outer join with right-side build The algorithm is similar to SPARK-32399, which supports shuffle-hash join for full outer join. The same methods fullOuterJoinWithUniqueKey and fullOuterJoinWithNonUniqueKey are improved to support the new case. These methods are called after the HashedRelation is already constructed of the build side, and do these two iterations: 1. Iterate Stream side. a. If find match on build side, mark. b. If no match on build side, join with null build-side row and add to result 2. Iterate build side. a. If find marked for match, add joined row to result b. If no match marked, join with null stream-side row The left outer join with left-side build, and right outer join with right-side build, need only a subset of these logics, namely replacing 1b above with a no-op. Codegen is left for a follow-up PR. ### Why are the changes needed? For joins of these types, shuffle-hash join can be more performant than sort-merge join, especially if the big table is large, as it skips an expensive sort of the big table. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Unit test in JoinSuite.scala Closes apache#41398 from szehon-ho/same_side_outer_build_join_master. Authored-by: Szehon Ho <[email protected]> Signed-off-by: huaxingao <[email protected]>
### What changes were proposed in this pull request? Codegen of shuffled hash join of build side outer join (ie, left outer join build left or right outer join build right) The implementation of apache#41398 was only for non-codegen version, and codegen was disabled in this scenario. No New unit test in WholeStageCodegenSuite
…join build right in shuffled hash join ### What changes were proposed in this pull request? Add support for shuffle-hash join for following scenarios: * left outer join with left-side build * right outer join with right-side build The algorithm is similar to SPARK-32399, which supports shuffle-hash join for full outer join. The same methods fullOuterJoinWithUniqueKey and fullOuterJoinWithNonUniqueKey are improved to support the new case. These methods are called after the HashedRelation is already constructed of the build side, and do these two iterations: 1. Iterate Stream side. a. If find match on build side, mark. b. If no match on build side, join with null build-side row and add to result 2. Iterate build side. a. If find marked for match, add joined row to result b. If no match marked, join with null stream-side row The left outer join with left-side build, and right outer join with right-side build, need only a subset of these logics, namely replacing 1b above with a no-op. Codegen is left for a follow-up PR. ### Why are the changes needed? For joins of these types, shuffle-hash join can be more performant than sort-merge join, especially if the big table is large, as it skips an expensive sort of the big table. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Unit test in JoinSuite.scala Closes apache#41398 from szehon-ho/same_side_outer_build_join_master. Authored-by: Szehon Ho <[email protected]> Signed-off-by: huaxingao <[email protected]> (cherry picked from commit 0effbec)
### What changes were proposed in this pull request? Codegen of shuffled hash join of build side outer join (ie, left outer join build left or right outer join build right) ### Why are the changes needed? The implementation of apache#41398 was only for non-codegen version, and codegen was disabled in this scenario. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New unit test in WholeStageCodegenSuite
### What changes were proposed in this pull request? Codegen of shuffled hash join of build side outer join (ie, left outer join build left or right outer join build right) ### Why are the changes needed? The implementation of #41398 was only for non-codegen version, and codegen was disabled in this scenario. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New unit test in WholeStageCodegenSuite Closes #41614 from szehon-ho/same_side_outer_join_codegen_master. Authored-by: Szehon Ho <[email protected]> Signed-off-by: huaxingao <[email protected]>
…without codegen ### What changes were proposed in this pull request? This is a re-submitting of #43938 to fix a join correctness bug caused by #41398 . Credits go to mcdull-zhang ### Why are the changes needed? correctness fix ### Does this PR introduce _any_ user-facing change? Yes, the query result will be corrected. ### How was this patch tested? new test ### Was this patch authored or co-authored using generative AI tooling? no Closes #47905 from cloud-fan/join. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…without codegen This is a re-submitting of #43938 to fix a join correctness bug caused by #41398 . Credits go to mcdull-zhang correctness fix Yes, the query result will be corrected. new test no Closes #47905 from cloud-fan/join. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit af5e0a2) Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
Add support for shuffle-hash join for following scenarios:
The algorithm is similar to SPARK-32399, which supports shuffle-hash join for full outer join.
The same methods fullOuterJoinWithUniqueKey and fullOuterJoinWithNonUniqueKey are improved to support the new case. These methods are called after the HashedRelation is already constructed of the build side, and do these two iterations:
a. If find match on build side, mark.
b. If no match on build side, join with null build-side row and add to result
a. If find marked for match, add joined row to result
b. If no match marked, join with null stream-side row
The left outer join with left-side build, and right outer join with right-side build, need only a subset of these logics, namely replacing 1b above with a no-op.
Codegen is left for a follow-up PR.
Why are the changes needed?
For joins of these types, shuffle-hash join can be more performant than sort-merge join, especially if the big table is large, as it skips an expensive sort of the big table.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Unit test in JoinSuite.scala