ARROW-4589: [Rust] Projection push down query optimizer rule#3664
ARROW-4589: [Rust] Projection push down query optimizer rule#3664andygrove wants to merge 13 commits intoapache:masterfrom
Conversation
|
@sunchao @paddyhoran @nevi-me This is ready for review |
| use arrow::error::Result; | ||
| use std::rc::Rc; | ||
|
|
||
| /// An optimizer rules performs a transformation on a logical plan to produce an optimized logical plan. |
There was a problem hiding this comment.
nit: can we also restrict the comments to be 90 characters?
There was a problem hiding this comment.
This seems is still over 90 characters.
There was a problem hiding this comment.
I pushed a second commit to fix this
| // sort the projection otherwise we get non-deterministic behavior | ||
| projection.sort(); | ||
|
|
||
| // now that the table scan is returning a different schema we need to create a |
There was a problem hiding this comment.
Why is it "returning a different schema" - seems the same schema is still returned?
Also, to help me understand, does each plan operator has its own schema and it could be different from the global schema (e.g., the schema of the input source). If so, is the column index the index to the schema of the current schema (e.g., column index in the expr of Sort will point to the schema in the Sort plan)?
There was a problem hiding this comment.
Looks like you just found a bug. Good catch. It should be returning the schema after the projection has been applied.
Yes each plan operator has its own schema (for its output). In some cases (filter, limit, sort) the schema does not change so they can just delegate to their input relation.
Column indexes are always for the schema of the input relation.
There was a problem hiding this comment.
I pushed a fix for the bug and added a test
There was a problem hiding this comment.
Column indexes are always for the schema of the input relation.
Does the schema of the input relation change? I'm still not sure why we need to rewrite the column indexes - can they always point to the complete schema of the input source?
There was a problem hiding this comment.
The logical plan created via the SQL query planner (and the DataFrame API when we have one) does just refer to the original table schema.
The query optimizer transforms the plan and pushes the projection down to the TableScan so that we basically pretend the table only contains the columns we care about. The rest of the plan is then rewritten to be relative to that.
Each operator in the plan is relative to its input and doesn't know about the underlying table schema which could be many levels down, especially once we have joins and subqueries.
There was a problem hiding this comment.
I guess one of the reasons for doing this, other than having a concise and simple to comprehend plan, is that ultimately these are indexes into RecordBatch instances.
Let's say we have a csv/parquet file with 300 columns and the query only references 12 of them... If we don't do this rewriting then we are going to have to load a RecordBatch with 300 columns where 288 of them are empty arrays / or empty options, or we have to have some special implementation of RecordBatch which does a mapping.
| } | ||
|
|
||
| fn optimize(plan: &LogicalPlan) -> Rc<LogicalPlan> { | ||
| let rule: Rc<RefCell<OptimizerRule>> = |
There was a problem hiding this comment.
Why we need Rc and RefCell here? can we do:
let mut rule = ProjectionPushDown::new();
rule.optimize(plan).unwrap()There was a problem hiding this comment.
You are correct. I was just trying to case from ProjectionPushDown to OptimizerRule since eventually there will be a list of rules to apply. I will simplify this for now though.
|
Hi @andygrove, I'll be able to take a look on Monday GMT morning |
This PR adds the first query optimizer rule, which rewrites a logical plan to push the projection down to the TableScan.
Once this is merged, I will create a follow up PR to integrate this into the query engine so that only the necessary columns are loaded from disk.