Conversation
|
Thank you @jayzhan211 -- I plan to review this tomorrow |
alamb
left a comment
There was a problem hiding this comment.
Thank you so much for working on this @jayzhan211 . I had some comments about the API. Sorry for the delay in reviews
| WindowFunctionDefinition::AggregateUDF(fun) => { | ||
| let aggregate = | ||
| udaf::create_aggregate_expr(fun.as_ref(), args, input_schema, name)?; | ||
| // TODO: Ordering not supported for Window UDFs yet |
There was a problem hiding this comment.
We should probably file a ticket for this as well -- I can do so as part of this PR's review
alamb
left a comment
There was a problem hiding this comment.
Thank you @jayzhan211 -- this pr is looking close. I still think the API needs a little tweaking as I mentioned in the comments
I apologize for the delay in review and feedback and I feel back about the back and forth. I am still catching up from last week. If you don't have a chance I hope to find some time to help push this PR through myself later this week
| &self, | ||
| _arg: &DataType, | ||
| _sort_exprs: Vec<Expr>, | ||
| _schmea: Option<Schema>, |
There was a problem hiding this comment.
| _schmea: Option<Schema>, | |
| _schema: Option<Schema>, |
| /// Return a new [`Accumulator`] that aggregates values for a specific | ||
| /// group during query execution. | ||
| fn accumulator(&self, arg: &DataType) -> Result<Box<dyn Accumulator>>; | ||
| fn accumulator( |
There was a problem hiding this comment.
I think this API:
- Doesn't need an owned Schema (passing in a
Schemarequires cloning which I think we can avoid - I am not sure why it needs an
Option(why is it not always passed in) - Should have the documentation updated to explain what sort_exprs are and what schema is
There was a problem hiding this comment.
Option is because Schema is only used for ordering, so if we don't care about ordering we can just pass None.
There was a problem hiding this comment.
Or maybe we should return the schema in case we need it in the future not only for ordering? 🤔
There was a problem hiding this comment.
Unless there is some reason not to always pass in schema I think we should always pass it in to make for an easier to use API. For example, if it is an Option the user would have to call unwrap() or check / error when they got sort exprs
alamb
left a comment
There was a problem hiding this comment.
Thanks @jayzhan211 -- I had a play around with the APIs and the code in this PR and I came up with this: jayzhan211#1
I see the challenge now that the create_accumulator function is invoked by the physical plan when there are no more Exprs / Schemas.
I think the next thing to do is figure out how ARRAY_AGG or some other built in aggregate function handles ordering and see if we can emulate the same thing
| /// Return a new [`Accumulator`] that aggregates values for a specific | ||
| /// group during query execution. | ||
| fn accumulator(&self, arg: &DataType) -> Result<Box<dyn Accumulator>>; | ||
| fn accumulator( |
There was a problem hiding this comment.
Unless there is some reason not to always pass in schema I think we should always pass it in to make for an easier to use API. For example, if it is an Option the user would have to call unwrap() or check / error when they got sort exprs
datafusion/expr/src/udaf.rs
Outdated
| /// group during query execution. sort_exprs is a list of ordering expressions, | ||
| /// and schema is used while ordering. |
There was a problem hiding this comment.
| /// group during query execution. sort_exprs is a list of ordering expressions, | |
| /// and schema is used while ordering. | |
| /// group during query execution. | |
| /// | |
| /// `arg`: the type of the argument to this accumulator | |
| /// | |
| /// `sort_exprs`: contains a list of `Expr::SortExpr`s if the | |
| /// aggregate is called with an explicit `ORDER BY`. For example, | |
| /// `ARRAY_AGG(x ORDER BY y ASC)`. In this case, `sort_exprs` would contain `[y ASC]` | |
| /// | |
| /// `schema` is the input schema to the aggregate |
datafusion/expr/src/udaf.rs
Outdated
| } | ||
|
|
||
| /// Return the ordering expressions for the accumulator | ||
| fn sort_exprs(&self) -> Vec<Expr> { |
There was a problem hiding this comment.
I don't understand how the sort_exprs could be supplied by the UDAF. I would expect them to be provided to the UDAF based on what was in the query
|
Edit: I think we can just pass the logical ordering expr and not move the conversion downstream. I think the problem here is that we need logical expr (ordering and schema) for accumulator. But the arguments pass into I'm thinking about changing the function here to let us done the converting of logical to physical lately, so we can keep the logical expr for accumulator. Other builtin accumulator don't have the problem because they are all defined in physical-expr unlike udaf. @alamb Does the move of physical expression conversion make senses? |
0dbfbad to
80eae25
Compare
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
This comment was marked as outdated.
This comment was marked as outdated.
Signed-off-by: jayzhan211 <[email protected]>
| data_type, | ||
| &ordering_dtypes, | ||
| ordering_req, | ||
| false, |
There was a problem hiding this comment.
ignore_nulls or other arguments are given in the accumulator provided by the user.
| let agg_udf = registry.udaf(udaf_name)?; | ||
| udaf::create_aggregate_expr(agg_udf.as_ref(), &input_phy_expr, &physical_schema, name) | ||
| // TODO: `order by` is not supported for UDAF yet | ||
| let sort_exprs = &[]; |
There was a problem hiding this comment.
We need to convert Vec<PhysicalSortExprNode> to Vec<Expr>, not sure if we should include it in this PR or not.
|
@alamb I think it is ready for review |
|
Thanks @jayzhan211 -- I will check it out, hopefully tomorrow |
|
I plan to change |
Which issue does this PR close?
Closes #8984.
Rationale for this change
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?