feat: Support sort merge join with a join condition#553
Conversation
|
Some tests doesn't pass, its good to check them in DF, SMJ joined filter is still in progress |
|
Yea, that's why I marked this as a draft PR now. |
|
just checked DF on one of the failed queries it passes |
|
looks like Comet produces duplicates |
|
Right join fails in DF |
|
Filed apache/datafusion#10882 |
native/Cargo.toml
Outdated
| datafusion-physical-plan = { git = "https://github.com/apache/datafusion.git", rev = "41.0.0-rc1", default-features = false } | ||
| datafusion-physical-expr-common = { git = "https://github.com/apache/datafusion.git", rev = "41.0.0-rc1", default-features = false } | ||
| datafusion-physical-expr = { git = "https://github.com/apache/datafusion.git", rev = "41.0.0-rc1", default-features = false } | ||
| datafusion-common = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "f98693e" } |
There was a problem hiding this comment.
Use the commit of the PR apache/datafusion#12090. When the PR is merged, we can change back to DataFusion repo.
| AggregateExprBuilder::new(sum_udaf(), vec![child]) | ||
| .schema(schema) | ||
| .alias("count") | ||
| .with_ignore_nulls(false) | ||
| .with_distinct(false) | ||
| .build().map_err(|e| ExecutionError::DataFusionError(e.to_string())) |
There was a problem hiding this comment.
Again, DataFusion API changes.
| if !check_support(predicate, &schema) { | ||
| let selectivity = default_selectivity as f64 / 100.0; | ||
| let mut stats = input_stats.into_inexact(); | ||
| let mut stats = input_stats.to_inexact(); |
c27f1f6 to
bb7586f
Compare
| val left = sql("SELECT * FROM tbl_a") | ||
| val right = sql("SELECT * FROM tbl_b") | ||
|
|
||
| val df8 = |
There was a problem hiding this comment.
I feel we can also use SQL for Anti, Semi joins?
| case FullOuter => JoinType.FullOuter | ||
| case LeftSemi => JoinType.LeftSemi | ||
| case LeftAnti => JoinType.LeftAnti | ||
| // TODO: DF SMJ with join condition fails TPCH q21 |
There was a problem hiding this comment.
let me have a look on q21. I remember Anti join had issues with TPCH in DF but it was fixed
There was a problem hiding this comment.
But it might be also related to apache/datafusion#11555
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #553 +/- ##
=============================================
- Coverage 55.16% 34.20% -20.96%
- Complexity 857 888 +31
=============================================
Files 109 112 +3
Lines 10542 43071 +32529
Branches 2010 9509 +7499
=============================================
+ Hits 5815 14733 +8918
- Misses 3714 25352 +21638
- Partials 1013 2986 +1973 ☔ View full report in Codecov by Sentry. |
| } | ||
| } | ||
|
|
||
| test("full outer join") { |
There was a problem hiding this comment.
This test fails currently. It needs the fix at DataFusion apache/datafusion#12159
There was a problem hiding this comment.
The fix was merged at DataFusion. I updated Comet to use the latest commit.
| @@ -75,7 +75,6 @@ abstract class CometTestBase | |||
| conf.set(MEMORY_OFFHEAP_SIZE.key, "2g") | |||
| conf.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "1g") | |||
| conf.set(SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key, "1g") | |||
| conf.set(SQLConf.COALESCE_PARTITIONS_ENABLED.key, "false") | |||
There was a problem hiding this comment.
We don't need to set SQLConf.COALESCE_PARTITIONS_ENABLED.key false now.
We also need to remove this to trigger test failure https://github.com/apache/datafusion-comet/pull/553/files#r1730694210.
| datafusion-physical-plan = { version = "41.0.0", default-features = false } | ||
| datafusion-physical-expr-common = { version = "41.0.0", default-features = false } | ||
| datafusion-physical-expr = { version = "41.0.0", default-features = false } | ||
| datafusion-common = { git = "https://github.com/apache/datafusion.git", rev = "dff590b" } |
There was a problem hiding this comment.
Updated to latest DataFusion to use the two commits including bug fixes.
| } | ||
| } | ||
|
|
||
| fn default_value(&self, _data_type: &DataType) -> Result<ScalarValue> { |
There was a problem hiding this comment.
New DataFusion API required for AggregateExpr trait.
| withInfo(join, cond) | ||
| return None | ||
| } | ||
| condProto.get |
There was a problem hiding this comment.
is there any scenario of None.get?
There was a problem hiding this comment.
if (condProto.isEmpty) {
withInfo(join, cond)
return None
}If it is None, it will return None earlier.
| checkSparkAnswerAndOperator(df7) | ||
|
|
||
| val df8 = sql( | ||
| "SELECT * FROM tbl_a LEFT SEMI JOIN tbl_b ON tbl_a._2 = tbl_b._1 " + |
There was a problem hiding this comment.
is there a RIGHT SEMI in Spark? afair there is still no proper support in DF for RightSemi
| val left = UnresolvedRelation(TableIdentifier("left")) | ||
| val right = UnresolvedRelation(TableIdentifier("right")) | ||
|
|
||
| checkSparkAnswer(left.join(right, $"left.N" === $"right.N", "full")) |
There was a problem hiding this comment.
is it possible to rephrase it in SQL as well?
There was a problem hiding this comment.
This test is copied from Spark. I think it is good to keep it as the same.
| checkSparkAnswerAndOperator(df9) | ||
|
|
||
| // TODO: Enable these tests after fixing the issue: | ||
| // https://github.com/apache/datafusion-comet/issues/861 |
There was a problem hiding this comment.
we probably can create a separate github ticket on this to not forget enabling tests
|
Thanks @comphead |
|
I need to update plan stability results... |
* Init * test * test * test * Use specified commit to test * Fix format * fix clippy * fix * fix * Fix * Change to SQL syntax * Disable SMJ LeftAnti with join filter * Fix * Add test * Add test * Update to last DataFusion commit * fix format * fix * Update diffs (cherry picked from commit e57ead4)
Which issue does this PR close?
Closes #398.
Rationale for this change
What changes are included in this PR?
How are these changes tested?