Support non-tuple expression for in-subquery to join#4826
Support non-tuple expression for in-subquery to join#4826alamb merged 14 commits intoapache:masterfrom
Conversation
|
Please take a look, @alamb @jackwener. |
| subquery_filters.push(expr.clone()); | ||
| } else { | ||
| join_filters.push(expr.clone()) | ||
| } |
There was a problem hiding this comment.
If the expression depends on the outer table, we should move it to the join predicate.
There was a problem hiding this comment.
Yeah -- this is the decorrelation logic
| .project(projection)? | ||
| .alias(&subqry_alias)? | ||
| let subquery_expr_name = format!("{:?}", unnormalize_col(subquery_expr.clone())); | ||
| let first_expr = subquery_expr.clone().alias(subquery_expr_name.clone()); |
There was a problem hiding this comment.
For the right side of the in predicate, it looks strange if we show the original qualify name in the plan, so the qualify name is stripped.
|
I plan to review this tomorrow. THank you @ygf11 |
jackwener
left a comment
There was a problem hiding this comment.
I prepare to review this PR more carefully tonight or tomorrow.
| let expected = vec![ | ||
| "+-------+---------+--------+", | ||
| "| t1_id | t1_name | t1_int |", | ||
| "+-------+---------+--------+", | ||
| "| 11 | a | 1 |", | ||
| "| 33 | c | 3 |", | ||
| "| 44 | d | 4 |", | ||
| "+-------+---------+--------+", | ||
| ]; | ||
|
|
||
| let results = execute_to_batches(&ctx, sql).await; | ||
| assert_batches_sorted_eq!(expected, &results); |
There was a problem hiding this comment.
Now we already use sqllogicaltest to support Data Driven Tests #4460.
I suggest add a file join.slt | subquery.slt to cover some case of join and subquery.
In UT and integration-test, we just focus on correctness of Plan, data-test is derived by sqllogicaltest
| let expected = "Projection: customer.c_custkey [c_custkey:Int64]\ | ||
| \n LeftSemi Join: Filter: customer.c_custkey + Int32(1) = __correlated_sq_1.o_custkey AND customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\ | ||
| \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\ | ||
| \n SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\ | ||
| \n Projection: orders.o_custkey AS o_custkey [o_custkey:Int64]\ | ||
| \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"; | ||
|
|
||
| assert_optimized_plan_eq_display_indent( | ||
| Arc::new(DecorrelateWhereIn::new()), | ||
| &plan, | ||
| "column comparison required", | ||
| expected, |
|
Sorry for the delay in my review. I am traveling this week at a conference so I don't have as much time to devote to reviews and merging as normal |
|
|
||
| let expected = vec![ | ||
| "Explain [plan_type:Utf8, plan:Utf8]", | ||
| " Projection: t1.t1_id, t1.t1_name, t1.t1_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]", |
| SubqueryAlias: __correlated_sq_1 | ||
| Projection: lineitem.l_linestatus AS l_linestatus, lineitem.l_orderkey AS l_orderkey | ||
| TableScan: lineitem projection=[l_orderkey, l_linestatus]"#; | ||
|
|
There was a problem hiding this comment.
This change is just whitespace, right?
There was a problem hiding this comment.
Not only whitespace, the alias except the first expression are removed in the projection.
before:
SubqueryAlias: __correlated_sq_1
Projection: lineitem.l_linestatus AS l_linestatus, lineitem.l_orderkey AS l_orderkeyafter:
\n SubqueryAlias: __correlated_sq_1\
\n Projection: lineitem.l_linestatus AS l_linestatus, lineitem.l_orderkey\The first expression of the projection is the right side of in-equijoin, it may be a non-column expression, so always give it an alias, but others do not(they are alway column).
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn three_projection_exprs_subquery_to_join() -> Result<()> { |
There was a problem hiding this comment.
As a minor point, I am not sure what the extra coverage the multiple predicates in the where clause are adding
I wonder if we need all these tests?
There was a problem hiding this comment.
Added more tests -- multiple predicates in outer and subquery where clause.
| } | ||
| } | ||
|
|
||
| /// Optimize the where in subquery to left-anti/left-semi join. |
| // Query will fail, but we can still transform the plan | ||
| let expected = "Projection: customer.c_custkey [c_custkey:Int64]\ | ||
| \n LeftSemi Join: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\ | ||
| \n LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_1.o_custkey AND customer.c_custkey = customer.c_custkey [c_custkey:Int64, c_name:Utf8]\ |
There was a problem hiding this comment.
This looks like somewhat of a regression to me (the filter has been pulled out of the subquery and into the join). Is that intended?
There was a problem hiding this comment.
No, I do not handle this special case, just treat customer.c_custkey = customer.c_custkey as join filter.
I think it is better to add this to the filter of outer table, then the plan will be like:
"Projection: customer.c_custkey [c_custkey:Int64]\
LeftSemi Join: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
Filter: customer.c_custkey = customer.c_custkey [..]\
TableScan: customer [c_custkey:Int64, c_name:Utf8]\
SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\
Projection: orders.o_custkey AS o_custkey [o_custkey:Int64]\
TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"
Is this the right direction?
There was a problem hiding this comment.
After more thought it probably makes sense to do this decorrelation in the general way (no special case0 as you have done and then we will rely on subsequent predicate pushdown logic to push the filters back down if possible
There was a problem hiding this comment.
Yes, it is better to do by other rule.
But I find our predicate pushdown logic does not support these predicates(semi-join-on) now.
https://github.com/apache/arrow-datafusion/blob/dee0dd8be6745f6cb798ba56dca6c1b936d90fd6/datafusion/optimizer/src/push_down_filter.rs#L103-L115
Maybe we should support predicate push down for semi-join-on, or we need move these predicates to the outside of join, then predicate push down can work(Like following).
// for sql: select * from t1 where t1.t1_int in(select t2.t2_int from t2 where t1.t1_id > 10);
// the output of this rule:
Projection: t1.t1_id, t1.t1_name, t1.t1_int
Filter: CAST(t1.t1_id AS Int64) > Int64(10)
LeftSemi Join: Filter: t1.t1_int = __correlated_sq_2.t2_int
TableScan: t1
SubqueryAlias: __correlated_sq_2
Projection: t2.t2_int AS t2_int
TableScan: t2 | let using_cols: Vec<Column> = expr | ||
| .to_columns()? | ||
| .into_iter() | ||
| .filter(|col| input_schema.field_from_column(col).is_ok()) |
There was a problem hiding this comment.
doesn't this ignore columns that are not found in the subquery? I wonder if the error should be percolated up rather than ignored? Perhaps test coverage of a query like select ... from t where x in (select non_existent_col from foo) would be good?
Although perhaps if it was an error it signals something is wrong in the plan (as a column reference wouldn't be present in the input schema 🤔 )
If this shouldn't happen, maybe we can collect the errors and return them?
There was a problem hiding this comment.
doesn't this ignore columns that are not found in the subquery
Yes, it will ignore. But it is used to collect columns which will be add to subquery projection.
Suppose there is a query:
select * from t where x in (select c from foo where foo.a > t.a and foo.b > t.a)
foo.a > t.a and foo.b > t.a will move to join, to make the join work, we need add foo.a and foo.b to the projection of subquery, the above code does the filter work.
If the where clause references unknown columns, we need do more work here, but I think it should be done in the planner.
| subquery_filters.push(expr.clone()); | ||
| } else { | ||
| join_filters.push(expr.clone()) | ||
| } |
There was a problem hiding this comment.
Yeah -- this is the decorrelation logic
Co-authored-by: Andrew Lamb <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
| // Query will fail, but we can still transform the plan | ||
| let expected = "Projection: customer.c_custkey [c_custkey:Int64]\ | ||
| \n LeftSemi Join: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\ | ||
| \n LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_1.o_custkey AND customer.c_custkey = customer.c_custkey [c_custkey:Int64, c_name:Utf8]\ |
There was a problem hiding this comment.
After more thought it probably makes sense to do this decorrelation in the general way (no special case0 as you have done and then we will rely on subsequent predicate pushdown logic to push the filters back down if possible
| let expected = "Projection: test.b [b:UInt32]\ | ||
| \n Filter: test.c > UInt32(1) [a:UInt32, b:UInt32, c:UInt32]\ | ||
| \n LeftSemi Join: Filter: test.c * UInt32(2) = __correlated_sq_2.c * UInt32(2) AND test.a > __correlated_sq_2.a [a:UInt32, b:UInt32, c:UInt32]\ | ||
| \n Filter: test.c > UInt32(1) [a:UInt32, b:UInt32, c:UInt32]\ |
There was a problem hiding this comment.
Filter: test.c > UInt32(1) happen twice, it is better to only add once #4914.
I will fix it in the following pr.
| " SubqueryAlias: __correlated_sq_1 [CAST(t2_id AS Int64) + Int64(1):Int64;N]", | ||
| " Projection: CAST(t2.t2_id AS Int64) + Int64(1) AS CAST(t2_id AS Int64) + Int64(1) [CAST(t2_id AS Int64) + Int64(1):Int64;N]", | ||
| " TableScan: t2 projection=[t2_id] [t2_id:UInt32;N]", | ||
| ]; |
There was a problem hiding this comment.
This test case show that the special case t1.t1_int > 0 is not pushed down after optimizing.
There was a problem hiding this comment.
Is that something you would like to fix in this PR or is it something you would like to fix in a follow on?
|
Thanks for sticking with this @ygf11 |
|
Benchmark runs are scheduled for baseline = d49c805 and contender = e2daee9. e2daee9 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Which issue does this PR close?
Closes #4724.
Rationale for this change
This sql works in datafusion:
but following sql does not work:
What changes are included in this PR?
optimize_where_inindecorrelate_where_in.rs.Are these changes tested?
Yes.
Are there any user-facing changes?