[SPARK-46037][SQL] When Left Join build Left and codegen is turned off, ShuffledHashJoinExec may result in incorrect results#43938
Conversation
|
ping @cloud-fan |
| // Put join left side before right side. This is to be consistent with | ||
| // `ShuffledHashJoinExec.fullOuterJoin`. | ||
| if ((joinType == FullOuter || joinType == LeftOuter) && buildSide == BuildLeft) { | ||
| // Put join left side before right side. This is to be consistent with ShuffledHashJoinExec. |
There was a problem hiding this comment.
I wonder whether this whole boundCondition block can be simplified to
Predicate.create(condition.get, left.output ++ right.output).eval _
since the left side is always on the left and the right side is always on the right.
(Edit: the proposed simplification in ⬆️ is not correct (the bound condition input schema is not necessarily the join's output schema); see later comments for discussion of an alternative simplification).
Further down in this file, we have
showing that buildPlan and streamPlan are just re-mappings of left and right depending on the build side.
Several years ago, it looks like we used to do something similar to my proposal:
but we switched to the current use of buildPlan and streamPlan in a refactoring in #12102 (I'm not fully clear on why).
There was a problem hiding this comment.
+1, It's fragile to keep 2 pieces of code in sync.
There was a problem hiding this comment.
Oh I guess it's wrong for inner join + build left?
There was a problem hiding this comment.
I found this
protected def createResultProjection(): (InternalRow) => InternalRow = joinType match {
case LeftExistence(_) =>
UnsafeProjection.create(output, output)
case _ =>
// Always put the stream side on left to simplify implementation
// both of left and right side could be null
UnsafeProjection.create(
output, (streamedPlan.output ++ buildPlan.output).map(_.withNullability(true)))
}
I think stream side left and build side right is the common case, but we have some special cases for outer joins.
There was a problem hiding this comment.
Actually I don't know why outer join needs to be a special case. Can't we always put streaming side left and build side right?
There was a problem hiding this comment.
HashJoin.scala
override def output: Seq[Attribute] = {
joinType match {
case _: InnerLike =>
left.output ++ right.output
case LeftOuter =>
left.output ++ right.output.map(_.withNullability(true))
case RightOuter =>
left.output.map(_.withNullability(true)) ++ right.output
case j: ExistenceJoin =>
left.output :+ j.exists
case LeftExistence(_) =>
left.output
case x =>
throw new IllegalArgumentException(s"HashJoin should not take $x as the JoinType")
}
}ShuffledJoin.scala
override def output: Seq[Attribute] = {
joinType match {
case _: InnerLike =>
left.output ++ right.output
case LeftOuter =>
left.output ++ right.output.map(_.withNullability(true))
case RightOuter =>
left.output.map(_.withNullability(true)) ++ right.output
case FullOuter =>
(left.output ++ right.output).map(_.withNullability(true))
case j: ExistenceJoin =>
left.output :+ j.exists
case LeftExistence(_) =>
left.output
case x =>
throw new IllegalArgumentException(
s"${getClass.getSimpleName} not take $x as the JoinType")
}
}@cloud-fan Output determines that we cannot simply put streaming side left and build side right.
Am I understanding correctly?
There was a problem hiding this comment.
Yea that's why we have a createResultProjection to reorder the columns.
There was a problem hiding this comment.
Ah, I think I understand now:
In all of the *Join methods in this file (which doesn't include full outer join), the streamed side is always the left side of the joinRow that is passed to boundCondition: in HashJoin.scala, the input to boundCondition is not necessarily the same as the output of the join operator itself (for that, we have resultProj which comes from createResultProjection which remains in sync with boundCondition (source).
However, boundCondition is inherited in ShuffledHashJoinExec and there we have a buildSideOrFullOuterJoin method (source) and there the resultProjection operates over the same input schema as the join output schema (rather than assuming that the joined row always has the streamed side on the left). In that code, boundCondition is evaluated over an input that matches the join output schema and there the streamed side could be on either side rather than only the left.
I think this inheritance is confusing and hard to reason about.
It seems like HashJoin.scala has an invariant of "streamed side always on left" which gets violated in ShuffleHashJoin.scala's separate implementation of outer joins.
I wonder whether we can address this bug by modifying ShuffledHashJoinExec.buildSideOrFullOuterJoin so that it always unconditionally places the streamed side on the left (the same as HashJoin.scala's default). (Edit: I'm basically agreeing with @cloud-fan's suggestion above).
| withSQLConf(SQLConf.ENABLE_BUILD_SIDE_OUTER_SHUFFLED_HASH_JOIN_CODEGEN.key -> "false") { | ||
| val df1 = sql( | ||
| """ | ||
| |SELECT /*+ SHUFFLE_HASH(t1) */ * |
There was a problem hiding this comment.
In the golden file test outer-join.sql, we run it multiple times to test different code paths: broadcast join, shuffle join, sort merge join. code on/off etc., can we move the test there?
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
…without codegen ### What changes were proposed in this pull request? This is a re-submitting of #43938 to fix a join correctness bug caused by #41398 . Credits go to mcdull-zhang ### Why are the changes needed? correctness fix ### Does this PR introduce _any_ user-facing change? Yes, the query result will be corrected. ### How was this patch tested? new test ### Was this patch authored or co-authored using generative AI tooling? no Closes #47905 from cloud-fan/join. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…without codegen This is a re-submitting of #43938 to fix a join correctness bug caused by #41398 . Credits go to mcdull-zhang correctness fix Yes, the query result will be corrected. new test no Closes #47905 from cloud-fan/join. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit af5e0a2) Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
When Left Join build Left and codegen is turned off, ShuffledHashJoinExec may have incorrect results.
The cause of the problem is: the left side (buildPlan) of LeftJoinBuildLeft is placed in the first half of joinRow
Why are the changes needed?
Make the query results correct.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
unit test.
Was this patch authored or co-authored using generative AI tooling?
No.