feat: support lambda function for scalar udf#17220
feat: support lambda function for scalar udf#17220chenkovsky wants to merge 10 commits intoapache:mainfrom
Conversation
| fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> { | ||
| let [arg_type] = take_function_args(self.name(), arg_types)?; | ||
| match arg_type { | ||
| List(_) | LargeList(_) => Ok(arg_type.clone()), | ||
| _ => plan_err!("{} does not support type {}", self.name(), arg_type), | ||
| } | ||
| } |
There was a problem hiding this comment.
Can you please implement return_field_from_args instead so it won't be nullable in case the input is not nullable
| fn coerce_types(&self, _arg_types: &[DataType]) -> Result<Vec<DataType>> { | ||
| datafusion_common::not_impl_err!( | ||
| "Function {} does not implement coerce_types", | ||
| self.name() | ||
| ) | ||
| } |
There was a problem hiding this comment.
This is the default implementation, can you please remove it?
| lambda: &dyn PhysicalLambda, | ||
| field: &Arc<Field>, | ||
| ) -> Result<ArrayRef> { | ||
| let mut offsets = vec![OffsetSize::zero()]; |
There was a problem hiding this comment.
can you use OffsetBufferBuilder instead? so we don't have to manage the offsets ourselves
| /// Implementation of the `array_filter` scalar user-defined function. | ||
| /// | ||
| /// This function filters array elements using a lambda function, returning a new array | ||
| /// containing only the elements for which the lambda function returns true. |
There was a problem hiding this comment.
Please also note that nulls will count as false
| ), | ||
| argument( | ||
| name = "lambda", | ||
| description = "Lambda function with one argument that returns a boolean. The lambda is applied to each element of the array." |
There was a problem hiding this comment.
please add that returning null will be the same as false
rluvaton
left a comment
There was a problem hiding this comment.
Good job, left some comments
| let values = list_array.values(); | ||
| let value_offsets = list_array.value_offsets(); | ||
| let nulls = list_array.nulls(); | ||
|
|
||
| let batch = RecordBatch::try_new( | ||
| Schema::new(vec![field | ||
| .as_ref() | ||
| .clone() | ||
| .with_name(lambda.params()[0].clone())]) | ||
| .into(), | ||
| vec![Arc::clone(values)], | ||
| )?; |
There was a problem hiding this comment.
I can do it in a separate PR.
this will lead to unnecessary computation as it will include values that are not part of list "visible" values in case of either of the following.
- the list is sliced, making the evaluate work on more data that is needed
this is how to create that:
let data = vec![
Some(vec![Some(0), Some(1), Some(2)]),
Some(vec![Some(3), Some(4), Some(5)]),
Some(vec![Some(6), Some(7)]),
Some(vec![Some(8)]),
];
let list_array = ListArray::from_iter_primitive::<Int32Type, _, _>(data);
let list_sliced_values = list_array.slice(1, 2);- in case of nulls in the list that are not behind an empty list
this is how to create that
let data = vec![
Some(vec![Some(0), Some(1), Some(2)]),
Some(vec![Some(3), Some(4), Some(5)]),
Some(vec![Some(6), Some(7)]),
];
let list_array = ListArray::from_iter_primitive::<Int32Type, _, _>(data);
let (field, offsets, values, nulls) = list_array.into_parts();
let list_array_with_null_pointing_to_non_empty_list = ListArray::try_new(
field,
offsets,
values,
Some(NullBuffer::from(&[true, false, true]))
)?;
``` There was a problem hiding this comment.
please view it again, now array will be compacted first. I think it can solve unnecessary computation and the following bug. @rluvaton
There was a problem hiding this comment.
it looks ok, but can you please add a unit test (as it's tricky to simulate with sql) to make sure the bug won't return?
| let ColumnarValue::Array(filter_array) = filter_array else { | ||
| return exec_err!( | ||
| "array_filter requires a lambda that returns an array of booleans" | ||
| ); | ||
| }; |
There was a problem hiding this comment.
You can add optimization for scalar if you want or I can do it in a different PR
| // Handle null arrays by keeping the offset unchanged | ||
| offsets.push(offsets[row_index]); |
There was a problem hiding this comment.
This have a bug in case of null value pointing to a non empty list and none of the underlying values were filtered
|
|
||
| # array_filter with multiple array columns | ||
| statement ok | ||
| CREATE TABLE test_arrays (arr1 ARRAY<INTEGER>, arr2 ARRAY<INTEGER>) AS VALUES ([1, 2, 3], [4, 5, 6]); |
There was a problem hiding this comment.
Can you please add null list here as well and null items
| } | ||
|
|
||
| fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> { | ||
| not_impl_err!("{} does not implement return_type", self.name()) |
There was a problem hiding this comment.
| not_impl_err!("{} does not implement return_type", self.name()) | |
| not_impl_err!("{} does not implement return_type, call return_field_from_args instead", self.name()) |
| let values = list_array.values(); | ||
| let value_offsets = list_array.value_offsets(); | ||
| let nulls = list_array.nulls(); | ||
|
|
||
| let batch = RecordBatch::try_new( | ||
| Schema::new(vec![field | ||
| .as_ref() | ||
| .clone() | ||
| .with_name(lambda.params()[0].clone())]) | ||
| .into(), | ||
| vec![Arc::clone(values)], | ||
| )?; |
There was a problem hiding this comment.
it looks ok, but can you please add a unit test (as it's tricky to simulate with sql) to make sure the bug won't return?
There was a problem hiding this comment.
Because this is the first implementation for lambda function could you please add a lot of comments explaining how it work so future lambda creation will have a reference point?
| /// | ||
| /// # Returns | ||
| /// An optional optimized expression, or None if no optimization is available | ||
| fn try_call(&self, _args: &[Expr]) -> Result<Option<Expr>> { |
There was a problem hiding this comment.
the try_call function name is confusing as it sounds like it will invoke the udf.
also isn't it the same as simplify?
There was a problem hiding this comment.
it's same as simplify, except invoking timing. I tried simplify before, but an error is thrown, because simplify is called in optimize stage. but we need this function to be called at the very beginning. maybe we can reuse simplify, but I'm not sure whether this will break other functions. maybe call this method: try_currying
There was a problem hiding this comment.
Nitpick: Agreed, more descriptive name would be nice
| /// Plans the scalar UDF implementation with lambda function support. | ||
| /// | ||
| /// This method enables UDF implementations to work with lambda functions | ||
| /// by allowing them to plan and prepare lambda expressions for execution. | ||
| /// Returns a new implementation instance if lambda planning is needed. | ||
| /// | ||
| /// # Arguments | ||
| /// * `_planner` - The lambda planner for converting logical lambdas to physical | ||
| /// * `_args` - The function arguments that may include lambda expressions | ||
| /// * `_input_dfschema` - The input schema context for lambda planning | ||
| /// | ||
| /// # Returns | ||
| /// An optional new UDF implementation with planned lambdas, or None if no planning is needed | ||
| fn plan( | ||
| &self, | ||
| _planner: &dyn LambdaPlanner, | ||
| _args: &[Expr], | ||
| _input_dfschema: &DFSchema, | ||
| ) -> Result<Option<Arc<dyn ScalarUDFImpl>>> { | ||
| Ok(None) | ||
| } |
There was a problem hiding this comment.
I find this approach confusing both to implement and understand. It requires users to call this function beforehand for the higher-order function to actually work. I had to read through it several times to grasp the concept.
This function is now a prerequisite for the lambda function UDF to operate. Previously, there was only one simple entry point (invoke_with_args) that was straightforward to implement. Adding another entry point increases complexity unnecessarily.
I suggest considering an alternative approach: create a separate trait specifically for higher-order functions with a dedicated wrapper (similar to ScalarUDF) that provides a better API suited for higher-order functions. This wrapper could handle the "compilation" of lambda expressions upfront, and the invoke call would include the pre-compiled lambda function.
Alternatively, we could add physical expressions of children to ScalarFunctionArgs, though I'm not particularly fond of that solution either.
For context on precompilation (which is meant for optimization and not required for the expression to work):
The current implementation creates confusion and adds an unnecessary prerequisite step that users must remember to perform.
There was a problem hiding this comment.
I considered your solutions before. but they all require a significant change. this solution is not the perfect one. But I think it's the least modification one.
Lambda functions are just a seasoning; although they are indispensable, most UDFs do not require them. For example, in Databricks, only the following functions use lambda functions:
aggregate,array_sort,exists,filter,forall,map_filter,map_zip_with,transform,transform_keys,transform_values,zip_with.
Therefore, I feel there is no need for us to make huge changes just because of this. That's why I selected the least modification one.
It requires users to call this function beforehand for the higher-order function to actually work.
it's a currying. from my side, this is not hard to understand.
| _planner: &dyn LambdaPlanner, | ||
| _args: &[Expr], |
There was a problem hiding this comment.
What if I don't work with logical expressions and only physical ones like in Comet.
|
@shehabgamin / @andygrove is this functionality that might be needed for spark integration? Thanks for the call otu @rluvaton and for the PR @chenkovsky Unfortunately, I am not likely to have the time to review this PR / feature in the near term -- as @rluvaton says, it would take some time to understand the implications of this new API for the system and other users. @chenkovsky can you explain more of the rationale / need for this functionality? If there are other users who need this feature, perhaps we can find some others in the community to help drive if forward. |
|
I'm all in on supporting lambda functions
yes, as @chenkovsky wrote:
|
|
Thank you for your contribution. Unfortunately, this pull request is stale because it has been open 60 days with no activity. Please remove the stale label or comment or this will be closed in 7 days. |
@alamb @chenkovsky Sorry I missed this, I think supporting lambda functions would be super valuable. I'll take a look through this PR today/tomorrow and hopefully we can reopen it? |
| if let Some(fm) = self.context_provider.get_function_meta(&name) { | ||
| let args = self.function_args_to_expr(args, schema, planner_context)?; | ||
| return Ok(Expr::ScalarFunction(ScalarFunction::new_udf(fm, args))); | ||
| return fm.try_call(args); |
There was a problem hiding this comment.
The naming here is a bit confusing imo
| /// | ||
| /// # Returns | ||
| /// An optional new UDF implementation with planned lambdas, or None if no planning is needed | ||
| fn plan( |
There was a problem hiding this comment.
Adding a plan function to the trait feels a bit odd. Are there any potential use cases for this outside of lambdas?
|
@chenkovsky @rluvaton @shehabgamin @alamb I happened to also open a PR in #18921 to add lambda support, and even if this here (the first to be opened) is decided to move forward, I believe that the different approach and the alternatives listed there can help to make progress here. Thanks |
Which issue does this PR close?
Rationale for this change
Some array-related UDFs need to support passing in lambda
What changes are included in this PR?
support lambda function in Scalar UDF, and implement array_filter as an example.
Are these changes tested?
UT
Are there any user-facing changes?
No