Skip to content

Comments

feat: support lambda function for scalar udf#17220

Closed
chenkovsky wants to merge 10 commits intoapache:mainfrom
chenkovsky:feat/high_order
Closed

feat: support lambda function for scalar udf#17220
chenkovsky wants to merge 10 commits intoapache:mainfrom
chenkovsky:feat/high_order

Conversation

@chenkovsky
Copy link
Contributor

@chenkovsky chenkovsky commented Aug 17, 2025

Which issue does this PR close?

  • Closes #.

Rationale for this change

Some array-related UDFs need to support passing in lambda

What changes are included in this PR?

support lambda function in Scalar UDF, and implement array_filter as an example.

Are these changes tested?

UT

Are there any user-facing changes?

No

@github-actions github-actions bot added sql SQL Planner logical-expr Logical plan and expressions physical-expr Changes to the physical-expr crates optimizer Optimizer rules sqllogictest SQL Logic Tests (.slt) substrait Changes to the substrait crate catalog Related to the catalog crate proto Related to proto crate labels Aug 17, 2025
@chenkovsky chenkovsky marked this pull request as draft August 17, 2025 14:59
@github-actions github-actions bot added the documentation Improvements or additions to documentation label Aug 17, 2025
@chenkovsky chenkovsky marked this pull request as ready for review August 18, 2025 01:19
Comment on lines 179 to 185
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
let [arg_type] = take_function_args(self.name(), arg_types)?;
match arg_type {
List(_) | LargeList(_) => Ok(arg_type.clone()),
_ => plan_err!("{} does not support type {}", self.name(), arg_type),
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please implement return_field_from_args instead so it won't be nullable in case the input is not nullable

Comment on lines 281 to 286
fn coerce_types(&self, _arg_types: &[DataType]) -> Result<Vec<DataType>> {
datafusion_common::not_impl_err!(
"Function {} does not implement coerce_types",
self.name()
)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the default implementation, can you please remove it?

lambda: &dyn PhysicalLambda,
field: &Arc<Field>,
) -> Result<ArrayRef> {
let mut offsets = vec![OffsetSize::zero()];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you use OffsetBufferBuilder instead? so we don't have to manage the offsets ourselves

/// Implementation of the `array_filter` scalar user-defined function.
///
/// This function filters array elements using a lambda function, returning a new array
/// containing only the elements for which the lambda function returns true.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also note that nulls will count as false

),
argument(
name = "lambda",
description = "Lambda function with one argument that returns a boolean. The lambda is applied to each element of the array."
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add that returning null will be the same as false

Copy link
Member

@rluvaton rluvaton left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good job, left some comments

Comment on lines +320 to +331
let values = list_array.values();
let value_offsets = list_array.value_offsets();
let nulls = list_array.nulls();

let batch = RecordBatch::try_new(
Schema::new(vec![field
.as_ref()
.clone()
.with_name(lambda.params()[0].clone())])
.into(),
vec![Arc::clone(values)],
)?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can do it in a separate PR.

this will lead to unnecessary computation as it will include values that are not part of list "visible" values in case of either of the following.

  1. the list is sliced, making the evaluate work on more data that is needed
    this is how to create that:
let data = vec![
    Some(vec![Some(0), Some(1), Some(2)]),
    Some(vec![Some(3), Some(4), Some(5)]),
    Some(vec![Some(6), Some(7)]),
    Some(vec![Some(8)]),
];
let list_array = ListArray::from_iter_primitive::<Int32Type, _, _>(data);
let list_sliced_values = list_array.slice(1, 2);
  1. in case of nulls in the list that are not behind an empty list
    this is how to create that
let data = vec![
    Some(vec![Some(0), Some(1), Some(2)]),
    Some(vec![Some(3), Some(4), Some(5)]),
    Some(vec![Some(6), Some(7)]),
];
let list_array = ListArray::from_iter_primitive::<Int32Type, _, _>(data);
let (field, offsets, values, nulls) = list_array.into_parts();
let list_array_with_null_pointing_to_non_empty_list = ListArray::try_new(
    field,
    offsets,
    values,
    Some(NullBuffer::from(&[true, false, true]))
)?;
```   

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please view it again, now array will be compacted first. I think it can solve unnecessary computation and the following bug. @rluvaton

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks ok, but can you please add a unit test (as it's tricky to simulate with sql) to make sure the bug won't return?

Comment on lines +334 to +338
let ColumnarValue::Array(filter_array) = filter_array else {
return exec_err!(
"array_filter requires a lambda that returns an array of booleans"
);
};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can add optimization for scalar if you want or I can do it in a different PR

Comment on lines 344 to 345
// Handle null arrays by keeping the offset unchanged
offsets.push(offsets[row_index]);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This have a bug in case of null value pointing to a non empty list and none of the underlying values were filtered


# array_filter with multiple array columns
statement ok
CREATE TABLE test_arrays (arr1 ARRAY<INTEGER>, arr2 ARRAY<INTEGER>) AS VALUES ([1, 2, 3], [4, 5, 6]);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add null list here as well and null items

}

fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
not_impl_err!("{} does not implement return_type", self.name())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
not_impl_err!("{} does not implement return_type", self.name())
not_impl_err!("{} does not implement return_type, call return_field_from_args instead", self.name())

Copy link
Member

@rluvaton rluvaton left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great job but I think this lambda API add a lot of complexity to the user.

@alamb, your take?

Comment on lines +320 to +331
let values = list_array.values();
let value_offsets = list_array.value_offsets();
let nulls = list_array.nulls();

let batch = RecordBatch::try_new(
Schema::new(vec![field
.as_ref()
.clone()
.with_name(lambda.params()[0].clone())])
.into(),
vec![Arc::clone(values)],
)?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks ok, but can you please add a unit test (as it's tricky to simulate with sql) to make sure the bug won't return?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because this is the first implementation for lambda function could you please add a lot of comments explaining how it work so future lambda creation will have a reference point?

///
/// # Returns
/// An optional optimized expression, or None if no optimization is available
fn try_call(&self, _args: &[Expr]) -> Result<Option<Expr>> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the try_call function name is confusing as it sounds like it will invoke the udf.

also isn't it the same as simplify?

Copy link
Contributor Author

@chenkovsky chenkovsky Aug 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's same as simplify, except invoking timing. I tried simplify before, but an error is thrown, because simplify is called in optimize stage. but we need this function to be called at the very beginning. maybe we can reuse simplify, but I'm not sure whether this will break other functions. maybe call this method: try_currying

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: Agreed, more descriptive name would be nice

Comment on lines +793 to +813
/// Plans the scalar UDF implementation with lambda function support.
///
/// This method enables UDF implementations to work with lambda functions
/// by allowing them to plan and prepare lambda expressions for execution.
/// Returns a new implementation instance if lambda planning is needed.
///
/// # Arguments
/// * `_planner` - The lambda planner for converting logical lambdas to physical
/// * `_args` - The function arguments that may include lambda expressions
/// * `_input_dfschema` - The input schema context for lambda planning
///
/// # Returns
/// An optional new UDF implementation with planned lambdas, or None if no planning is needed
fn plan(
&self,
_planner: &dyn LambdaPlanner,
_args: &[Expr],
_input_dfschema: &DFSchema,
) -> Result<Option<Arc<dyn ScalarUDFImpl>>> {
Ok(None)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this approach confusing both to implement and understand. It requires users to call this function beforehand for the higher-order function to actually work. I had to read through it several times to grasp the concept.

This function is now a prerequisite for the lambda function UDF to operate. Previously, there was only one simple entry point (invoke_with_args) that was straightforward to implement. Adding another entry point increases complexity unnecessarily.

I suggest considering an alternative approach: create a separate trait specifically for higher-order functions with a dedicated wrapper (similar to ScalarUDF) that provides a better API suited for higher-order functions. This wrapper could handle the "compilation" of lambda expressions upfront, and the invoke call would include the pre-compiled lambda function.

Alternatively, we could add physical expressions of children to ScalarFunctionArgs, though I'm not particularly fond of that solution either.

For context on precompilation (which is meant for optimization and not required for the expression to work):

The current implementation creates confusion and adds an unnecessary prerequisite step that users must remember to perform.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered your solutions before. but they all require a significant change. this solution is not the perfect one. But I think it's the least modification one.

Lambda functions are just a seasoning; although they are indispensable, most UDFs do not require them. For example, in Databricks, only the following functions use lambda functions:
aggregate,array_sort,exists,filter,forall,map_filter,map_zip_with,transform,transform_keys,transform_values,zip_with.
Therefore, I feel there is no need for us to make huge changes just because of this. That's why I selected the least modification one.

It requires users to call this function beforehand for the higher-order function to actually work.

it's a currying. from my side, this is not hard to understand.

Comment on lines +808 to +809
_planner: &dyn LambdaPlanner,
_args: &[Expr],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if I don't work with logical expressions and only physical ones like in Comet.

@alamb
Copy link
Contributor

alamb commented Aug 26, 2025

@shehabgamin / @andygrove is this functionality that might be needed for spark integration?

Thanks for the call otu @rluvaton and for the PR @chenkovsky

Unfortunately, I am not likely to have the time to review this PR / feature in the near term -- as @rluvaton says, it would take some time to understand the implications of this new API for the system and other users.

@chenkovsky can you explain more of the rationale / need for this functionality? If there are other users who need this feature, perhaps we can find some others in the community to help drive if forward.

@alamb alamb added the api change Changes the API exposed to users of the crate label Aug 26, 2025
@rluvaton
Copy link
Member

rluvaton commented Aug 26, 2025

I'm all in on supporting lambda functions

is this functionality that might be needed for spark integration?

yes, as @chenkovsky wrote:

in Databricks, only the following functions use lambda functions:
aggregate, array_sort, exists, filter, forall, map_filter, map_zip_with, transform, transform_keys, transform_values, zip_with.

@github-actions
Copy link

Thank you for your contribution. Unfortunately, this pull request is stale because it has been open 60 days with no activity. Please remove the stale label or comment or this will be closed in 7 days.

@github-actions github-actions bot added the Stale PR has not had any activity for some time label Oct 28, 2025
@github-actions github-actions bot closed this Nov 4, 2025
@shehabgamin
Copy link
Contributor

@shehabgamin / @andygrove is this functionality that might be needed for spark integration?

Thanks for the call otu @rluvaton and for the PR @chenkovsky

Unfortunately, I am not likely to have the time to review this PR / feature in the near term -- as @rluvaton says, it would take some time to understand the implications of this new API for the system and other users.

@chenkovsky can you explain more of the rationale / need for this functionality? If there are other users who need this feature, perhaps we can find some others in the community to help drive if forward.

@alamb @chenkovsky Sorry I missed this, I think supporting lambda functions would be super valuable. I'll take a look through this PR today/tomorrow and hopefully we can reopen it?

if let Some(fm) = self.context_provider.get_function_meta(&name) {
let args = self.function_args_to_expr(args, schema, planner_context)?;
return Ok(Expr::ScalarFunction(ScalarFunction::new_udf(fm, args)));
return fm.try_call(args);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The naming here is a bit confusing imo

///
/// # Returns
/// An optional new UDF implementation with planned lambdas, or None if no planning is needed
fn plan(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding a plan function to the trait feels a bit odd. Are there any potential use cases for this outside of lambdas?

@gstvg
Copy link
Contributor

gstvg commented Dec 9, 2025

@chenkovsky @rluvaton @shehabgamin @alamb I happened to also open a PR in #18921 to add lambda support, and even if this here (the first to be opened) is decided to move forward, I believe that the different approach and the alternatives listed there can help to make progress here. Thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

api change Changes the API exposed to users of the crate catalog Related to the catalog crate documentation Improvements or additions to documentation logical-expr Logical plan and expressions optimizer Optimizer rules physical-expr Changes to the physical-expr crates proto Related to proto crate sql SQL Planner sqllogictest SQL Logic Tests (.slt) Stale PR has not had any activity for some time substrait Changes to the substrait crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants