Skip to content

ARROW-10885: [Rust][DataFusion] Optimize hash join build vs probe order based on number of rows#8961

Closed
Dandandan wants to merge 19 commits intoapache:masterfrom
Dandandan:rows_hash
Closed

ARROW-10885: [Rust][DataFusion] Optimize hash join build vs probe order based on number of rows#8961
Dandandan wants to merge 19 commits intoapache:masterfrom
Dandandan:rows_hash

Conversation

@Dandandan
Copy link
Contributor

@Dandandan Dandandan commented Dec 18, 2020

This PR uses the num_rows statistics to implement a common optimization to use the smallest table for the build phase.
This is a good heuristic, as to have the smallest table used in the build phase leads to less items to be inserted to the hash table, in particular if the size of tables is very imbalanced.

Some notes:

  • The optimization works on the LogicalPlan by swapping left and right, the join type and the key order. This seems currently the easiest place to add it, as there is no cost based optimizer and/or optimizers on the physical plan yet. The optimization rule assumes that the left part of the join will be used for the build phase and the right part for the probe phase.
  • It requires the number of rows to be exactly known, so it will not work whenever there is a transformation changing the number of rows, except for limit. The idea here is that in other cases, it is very hard to estimate the number of resulting rows.
  • The impact currently is measurable on queries with a bigger left side of an (inner) join

FYI @andygrove @jorgecarleitao

@github-actions
Copy link

@codecov-io
Copy link

codecov-io commented Dec 18, 2020

Codecov Report

Merging #8961 (1430e0d) into master (a054c78) will decrease coverage by 0.03%.
The diff coverage is 61.70%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #8961      +/-   ##
==========================================
- Coverage   83.20%   83.16%   -0.04%     
==========================================
  Files         199      200       +1     
  Lines       48857    48946      +89     
==========================================
+ Hits        40651    40708      +57     
- Misses       8206     8238      +32     
Impacted Files Coverage Δ
rust/datafusion/src/logical_plan/plan.rs 88.12% <ø> (ø)
rust/datafusion/src/physical_plan/hash_utils.rs 97.10% <ø> (ø)
rust/datafusion/src/physical_plan/planner.rs 80.45% <ø> (ø)
rust/datafusion/src/sql/parser.rs 86.87% <ø> (ø)
...datafusion/src/optimizer/hash_build_probe_order.rs 59.09% <59.09%> (ø)
rust/datafusion/src/execution/context.rs 90.00% <100.00%> (+0.01%) ⬆️
...t/datafusion/src/optimizer/projection_push_down.rs 97.70% <100.00%> (ø)
rust/datafusion/src/optimizer/utils.rs 61.75% <100.00%> (ø)
rust/datafusion/src/physical_plan/hash_join.rs 92.77% <100.00%> (ø)
rust/datafusion/src/sql/planner.rs 84.79% <100.00%> (ø)
... and 4 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update a054c78...1430e0d. Read the comment docs.

LogicalPlan::Projection { input, .. } => get_num_rows(input),
LogicalPlan::Sort { input, .. } => get_num_rows(input),
LogicalPlan::TableScan { source, .. } => source.statistics().num_rows,
LogicalPlan::EmptyRelation {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is relevant, where this is used?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good. This is used for projections without an input, such as SELECT 1.

right: left.clone(),
on: on
.iter()
.map(|(l, r)| (r.to_string(), l.to_string()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in theory, this is unnecessary since there are no restrictions in SQL on order of join conditions. However, it is possible we do make some assumptions so if you ean into that it would be good to file an issue for it.

Copy link
Contributor Author

@Dandandan Dandandan Dec 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, was surprised by it that I had to. Currently it fails when you change the order (e.g. in a query already) without changing the key order.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

} => {
if should_swap_join_order(left, right) {
// Swap left and right, change join type and (equi-)join key order
Ok(LogicalPlan::Join {
Copy link
Member

@andygrove andygrove Dec 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if it is relevant, but when I have done this in other projects, I have wrapped the swapped join in a projection to preserve the column ordering of the output. This would be less surprising to a user if for some reason there is no final projection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the order explicit in the schema (which is the same) or will it be changed based on swapping left and right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a unit test for this so that we know the answer to that question, I think.

@andygrove
Copy link
Member

Thanks @Dandandan this is looking great 🚀

pub struct HashBuildProbeOrder {}

// Gets exact number of rows, if known by the statistics of the underlying
fn get_num_rows(logical_plan: &LogicalPlan) -> Option<usize> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have been thinking about what we can do to estimate the number of rows coming out of joins so that we can extend this optimization to nested joins. We can't do anything accurate with the current statistics in this case but I feel that we should try and do something rather than just pick the left side as the build side.

One idea is to assume that all joins produce a cartesian product (left row count * right row count). This would at least help in the case where two small tables are joined, and then joined with a huge table, or the other way around.

Copy link
Contributor Author

@Dandandan Dandandan Dec 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed I think that could be very beneficial but estimating it before executing might be really hard / impossible?

Also, if using the left as build side wrong, at this moment, the order could be changed by the user by changing the query itself, which you lose by having a heuristic that can be wrong (or you have to provide some other mechanism , e.g. providing a query hint)

I think ideally you should be able to know more about the table size when the query is executing (a la Spark 3 adaptive query execution) so you don't do the wrong thing. BigQuery also has a nice strategy / explanation for this https://cloud.google.com/bigquery/query-plan-explanation This probably requires quite a bit of changes on the execution / planning side, but this would bring much more available statistics to each step during execution to be able to change optimize the plan further.

Copy link
Member

@andygrove andygrove Dec 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some databases use the STRAIGHT_JOIN modifier to force joins to happen in the user-specified order. This is from Impala docs:

If statistics are not available for all the tables in the join query, or if Impala chooses a join order that is not the most efficient, you can override the automatic join order optimization by specifying the STRAIGHT_JOIN keyword immediately after the SELECT and any DISTINCT or ALL keywords. In this case, Impala uses the order the tables appear in the query to guide how the joins are processed.

I think we can merge this PR as is and continue this discussion. Spark's AQE approach would mean that we have the statistics, but only if we load both sides into memory first (or scan them first for row counts) which would possibly defeat the point of this optimization. It would also mean that the next operator in the query plan wouldn't be able to start streaming until the join has completed? This is a tricky area.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andygrove
Copy link
Member

I do think that we should start looking at optimizations on the physical plan and eventually move this optimization there. I also do think that an adaptive execution approach makes sense, especially in a distributed context. I think it might not work so well for the current single node / in-process execution approach though.

@Dandandan Dandandan marked this pull request as ready for review December 18, 2020 18:19
@seddonm1
Copy link
Contributor

This is awesome work @Dandandan 👍

@Dandandan
Copy link
Contributor Author

Also related to #8965 which stops generating/using an index for the probe side.

@Dandandan
Copy link
Contributor Author

Dandandan commented Dec 19, 2020

I checked merging the other PR #8965 which improves the join implementation.

Besides being much faster regardless of this PR, reordering gives a further ~15% reduction in time when reordering the following query (6001214 left vs 1499999 rows on the right)

select
                l_shipmode,
                sum(case
                    when o_orderpriority = '1-URGENT'
                        or o_orderpriority = '2-HIGH'
                        then 1
                    else 0
                end) as high_line_count,
                sum(case
                    when o_orderpriority <> '1-URGENT'
                        and o_orderpriority <> '2-HIGH'
                        then 1
                    else 0
                end) as low_line_count
            from
                lineitem
            join
                orders
            on
                l_orderkey = o_orderkey
            group by
                l_shipmode
            order by
                l_shipmode;"

@Dandandan
Copy link
Contributor Author

@andygrove
Copy link
Member

@Dandandan This needs rebasing - I tried merging into master locally before merging this and got some compilation errors.

error[E0050]: method `scan` has 3 parameters but the declaration in trait `datasource::datasource::TableProvider::scan` has 4
   --> datafusion/src/optimizer/hash_build_probe_order.rs:179:13
    |
179 | /             &self,
180 | |             _projection: &Option<Vec<usize>>,
181 | |             _batch_size: usize,
    | |______________________________^ expected 4 parameters, found 3
    | 
   ::: datafusion/src/datasource/datasource.rs:66:9
    |
66  | /         &self,
67  | |         projection: &Option<Vec<usize>>,
68  | |         batch_size: usize,
69  | |         filters: &[Expr],
    | |________________________- trait requires 4 parameters

error: cannot construct `plan::LogicalPlan` with struct literal syntax due to inaccessible fields
   --> datafusion/src/optimizer/hash_build_probe_order.rs:204:23
    |
204 |         let lp_left = LogicalPlan::TableScan {
    |                       ^^^^^^^^^^^^^^^^^^^^^^

error: cannot construct `plan::LogicalPlan` with struct literal syntax due to inaccessible fields
   --> datafusion/src/optimizer/hash_build_probe_order.rs:211:24
    |
211 |         let lp_right = LogicalPlan::TableScan {
    |                        ^^^^^^^^^^^^^^^^^^^^^^

@Dandandan
Copy link
Contributor Author

@andygrove thanks, will do. Will also enable it now that #8965 is merged.

@Dandandan
Copy link
Contributor Author

@andygrove updated & enabled the optimization now.

@andygrove
Copy link
Member

I merged with master locally and tested it and see a speedup. I will merge when CI is green. Thanks @Dandandan

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants