feat: Add projection to HashJoinExec.#9236
Conversation
54f4712 to
a25e272
Compare
26428da to
768e4e5
Compare
c363a01 to
cd2be59
Compare
The result in my pc is unstable, sometimes it get slower😅. |
| physical_plan | ||
| GlobalLimitExec: skip=0, fetch=5 | ||
| --SortPreservingMergeExec: [a@0 ASC NULLS LAST], fetch=5 | ||
| ----ProjectionExec: expr=[a@1 as a] |
There was a problem hiding this comment.
typical exmaple here
Nice, could you run/post the |
|
These are my results for tcph_mem, seems to be a small but reasonable speed up 🚀 : |
Thanks, @Dandandan. Currently, I don't project equivalence_properties and output_ordering. So some optimizer don't work after embed projection to HashJoinExec. I am trying to handle it. |
Done! I will add more docs tomorrow. |
|
@metesynnada PTAL |
| on.clone(), | ||
| None, | ||
| join_type, | ||
| // TODO: add a projectionexec for projection in the join |
There was a problem hiding this comment.
Please ignore this comment, I will remove it later.
| SELECT DATE_TRUNC('minute', to_timestamp_seconds("EventTime")) AS M, COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-14' AND "EventDate"::INT::DATE <= '2013-07-15' AND "IsRefresh" = 0 AND "DontCountHits" = 0 GROUP BY DATE_TRUNC('minute', to_timestamp_seconds("EventTime")) ORDER BY DATE_TRUNC('minute', M) LIMIT 10 OFFSET 1000; | ||
| ---- | ||
|
|
||
| query |
There was a problem hiding this comment.
This is changed by cargo test --test sqllogictests -- --complete automatically.
| let right_stream = self.right.execute(partition, context)?; | ||
|
|
||
| // update column indices to reflect the projection | ||
| let column_indices_after_projection = match &self.projection { |
There was a problem hiding this comment.
Project column_indices, so build_batch_from_indices can skip useless column.
| /// How the join is performed (`OUTER`, `INNER`, etc) | ||
| pub join_type: JoinType, | ||
| /// The output schema for the join | ||
| /// The schema after join |
There was a problem hiding this comment.
- Add projection
- update
equivalence_propertiesandoutput_orderingafter projection - update
column_indices - keep
HashJoinExec.schemaas the result ofbuild_join_schema, we can get the finally schema after projection throughschema()function. So we need to be careful when using it.
|
Same here -- planning to take a closer look during tomorrow, the idea in general looks good though. Thank you @my-vegetable-has-exploded |
| // as the latter may break local sorting requirements. | ||
| Arc::new(EnforceSorting::new()), | ||
| // TODO: `try_embed_to_hash_join` in the ProjectionPushdown rule would be block by the CoalesceBatches, so add it before CoalesceBatches. Maybe optimize it in the future. | ||
| Arc::new(ProjectionPushdown::new()), |
There was a problem hiding this comment.
Do we need to have two ProjectionPushdown? The original can be removed?
There was a problem hiding this comment.
AFAICT the original will be modified to account for the new built-in projection capability and this one will be removed
There was a problem hiding this comment.
the original will be modified to account for th
Is this refers to #9111?
viirya
left a comment
There was a problem hiding this comment.
It looks good overall. I only have a few comments.
|
Thank you all for review. @Dandandan @metesynnada @korowa @viirya |
|
Thank you @my-vegetable-has-exploded ! |
Since datafusion's apache/datafusion#9236, HashJoinExec can also project.
Since datafusion's apache/datafusion#9236, HashJoinExec can also project.
Since datafusion's apache/datafusion#9236, HashJoinExec can also project.
Since datafusion's apache/datafusion#9236, HashJoinExec can also project.
* Upgrading Ballista to datafusion 37.0.0. * Better test debugging information in planner.rs * Updated test logic in planner. Since datafusion's apache/datafusion#9236, HashJoinExec can also project. * cargo fmt * cargo fix * Removed leftover comment * Make cargo clippy happy * lint * Cargo fmt * Fix tpch build * Fix comment spelling * cargo fmt
Which issue does this PR close?
ref #6768
Rationale for this change
Some projection can't be pushed down left input or right input of hash join because
filteroronneed may need some columns that won't be used in later.By embed those projection to hash join, we can reduce the cost of build_batch_from_indices in hash join (build_batch_from_indices need to can compute::take() for each column) and avoid unecessary output creation.
What changes are included in this PR?
Add a rule
try_embed_to_hash_joininphysical_optimizer/projection_pushdown.rs. More related changes are are noted in the comments.Are these changes tested?
Are there any user-facing changes?
None